Pular para o conteúdoPedro Farbo

Mensageria com RabbitMQ: Comunicação Assíncrona entre Microserviços

Aprenda a implementar comunicação assíncrona entre microserviços usando RabbitMQ, incluindo padrões de mensageria, dead letter queues, retry policies e observabilidade.

Este conteúdo é gratuito! Ajude a manter o projeto no ar.

PIX:0737160d-e98f-4a65-8392-5dba70e7ff3e

No artigo anterior sobre API Gateway com Kong, vimos como gerenciar a comunicação síncrona entre clientes e microserviços. Agora, vamos explorar a comunicação assíncrona usando RabbitMQ, essencial para sistemas distribuídos resilientes.

Por que Comunicação Assíncrona?

Em arquiteturas de microserviços, a comunicação síncrona (HTTP/REST) cria acoplamento temporal entre serviços. Se o serviço destino estiver indisponível, a requisição falha.

Comunicação Síncrona (HTTP):
┌──────────┐     HTTP Request      ┌──────────┐
│ Service  │ ──────────────────►   │ Service  │
│    A     │ ◄──────────────────   │    B     │
└──────────┘     HTTP Response     └──────────┘
     ⚠️ Se B estiver down, A falha

Comunicação Assíncrona (Message Broker):
┌──────────┐    Publish     ┌──────────┐    Consume    ┌──────────┐
│ Service  │ ──────────►    │ RabbitMQ │ ──────────►   │ Service  │
│    A     │                │  Broker  │               │    B     │
└──────────┘                └──────────┘               └──────────┘
     ✅ Se B estiver down, mensagem fica na fila

Quando Usar Mensageria

CenárioRecomendação
Operações que podem ser processadas depois✅ Assíncrono
Notificações e emails✅ Assíncrono
Processamento de arquivos/imagens✅ Assíncrono
Sincronização de dados entre serviços✅ Assíncrono
Consulta que precisa de resposta imediata❌ Síncrono
Validação em tempo real❌ Síncrono

RabbitMQ: Conceitos Fundamentais

RabbitMQ é um message broker que implementa o protocolo AMQP (Advanced Message Queuing Protocol).

Arquitetura

┌─────────────────────────────────────────────────────────────────────┐
│                          RabbitMQ Broker                             │
│                                                                      │
│  Producer ──► Exchange ──► Binding ──► Queue ──► Consumer           │
│                  │                       │                           │
│            ┌─────┴─────┐           ┌─────┴─────┐                    │
│            │  Routing  │           │  Message  │                    │
│            │   Rules   │           │  Storage  │                    │
│            └───────────┘           └───────────┘                    │
└─────────────────────────────────────────────────────────────────────┘

Componentes Principais

ComponenteDescrição
ProducerAplicação que envia mensagens
ExchangeRecebe mensagens e roteia para filas
QueueArmazena mensagens até serem consumidas
BindingRegra que liga Exchange à Queue
ConsumerAplicação que recebe e processa mensagens
Virtual HostIsolamento lógico (multi-tenancy)

Tipos de Exchange

