Skip to contentPedro Farbo

Messaging with RabbitMQ: Asynchronous Communication between Microservices

Learn how to implement asynchronous communication between microservices using RabbitMQ, including messaging patterns, dead letter queues, retry policies, and observability.

This content is free! Help keep the project running.

PIX:0737160d-e98f-4a65-8392-5dba70e7ff3e

In the previous article about API Gateway with Kong, we saw how to manage synchronous communication between clients and microservices. Now, let's explore asynchronous communication using RabbitMQ, essential for resilient distributed systems.

Why Asynchronous Communication?

In microservices architectures, synchronous communication (HTTP/REST) creates temporal coupling between services. If the target service is unavailable, the request fails.

Synchronous Communication (HTTP):
┌──────────┐     HTTP Request      ┌──────────┐
│ Service  │ ──────────────────►   │ Service  │
│    A     │ ◄──────────────────   │    B     │
└──────────┘     HTTP Response     └──────────┘
     ⚠️ If B is down, A fails

Asynchronous Communication (Message Broker):
┌──────────┐    Publish     ┌──────────┐    Consume    ┌──────────┐
│ Service  │ ──────────►    │ RabbitMQ │ ──────────►   │ Service  │
│    A     │                │  Broker  │               │    B     │
└──────────┘                └──────────┘               └──────────┘
     ✅ If B is down, message stays in queue

When to Use Messaging

ScenarioRecommendation
Operations that can be processed later✅ Asynchronous
Notifications and emails✅ Asynchronous
File/image processing✅ Asynchronous
Data synchronization between services✅ Asynchronous
Query that needs immediate response❌ Synchronous
Real-time validation❌ Synchronous

RabbitMQ: Fundamental Concepts

RabbitMQ is a message broker that implements the AMQP (Advanced Message Queuing Protocol) protocol.

Architecture

┌─────────────────────────────────────────────────────────────────────┐
│                          RabbitMQ Broker                             │
│                                                                      │
│  Producer ──► Exchange ──► Binding ──► Queue ──► Consumer           │
│                  │                       │                           │
│            ┌─────┴─────┐           ┌─────┴─────┐                    │
│            │  Routing  │           │  Message  │                    │
│            │   Rules   │           │  Storage  │                    │
│            └───────────┘           └───────────┘                    │
└─────────────────────────────────────────────────────────────────────┘

Main Components

ComponentDescription
ProducerApplication that sends messages
ExchangeReceives messages and routes to queues
QueueStores messages until consumed
BindingRule that connects Exchange to Queue
ConsumerApplication that receives and processes messages
Virtual HostLogical isolation (multi-tenancy)

Exchange Types

