Messaging with RabbitMQ: Asynchronous Communication between Microservices
Learn how to implement asynchronous communication between microservices using RabbitMQ, including messaging patterns, dead letter queues, retry policies, and observability.
This content is free! Help keep the project running.
0737160d-e98f-4a65-8392-5dba70e7ff3eIn the previous article about API Gateway with Kong, we saw how to manage synchronous communication between clients and microservices. Now, let's explore asynchronous communication using RabbitMQ, essential for resilient distributed systems.
Why Asynchronous Communication?
In microservices architectures, synchronous communication (HTTP/REST) creates temporal coupling between services. If the target service is unavailable, the request fails.
Synchronous Communication (HTTP):
┌──────────┐ HTTP Request ┌──────────┐
│ Service │ ──────────────────► │ Service │
│ A │ ◄────────────────── │ B │
└──────────┘ HTTP Response └──────────┘
⚠️ If B is down, A fails
Asynchronous Communication (Message Broker):
┌──────────┐ Publish ┌──────────┐ Consume ┌──────────┐
│ Service │ ──────────► │ RabbitMQ │ ──────────► │ Service │
│ A │ │ Broker │ │ B │
└──────────┘ └──────────┘ └──────────┘
✅ If B is down, message stays in queue
When to Use Messaging
| Scenario | Recommendation |
|---|---|
| Operations that can be processed later | ✅ Asynchronous |
| Notifications and emails | ✅ Asynchronous |
| File/image processing | ✅ Asynchronous |
| Data synchronization between services | ✅ Asynchronous |
| Query that needs immediate response | ❌ Synchronous |
| Real-time validation | ❌ Synchronous |
RabbitMQ: Fundamental Concepts
RabbitMQ is a message broker that implements the AMQP (Advanced Message Queuing Protocol) protocol.
Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ RabbitMQ Broker │
│ │
│ Producer ──► Exchange ──► Binding ──► Queue ──► Consumer │
│ │ │ │
│ ┌─────┴─────┐ ┌─────┴─────┐ │
│ │ Routing │ │ Message │ │
│ │ Rules │ │ Storage │ │
│ └───────────┘ └───────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Main Components
| Component | Description |
|---|---|
| Producer | Application that sends messages |
| Exchange | Receives messages and routes to queues |
| Queue | Stores messages until consumed |
| Binding | Rule that connects Exchange to Queue |
| Consumer | Application that receives and processes messages |
| Virtual Host | Logical isolation (multi-tenancy) |
Exchange Types
┌─────────────────────────────────────────────────────────────────────┐
│ Exchange Types │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ DIRECT: Exact routing key │
│ ┌──────────┐ │
│ │ Exchange │──── routing_key="order.created" ────► Queue A │
│ └──────────┘ │
│ │
│ FANOUT: Broadcast to all queues │
│ ┌──────────┐────────────────────────────────────► Queue A │
│ │ Exchange │────────────────────────────────────► Queue B │
│ └──────────┘────────────────────────────────────► Queue C │
│ │
│ TOPIC: Pattern matching with wildcards │
│ ┌──────────┐ │
│ │ Exchange │──── "order.*" ─────────────────────► Queue A │
│ └──────────┘──── "order.#" ─────────────────────► Queue B │
│ (* = one word, # = zero or more) │
│ │
│ HEADERS: Match by message headers │
│ ┌──────────┐ │
│ │ Exchange │──── x-match: all, type: order ─────► Queue A │
│ └──────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Project Structure
messaging-service/
├── src/
│ ├── config/
│ │ ├── rabbitmq.config.ts
│ │ └── index.ts
│ │
│ ├── infrastructure/
│ │ ├── messaging/
│ │ │ ├── RabbitMQConnection.ts
│ │ │ ├── RabbitMQPublisher.ts
│ │ │ ├── RabbitMQConsumer.ts
│ │ │ ├── MessageSerializer.ts
│ │ │ └── index.ts
│ │ │
│ │ └── health/
│ │ └── RabbitMQHealthCheck.ts
│ │
│ ├── domain/
│ │ ├── events/
│ │ │ ├── DomainEvent.ts
│ │ │ ├── OrderCreatedEvent.ts
│ │ │ ├── OrderPaidEvent.ts
│ │ │ ├── UserRegisteredEvent.ts
│ │ │ └── index.ts
│ │ │
│ │ └── interfaces/
│ │ ├── IMessagePublisher.ts
│ │ ├── IMessageConsumer.ts
│ │ └── IMessageHandler.ts
│ │
│ ├── application/
│ │ ├── handlers/
│ │ │ ├── OrderCreatedHandler.ts
│ │ │ ├── OrderPaidHandler.ts
│ │ │ ├── UserRegisteredHandler.ts
│ │ │ └── index.ts
│ │ │
│ │ └── services/
│ │ └── EventDispatcher.ts
│ │
│ ├── presentation/
│ │ └── consumers/
│ │ ├── OrderConsumer.ts
│ │ ├── NotificationConsumer.ts
│ │ └── index.ts
│ │
│ └── shared/
│ ├── errors/
│ │ ├── MessageProcessingError.ts
│ │ └── RetryableError.ts
│ │
│ └── utils/
│ ├── retry.ts
│ └── logger.ts
│
├── docker/
│ ├── docker-compose.yml
│ └── rabbitmq/
│ ├── definitions.json
│ └── rabbitmq.conf
│
├── tests/
│ ├── unit/
│ │ └── handlers/
│ ├── integration/
│ │ └── messaging/
│ └── e2e/
│
├── package.json
├── tsconfig.json
└── README.md
Implementation
1. RabbitMQ Configuration
// src/config/rabbitmq.config.tsexport interface RabbitMQConfig { host: string; port: number; username: string; password: string; vhost: string; heartbeat: number; prefetch: number; reconnectDelay: number; maxReconnectAttempts: number;} export const rabbitmqConfig: RabbitMQConfig = { host: process.env.RABBITMQ_HOST || 'localhost', port: parseInt(process.env.RABBITMQ_PORT || '5672'), username: process.env.RABBITMQ_USER || 'guest', password: process.env.RABBITMQ_PASS || 'guest', vhost: process.env.RABBITMQ_VHOST || '/', heartbeat: 60, prefetch: 10, reconnectDelay: 5000, maxReconnectAttempts: 10,}; export const exchanges: Record<string, ExchangeConfig> = { orders: { name: 'orders.exchange', type: 'topic', durable: true, autoDelete: false, }, notifications: { name: 'notifications.exchange', type: 'fanout', durable: true, autoDelete: false, }, deadLetter: { name: 'dlx.exchange', type: 'direct', durable: true, autoDelete: false, },}; export const queues: Record<string, QueueConfig> = { orderCreated: { name: 'orders.created.queue', durable: true, exclusive: false, autoDelete: false, deadLetterExchange: 'dlx.exchange', deadLetterRoutingKey: 'orders.created.dlq', messageTtl: 86400000, // 24 hours }, orderPaid: { name: 'orders.paid.queue', durable: true, exclusive: false, autoDelete: false, deadLetterExchange: 'dlx.exchange', deadLetterRoutingKey: 'orders.paid.dlq', }, notifications: { name: 'notifications.queue', durable: true, exclusive: false, autoDelete: false, maxLength: 10000, }, deadLetter: { name: 'dead-letter.queue', durable: true, exclusive: false, autoDelete: false, },};2. RabbitMQ Connection
// src/infrastructure/messaging/RabbitMQConnection.tsimport amqp, { Connection, Channel, ConfirmChannel } from 'amqplib';import { EventEmitter } from 'events'; export class RabbitMQConnection extends EventEmitter { private connection: Connection | null = null; private channel: ConfirmChannel | null = null; private reconnectAttempts = 0; private isConnecting = false; constructor( private readonly config: RabbitMQConfig, private readonly logger: Logger ) { super(); } async connect(): Promise<void> { if (this.isConnecting) return; this.isConnecting = true; try { const url = this.buildConnectionUrl(); this.connection = await amqp.connect(url, { heartbeat: this.config.heartbeat, }); this.connection.on('error', (err) => { this.logger.error('RabbitMQ connection error', { error: err.message }); this.emit('error', err); }); this.connection.on('close', () => { this.logger.warn('RabbitMQ connection closed'); this.emit('disconnected'); this.scheduleReconnect(); }); this.channel = await this.connection.createConfirmChannel(); await this.channel.prefetch(this.config.prefetch); this.reconnectAttempts = 0; this.isConnecting = false; this.emit('connected'); this.logger.info('Connected to RabbitMQ'); } catch (error) { this.isConnecting = false; this.scheduleReconnect(); throw error; } } private scheduleReconnect(): void { if (this.reconnectAttempts >= this.config.maxReconnectAttempts) { this.logger.error('Max reconnect attempts reached'); this.emit('maxReconnectAttempts'); return; } this.reconnectAttempts++; const delay = this.config.reconnectDelay * this.reconnectAttempts; this.logger.info(`Scheduling reconnect attempt ${this.reconnectAttempts}`, { delayMs: delay, }); setTimeout(() => this.connect(), delay); } getChannel(): ConfirmChannel { if (!this.channel) throw new Error('Channel not initialized'); return this.channel; } async close(): Promise<void> { if (this.channel) await this.channel.close(); if (this.connection) await this.connection.close(); this.logger.info('RabbitMQ connection closed gracefully'); } isConnected(): boolean { return this.connection !== null && this.channel !== null; }}3. Domain Events
// src/domain/events/DomainEvent.tsexport interface DomainEvent<T = unknown> { eventId: string; eventType: string; aggregateId: string; aggregateType: string; timestamp: Date; version: number; payload: T; metadata: EventMetadata;} export interface EventMetadata { correlationId: string; causationId?: string; userId?: string; traceId?: string; spanId?: string;} // src/domain/events/OrderCreatedEvent.tsexport interface OrderCreatedPayload { orderId: string; customerId: string; items: Array<{ productId: string; quantity: number; price: number; }>; totalAmount: number; currency: string; shippingAddress: { street: string; city: string; state: string; zipCode: string; country: string; };} export class OrderCreatedEvent extends BaseDomainEvent<OrderCreatedPayload> { static readonly EVENT_TYPE = 'order.created'; constructor(payload: OrderCreatedPayload, metadata: EventMetadata) { super( OrderCreatedEvent.EVENT_TYPE, payload.orderId, 'Order', payload, metadata ); }}4. Consumer with Retry and Dead Letter
// src/infrastructure/messaging/RabbitMQConsumer.tsexport class RabbitMQConsumer { private channel: ConfirmChannel | null = null; private handlers: Map<string, IMessageHandler> = new Map(); constructor( private readonly connection: RabbitMQConnection, private readonly serializer: MessageSerializer, private readonly logger: Logger, private readonly options: ConsumerOptions ) { this.connection.on('connected', () => { this.channel = this.connection.getChannel(); }); } registerHandler(eventType: string, handler: IMessageHandler): void { this.handlers.set(eventType, handler); this.logger.info(`Handler registered for event type: ${eventType}`); } async start(): Promise<void> { if (!this.channel) throw new Error('Channel not available'); await this.channel.consume( this.options.queue, (msg) => this.handleMessage(msg), { noAck: this.options.noAck ?? false } ); this.logger.info(`Consumer started for queue: ${this.options.queue}`); } private async handleMessage(msg: ConsumeMessage | null): Promise<void> { if (!msg || !this.channel) return; const startTime = Date.now(); const eventType = msg.properties.type; try { const event = this.serializer.deserialize<DomainEvent>(msg.content); const handler = this.handlers.get(eventType); if (!handler) { this.logger.warn(`No handler found for event type: ${eventType}`); this.channel.ack(msg); return; } await handler.handle(event); this.channel.ack(msg); this.logger.info('Message processed successfully', { eventType, durationMs: Date.now() - startTime, }); } catch (error) { await this.handleError(msg, error as Error); } } private async handleError(msg: ConsumeMessage, error: Error): Promise<void> { const retryCount = this.getRetryCount(msg); const maxRetries = this.options.maxRetries ?? 3; if (error instanceof RetryableError && retryCount < maxRetries) { // Will be requeued with delay via dead letter exchange this.channel!.nack(msg, false, false); } else { // Send to dead letter queue permanently this.channel!.nack(msg, false, false); this.logger.warn('Message sent to dead letter queue', { reason: retryCount >= maxRetries ? 'max retries exceeded' : 'non-retryable error', }); } }}Messaging Patterns
Saga Pattern
// src/application/sagas/OrderSaga.tsinterface SagaStep<T> { name: string; execute: (data: T) => Promise<void>; compensate: (data: T) => Promise<void>;} export class OrderSaga { private steps: SagaStep<OrderSagaData>[] = []; private executedSteps: SagaStep<OrderSagaData>[] = []; async execute(data: OrderSagaData): Promise<void> { this.logger.info('Starting order saga', { orderId: data.orderId }); for (const step of this.steps) { try { await step.execute(data); this.executedSteps.push(step); } catch (error) { await this.compensate(data); throw error; } } } private async compensate(data: OrderSagaData): Promise<void> { // Compensate in reverse order for (const step of this.executedSteps.reverse()) { try { await step.compensate(data); } catch (error) { this.logger.error(`Compensation failed for step: ${step.name}`); } } }}Outbox Pattern
// src/infrastructure/outbox/OutboxProcessor.tsexport class OutboxProcessor { private isRunning = false; constructor( private readonly repository: OutboxRepository, private readonly publisher: IMessagePublisher, private readonly intervalMs: number = 1000 ) {} start(): void { if (this.isRunning) return; this.isRunning = true; setInterval(() => this.processMessages(), this.intervalMs); } private async processMessages(): Promise<void> { const messages = await this.repository.getUnprocessed(100); for (const message of messages) { try { const event = JSON.parse(message.payload); await this.publisher.publish( `${message.aggregateType.toLowerCase()}.exchange`, message.eventType, event ); await this.repository.markAsProcessed(message.id); } catch (error) { await this.repository.incrementRetry(message.id); } } }}Observability
Prometheus Metrics
// src/infrastructure/metrics/MessagingMetrics.tsexport class MessagingMetrics { private readonly messagesPublished: Counter; private readonly messagesConsumed: Counter; private readonly messagesFailed: Counter; private readonly messageProcessingDuration: Histogram; recordPublish(exchange: string, routingKey: string, eventType: string): void { this.messagesPublished.labels(exchange, routingKey, eventType).inc(); } recordConsume(queue: string, eventType: string, status: 'success' | 'failed'): void { this.messagesConsumed.labels(queue, eventType, status).inc(); } recordProcessingDuration(queue: string, eventType: string, durationMs: number): void { this.messageProcessingDuration.labels(queue, eventType).observe(durationMs / 1000); }}Production Checklist
Configuration
- Secure credentials (not hardcoded)
- Separate virtual hosts per environment
- Prefetch configured appropriately
- Heartbeat enabled
- TLS configured
Resilience
- Dead Letter Queues configured
- Retry policies implemented
- Circuit breaker for publishing
- Automatic reconnect
- Graceful shutdown
Observability
- Prometheus metrics
- Structured logs
- Alerts for full queues
- Alerts for messages in DLQ
- Distributed tracing
Conclusion
Asynchronous communication with RabbitMQ is fundamental for building resilient and scalable microservices. Key points:
- Decoupling: Services don't need to be available simultaneously
- Resilience: Dead Letter Queues and retry policies ensure messages aren't lost
- Scalability: Consumers can be scaled independently
- Observability: Metrics and logs allow quick problem identification
In the next article, we'll explore how to implement complete observability with OpenTelemetry, Prometheus, and Grafana for your microservices: Observability with OpenTelemetry.
Enjoyed the content? Your contribution helps keep everything online and free!
0737160d-e98f-4a65-8392-5dba70e7ff3e