┌─────────────────────────────────────────────────────────────────────┐
│                        Exchange Types                                │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  DIRECT: Routing key exato                                          │
│  ┌──────────┐                                                       │
│  │ Exchange │──── routing_key="order.created" ────► Queue A         │
│  └──────────┘                                                       │
│                                                                      │
│  FANOUT: Broadcast para todas as filas                              │
│  ┌──────────┐────────────────────────────────────► Queue A          │
│  │ Exchange │────────────────────────────────────► Queue B          │
│  └──────────┘────────────────────────────────────► Queue C          │
│                                                                      │
│  TOPIC: Pattern matching com wildcards                              │
│  ┌──────────┐                                                       │
│  │ Exchange │──── "order.*" ─────────────────────► Queue A          │
│  └──────────┘──── "order.#" ─────────────────────► Queue B          │
│               (* = uma palavra, # = zero ou mais)                   │
│                                                                      │
│  HEADERS: Match por headers da mensagem                             │
│  ┌──────────┐                                                       │
│  │ Exchange │──── x-match: all, type: order ─────► Queue A          │
│  └──────────┘                                                       │
└─────────────────────────────────────────────────────────────────────┘

Estrutura do Projeto

messaging-service/
├── src/
│   ├── config/
│   │   ├── rabbitmq.config.ts
│   │   └── index.ts
│   │
│   ├── infrastructure/
│   │   ├── messaging/
│   │   │   ├── RabbitMQConnection.ts
│   │   │   ├── RabbitMQPublisher.ts
│   │   │   ├── RabbitMQConsumer.ts
│   │   │   ├── MessageSerializer.ts
│   │   │   └── index.ts
│   │   │
│   │   └── health/
│   │       └── RabbitMQHealthCheck.ts
│   │
│   ├── domain/
│   │   ├── events/
│   │   │   ├── DomainEvent.ts
│   │   │   ├── OrderCreatedEvent.ts
│   │   │   ├── OrderPaidEvent.ts
│   │   │   ├── UserRegisteredEvent.ts
│   │   │   └── index.ts
│   │   │
│   │   └── interfaces/
│   │       ├── IMessagePublisher.ts
│   │       ├── IMessageConsumer.ts
│   │       └── IMessageHandler.ts
│   │
│   ├── application/
│   │   ├── handlers/
│   │   │   ├── OrderCreatedHandler.ts
│   │   │   ├── OrderPaidHandler.ts
│   │   │   ├── UserRegisteredHandler.ts
│   │   │   └── index.ts
│   │   │
│   │   └── services/
│   │       └── EventDispatcher.ts
│   │
│   ├── presentation/
│   │   └── consumers/
│   │       ├── OrderConsumer.ts
│   │       ├── NotificationConsumer.ts
│   │       └── index.ts
│   │
│   └── shared/
│       ├── errors/
│       │   ├── MessageProcessingError.ts
│       │   └── RetryableError.ts
│       │
│       └── utils/
│           ├── retry.ts
│           └── logger.ts
│
├── docker/
│   ├── docker-compose.yml
│   └── rabbitmq/
│       ├── definitions.json
│       └── rabbitmq.conf
│
├── tests/
│   ├── unit/
│   │   └── handlers/
│   ├── integration/
│   │   └── messaging/
│   └── e2e/
│
├── package.json
├── tsconfig.json
└── README.md

Implementação

1. Configuração do RabbitMQ

typescript
// src/config/rabbitmq.config.tsexport interface RabbitMQConfig {  host: string;  port: number;  username: string;  password: string;  vhost: string;  heartbeat: number;  prefetch: number;  reconnectDelay: number;  maxReconnectAttempts: number;} export interface ExchangeConfig {  name: string;  type: 'direct' | 'fanout' | 'topic' | 'headers';  durable: boolean;  autoDelete: boolean;} export interface QueueConfig {  name: string;  durable: boolean;  exclusive: boolean;  autoDelete: boolean;  deadLetterExchange?: string;  deadLetterRoutingKey?: string;  messageTtl?: number;  maxLength?: number;  maxPriority?: number;} export const rabbitmqConfig: RabbitMQConfig = {  host: process.env.RABBITMQ_HOST || 'localhost',  port: parseInt(process.env.RABBITMQ_PORT || '5672'),  username: process.env.RABBITMQ_USER || 'guest',  password: process.env.RABBITMQ_PASS || 'guest',  vhost: process.env.RABBITMQ_VHOST || '/',  heartbeat: 60,  prefetch: 10,  reconnectDelay: 5000,  maxReconnectAttempts: 10,}; export const exchanges: Record<string, ExchangeConfig> = {  orders: {    name: 'orders.exchange',    type: 'topic',    durable: true,    autoDelete: false,  },  notifications: {    name: 'notifications.exchange',    type: 'fanout',    durable: true,    autoDelete: false,  },  deadLetter: {    name: 'dlx.exchange',    type: 'direct',    durable: true,    autoDelete: false,  },}; export const queues: Record<string, QueueConfig> = {  orderCreated: {    name: 'orders.created.queue',    durable: true,    exclusive: false,    autoDelete: false,    deadLetterExchange: 'dlx.exchange',    deadLetterRoutingKey: 'orders.created.dlq',    messageTtl: 86400000, // 24 hours  },  orderPaid: {    name: 'orders.paid.queue',    durable: true,    exclusive: false,    autoDelete: false,    deadLetterExchange: 'dlx.exchange',    deadLetterRoutingKey: 'orders.paid.dlq',  },  notifications: {    name: 'notifications.queue',    durable: true,    exclusive: false,    autoDelete: false,    maxLength: 10000,  },  deadLetter: {    name: 'dead-letter.queue',    durable: true,    exclusive: false,    autoDelete: false,  },};

2. Conexão com RabbitMQ

typescript
// src/infrastructure/messaging/RabbitMQConnection.tsimport amqp, { Connection, Channel, ConfirmChannel } from 'amqplib';import { EventEmitter } from 'events';import { RabbitMQConfig, ExchangeConfig, QueueConfig } from '../../config/rabbitmq.config';import { Logger } from '../../shared/utils/logger'; export class RabbitMQConnection extends EventEmitter {  private connection: Connection | null = null;  private channel: ConfirmChannel | null = null;  private reconnectAttempts = 0;  private isConnecting = false;   constructor(    private readonly config: RabbitMQConfig,    private readonly logger: Logger  ) {    super();  }   async connect(): Promise<void> {    if (this.isConnecting) return;    this.isConnecting = true;     try {      const url = this.buildConnectionUrl();      this.connection = await amqp.connect(url, {        heartbeat: this.config.heartbeat,      });       this.connection.on('error', (err) => {        this.logger.error('RabbitMQ connection error', { error: err.message });        this.emit('error', err);      });       this.connection.on('close', () => {        this.logger.warn('RabbitMQ connection closed');        this.emit('disconnected');        this.scheduleReconnect();      });       this.channel = await this.connection.createConfirmChannel();      await this.channel.prefetch(this.config.prefetch);       this.channel.on('error', (err) => {        this.logger.error('RabbitMQ channel error', { error: err.message });      });       this.channel.on('close', () => {        this.logger.warn('RabbitMQ channel closed');      });       this.reconnectAttempts = 0;      this.isConnecting = false;      this.emit('connected');      this.logger.info('Connected to RabbitMQ');    } catch (error) {      this.isConnecting = false;      this.logger.error('Failed to connect to RabbitMQ', { error });      this.scheduleReconnect();      throw error;    }  }   private buildConnectionUrl(): string {    const { username, password, host, port, vhost } = this.config;    return `amqp://${username}:${password}@${host}:${port}${vhost}`;  }   private scheduleReconnect(): void {    if (this.reconnectAttempts >= this.config.maxReconnectAttempts) {      this.logger.error('Max reconnect attempts reached');      this.emit('maxReconnectAttempts');      return;    }     this.reconnectAttempts++;    const delay = this.config.reconnectDelay * this.reconnectAttempts;     this.logger.info(`Scheduling reconnect attempt ${this.reconnectAttempts}`, {      delayMs: delay,    });     setTimeout(() => this.connect(), delay);  }   async setupExchange(config: ExchangeConfig): Promise<void> {    if (!this.channel) throw new Error('Channel not initialized');     await this.channel.assertExchange(config.name, config.type, {      durable: config.durable,      autoDelete: config.autoDelete,    });     this.logger.info(`Exchange created: ${config.name}`);  }   async setupQueue(config: QueueConfig): Promise<void> {    if (!this.channel) throw new Error('Channel not initialized');     const options: amqp.Options.AssertQueue = {      durable: config.durable,      exclusive: config.exclusive,      autoDelete: config.autoDelete,      arguments: {},    };     if (config.deadLetterExchange) {      options.arguments!['x-dead-letter-exchange'] = config.deadLetterExchange;    }    if (config.deadLetterRoutingKey) {      options.arguments!['x-dead-letter-routing-key'] = config.deadLetterRoutingKey;    }    if (config.messageTtl) {      options.arguments!['x-message-ttl'] = config.messageTtl;    }    if (config.maxLength) {      options.arguments!['x-max-length'] = config.maxLength;    }    if (config.maxPriority) {      options.arguments!['x-max-priority'] = config.maxPriority;    }     await this.channel.assertQueue(config.name, options);    this.logger.info(`Queue created: ${config.name}`);  }   async bindQueue(    queue: string,    exchange: string,    routingKey: string  ): Promise<void> {    if (!this.channel) throw new Error('Channel not initialized');     await this.channel.bindQueue(queue, exchange, routingKey);    this.logger.info(`Queue ${queue} bound to ${exchange} with key ${routingKey}`);  }   getChannel(): ConfirmChannel {    if (!this.channel) throw new Error('Channel not initialized');    return this.channel;  }   async close(): Promise<void> {    try {      if (this.channel) await this.channel.close();      if (this.connection) await this.connection.close();      this.logger.info('RabbitMQ connection closed gracefully');    } catch (error) {      this.logger.error('Error closing RabbitMQ connection', { error });    }  }   isConnected(): boolean {    return this.connection !== null && this.channel !== null;  }}

3. Domain Events

typescript
// src/domain/events/DomainEvent.tsexport interface DomainEvent<T = unknown> {  eventId: string;  eventType: string;  aggregateId: string;  aggregateType: string;  timestamp: Date;  version: number;  payload: T;  metadata: EventMetadata;} export interface EventMetadata {  correlationId: string;  causationId?: string;  userId?: string;  traceId?: string;  spanId?: string;} export abstract class BaseDomainEvent<T> implements DomainEvent<T> {  public readonly eventId: string;  public readonly timestamp: Date;  public readonly version: number = 1;   constructor(    public readonly eventType: string,    public readonly aggregateId: string,    public readonly aggregateType: string,    public readonly payload: T,    public readonly metadata: EventMetadata  ) {    this.eventId = this.generateEventId();    this.timestamp = new Date();  }   private generateEventId(): string {    return `${this.aggregateType}-${this.aggregateId}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;  }} // src/domain/events/OrderCreatedEvent.tsexport interface OrderCreatedPayload {  orderId: string;  customerId: string;  items: Array<{    productId: string;    quantity: number;    price: number;  }>;  totalAmount: number;  currency: string;  shippingAddress: {    street: string;    city: string;    state: string;    zipCode: string;    country: string;  };} export class OrderCreatedEvent extends BaseDomainEvent<OrderCreatedPayload> {  static readonly EVENT_TYPE = 'order.created';   constructor(payload: OrderCreatedPayload, metadata: EventMetadata) {    super(      OrderCreatedEvent.EVENT_TYPE,      payload.orderId,      'Order',      payload,      metadata    );  }} // src/domain/events/OrderPaidEvent.tsexport interface OrderPaidPayload {  orderId: string;  paymentId: string;  amount: number;  currency: string;  paymentMethod: string;  paidAt: Date;} export class OrderPaidEvent extends BaseDomainEvent<OrderPaidPayload> {  static readonly EVENT_TYPE = 'order.paid';   constructor(payload: OrderPaidPayload, metadata: EventMetadata) {    super(      OrderPaidEvent.EVENT_TYPE,      payload.orderId,      'Order',      payload,      metadata    );  }}

4. Publisher

typescript
// src/infrastructure/messaging/RabbitMQPublisher.tsimport { ConfirmChannel } from 'amqplib';import { DomainEvent } from '../../domain/events/DomainEvent';import { IMessagePublisher, PublishOptions } from '../../domain/interfaces/IMessagePublisher';import { MessageSerializer } from './MessageSerializer';import { Logger } from '../../shared/utils/logger';import { RabbitMQConnection } from './RabbitMQConnection'; export class RabbitMQPublisher implements IMessagePublisher {  private channel: ConfirmChannel | null = null;   constructor(    private readonly connection: RabbitMQConnection,    private readonly serializer: MessageSerializer,    private readonly logger: Logger  ) {    this.connection.on('connected', () => {      this.channel = this.connection.getChannel();    });  }   async publish<T>(    exchange: string,    routingKey: string,    event: DomainEvent<T>,    options: PublishOptions = {}  ): Promise<void> {    if (!this.channel) {      throw new Error('Channel not available');    }     const message = this.serializer.serialize(event);    const publishOptions = this.buildPublishOptions(event, options);     return new Promise((resolve, reject) => {      this.channel!.publish(        exchange,        routingKey,        message,        publishOptions,        (err) => {          if (err) {            this.logger.error('Failed to publish message', {              exchange,              routingKey,              eventId: event.eventId,              error: err.message,            });            reject(err);          } else {            this.logger.info('Message published', {              exchange,              routingKey,              eventId: event.eventId,              eventType: event.eventType,            });            resolve();          }        }      );    });  }   async publishBatch<T>(    exchange: string,    events: Array<{ routingKey: string; event: DomainEvent<T> }>  ): Promise<void> {    const publishPromises = events.map(({ routingKey, event }) =>      this.publish(exchange, routingKey, event)    );     await Promise.all(publishPromises);  }   private buildPublishOptions(    event: DomainEvent,    options: PublishOptions  ): amqp.Options.Publish {    return {      persistent: options.persistent ?? true,      messageId: event.eventId,      timestamp: event.timestamp.getTime(),      type: event.eventType,      contentType: 'application/json',      contentEncoding: 'utf-8',      correlationId: event.metadata.correlationId,      headers: {        'x-event-type': event.eventType,        'x-aggregate-type': event.aggregateType,        'x-aggregate-id': event.aggregateId,        'x-trace-id': event.metadata.traceId,        ...options.headers,      },      priority: options.priority,      expiration: options.expiration?.toString(),    };  }} // src/domain/interfaces/IMessagePublisher.tsexport interface PublishOptions {  persistent?: boolean;  priority?: number;  expiration?: number;  headers?: Record<string, string>;} export interface IMessagePublisher {  publish<T>(    exchange: string,    routingKey: string,    event: DomainEvent<T>,    options?: PublishOptions  ): Promise<void>;   publishBatch<T>(    exchange: string,    events: Array<{ routingKey: string; event: DomainEvent<T> }>  ): Promise<void>;}

5. Consumer com Retry e Dead Letter

typescript
// src/infrastructure/messaging/RabbitMQConsumer.tsimport { ConsumeMessage, ConfirmChannel } from 'amqplib';import { DomainEvent } from '../../domain/events/DomainEvent';import { IMessageHandler } from '../../domain/interfaces/IMessageHandler';import { MessageSerializer } from './MessageSerializer';import { Logger } from '../../shared/utils/logger';import { RabbitMQConnection } from './RabbitMQConnection';import { RetryableError } from '../../shared/errors/RetryableError'; export interface ConsumerOptions {  queue: string;  prefetch?: number;  noAck?: boolean;  maxRetries?: number;  retryDelay?: number;} export class RabbitMQConsumer {  private channel: ConfirmChannel | null = null;  private handlers: Map<string, IMessageHandler> = new Map();   constructor(    private readonly connection: RabbitMQConnection,    private readonly serializer: MessageSerializer,    private readonly logger: Logger,    private readonly options: ConsumerOptions  ) {    this.connection.on('connected', () => {      this.channel = this.connection.getChannel();    });  }   registerHandler(eventType: string, handler: IMessageHandler): void {    this.handlers.set(eventType, handler);    this.logger.info(`Handler registered for event type: ${eventType}`);  }   async start(): Promise<void> {    if (!this.channel) {      throw new Error('Channel not available');    }     if (this.options.prefetch) {      await this.channel.prefetch(this.options.prefetch);    }     await this.channel.consume(      this.options.queue,      (msg) => this.handleMessage(msg),      { noAck: this.options.noAck ?? false }    );     this.logger.info(`Consumer started for queue: ${this.options.queue}`);  }   private async handleMessage(msg: ConsumeMessage | null): Promise<void> {    if (!msg || !this.channel) return;     const startTime = Date.now();    const eventType = msg.properties.type;    const messageId = msg.properties.messageId;     try {      const event = this.serializer.deserialize<DomainEvent>(msg.content);      const handler = this.handlers.get(eventType);       if (!handler) {        this.logger.warn(`No handler found for event type: ${eventType}`);        this.channel.ack(msg);        return;      }       this.logger.info('Processing message', {        messageId,        eventType,        queue: this.options.queue,      });       await handler.handle(event);       this.channel.ack(msg);       this.logger.info('Message processed successfully', {        messageId,        eventType,        durationMs: Date.now() - startTime,      });    } catch (error) {      await this.handleError(msg, error as Error);    }  }   private async handleError(msg: ConsumeMessage, error: Error): Promise<void> {    const retryCount = this.getRetryCount(msg);    const maxRetries = this.options.maxRetries ?? 3;    const messageId = msg.properties.messageId;     this.logger.error('Error processing message', {      messageId,      error: error.message,      retryCount,      maxRetries,    });     if (error instanceof RetryableError && retryCount < maxRetries) {      await this.scheduleRetry(msg, retryCount);    } else {      // Send to dead letter queue      this.channel!.nack(msg, false, false);      this.logger.warn('Message sent to dead letter queue', {        messageId,        reason: retryCount >= maxRetries ? 'max retries exceeded' : 'non-retryable error',      });    }  }   private getRetryCount(msg: ConsumeMessage): number {    const deaths = msg.properties.headers?.['x-death'];    if (!deaths || !Array.isArray(deaths)) return 0;    return deaths.reduce((count, death) => count + (death.count || 0), 0);  }   private async scheduleRetry(    msg: ConsumeMessage,    retryCount: number  ): Promise<void> {    const delay = this.calculateRetryDelay(retryCount);     // Requeue with delay using dead letter exchange    this.channel!.nack(msg, false, false);     this.logger.info('Message scheduled for retry', {      messageId: msg.properties.messageId,      retryCount: retryCount + 1,      delayMs: delay,    });  }   private calculateRetryDelay(retryCount: number): number {    const baseDelay = this.options.retryDelay ?? 1000;    // Exponential backoff: 1s, 2s, 4s, 8s...    return baseDelay * Math.pow(2, retryCount);  }   async stop(): Promise<void> {    if (this.channel) {      await this.channel.cancel(this.options.queue);      this.logger.info(`Consumer stopped for queue: ${this.options.queue}`);    }  }}

6. Message Handlers

typescript
// src/application/handlers/OrderCreatedHandler.tsimport { IMessageHandler } from '../../domain/interfaces/IMessageHandler';import { OrderCreatedEvent, OrderCreatedPayload } from '../../domain/events/OrderCreatedEvent';import { Logger } from '../../shared/utils/logger';import { NotificationService } from '../services/NotificationService';import { InventoryService } from '../services/InventoryService'; export class OrderCreatedHandler implements IMessageHandler<OrderCreatedPayload> {  constructor(    private readonly notificationService: NotificationService,    private readonly inventoryService: InventoryService,    private readonly logger: Logger  ) {}   async handle(event: OrderCreatedEvent): Promise<void> {    const { payload } = event;     this.logger.info('Handling order created event', {      orderId: payload.orderId,      customerId: payload.customerId,      totalAmount: payload.totalAmount,    });     // Reserve inventory    await this.inventoryService.reserveItems(      payload.orderId,      payload.items.map((item) => ({        productId: item.productId,        quantity: item.quantity,      }))    );     // Send confirmation notification    await this.notificationService.sendOrderConfirmation({      customerId: payload.customerId,      orderId: payload.orderId,      totalAmount: payload.totalAmount,      currency: payload.currency,    });     this.logger.info('Order created event handled successfully', {      orderId: payload.orderId,    });  }} // src/application/handlers/OrderPaidHandler.tsimport { IMessageHandler } from '../../domain/interfaces/IMessageHandler';import { OrderPaidEvent, OrderPaidPayload } from '../../domain/events/OrderPaidEvent';import { Logger } from '../../shared/utils/logger';import { ShippingService } from '../services/ShippingService';import { InvoiceService } from '../services/InvoiceService'; export class OrderPaidHandler implements IMessageHandler<OrderPaidPayload> {  constructor(    private readonly shippingService: ShippingService,    private readonly invoiceService: InvoiceService,    private readonly logger: Logger  ) {}   async handle(event: OrderPaidEvent): Promise<void> {    const { payload } = event;     this.logger.info('Handling order paid event', {      orderId: payload.orderId,      paymentId: payload.paymentId,      amount: payload.amount,    });     // Generate invoice    const invoice = await this.invoiceService.generate({      orderId: payload.orderId,      paymentId: payload.paymentId,      amount: payload.amount,      currency: payload.currency,      paidAt: payload.paidAt,    });     // Schedule shipping    await this.shippingService.scheduleShipment({      orderId: payload.orderId,      invoiceId: invoice.id,    });     this.logger.info('Order paid event handled successfully', {      orderId: payload.orderId,      invoiceId: invoice.id,    });  }}

7. Docker Compose

yaml
# docker/docker-compose.ymlversion: '3.8' services:  rabbitmq:    image: rabbitmq:3.12-management-alpine    container_name: rabbitmq    hostname: rabbitmq    environment:      RABBITMQ_DEFAULT_USER: admin      RABBITMQ_DEFAULT_PASS: admin123      RABBITMQ_DEFAULT_VHOST: /    ports:      - "5672:5672"   # AMQP      - "15672:15672" # Management UI      - "15692:15692" # Prometheus metrics    volumes:      - rabbitmq_data:/var/lib/rabbitmq      - ./rabbitmq/definitions.json:/etc/rabbitmq/definitions.json      - ./rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf    healthcheck:      test: ["CMD", "rabbitmq-diagnostics", "check_running"]      interval: 10s      timeout: 5s      retries: 5    networks:      - messaging-net   order-service:    build:      context: ../      dockerfile: Dockerfile    container_name: order-service    environment:      NODE_ENV: development      RABBITMQ_HOST: rabbitmq      RABBITMQ_PORT: 5672      RABBITMQ_USER: admin      RABBITMQ_PASS: admin123    depends_on:      rabbitmq:        condition: service_healthy    networks:      - messaging-net   notification-service:    build:      context: ../notification-service      dockerfile: Dockerfile    container_name: notification-service    environment:      NODE_ENV: development      RABBITMQ_HOST: rabbitmq      RABBITMQ_PORT: 5672      RABBITMQ_USER: admin      RABBITMQ_PASS: admin123    depends_on:      rabbitmq:        condition: service_healthy    networks:      - messaging-net volumes:  rabbitmq_data: networks:  messaging-net:    driver: bridge

8. Definições do RabbitMQ

json
{  "rabbit_version": "3.12.0",  "users": [    {      "name": "admin",      "password_hash": "hashed_password",      "hashing_algorithm": "rabbit_password_hashing_sha256",      "tags": ["administrator"]    }  ],  "vhosts": [    {"name": "/"}  ],  "permissions": [    {      "user": "admin",      "vhost": "/",      "configure": ".*",      "write": ".*",      "read": ".*"    }  ],  "exchanges": [    {      "name": "orders.exchange",      "vhost": "/",      "type": "topic",      "durable": true,      "auto_delete": false    },    {      "name": "notifications.exchange",      "vhost": "/",      "type": "fanout",      "durable": true,      "auto_delete": false    },    {      "name": "dlx.exchange",      "vhost": "/",      "type": "direct",      "durable": true,      "auto_delete": false    }  ],  "queues": [    {      "name": "orders.created.queue",      "vhost": "/",      "durable": true,      "auto_delete": false,      "arguments": {        "x-dead-letter-exchange": "dlx.exchange",        "x-dead-letter-routing-key": "orders.created.dlq"      }    },    {      "name": "orders.paid.queue",      "vhost": "/",      "durable": true,      "auto_delete": false,      "arguments": {        "x-dead-letter-exchange": "dlx.exchange",        "x-dead-letter-routing-key": "orders.paid.dlq"      }    },    {      "name": "notifications.queue",      "vhost": "/",      "durable": true,      "auto_delete": false    },    {      "name": "dead-letter.queue",      "vhost": "/",      "durable": true,      "auto_delete": false    }  ],  "bindings": [    {      "source": "orders.exchange",      "vhost": "/",      "destination": "orders.created.queue",      "destination_type": "queue",      "routing_key": "order.created"    },    {      "source": "orders.exchange",      "vhost": "/",      "destination": "orders.paid.queue",      "destination_type": "queue",      "routing_key": "order.paid"    },    {      "source": "notifications.exchange",      "vhost": "/",      "destination": "notifications.queue",      "destination_type": "queue",      "routing_key": ""    },    {      "source": "dlx.exchange",      "vhost": "/",      "destination": "dead-letter.queue",      "destination_type": "queue",      "routing_key": "#"    }  ]}

Padrões de Mensageria

1. Saga Pattern

typescript
// src/application/sagas/OrderSaga.tsimport { DomainEvent } from '../../domain/events/DomainEvent';import { IMessagePublisher } from '../../domain/interfaces/IMessagePublisher';import { Logger } from '../../shared/utils/logger'; interface SagaStep<T> {  name: string;  execute: (data: T) => Promise<void>;  compensate: (data: T) => Promise<void>;} export class OrderSaga {  private steps: SagaStep<OrderSagaData>[] = [];  private executedSteps: SagaStep<OrderSagaData>[] = [];   constructor(    private readonly publisher: IMessagePublisher,    private readonly logger: Logger  ) {    this.initializeSteps();  }   private initializeSteps(): void {    this.steps = [      {        name: 'reserve-inventory',        execute: async (data) => {          await this.publisher.publish(            'inventory.exchange',            'inventory.reserve',            this.createEvent('inventory.reserve.requested', data)          );        },        compensate: async (data) => {          await this.publisher.publish(            'inventory.exchange',            'inventory.release',            this.createEvent('inventory.release.requested', data)          );        },      },      {        name: 'process-payment',        execute: async (data) => {          await this.publisher.publish(            'payment.exchange',            'payment.process',            this.createEvent('payment.process.requested', data)          );        },        compensate: async (data) => {          await this.publisher.publish(            'payment.exchange',            'payment.refund',            this.createEvent('payment.refund.requested', data)          );        },      },      {        name: 'create-shipment',        execute: async (data) => {          await this.publisher.publish(            'shipping.exchange',            'shipment.create',            this.createEvent('shipment.create.requested', data)          );        },        compensate: async (data) => {          await this.publisher.publish(            'shipping.exchange',            'shipment.cancel',            this.createEvent('shipment.cancel.requested', data)          );        },      },    ];  }   async execute(data: OrderSagaData): Promise<void> {    this.logger.info('Starting order saga', { orderId: data.orderId });     for (const step of this.steps) {      try {        this.logger.info(`Executing saga step: ${step.name}`, {          orderId: data.orderId,        });         await step.execute(data);        this.executedSteps.push(step);      } catch (error) {        this.logger.error(`Saga step failed: ${step.name}`, {          orderId: data.orderId,          error: (error as Error).message,        });         await this.compensate(data);        throw error;      }    }     this.logger.info('Order saga completed successfully', {      orderId: data.orderId,    });  }   private async compensate(data: OrderSagaData): Promise<void> {    this.logger.info('Starting saga compensation', {      orderId: data.orderId,      stepsToCompensate: this.executedSteps.length,    });     // Compensate in reverse order    for (const step of this.executedSteps.reverse()) {      try {        this.logger.info(`Compensating step: ${step.name}`, {          orderId: data.orderId,        });        await step.compensate(data);      } catch (error) {        this.logger.error(`Compensation failed for step: ${step.name}`, {          orderId: data.orderId,          error: (error as Error).message,        });        // Continue compensating other steps      }    }  }   private createEvent(type: string, data: OrderSagaData): DomainEvent {    return {      eventId: `${type}-${data.orderId}-${Date.now()}`,      eventType: type,      aggregateId: data.orderId,      aggregateType: 'Order',      timestamp: new Date(),      version: 1,      payload: data,      metadata: {        correlationId: data.correlationId,      },    };  }} interface OrderSagaData {  orderId: string;  correlationId: string;  customerId: string;  items: Array<{ productId: string; quantity: number }>;  totalAmount: number;}

2. Outbox Pattern

typescript
// src/infrastructure/outbox/OutboxRepository.tsexport interface OutboxMessage {  id: string;  aggregateType: string;  aggregateId: string;  eventType: string;  payload: string;  createdAt: Date;  processedAt: Date | null;  retryCount: number;} export class OutboxRepository {  constructor(private readonly db: Database) {}   async save(message: Omit<OutboxMessage, 'id' | 'createdAt' | 'processedAt' | 'retryCount'>): Promise<OutboxMessage> {    const result = await this.db.query(`      INSERT INTO outbox_messages (aggregate_type, aggregate_id, event_type, payload)      VALUES ($1, $2, $3, $4)      RETURNING *    `, [message.aggregateType, message.aggregateId, message.eventType, message.payload]);     return this.mapToEntity(result.rows[0]);  }   async getUnprocessed(limit: number = 100): Promise<OutboxMessage[]> {    const result = await this.db.query(`      SELECT * FROM outbox_messages      WHERE processed_at IS NULL      AND retry_count < 5      ORDER BY created_at ASC      LIMIT $1      FOR UPDATE SKIP LOCKED    `, [limit]);     return result.rows.map(this.mapToEntity);  }   async markAsProcessed(id: string): Promise<void> {    await this.db.query(`      UPDATE outbox_messages      SET processed_at = NOW()      WHERE id = $1    `, [id]);  }   async incrementRetry(id: string): Promise<void> {    await this.db.query(`      UPDATE outbox_messages      SET retry_count = retry_count + 1      WHERE id = $1    `, [id]);  }   private mapToEntity(row: any): OutboxMessage {    return {      id: row.id,      aggregateType: row.aggregate_type,      aggregateId: row.aggregate_id,      eventType: row.event_type,      payload: row.payload,      createdAt: row.created_at,      processedAt: row.processed_at,      retryCount: row.retry_count,    };  }} // src/infrastructure/outbox/OutboxProcessor.tsexport class OutboxProcessor {  private isRunning = false;  private intervalId: NodeJS.Timeout | null = null;   constructor(    private readonly repository: OutboxRepository,    private readonly publisher: IMessagePublisher,    private readonly logger: Logger,    private readonly intervalMs: number = 1000  ) {}   start(): void {    if (this.isRunning) return;     this.isRunning = true;    this.intervalId = setInterval(() => this.processMessages(), this.intervalMs);    this.logger.info('Outbox processor started');  }   stop(): void {    if (this.intervalId) {      clearInterval(this.intervalId);      this.intervalId = null;    }    this.isRunning = false;    this.logger.info('Outbox processor stopped');  }   private async processMessages(): Promise<void> {    try {      const messages = await this.repository.getUnprocessed(100);       for (const message of messages) {        try {          const event = JSON.parse(message.payload);           await this.publisher.publish(            `${message.aggregateType.toLowerCase()}.exchange`,            message.eventType,            event          );           await this.repository.markAsProcessed(message.id);           this.logger.debug('Outbox message processed', {            messageId: message.id,            eventType: message.eventType,          });        } catch (error) {          await this.repository.incrementRetry(message.id);           this.logger.error('Failed to process outbox message', {            messageId: message.id,            error: (error as Error).message,          });        }      }    } catch (error) {      this.logger.error('Outbox processor error', {        error: (error as Error).message,      });    }  }}

Observabilidade

Métricas Prometheus

typescript
// src/infrastructure/metrics/MessagingMetrics.tsimport { Counter, Histogram, Gauge } from 'prom-client'; export class MessagingMetrics {  private readonly messagesPublished: Counter;  private readonly messagesConsumed: Counter;  private readonly messagesFailed: Counter;  private readonly messageProcessingDuration: Histogram;  private readonly queueDepth: Gauge;   constructor() {    this.messagesPublished = new Counter({      name: 'rabbitmq_messages_published_total',      help: 'Total number of messages published',      labelNames: ['exchange', 'routing_key', 'event_type'],    });     this.messagesConsumed = new Counter({      name: 'rabbitmq_messages_consumed_total',      help: 'Total number of messages consumed',      labelNames: ['queue', 'event_type', 'status'],    });     this.messagesFailed = new Counter({      name: 'rabbitmq_messages_failed_total',      help: 'Total number of failed messages',      labelNames: ['queue', 'event_type', 'error_type'],    });     this.messageProcessingDuration = new Histogram({      name: 'rabbitmq_message_processing_duration_seconds',      help: 'Message processing duration in seconds',      labelNames: ['queue', 'event_type'],      buckets: [0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10],    });     this.queueDepth = new Gauge({      name: 'rabbitmq_queue_depth',      help: 'Current number of messages in queue',      labelNames: ['queue'],    });  }   recordPublish(exchange: string, routingKey: string, eventType: string): void {    this.messagesPublished.labels(exchange, routingKey, eventType).inc();  }   recordConsume(queue: string, eventType: string, status: 'success' | 'failed'): void {    this.messagesConsumed.labels(queue, eventType, status).inc();  }   recordFailure(queue: string, eventType: string, errorType: string): void {    this.messagesFailed.labels(queue, eventType, errorType).inc();  }   recordProcessingDuration(queue: string, eventType: string, durationMs: number): void {    this.messageProcessingDuration.labels(queue, eventType).observe(durationMs / 1000);  }   setQueueDepth(queue: string, depth: number): void {    this.queueDepth.labels(queue).set(depth);  }}

Health Check

typescript
// src/infrastructure/health/RabbitMQHealthCheck.tsexport class RabbitMQHealthCheck {  constructor(private readonly connection: RabbitMQConnection) {}   async check(): Promise<HealthCheckResult> {    const startTime = Date.now();     try {      if (!this.connection.isConnected()) {        return {          status: 'unhealthy',          details: {            connected: false,            error: 'Not connected to RabbitMQ',          },          durationMs: Date.now() - startTime,        };      }       // Try to declare a temporary queue to verify connectivity      const channel = this.connection.getChannel();      await channel.checkQueue('health-check-queue');       return {        status: 'healthy',        details: {          connected: true,        },        durationMs: Date.now() - startTime,      };    } catch (error) {      return {        status: 'unhealthy',        details: {          connected: false,          error: (error as Error).message,        },        durationMs: Date.now() - startTime,      };    }  }} interface HealthCheckResult {  status: 'healthy' | 'unhealthy';  details: Record<string, unknown>;  durationMs: number;}

Testes

Testes de Integração

typescript
// tests/integration/messaging/RabbitMQPublisher.test.tsimport { RabbitMQConnection } from '../../../src/infrastructure/messaging/RabbitMQConnection';import { RabbitMQPublisher } from '../../../src/infrastructure/messaging/RabbitMQPublisher';import { OrderCreatedEvent } from '../../../src/domain/events/OrderCreatedEvent'; describe('RabbitMQPublisher', () => {  let connection: RabbitMQConnection;  let publisher: RabbitMQPublisher;   beforeAll(async () => {    connection = new RabbitMQConnection(testConfig, logger);    await connection.connect();    await connection.setupExchange(exchanges.orders);    publisher = new RabbitMQPublisher(connection, serializer, logger);  });   afterAll(async () => {    await connection.close();  });   it('should publish message to exchange', async () => {    const event = new OrderCreatedEvent(      {        orderId: 'order-123',        customerId: 'customer-456',        items: [{ productId: 'prod-1', quantity: 2, price: 100 }],        totalAmount: 200,        currency: 'BRL',        shippingAddress: {          street: 'Rua Test',          city: 'São Paulo',          state: 'SP',          zipCode: '01234-567',          country: 'Brazil',        },      },      { correlationId: 'corr-123' }    );     await expect(      publisher.publish('orders.exchange', 'order.created', event)    ).resolves.not.toThrow();  });   it('should handle publish failure gracefully', async () => {    await connection.close();     const event = new OrderCreatedEvent(      { orderId: 'order-123', /* ... */ },      { correlationId: 'corr-123' }    );     await expect(      publisher.publish('orders.exchange', 'order.created', event)    ).rejects.toThrow('Channel not available');  });});

Checklist de Produção

Configuração

  • Credenciais seguras (não hardcoded)
  • Virtual hosts separados por ambiente
  • Prefetch configurado adequadamente
  • Heartbeat habilitado
  • TLS configurado

Resiliência

  • Dead Letter Queues configuradas
  • Retry policies implementadas
  • Circuit breaker para publicação
  • Reconnect automático
  • Graceful shutdown

Observabilidade

  • Métricas Prometheus
  • Logs estruturados
  • Alertas para filas cheias
  • Alertas para mensagens na DLQ
  • Tracing distribuído

Performance

  • Connection pooling
  • Publisher confirms habilitado
  • Consumer acknowledgment correto
  • Batch processing quando apropriado

Conclusão

A comunicação assíncrona com RabbitMQ é fundamental para construir microserviços resilientes e escaláveis. Os pontos-chave são:

  1. Desacoplamento: Serviços não precisam estar disponíveis simultaneamente
  2. Resiliência: Dead Letter Queues e retry policies garantem que mensagens não sejam perdidas
  3. Escalabilidade: Consumers podem ser escalados independentemente
  4. Observabilidade: Métricas e logs permitem identificar problemas rapidamente

No próximo artigo, exploraremos como implementar observabilidade completa com OpenTelemetry, Prometheus e Grafana para seus microserviços: Observabilidade com OpenTelemetry.

PF
Sobre o autor

Pedro Farbo

Platform Engineering Lead & Solutions Architect com +10 anos de experiência. CEO da Farbo TSC. Especialista em Microserviços, Kong, Backstage e Cloud.

Gostou do conteúdo? Sua contribuição ajuda a manter tudo online e gratuito!

PIX:0737160d-e98f-4a65-8392-5dba70e7ff3e