┌─────────────────────────────────────────────────────────────────────┐
│                        Exchange Types                                │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  DIRECT: Exact routing key                                          │
│  ┌──────────┐                                                       │
│  │ Exchange │──── routing_key="order.created" ────► Queue A         │
│  └──────────┘                                                       │
│                                                                      │
│  FANOUT: Broadcast to all queues                                    │
│  ┌──────────┐────────────────────────────────────► Queue A          │
│  │ Exchange │────────────────────────────────────► Queue B          │
│  └──────────┘────────────────────────────────────► Queue C          │
│                                                                      │
│  TOPIC: Pattern matching with wildcards                             │
│  ┌──────────┐                                                       │
│  │ Exchange │──── "order.*" ─────────────────────► Queue A          │
│  └──────────┘──── "order.#" ─────────────────────► Queue B          │
│               (* = one word, # = zero or more)                      │
│                                                                      │
│  HEADERS: Match by message headers                                  │
│  ┌──────────┐                                                       │
│  │ Exchange │──── x-match: all, type: order ─────► Queue A          │
│  └──────────┘                                                       │
└─────────────────────────────────────────────────────────────────────┘

Project Structure

messaging-service/
├── src/
│   ├── config/
│   │   ├── rabbitmq.config.ts
│   │   └── index.ts
│   │
│   ├── infrastructure/
│   │   ├── messaging/
│   │   │   ├── RabbitMQConnection.ts
│   │   │   ├── RabbitMQPublisher.ts
│   │   │   ├── RabbitMQConsumer.ts
│   │   │   ├── MessageSerializer.ts
│   │   │   └── index.ts
│   │   │
│   │   └── health/
│   │       └── RabbitMQHealthCheck.ts
│   │
│   ├── domain/
│   │   ├── events/
│   │   │   ├── DomainEvent.ts
│   │   │   ├── OrderCreatedEvent.ts
│   │   │   ├── OrderPaidEvent.ts
│   │   │   ├── UserRegisteredEvent.ts
│   │   │   └── index.ts
│   │   │
│   │   └── interfaces/
│   │       ├── IMessagePublisher.ts
│   │       ├── IMessageConsumer.ts
│   │       └── IMessageHandler.ts
│   │
│   ├── application/
│   │   ├── handlers/
│   │   │   ├── OrderCreatedHandler.ts
│   │   │   ├── OrderPaidHandler.ts
│   │   │   ├── UserRegisteredHandler.ts
│   │   │   └── index.ts
│   │   │
│   │   └── services/
│   │       └── EventDispatcher.ts
│   │
│   ├── presentation/
│   │   └── consumers/
│   │       ├── OrderConsumer.ts
│   │       ├── NotificationConsumer.ts
│   │       └── index.ts
│   │
│   └── shared/
│       ├── errors/
│       │   ├── MessageProcessingError.ts
│       │   └── RetryableError.ts
│       │
│       └── utils/
│           ├── retry.ts
│           └── logger.ts
│
├── docker/
│   ├── docker-compose.yml
│   └── rabbitmq/
│       ├── definitions.json
│       └── rabbitmq.conf
│
├── tests/
│   ├── unit/
│   │   └── handlers/
│   ├── integration/
│   │   └── messaging/
│   └── e2e/
│
├── package.json
├── tsconfig.json
└── README.md

Implementation

1. RabbitMQ Configuration

typescript
// src/config/rabbitmq.config.tsexport interface RabbitMQConfig {  host: string;  port: number;  username: string;  password: string;  vhost: string;  heartbeat: number;  prefetch: number;  reconnectDelay: number;  maxReconnectAttempts: number;} export const rabbitmqConfig: RabbitMQConfig = {  host: process.env.RABBITMQ_HOST || 'localhost',  port: parseInt(process.env.RABBITMQ_PORT || '5672'),  username: process.env.RABBITMQ_USER || 'guest',  password: process.env.RABBITMQ_PASS || 'guest',  vhost: process.env.RABBITMQ_VHOST || '/',  heartbeat: 60,  prefetch: 10,  reconnectDelay: 5000,  maxReconnectAttempts: 10,}; export const exchanges: Record<string, ExchangeConfig> = {  orders: {    name: 'orders.exchange',    type: 'topic',    durable: true,    autoDelete: false,  },  notifications: {    name: 'notifications.exchange',    type: 'fanout',    durable: true,    autoDelete: false,  },  deadLetter: {    name: 'dlx.exchange',    type: 'direct',    durable: true,    autoDelete: false,  },}; export const queues: Record<string, QueueConfig> = {  orderCreated: {    name: 'orders.created.queue',    durable: true,    exclusive: false,    autoDelete: false,    deadLetterExchange: 'dlx.exchange',    deadLetterRoutingKey: 'orders.created.dlq',    messageTtl: 86400000, // 24 hours  },  orderPaid: {    name: 'orders.paid.queue',    durable: true,    exclusive: false,    autoDelete: false,    deadLetterExchange: 'dlx.exchange',    deadLetterRoutingKey: 'orders.paid.dlq',  },  notifications: {    name: 'notifications.queue',    durable: true,    exclusive: false,    autoDelete: false,    maxLength: 10000,  },  deadLetter: {    name: 'dead-letter.queue',    durable: true,    exclusive: false,    autoDelete: false,  },};

2. RabbitMQ Connection

typescript
// src/infrastructure/messaging/RabbitMQConnection.tsimport amqp, { Connection, Channel, ConfirmChannel } from 'amqplib';import { EventEmitter } from 'events'; export class RabbitMQConnection extends EventEmitter {  private connection: Connection | null = null;  private channel: ConfirmChannel | null = null;  private reconnectAttempts = 0;  private isConnecting = false;   constructor(    private readonly config: RabbitMQConfig,    private readonly logger: Logger  ) {    super();  }   async connect(): Promise<void> {    if (this.isConnecting) return;    this.isConnecting = true;     try {      const url = this.buildConnectionUrl();      this.connection = await amqp.connect(url, {        heartbeat: this.config.heartbeat,      });       this.connection.on('error', (err) => {        this.logger.error('RabbitMQ connection error', { error: err.message });        this.emit('error', err);      });       this.connection.on('close', () => {        this.logger.warn('RabbitMQ connection closed');        this.emit('disconnected');        this.scheduleReconnect();      });       this.channel = await this.connection.createConfirmChannel();      await this.channel.prefetch(this.config.prefetch);       this.reconnectAttempts = 0;      this.isConnecting = false;      this.emit('connected');      this.logger.info('Connected to RabbitMQ');    } catch (error) {      this.isConnecting = false;      this.scheduleReconnect();      throw error;    }  }   private scheduleReconnect(): void {    if (this.reconnectAttempts >= this.config.maxReconnectAttempts) {      this.logger.error('Max reconnect attempts reached');      this.emit('maxReconnectAttempts');      return;    }     this.reconnectAttempts++;    const delay = this.config.reconnectDelay * this.reconnectAttempts;     this.logger.info(`Scheduling reconnect attempt ${this.reconnectAttempts}`, {      delayMs: delay,    });     setTimeout(() => this.connect(), delay);  }   getChannel(): ConfirmChannel {    if (!this.channel) throw new Error('Channel not initialized');    return this.channel;  }   async close(): Promise<void> {    if (this.channel) await this.channel.close();    if (this.connection) await this.connection.close();    this.logger.info('RabbitMQ connection closed gracefully');  }   isConnected(): boolean {    return this.connection !== null && this.channel !== null;  }}

3. Domain Events

typescript
// src/domain/events/DomainEvent.tsexport interface DomainEvent<T = unknown> {  eventId: string;  eventType: string;  aggregateId: string;  aggregateType: string;  timestamp: Date;  version: number;  payload: T;  metadata: EventMetadata;} export interface EventMetadata {  correlationId: string;  causationId?: string;  userId?: string;  traceId?: string;  spanId?: string;} // src/domain/events/OrderCreatedEvent.tsexport interface OrderCreatedPayload {  orderId: string;  customerId: string;  items: Array<{    productId: string;    quantity: number;    price: number;  }>;  totalAmount: number;  currency: string;  shippingAddress: {    street: string;    city: string;    state: string;    zipCode: string;    country: string;  };} export class OrderCreatedEvent extends BaseDomainEvent<OrderCreatedPayload> {  static readonly EVENT_TYPE = 'order.created';   constructor(payload: OrderCreatedPayload, metadata: EventMetadata) {    super(      OrderCreatedEvent.EVENT_TYPE,      payload.orderId,      'Order',      payload,      metadata    );  }}

4. Consumer with Retry and Dead Letter

typescript
// src/infrastructure/messaging/RabbitMQConsumer.tsexport class RabbitMQConsumer {  private channel: ConfirmChannel | null = null;  private handlers: Map<string, IMessageHandler> = new Map();   constructor(    private readonly connection: RabbitMQConnection,    private readonly serializer: MessageSerializer,    private readonly logger: Logger,    private readonly options: ConsumerOptions  ) {    this.connection.on('connected', () => {      this.channel = this.connection.getChannel();    });  }   registerHandler(eventType: string, handler: IMessageHandler): void {    this.handlers.set(eventType, handler);    this.logger.info(`Handler registered for event type: ${eventType}`);  }   async start(): Promise<void> {    if (!this.channel) throw new Error('Channel not available');     await this.channel.consume(      this.options.queue,      (msg) => this.handleMessage(msg),      { noAck: this.options.noAck ?? false }    );     this.logger.info(`Consumer started for queue: ${this.options.queue}`);  }   private async handleMessage(msg: ConsumeMessage | null): Promise<void> {    if (!msg || !this.channel) return;     const startTime = Date.now();    const eventType = msg.properties.type;     try {      const event = this.serializer.deserialize<DomainEvent>(msg.content);      const handler = this.handlers.get(eventType);       if (!handler) {        this.logger.warn(`No handler found for event type: ${eventType}`);        this.channel.ack(msg);        return;      }       await handler.handle(event);      this.channel.ack(msg);       this.logger.info('Message processed successfully', {        eventType,        durationMs: Date.now() - startTime,      });    } catch (error) {      await this.handleError(msg, error as Error);    }  }   private async handleError(msg: ConsumeMessage, error: Error): Promise<void> {    const retryCount = this.getRetryCount(msg);    const maxRetries = this.options.maxRetries ?? 3;     if (error instanceof RetryableError && retryCount < maxRetries) {      // Will be requeued with delay via dead letter exchange      this.channel!.nack(msg, false, false);    } else {      // Send to dead letter queue permanently      this.channel!.nack(msg, false, false);      this.logger.warn('Message sent to dead letter queue', {        reason: retryCount >= maxRetries ? 'max retries exceeded' : 'non-retryable error',      });    }  }}

Messaging Patterns

Saga Pattern

typescript
// src/application/sagas/OrderSaga.tsinterface SagaStep<T> {  name: string;  execute: (data: T) => Promise<void>;  compensate: (data: T) => Promise<void>;} export class OrderSaga {  private steps: SagaStep<OrderSagaData>[] = [];  private executedSteps: SagaStep<OrderSagaData>[] = [];   async execute(data: OrderSagaData): Promise<void> {    this.logger.info('Starting order saga', { orderId: data.orderId });     for (const step of this.steps) {      try {        await step.execute(data);        this.executedSteps.push(step);      } catch (error) {        await this.compensate(data);        throw error;      }    }  }   private async compensate(data: OrderSagaData): Promise<void> {    // Compensate in reverse order    for (const step of this.executedSteps.reverse()) {      try {        await step.compensate(data);      } catch (error) {        this.logger.error(`Compensation failed for step: ${step.name}`);      }    }  }}

Outbox Pattern

typescript
// src/infrastructure/outbox/OutboxProcessor.tsexport class OutboxProcessor {  private isRunning = false;   constructor(    private readonly repository: OutboxRepository,    private readonly publisher: IMessagePublisher,    private readonly intervalMs: number = 1000  ) {}   start(): void {    if (this.isRunning) return;    this.isRunning = true;    setInterval(() => this.processMessages(), this.intervalMs);  }   private async processMessages(): Promise<void> {    const messages = await this.repository.getUnprocessed(100);     for (const message of messages) {      try {        const event = JSON.parse(message.payload);        await this.publisher.publish(          `${message.aggregateType.toLowerCase()}.exchange`,          message.eventType,          event        );        await this.repository.markAsProcessed(message.id);      } catch (error) {        await this.repository.incrementRetry(message.id);      }    }  }}

Observability

Prometheus Metrics

typescript
// src/infrastructure/metrics/MessagingMetrics.tsexport class MessagingMetrics {  private readonly messagesPublished: Counter;  private readonly messagesConsumed: Counter;  private readonly messagesFailed: Counter;  private readonly messageProcessingDuration: Histogram;   recordPublish(exchange: string, routingKey: string, eventType: string): void {    this.messagesPublished.labels(exchange, routingKey, eventType).inc();  }   recordConsume(queue: string, eventType: string, status: 'success' | 'failed'): void {    this.messagesConsumed.labels(queue, eventType, status).inc();  }   recordProcessingDuration(queue: string, eventType: string, durationMs: number): void {    this.messageProcessingDuration.labels(queue, eventType).observe(durationMs / 1000);  }}

Production Checklist

Configuration

  • Secure credentials (not hardcoded)
  • Separate virtual hosts per environment
  • Prefetch configured appropriately
  • Heartbeat enabled
  • TLS configured

Resilience

  • Dead Letter Queues configured
  • Retry policies implemented
  • Circuit breaker for publishing
  • Automatic reconnect
  • Graceful shutdown

Observability

  • Prometheus metrics
  • Structured logs
  • Alerts for full queues
  • Alerts for messages in DLQ
  • Distributed tracing

Conclusion

Asynchronous communication with RabbitMQ is fundamental for building resilient and scalable microservices. Key points:

  1. Decoupling: Services don't need to be available simultaneously
  2. Resilience: Dead Letter Queues and retry policies ensure messages aren't lost
  3. Scalability: Consumers can be scaled independently
  4. Observability: Metrics and logs allow quick problem identification

In the next article, we'll explore how to implement complete observability with OpenTelemetry, Prometheus, and Grafana for your microservices: Observability with OpenTelemetry.

PF
About the author

Pedro Farbo

Platform Engineering Lead & Solutions Architect with 10+ years of experience. CEO at Farbo TSC. Expert in Microservices, Kong, Backstage, and Cloud.

Enjoyed the content? Your contribution helps keep everything online and free!

PIX:0737160d-e98f-4a65-8392-5dba70e7ff3e