Mensageria com RabbitMQ: Comunicação Assíncrona entre Microserviços
Aprenda a implementar comunicação assíncrona entre microserviços usando RabbitMQ, incluindo padrões de mensageria, dead letter queues, retry policies e observabilidade.
Este conteúdo é gratuito! Ajude a manter o projeto no ar.
0737160d-e98f-4a65-8392-5dba70e7ff3eNo artigo anterior sobre API Gateway com Kong, vimos como gerenciar a comunicação síncrona entre clientes e microserviços. Agora, vamos explorar a comunicação assíncrona usando RabbitMQ, essencial para sistemas distribuídos resilientes.
Por que Comunicação Assíncrona?
Em arquiteturas de microserviços, a comunicação síncrona (HTTP/REST) cria acoplamento temporal entre serviços. Se o serviço destino estiver indisponível, a requisição falha.
Comunicação Síncrona (HTTP):
┌──────────┐ HTTP Request ┌──────────┐
│ Service │ ──────────────────► │ Service │
│ A │ ◄────────────────── │ B │
└──────────┘ HTTP Response └──────────┘
⚠️ Se B estiver down, A falha
Comunicação Assíncrona (Message Broker):
┌──────────┐ Publish ┌──────────┐ Consume ┌──────────┐
│ Service │ ──────────► │ RabbitMQ │ ──────────► │ Service │
│ A │ │ Broker │ │ B │
└──────────┘ └──────────┘ └──────────┘
✅ Se B estiver down, mensagem fica na fila
Quando Usar Mensageria
| Cenário | Recomendação |
|---|---|
| Operações que podem ser processadas depois | ✅ Assíncrono |
| Notificações e emails | ✅ Assíncrono |
| Processamento de arquivos/imagens | ✅ Assíncrono |
| Sincronização de dados entre serviços | ✅ Assíncrono |
| Consulta que precisa de resposta imediata | ❌ Síncrono |
| Validação em tempo real | ❌ Síncrono |
RabbitMQ: Conceitos Fundamentais
RabbitMQ é um message broker que implementa o protocolo AMQP (Advanced Message Queuing Protocol).
Arquitetura
┌─────────────────────────────────────────────────────────────────────┐
│ RabbitMQ Broker │
│ │
│ Producer ──► Exchange ──► Binding ──► Queue ──► Consumer │
│ │ │ │
│ ┌─────┴─────┐ ┌─────┴─────┐ │
│ │ Routing │ │ Message │ │
│ │ Rules │ │ Storage │ │
│ └───────────┘ └───────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Componentes Principais
| Componente | Descrição |
|---|---|
| Producer | Aplicação que envia mensagens |
| Exchange | Recebe mensagens e roteia para filas |
| Queue | Armazena mensagens até serem consumidas |
| Binding | Regra que liga Exchange à Queue |
| Consumer | Aplicação que recebe e processa mensagens |
| Virtual Host | Isolamento lógico (multi-tenancy) |
Tipos de Exchange
┌─────────────────────────────────────────────────────────────────────┐
│ Exchange Types │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ DIRECT: Routing key exato │
│ ┌──────────┐ │
│ │ Exchange │──── routing_key="order.created" ────► Queue A │
│ └──────────┘ │
│ │
│ FANOUT: Broadcast para todas as filas │
│ ┌──────────┐────────────────────────────────────► Queue A │
│ │ Exchange │────────────────────────────────────► Queue B │
│ └──────────┘────────────────────────────────────► Queue C │
│ │
│ TOPIC: Pattern matching com wildcards │
│ ┌──────────┐ │
│ │ Exchange │──── "order.*" ─────────────────────► Queue A │
│ └──────────┘──── "order.#" ─────────────────────► Queue B │
│ (* = uma palavra, # = zero ou mais) │
│ │
│ HEADERS: Match por headers da mensagem │
│ ┌──────────┐ │
│ │ Exchange │──── x-match: all, type: order ─────► Queue A │
│ └──────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Estrutura do Projeto
messaging-service/
├── src/
│ ├── config/
│ │ ├── rabbitmq.config.ts
│ │ └── index.ts
│ │
│ ├── infrastructure/
│ │ ├── messaging/
│ │ │ ├── RabbitMQConnection.ts
│ │ │ ├── RabbitMQPublisher.ts
│ │ │ ├── RabbitMQConsumer.ts
│ │ │ ├── MessageSerializer.ts
│ │ │ └── index.ts
│ │ │
│ │ └── health/
│ │ └── RabbitMQHealthCheck.ts
│ │
│ ├── domain/
│ │ ├── events/
│ │ │ ├── DomainEvent.ts
│ │ │ ├── OrderCreatedEvent.ts
│ │ │ ├── OrderPaidEvent.ts
│ │ │ ├── UserRegisteredEvent.ts
│ │ │ └── index.ts
│ │ │
│ │ └── interfaces/
│ │ ├── IMessagePublisher.ts
│ │ ├── IMessageConsumer.ts
│ │ └── IMessageHandler.ts
│ │
│ ├── application/
│ │ ├── handlers/
│ │ │ ├── OrderCreatedHandler.ts
│ │ │ ├── OrderPaidHandler.ts
│ │ │ ├── UserRegisteredHandler.ts
│ │ │ └── index.ts
│ │ │
│ │ └── services/
│ │ └── EventDispatcher.ts
│ │
│ ├── presentation/
│ │ └── consumers/
│ │ ├── OrderConsumer.ts
│ │ ├── NotificationConsumer.ts
│ │ └── index.ts
│ │
│ └── shared/
│ ├── errors/
│ │ ├── MessageProcessingError.ts
│ │ └── RetryableError.ts
│ │
│ └── utils/
│ ├── retry.ts
│ └── logger.ts
│
├── docker/
│ ├── docker-compose.yml
│ └── rabbitmq/
│ ├── definitions.json
│ └── rabbitmq.conf
│
├── tests/
│ ├── unit/
│ │ └── handlers/
│ ├── integration/
│ │ └── messaging/
│ └── e2e/
│
├── package.json
├── tsconfig.json
└── README.md
Implementação
1. Configuração do RabbitMQ
// src/config/rabbitmq.config.tsexport interface RabbitMQConfig { host: string; port: number; username: string; password: string; vhost: string; heartbeat: number; prefetch: number; reconnectDelay: number; maxReconnectAttempts: number;} export interface ExchangeConfig { name: string; type: 'direct' | 'fanout' | 'topic' | 'headers'; durable: boolean; autoDelete: boolean;} export interface QueueConfig { name: string; durable: boolean; exclusive: boolean; autoDelete: boolean; deadLetterExchange?: string; deadLetterRoutingKey?: string; messageTtl?: number; maxLength?: number; maxPriority?: number;} export const rabbitmqConfig: RabbitMQConfig = { host: process.env.RABBITMQ_HOST || 'localhost', port: parseInt(process.env.RABBITMQ_PORT || '5672'), username: process.env.RABBITMQ_USER || 'guest', password: process.env.RABBITMQ_PASS || 'guest', vhost: process.env.RABBITMQ_VHOST || '/', heartbeat: 60, prefetch: 10, reconnectDelay: 5000, maxReconnectAttempts: 10,}; export const exchanges: Record<string, ExchangeConfig> = { orders: { name: 'orders.exchange', type: 'topic', durable: true, autoDelete: false, }, notifications: { name: 'notifications.exchange', type: 'fanout', durable: true, autoDelete: false, }, deadLetter: { name: 'dlx.exchange', type: 'direct', durable: true, autoDelete: false, },}; export const queues: Record<string, QueueConfig> = { orderCreated: { name: 'orders.created.queue', durable: true, exclusive: false, autoDelete: false, deadLetterExchange: 'dlx.exchange', deadLetterRoutingKey: 'orders.created.dlq', messageTtl: 86400000, // 24 hours }, orderPaid: { name: 'orders.paid.queue', durable: true, exclusive: false, autoDelete: false, deadLetterExchange: 'dlx.exchange', deadLetterRoutingKey: 'orders.paid.dlq', }, notifications: { name: 'notifications.queue', durable: true, exclusive: false, autoDelete: false, maxLength: 10000, }, deadLetter: { name: 'dead-letter.queue', durable: true, exclusive: false, autoDelete: false, },};2. Conexão com RabbitMQ
// src/infrastructure/messaging/RabbitMQConnection.tsimport amqp, { Connection, Channel, ConfirmChannel } from 'amqplib';import { EventEmitter } from 'events';import { RabbitMQConfig, ExchangeConfig, QueueConfig } from '../../config/rabbitmq.config';import { Logger } from '../../shared/utils/logger'; export class RabbitMQConnection extends EventEmitter { private connection: Connection | null = null; private channel: ConfirmChannel | null = null; private reconnectAttempts = 0; private isConnecting = false; constructor( private readonly config: RabbitMQConfig, private readonly logger: Logger ) { super(); } async connect(): Promise<void> { if (this.isConnecting) return; this.isConnecting = true; try { const url = this.buildConnectionUrl(); this.connection = await amqp.connect(url, { heartbeat: this.config.heartbeat, }); this.connection.on('error', (err) => { this.logger.error('RabbitMQ connection error', { error: err.message }); this.emit('error', err); }); this.connection.on('close', () => { this.logger.warn('RabbitMQ connection closed'); this.emit('disconnected'); this.scheduleReconnect(); }); this.channel = await this.connection.createConfirmChannel(); await this.channel.prefetch(this.config.prefetch); this.channel.on('error', (err) => { this.logger.error('RabbitMQ channel error', { error: err.message }); }); this.channel.on('close', () => { this.logger.warn('RabbitMQ channel closed'); }); this.reconnectAttempts = 0; this.isConnecting = false; this.emit('connected'); this.logger.info('Connected to RabbitMQ'); } catch (error) { this.isConnecting = false; this.logger.error('Failed to connect to RabbitMQ', { error }); this.scheduleReconnect(); throw error; } } private buildConnectionUrl(): string { const { username, password, host, port, vhost } = this.config; return `amqp://${username}:${password}@${host}:${port}${vhost}`; } private scheduleReconnect(): void { if (this.reconnectAttempts >= this.config.maxReconnectAttempts) { this.logger.error('Max reconnect attempts reached'); this.emit('maxReconnectAttempts'); return; } this.reconnectAttempts++; const delay = this.config.reconnectDelay * this.reconnectAttempts; this.logger.info(`Scheduling reconnect attempt ${this.reconnectAttempts}`, { delayMs: delay, }); setTimeout(() => this.connect(), delay); } async setupExchange(config: ExchangeConfig): Promise<void> { if (!this.channel) throw new Error('Channel not initialized'); await this.channel.assertExchange(config.name, config.type, { durable: config.durable, autoDelete: config.autoDelete, }); this.logger.info(`Exchange created: ${config.name}`); } async setupQueue(config: QueueConfig): Promise<void> { if (!this.channel) throw new Error('Channel not initialized'); const options: amqp.Options.AssertQueue = { durable: config.durable, exclusive: config.exclusive, autoDelete: config.autoDelete, arguments: {}, }; if (config.deadLetterExchange) { options.arguments!['x-dead-letter-exchange'] = config.deadLetterExchange; } if (config.deadLetterRoutingKey) { options.arguments!['x-dead-letter-routing-key'] = config.deadLetterRoutingKey; } if (config.messageTtl) { options.arguments!['x-message-ttl'] = config.messageTtl; } if (config.maxLength) { options.arguments!['x-max-length'] = config.maxLength; } if (config.maxPriority) { options.arguments!['x-max-priority'] = config.maxPriority; } await this.channel.assertQueue(config.name, options); this.logger.info(`Queue created: ${config.name}`); } async bindQueue( queue: string, exchange: string, routingKey: string ): Promise<void> { if (!this.channel) throw new Error('Channel not initialized'); await this.channel.bindQueue(queue, exchange, routingKey); this.logger.info(`Queue ${queue} bound to ${exchange} with key ${routingKey}`); } getChannel(): ConfirmChannel { if (!this.channel) throw new Error('Channel not initialized'); return this.channel; } async close(): Promise<void> { try { if (this.channel) await this.channel.close(); if (this.connection) await this.connection.close(); this.logger.info('RabbitMQ connection closed gracefully'); } catch (error) { this.logger.error('Error closing RabbitMQ connection', { error }); } } isConnected(): boolean { return this.connection !== null && this.channel !== null; }}3. Domain Events
// src/domain/events/DomainEvent.tsexport interface DomainEvent<T = unknown> { eventId: string; eventType: string; aggregateId: string; aggregateType: string; timestamp: Date; version: number; payload: T; metadata: EventMetadata;} export interface EventMetadata { correlationId: string; causationId?: string; userId?: string; traceId?: string; spanId?: string;} export abstract class BaseDomainEvent<T> implements DomainEvent<T> { public readonly eventId: string; public readonly timestamp: Date; public readonly version: number = 1; constructor( public readonly eventType: string, public readonly aggregateId: string, public readonly aggregateType: string, public readonly payload: T, public readonly metadata: EventMetadata ) { this.eventId = this.generateEventId(); this.timestamp = new Date(); } private generateEventId(): string { return `${this.aggregateType}-${this.aggregateId}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; }} // src/domain/events/OrderCreatedEvent.tsexport interface OrderCreatedPayload { orderId: string; customerId: string; items: Array<{ productId: string; quantity: number; price: number; }>; totalAmount: number; currency: string; shippingAddress: { street: string; city: string; state: string; zipCode: string; country: string; };} export class OrderCreatedEvent extends BaseDomainEvent<OrderCreatedPayload> { static readonly EVENT_TYPE = 'order.created'; constructor(payload: OrderCreatedPayload, metadata: EventMetadata) { super( OrderCreatedEvent.EVENT_TYPE, payload.orderId, 'Order', payload, metadata ); }} // src/domain/events/OrderPaidEvent.tsexport interface OrderPaidPayload { orderId: string; paymentId: string; amount: number; currency: string; paymentMethod: string; paidAt: Date;} export class OrderPaidEvent extends BaseDomainEvent<OrderPaidPayload> { static readonly EVENT_TYPE = 'order.paid'; constructor(payload: OrderPaidPayload, metadata: EventMetadata) { super( OrderPaidEvent.EVENT_TYPE, payload.orderId, 'Order', payload, metadata ); }}4. Publisher
// src/infrastructure/messaging/RabbitMQPublisher.tsimport { ConfirmChannel } from 'amqplib';import { DomainEvent } from '../../domain/events/DomainEvent';import { IMessagePublisher, PublishOptions } from '../../domain/interfaces/IMessagePublisher';import { MessageSerializer } from './MessageSerializer';import { Logger } from '../../shared/utils/logger';import { RabbitMQConnection } from './RabbitMQConnection'; export class RabbitMQPublisher implements IMessagePublisher { private channel: ConfirmChannel | null = null; constructor( private readonly connection: RabbitMQConnection, private readonly serializer: MessageSerializer, private readonly logger: Logger ) { this.connection.on('connected', () => { this.channel = this.connection.getChannel(); }); } async publish<T>( exchange: string, routingKey: string, event: DomainEvent<T>, options: PublishOptions = {} ): Promise<void> { if (!this.channel) { throw new Error('Channel not available'); } const message = this.serializer.serialize(event); const publishOptions = this.buildPublishOptions(event, options); return new Promise((resolve, reject) => { this.channel!.publish( exchange, routingKey, message, publishOptions, (err) => { if (err) { this.logger.error('Failed to publish message', { exchange, routingKey, eventId: event.eventId, error: err.message, }); reject(err); } else { this.logger.info('Message published', { exchange, routingKey, eventId: event.eventId, eventType: event.eventType, }); resolve(); } } ); }); } async publishBatch<T>( exchange: string, events: Array<{ routingKey: string; event: DomainEvent<T> }> ): Promise<void> { const publishPromises = events.map(({ routingKey, event }) => this.publish(exchange, routingKey, event) ); await Promise.all(publishPromises); } private buildPublishOptions( event: DomainEvent, options: PublishOptions ): amqp.Options.Publish { return { persistent: options.persistent ?? true, messageId: event.eventId, timestamp: event.timestamp.getTime(), type: event.eventType, contentType: 'application/json', contentEncoding: 'utf-8', correlationId: event.metadata.correlationId, headers: { 'x-event-type': event.eventType, 'x-aggregate-type': event.aggregateType, 'x-aggregate-id': event.aggregateId, 'x-trace-id': event.metadata.traceId, ...options.headers, }, priority: options.priority, expiration: options.expiration?.toString(), }; }} // src/domain/interfaces/IMessagePublisher.tsexport interface PublishOptions { persistent?: boolean; priority?: number; expiration?: number; headers?: Record<string, string>;} export interface IMessagePublisher { publish<T>( exchange: string, routingKey: string, event: DomainEvent<T>, options?: PublishOptions ): Promise<void>; publishBatch<T>( exchange: string, events: Array<{ routingKey: string; event: DomainEvent<T> }> ): Promise<void>;}5. Consumer com Retry e Dead Letter
// src/infrastructure/messaging/RabbitMQConsumer.tsimport { ConsumeMessage, ConfirmChannel } from 'amqplib';import { DomainEvent } from '../../domain/events/DomainEvent';import { IMessageHandler } from '../../domain/interfaces/IMessageHandler';import { MessageSerializer } from './MessageSerializer';import { Logger } from '../../shared/utils/logger';import { RabbitMQConnection } from './RabbitMQConnection';import { RetryableError } from '../../shared/errors/RetryableError'; export interface ConsumerOptions { queue: string; prefetch?: number; noAck?: boolean; maxRetries?: number; retryDelay?: number;} export class RabbitMQConsumer { private channel: ConfirmChannel | null = null; private handlers: Map<string, IMessageHandler> = new Map(); constructor( private readonly connection: RabbitMQConnection, private readonly serializer: MessageSerializer, private readonly logger: Logger, private readonly options: ConsumerOptions ) { this.connection.on('connected', () => { this.channel = this.connection.getChannel(); }); } registerHandler(eventType: string, handler: IMessageHandler): void { this.handlers.set(eventType, handler); this.logger.info(`Handler registered for event type: ${eventType}`); } async start(): Promise<void> { if (!this.channel) { throw new Error('Channel not available'); } if (this.options.prefetch) { await this.channel.prefetch(this.options.prefetch); } await this.channel.consume( this.options.queue, (msg) => this.handleMessage(msg), { noAck: this.options.noAck ?? false } ); this.logger.info(`Consumer started for queue: ${this.options.queue}`); } private async handleMessage(msg: ConsumeMessage | null): Promise<void> { if (!msg || !this.channel) return; const startTime = Date.now(); const eventType = msg.properties.type; const messageId = msg.properties.messageId; try { const event = this.serializer.deserialize<DomainEvent>(msg.content); const handler = this.handlers.get(eventType); if (!handler) { this.logger.warn(`No handler found for event type: ${eventType}`); this.channel.ack(msg); return; } this.logger.info('Processing message', { messageId, eventType, queue: this.options.queue, }); await handler.handle(event); this.channel.ack(msg); this.logger.info('Message processed successfully', { messageId, eventType, durationMs: Date.now() - startTime, }); } catch (error) { await this.handleError(msg, error as Error); } } private async handleError(msg: ConsumeMessage, error: Error): Promise<void> { const retryCount = this.getRetryCount(msg); const maxRetries = this.options.maxRetries ?? 3; const messageId = msg.properties.messageId; this.logger.error('Error processing message', { messageId, error: error.message, retryCount, maxRetries, }); if (error instanceof RetryableError && retryCount < maxRetries) { await this.scheduleRetry(msg, retryCount); } else { // Send to dead letter queue this.channel!.nack(msg, false, false); this.logger.warn('Message sent to dead letter queue', { messageId, reason: retryCount >= maxRetries ? 'max retries exceeded' : 'non-retryable error', }); } } private getRetryCount(msg: ConsumeMessage): number { const deaths = msg.properties.headers?.['x-death']; if (!deaths || !Array.isArray(deaths)) return 0; return deaths.reduce((count, death) => count + (death.count || 0), 0); } private async scheduleRetry( msg: ConsumeMessage, retryCount: number ): Promise<void> { const delay = this.calculateRetryDelay(retryCount); // Requeue with delay using dead letter exchange this.channel!.nack(msg, false, false); this.logger.info('Message scheduled for retry', { messageId: msg.properties.messageId, retryCount: retryCount + 1, delayMs: delay, }); } private calculateRetryDelay(retryCount: number): number { const baseDelay = this.options.retryDelay ?? 1000; // Exponential backoff: 1s, 2s, 4s, 8s... return baseDelay * Math.pow(2, retryCount); } async stop(): Promise<void> { if (this.channel) { await this.channel.cancel(this.options.queue); this.logger.info(`Consumer stopped for queue: ${this.options.queue}`); } }}6. Message Handlers
// src/application/handlers/OrderCreatedHandler.tsimport { IMessageHandler } from '../../domain/interfaces/IMessageHandler';import { OrderCreatedEvent, OrderCreatedPayload } from '../../domain/events/OrderCreatedEvent';import { Logger } from '../../shared/utils/logger';import { NotificationService } from '../services/NotificationService';import { InventoryService } from '../services/InventoryService'; export class OrderCreatedHandler implements IMessageHandler<OrderCreatedPayload> { constructor( private readonly notificationService: NotificationService, private readonly inventoryService: InventoryService, private readonly logger: Logger ) {} async handle(event: OrderCreatedEvent): Promise<void> { const { payload } = event; this.logger.info('Handling order created event', { orderId: payload.orderId, customerId: payload.customerId, totalAmount: payload.totalAmount, }); // Reserve inventory await this.inventoryService.reserveItems( payload.orderId, payload.items.map((item) => ({ productId: item.productId, quantity: item.quantity, })) ); // Send confirmation notification await this.notificationService.sendOrderConfirmation({ customerId: payload.customerId, orderId: payload.orderId, totalAmount: payload.totalAmount, currency: payload.currency, }); this.logger.info('Order created event handled successfully', { orderId: payload.orderId, }); }} // src/application/handlers/OrderPaidHandler.tsimport { IMessageHandler } from '../../domain/interfaces/IMessageHandler';import { OrderPaidEvent, OrderPaidPayload } from '../../domain/events/OrderPaidEvent';import { Logger } from '../../shared/utils/logger';import { ShippingService } from '../services/ShippingService';import { InvoiceService } from '../services/InvoiceService'; export class OrderPaidHandler implements IMessageHandler<OrderPaidPayload> { constructor( private readonly shippingService: ShippingService, private readonly invoiceService: InvoiceService, private readonly logger: Logger ) {} async handle(event: OrderPaidEvent): Promise<void> { const { payload } = event; this.logger.info('Handling order paid event', { orderId: payload.orderId, paymentId: payload.paymentId, amount: payload.amount, }); // Generate invoice const invoice = await this.invoiceService.generate({ orderId: payload.orderId, paymentId: payload.paymentId, amount: payload.amount, currency: payload.currency, paidAt: payload.paidAt, }); // Schedule shipping await this.shippingService.scheduleShipment({ orderId: payload.orderId, invoiceId: invoice.id, }); this.logger.info('Order paid event handled successfully', { orderId: payload.orderId, invoiceId: invoice.id, }); }}7. Docker Compose
# docker/docker-compose.ymlversion: '3.8' services: rabbitmq: image: rabbitmq:3.12-management-alpine container_name: rabbitmq hostname: rabbitmq environment: RABBITMQ_DEFAULT_USER: admin RABBITMQ_DEFAULT_PASS: admin123 RABBITMQ_DEFAULT_VHOST: / ports: - "5672:5672" # AMQP - "15672:15672" # Management UI - "15692:15692" # Prometheus metrics volumes: - rabbitmq_data:/var/lib/rabbitmq - ./rabbitmq/definitions.json:/etc/rabbitmq/definitions.json - ./rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf healthcheck: test: ["CMD", "rabbitmq-diagnostics", "check_running"] interval: 10s timeout: 5s retries: 5 networks: - messaging-net order-service: build: context: ../ dockerfile: Dockerfile container_name: order-service environment: NODE_ENV: development RABBITMQ_HOST: rabbitmq RABBITMQ_PORT: 5672 RABBITMQ_USER: admin RABBITMQ_PASS: admin123 depends_on: rabbitmq: condition: service_healthy networks: - messaging-net notification-service: build: context: ../notification-service dockerfile: Dockerfile container_name: notification-service environment: NODE_ENV: development RABBITMQ_HOST: rabbitmq RABBITMQ_PORT: 5672 RABBITMQ_USER: admin RABBITMQ_PASS: admin123 depends_on: rabbitmq: condition: service_healthy networks: - messaging-net volumes: rabbitmq_data: networks: messaging-net: driver: bridge8. Definições do RabbitMQ
{ "rabbit_version": "3.12.0", "users": [ { "name": "admin", "password_hash": "hashed_password", "hashing_algorithm": "rabbit_password_hashing_sha256", "tags": ["administrator"] } ], "vhosts": [ {"name": "/"} ], "permissions": [ { "user": "admin", "vhost": "/", "configure": ".*", "write": ".*", "read": ".*" } ], "exchanges": [ { "name": "orders.exchange", "vhost": "/", "type": "topic", "durable": true, "auto_delete": false }, { "name": "notifications.exchange", "vhost": "/", "type": "fanout", "durable": true, "auto_delete": false }, { "name": "dlx.exchange", "vhost": "/", "type": "direct", "durable": true, "auto_delete": false } ], "queues": [ { "name": "orders.created.queue", "vhost": "/", "durable": true, "auto_delete": false, "arguments": { "x-dead-letter-exchange": "dlx.exchange", "x-dead-letter-routing-key": "orders.created.dlq" } }, { "name": "orders.paid.queue", "vhost": "/", "durable": true, "auto_delete": false, "arguments": { "x-dead-letter-exchange": "dlx.exchange", "x-dead-letter-routing-key": "orders.paid.dlq" } }, { "name": "notifications.queue", "vhost": "/", "durable": true, "auto_delete": false }, { "name": "dead-letter.queue", "vhost": "/", "durable": true, "auto_delete": false } ], "bindings": [ { "source": "orders.exchange", "vhost": "/", "destination": "orders.created.queue", "destination_type": "queue", "routing_key": "order.created" }, { "source": "orders.exchange", "vhost": "/", "destination": "orders.paid.queue", "destination_type": "queue", "routing_key": "order.paid" }, { "source": "notifications.exchange", "vhost": "/", "destination": "notifications.queue", "destination_type": "queue", "routing_key": "" }, { "source": "dlx.exchange", "vhost": "/", "destination": "dead-letter.queue", "destination_type": "queue", "routing_key": "#" } ]}Padrões de Mensageria
1. Saga Pattern
// src/application/sagas/OrderSaga.tsimport { DomainEvent } from '../../domain/events/DomainEvent';import { IMessagePublisher } from '../../domain/interfaces/IMessagePublisher';import { Logger } from '../../shared/utils/logger'; interface SagaStep<T> { name: string; execute: (data: T) => Promise<void>; compensate: (data: T) => Promise<void>;} export class OrderSaga { private steps: SagaStep<OrderSagaData>[] = []; private executedSteps: SagaStep<OrderSagaData>[] = []; constructor( private readonly publisher: IMessagePublisher, private readonly logger: Logger ) { this.initializeSteps(); } private initializeSteps(): void { this.steps = [ { name: 'reserve-inventory', execute: async (data) => { await this.publisher.publish( 'inventory.exchange', 'inventory.reserve', this.createEvent('inventory.reserve.requested', data) ); }, compensate: async (data) => { await this.publisher.publish( 'inventory.exchange', 'inventory.release', this.createEvent('inventory.release.requested', data) ); }, }, { name: 'process-payment', execute: async (data) => { await this.publisher.publish( 'payment.exchange', 'payment.process', this.createEvent('payment.process.requested', data) ); }, compensate: async (data) => { await this.publisher.publish( 'payment.exchange', 'payment.refund', this.createEvent('payment.refund.requested', data) ); }, }, { name: 'create-shipment', execute: async (data) => { await this.publisher.publish( 'shipping.exchange', 'shipment.create', this.createEvent('shipment.create.requested', data) ); }, compensate: async (data) => { await this.publisher.publish( 'shipping.exchange', 'shipment.cancel', this.createEvent('shipment.cancel.requested', data) ); }, }, ]; } async execute(data: OrderSagaData): Promise<void> { this.logger.info('Starting order saga', { orderId: data.orderId }); for (const step of this.steps) { try { this.logger.info(`Executing saga step: ${step.name}`, { orderId: data.orderId, }); await step.execute(data); this.executedSteps.push(step); } catch (error) { this.logger.error(`Saga step failed: ${step.name}`, { orderId: data.orderId, error: (error as Error).message, }); await this.compensate(data); throw error; } } this.logger.info('Order saga completed successfully', { orderId: data.orderId, }); } private async compensate(data: OrderSagaData): Promise<void> { this.logger.info('Starting saga compensation', { orderId: data.orderId, stepsToCompensate: this.executedSteps.length, }); // Compensate in reverse order for (const step of this.executedSteps.reverse()) { try { this.logger.info(`Compensating step: ${step.name}`, { orderId: data.orderId, }); await step.compensate(data); } catch (error) { this.logger.error(`Compensation failed for step: ${step.name}`, { orderId: data.orderId, error: (error as Error).message, }); // Continue compensating other steps } } } private createEvent(type: string, data: OrderSagaData): DomainEvent { return { eventId: `${type}-${data.orderId}-${Date.now()}`, eventType: type, aggregateId: data.orderId, aggregateType: 'Order', timestamp: new Date(), version: 1, payload: data, metadata: { correlationId: data.correlationId, }, }; }} interface OrderSagaData { orderId: string; correlationId: string; customerId: string; items: Array<{ productId: string; quantity: number }>; totalAmount: number;}2. Outbox Pattern
// src/infrastructure/outbox/OutboxRepository.tsexport interface OutboxMessage { id: string; aggregateType: string; aggregateId: string; eventType: string; payload: string; createdAt: Date; processedAt: Date | null; retryCount: number;} export class OutboxRepository { constructor(private readonly db: Database) {} async save(message: Omit<OutboxMessage, 'id' | 'createdAt' | 'processedAt' | 'retryCount'>): Promise<OutboxMessage> { const result = await this.db.query(` INSERT INTO outbox_messages (aggregate_type, aggregate_id, event_type, payload) VALUES ($1, $2, $3, $4) RETURNING * `, [message.aggregateType, message.aggregateId, message.eventType, message.payload]); return this.mapToEntity(result.rows[0]); } async getUnprocessed(limit: number = 100): Promise<OutboxMessage[]> { const result = await this.db.query(` SELECT * FROM outbox_messages WHERE processed_at IS NULL AND retry_count < 5 ORDER BY created_at ASC LIMIT $1 FOR UPDATE SKIP LOCKED `, [limit]); return result.rows.map(this.mapToEntity); } async markAsProcessed(id: string): Promise<void> { await this.db.query(` UPDATE outbox_messages SET processed_at = NOW() WHERE id = $1 `, [id]); } async incrementRetry(id: string): Promise<void> { await this.db.query(` UPDATE outbox_messages SET retry_count = retry_count + 1 WHERE id = $1 `, [id]); } private mapToEntity(row: any): OutboxMessage { return { id: row.id, aggregateType: row.aggregate_type, aggregateId: row.aggregate_id, eventType: row.event_type, payload: row.payload, createdAt: row.created_at, processedAt: row.processed_at, retryCount: row.retry_count, }; }} // src/infrastructure/outbox/OutboxProcessor.tsexport class OutboxProcessor { private isRunning = false; private intervalId: NodeJS.Timeout | null = null; constructor( private readonly repository: OutboxRepository, private readonly publisher: IMessagePublisher, private readonly logger: Logger, private readonly intervalMs: number = 1000 ) {} start(): void { if (this.isRunning) return; this.isRunning = true; this.intervalId = setInterval(() => this.processMessages(), this.intervalMs); this.logger.info('Outbox processor started'); } stop(): void { if (this.intervalId) { clearInterval(this.intervalId); this.intervalId = null; } this.isRunning = false; this.logger.info('Outbox processor stopped'); } private async processMessages(): Promise<void> { try { const messages = await this.repository.getUnprocessed(100); for (const message of messages) { try { const event = JSON.parse(message.payload); await this.publisher.publish( `${message.aggregateType.toLowerCase()}.exchange`, message.eventType, event ); await this.repository.markAsProcessed(message.id); this.logger.debug('Outbox message processed', { messageId: message.id, eventType: message.eventType, }); } catch (error) { await this.repository.incrementRetry(message.id); this.logger.error('Failed to process outbox message', { messageId: message.id, error: (error as Error).message, }); } } } catch (error) { this.logger.error('Outbox processor error', { error: (error as Error).message, }); } }}Observabilidade
Métricas Prometheus
// src/infrastructure/metrics/MessagingMetrics.tsimport { Counter, Histogram, Gauge } from 'prom-client'; export class MessagingMetrics { private readonly messagesPublished: Counter; private readonly messagesConsumed: Counter; private readonly messagesFailed: Counter; private readonly messageProcessingDuration: Histogram; private readonly queueDepth: Gauge; constructor() { this.messagesPublished = new Counter({ name: 'rabbitmq_messages_published_total', help: 'Total number of messages published', labelNames: ['exchange', 'routing_key', 'event_type'], }); this.messagesConsumed = new Counter({ name: 'rabbitmq_messages_consumed_total', help: 'Total number of messages consumed', labelNames: ['queue', 'event_type', 'status'], }); this.messagesFailed = new Counter({ name: 'rabbitmq_messages_failed_total', help: 'Total number of failed messages', labelNames: ['queue', 'event_type', 'error_type'], }); this.messageProcessingDuration = new Histogram({ name: 'rabbitmq_message_processing_duration_seconds', help: 'Message processing duration in seconds', labelNames: ['queue', 'event_type'], buckets: [0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10], }); this.queueDepth = new Gauge({ name: 'rabbitmq_queue_depth', help: 'Current number of messages in queue', labelNames: ['queue'], }); } recordPublish(exchange: string, routingKey: string, eventType: string): void { this.messagesPublished.labels(exchange, routingKey, eventType).inc(); } recordConsume(queue: string, eventType: string, status: 'success' | 'failed'): void { this.messagesConsumed.labels(queue, eventType, status).inc(); } recordFailure(queue: string, eventType: string, errorType: string): void { this.messagesFailed.labels(queue, eventType, errorType).inc(); } recordProcessingDuration(queue: string, eventType: string, durationMs: number): void { this.messageProcessingDuration.labels(queue, eventType).observe(durationMs / 1000); } setQueueDepth(queue: string, depth: number): void { this.queueDepth.labels(queue).set(depth); }}Health Check
// src/infrastructure/health/RabbitMQHealthCheck.tsexport class RabbitMQHealthCheck { constructor(private readonly connection: RabbitMQConnection) {} async check(): Promise<HealthCheckResult> { const startTime = Date.now(); try { if (!this.connection.isConnected()) { return { status: 'unhealthy', details: { connected: false, error: 'Not connected to RabbitMQ', }, durationMs: Date.now() - startTime, }; } // Try to declare a temporary queue to verify connectivity const channel = this.connection.getChannel(); await channel.checkQueue('health-check-queue'); return { status: 'healthy', details: { connected: true, }, durationMs: Date.now() - startTime, }; } catch (error) { return { status: 'unhealthy', details: { connected: false, error: (error as Error).message, }, durationMs: Date.now() - startTime, }; } }} interface HealthCheckResult { status: 'healthy' | 'unhealthy'; details: Record<string, unknown>; durationMs: number;}Testes
Testes de Integração
// tests/integration/messaging/RabbitMQPublisher.test.tsimport { RabbitMQConnection } from '../../../src/infrastructure/messaging/RabbitMQConnection';import { RabbitMQPublisher } from '../../../src/infrastructure/messaging/RabbitMQPublisher';import { OrderCreatedEvent } from '../../../src/domain/events/OrderCreatedEvent'; describe('RabbitMQPublisher', () => { let connection: RabbitMQConnection; let publisher: RabbitMQPublisher; beforeAll(async () => { connection = new RabbitMQConnection(testConfig, logger); await connection.connect(); await connection.setupExchange(exchanges.orders); publisher = new RabbitMQPublisher(connection, serializer, logger); }); afterAll(async () => { await connection.close(); }); it('should publish message to exchange', async () => { const event = new OrderCreatedEvent( { orderId: 'order-123', customerId: 'customer-456', items: [{ productId: 'prod-1', quantity: 2, price: 100 }], totalAmount: 200, currency: 'BRL', shippingAddress: { street: 'Rua Test', city: 'São Paulo', state: 'SP', zipCode: '01234-567', country: 'Brazil', }, }, { correlationId: 'corr-123' } ); await expect( publisher.publish('orders.exchange', 'order.created', event) ).resolves.not.toThrow(); }); it('should handle publish failure gracefully', async () => { await connection.close(); const event = new OrderCreatedEvent( { orderId: 'order-123', /* ... */ }, { correlationId: 'corr-123' } ); await expect( publisher.publish('orders.exchange', 'order.created', event) ).rejects.toThrow('Channel not available'); });});Checklist de Produção
Configuração
- Credenciais seguras (não hardcoded)
- Virtual hosts separados por ambiente
- Prefetch configurado adequadamente
- Heartbeat habilitado
- TLS configurado
Resiliência
- Dead Letter Queues configuradas
- Retry policies implementadas
- Circuit breaker para publicação
- Reconnect automático
- Graceful shutdown
Observabilidade
- Métricas Prometheus
- Logs estruturados
- Alertas para filas cheias
- Alertas para mensagens na DLQ
- Tracing distribuído
Performance
- Connection pooling
- Publisher confirms habilitado
- Consumer acknowledgment correto
- Batch processing quando apropriado
Conclusão
A comunicação assíncrona com RabbitMQ é fundamental para construir microserviços resilientes e escaláveis. Os pontos-chave são:
- Desacoplamento: Serviços não precisam estar disponíveis simultaneamente
- Resiliência: Dead Letter Queues e retry policies garantem que mensagens não sejam perdidas
- Escalabilidade: Consumers podem ser escalados independentemente
- Observabilidade: Métricas e logs permitem identificar problemas rapidamente
No próximo artigo, exploraremos como implementar observabilidade completa com OpenTelemetry, Prometheus e Grafana para seus microserviços: Observabilidade com OpenTelemetry.
Gostou do conteúdo? Sua contribuição ajuda a manter tudo online e gratuito!
0737160d-e98f-4a65-8392-5dba70e7ff3e