Mensajería con RabbitMQ: Comunicación Asíncrona entre Microservicios
Aprende a implementar comunicación asíncrona entre microservicios usando RabbitMQ, incluyendo patrones de mensajería, dead letter queues, retry policies y observabilidad.
¡Este contenido es gratuito! Ayuda a mantener el proyecto en línea.
0737160d-e98f-4a65-8392-5dba70e7ff3eEn el artículo anterior sobre API Gateway con Kong, vimos cómo gestionar la comunicación síncrona entre clientes y microservicios. Ahora, vamos a explorar la comunicación asíncrona usando RabbitMQ, esencial para sistemas distribuidos resilientes.
¿Por qué Comunicación Asíncrona?
En arquitecturas de microservicios, la comunicación síncrona (HTTP/REST) crea acoplamiento temporal entre servicios. Si el servicio destino está indisponible, la solicitud falla.
Comunicación Síncrona (HTTP):
┌──────────┐ HTTP Request ┌──────────┐
│ Service │ ──────────────────► │ Service │
│ A │ ◄────────────────── │ B │
└──────────┘ HTTP Response └──────────┘
⚠️ Si B está caído, A falla
Comunicación Asíncrona (Message Broker):
┌──────────┐ Publish ┌──────────┐ Consume ┌──────────┐
│ Service │ ──────────► │ RabbitMQ │ ──────────► │ Service │
│ A │ │ Broker │ │ B │
└──────────┘ └──────────┘ └──────────┘
✅ Si B está caído, mensaje queda en la cola
Cuándo Usar Mensajería
| Escenario | Recomendación |
|---|---|
| Operaciones que pueden procesarse después | ✅ Asíncrono |
| Notificaciones y emails | ✅ Asíncrono |
| Procesamiento de archivos/imágenes | ✅ Asíncrono |
| Sincronización de datos entre servicios | ✅ Asíncrono |
| Consulta que necesita respuesta inmediata | ❌ Síncrono |
| Validación en tiempo real | ❌ Síncrono |
RabbitMQ: Conceptos Fundamentales
RabbitMQ es un message broker que implementa el protocolo AMQP (Advanced Message Queuing Protocol).
Arquitectura
┌─────────────────────────────────────────────────────────────────────┐
│ RabbitMQ Broker │
│ │
│ Producer ──► Exchange ──► Binding ──► Queue ──► Consumer │
│ │ │ │
│ ┌─────┴─────┐ ┌─────┴─────┐ │
│ │ Routing │ │ Message │ │
│ │ Rules │ │ Storage │ │
│ └───────────┘ └───────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Componentes Principales
| Componente | Descripción |
|---|---|
| Producer | Aplicación que envía mensajes |
| Exchange | Recibe mensajes y enruta a colas |
| Queue | Almacena mensajes hasta ser consumidos |
| Binding | Regla que conecta Exchange a Queue |
| Consumer | Aplicación que recibe y procesa mensajes |
| Virtual Host | Aislamiento lógico (multi-tenancy) |
Tipos de Exchange
┌─────────────────────────────────────────────────────────────────────┐
│ Exchange Types │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ DIRECT: Routing key exacto │
│ ┌──────────┐ │
│ │ Exchange │──── routing_key="order.created" ────► Queue A │
│ └──────────┘ │
│ │
│ FANOUT: Broadcast a todas las colas │
│ ┌──────────┐────────────────────────────────────► Queue A │
│ │ Exchange │────────────────────────────────────► Queue B │
│ └──────────┘────────────────────────────────────► Queue C │
│ │
│ TOPIC: Pattern matching con wildcards │
│ ┌──────────┐ │
│ │ Exchange │──── "order.*" ─────────────────────► Queue A │
│ └──────────┘──── "order.#" ─────────────────────► Queue B │
│ (* = una palabra, # = cero o más) │
│ │
│ HEADERS: Match por headers del mensaje │
│ ┌──────────┐ │
│ │ Exchange │──── x-match: all, type: order ─────► Queue A │
│ └──────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Estructura del Proyecto
messaging-service/
├── src/
│ ├── config/
│ │ ├── rabbitmq.config.ts
│ │ └── index.ts
│ │
│ ├── infrastructure/
│ │ ├── messaging/
│ │ │ ├── RabbitMQConnection.ts
│ │ │ ├── RabbitMQPublisher.ts
│ │ │ ├── RabbitMQConsumer.ts
│ │ │ ├── MessageSerializer.ts
│ │ │ └── index.ts
│ │ │
│ │ └── health/
│ │ └── RabbitMQHealthCheck.ts
│ │
│ ├── domain/
│ │ ├── events/
│ │ │ ├── DomainEvent.ts
│ │ │ ├── OrderCreatedEvent.ts
│ │ │ ├── OrderPaidEvent.ts
│ │ │ ├── UserRegisteredEvent.ts
│ │ │ └── index.ts
│ │ │
│ │ └── interfaces/
│ │ ├── IMessagePublisher.ts
│ │ ├── IMessageConsumer.ts
│ │ └── IMessageHandler.ts
│ │
│ ├── application/
│ │ ├── handlers/
│ │ │ ├── OrderCreatedHandler.ts
│ │ │ ├── OrderPaidHandler.ts
│ │ │ ├── UserRegisteredHandler.ts
│ │ │ └── index.ts
│ │ │
│ │ └── services/
│ │ └── EventDispatcher.ts
│ │
│ ├── presentation/
│ │ └── consumers/
│ │ ├── OrderConsumer.ts
│ │ ├── NotificationConsumer.ts
│ │ └── index.ts
│ │
│ └── shared/
│ ├── errors/
│ │ ├── MessageProcessingError.ts
│ │ └── RetryableError.ts
│ │
│ └── utils/
│ ├── retry.ts
│ └── logger.ts
│
├── docker/
│ ├── docker-compose.yml
│ └── rabbitmq/
│ ├── definitions.json
│ └── rabbitmq.conf
│
├── tests/
│ ├── unit/
│ │ └── handlers/
│ ├── integration/
│ │ └── messaging/
│ └── e2e/
│
├── package.json
├── tsconfig.json
└── README.md
Implementación
1. Configuración de RabbitMQ
// src/config/rabbitmq.config.tsexport interface RabbitMQConfig { host: string; port: number; username: string; password: string; vhost: string; heartbeat: number; prefetch: number; reconnectDelay: number; maxReconnectAttempts: number;} export const rabbitmqConfig: RabbitMQConfig = { host: process.env.RABBITMQ_HOST || 'localhost', port: parseInt(process.env.RABBITMQ_PORT || '5672'), username: process.env.RABBITMQ_USER || 'guest', password: process.env.RABBITMQ_PASS || 'guest', vhost: process.env.RABBITMQ_VHOST || '/', heartbeat: 60, prefetch: 10, reconnectDelay: 5000, maxReconnectAttempts: 10,}; export const exchanges: Record<string, ExchangeConfig> = { orders: { name: 'orders.exchange', type: 'topic', durable: true, autoDelete: false, }, notifications: { name: 'notifications.exchange', type: 'fanout', durable: true, autoDelete: false, }, deadLetter: { name: 'dlx.exchange', type: 'direct', durable: true, autoDelete: false, },}; export const queues: Record<string, QueueConfig> = { orderCreated: { name: 'orders.created.queue', durable: true, exclusive: false, autoDelete: false, deadLetterExchange: 'dlx.exchange', deadLetterRoutingKey: 'orders.created.dlq', messageTtl: 86400000, // 24 horas }, orderPaid: { name: 'orders.paid.queue', durable: true, exclusive: false, autoDelete: false, deadLetterExchange: 'dlx.exchange', deadLetterRoutingKey: 'orders.paid.dlq', }, notifications: { name: 'notifications.queue', durable: true, exclusive: false, autoDelete: false, maxLength: 10000, }, deadLetter: { name: 'dead-letter.queue', durable: true, exclusive: false, autoDelete: false, },};2. Conexión con RabbitMQ
// src/infrastructure/messaging/RabbitMQConnection.tsimport amqp, { Connection, Channel, ConfirmChannel } from 'amqplib';import { EventEmitter } from 'events'; export class RabbitMQConnection extends EventEmitter { private connection: Connection | null = null; private channel: ConfirmChannel | null = null; private reconnectAttempts = 0; private isConnecting = false; constructor( private readonly config: RabbitMQConfig, private readonly logger: Logger ) { super(); } async connect(): Promise<void> { if (this.isConnecting) return; this.isConnecting = true; try { const url = this.buildConnectionUrl(); this.connection = await amqp.connect(url, { heartbeat: this.config.heartbeat, }); this.connection.on('error', (err) => { this.logger.error('RabbitMQ connection error', { error: err.message }); this.emit('error', err); }); this.connection.on('close', () => { this.logger.warn('RabbitMQ connection closed'); this.emit('disconnected'); this.scheduleReconnect(); }); this.channel = await this.connection.createConfirmChannel(); await this.channel.prefetch(this.config.prefetch); this.reconnectAttempts = 0; this.isConnecting = false; this.emit('connected'); this.logger.info('Connected to RabbitMQ'); } catch (error) { this.isConnecting = false; this.scheduleReconnect(); throw error; } } private scheduleReconnect(): void { if (this.reconnectAttempts >= this.config.maxReconnectAttempts) { this.logger.error('Max reconnect attempts reached'); this.emit('maxReconnectAttempts'); return; } this.reconnectAttempts++; const delay = this.config.reconnectDelay * this.reconnectAttempts; this.logger.info(`Scheduling reconnect attempt ${this.reconnectAttempts}`, { delayMs: delay, }); setTimeout(() => this.connect(), delay); } getChannel(): ConfirmChannel { if (!this.channel) throw new Error('Channel not initialized'); return this.channel; } async close(): Promise<void> { if (this.channel) await this.channel.close(); if (this.connection) await this.connection.close(); this.logger.info('RabbitMQ connection closed gracefully'); } isConnected(): boolean { return this.connection !== null && this.channel !== null; }}3. Domain Events
// src/domain/events/DomainEvent.tsexport interface DomainEvent<T = unknown> { eventId: string; eventType: string; aggregateId: string; aggregateType: string; timestamp: Date; version: number; payload: T; metadata: EventMetadata;} export interface EventMetadata { correlationId: string; causationId?: string; userId?: string; traceId?: string; spanId?: string;} // src/domain/events/OrderCreatedEvent.tsexport interface OrderCreatedPayload { orderId: string; customerId: string; items: Array<{ productId: string; quantity: number; price: number; }>; totalAmount: number; currency: string; shippingAddress: { street: string; city: string; state: string; zipCode: string; country: string; };} export class OrderCreatedEvent extends BaseDomainEvent<OrderCreatedPayload> { static readonly EVENT_TYPE = 'order.created'; constructor(payload: OrderCreatedPayload, metadata: EventMetadata) { super( OrderCreatedEvent.EVENT_TYPE, payload.orderId, 'Order', payload, metadata ); }}4. Consumer con Retry y Dead Letter
// src/infrastructure/messaging/RabbitMQConsumer.tsexport class RabbitMQConsumer { private channel: ConfirmChannel | null = null; private handlers: Map<string, IMessageHandler> = new Map(); constructor( private readonly connection: RabbitMQConnection, private readonly serializer: MessageSerializer, private readonly logger: Logger, private readonly options: ConsumerOptions ) { this.connection.on('connected', () => { this.channel = this.connection.getChannel(); }); } registerHandler(eventType: string, handler: IMessageHandler): void { this.handlers.set(eventType, handler); this.logger.info(`Handler registered for event type: ${eventType}`); } async start(): Promise<void> { if (!this.channel) throw new Error('Channel not available'); await this.channel.consume( this.options.queue, (msg) => this.handleMessage(msg), { noAck: this.options.noAck ?? false } ); this.logger.info(`Consumer started for queue: ${this.options.queue}`); } private async handleMessage(msg: ConsumeMessage | null): Promise<void> { if (!msg || !this.channel) return; const startTime = Date.now(); const eventType = msg.properties.type; try { const event = this.serializer.deserialize<DomainEvent>(msg.content); const handler = this.handlers.get(eventType); if (!handler) { this.logger.warn(`No handler found for event type: ${eventType}`); this.channel.ack(msg); return; } await handler.handle(event); this.channel.ack(msg); this.logger.info('Message processed successfully', { eventType, durationMs: Date.now() - startTime, }); } catch (error) { await this.handleError(msg, error as Error); } } private async handleError(msg: ConsumeMessage, error: Error): Promise<void> { const retryCount = this.getRetryCount(msg); const maxRetries = this.options.maxRetries ?? 3; if (error instanceof RetryableError && retryCount < maxRetries) { // Será reencolado con delay via dead letter exchange this.channel!.nack(msg, false, false); } else { // Enviar a dead letter queue permanentemente this.channel!.nack(msg, false, false); this.logger.warn('Message sent to dead letter queue', { reason: retryCount >= maxRetries ? 'max retries exceeded' : 'non-retryable error', }); } }}Patrones de Mensajería
Saga Pattern
// src/application/sagas/OrderSaga.tsinterface SagaStep<T> { name: string; execute: (data: T) => Promise<void>; compensate: (data: T) => Promise<void>;} export class OrderSaga { private steps: SagaStep<OrderSagaData>[] = []; private executedSteps: SagaStep<OrderSagaData>[] = []; async execute(data: OrderSagaData): Promise<void> { this.logger.info('Starting order saga', { orderId: data.orderId }); for (const step of this.steps) { try { await step.execute(data); this.executedSteps.push(step); } catch (error) { await this.compensate(data); throw error; } } } private async compensate(data: OrderSagaData): Promise<void> { // Compensar en orden inverso for (const step of this.executedSteps.reverse()) { try { await step.compensate(data); } catch (error) { this.logger.error(`Compensation failed for step: ${step.name}`); } } }}Outbox Pattern
// src/infrastructure/outbox/OutboxProcessor.tsexport class OutboxProcessor { private isRunning = false; constructor( private readonly repository: OutboxRepository, private readonly publisher: IMessagePublisher, private readonly intervalMs: number = 1000 ) {} start(): void { if (this.isRunning) return; this.isRunning = true; setInterval(() => this.processMessages(), this.intervalMs); } private async processMessages(): Promise<void> { const messages = await this.repository.getUnprocessed(100); for (const message of messages) { try { const event = JSON.parse(message.payload); await this.publisher.publish( `${message.aggregateType.toLowerCase()}.exchange`, message.eventType, event ); await this.repository.markAsProcessed(message.id); } catch (error) { await this.repository.incrementRetry(message.id); } } }}Observabilidad
Métricas Prometheus
// src/infrastructure/metrics/MessagingMetrics.tsexport class MessagingMetrics { private readonly messagesPublished: Counter; private readonly messagesConsumed: Counter; private readonly messagesFailed: Counter; private readonly messageProcessingDuration: Histogram; recordPublish(exchange: string, routingKey: string, eventType: string): void { this.messagesPublished.labels(exchange, routingKey, eventType).inc(); } recordConsume(queue: string, eventType: string, status: 'success' | 'failed'): void { this.messagesConsumed.labels(queue, eventType, status).inc(); } recordProcessingDuration(queue: string, eventType: string, durationMs: number): void { this.messageProcessingDuration.labels(queue, eventType).observe(durationMs / 1000); }}Checklist de Producción
Configuración
- Credenciales seguras (no hardcoded)
- Virtual hosts separados por ambiente
- Prefetch configurado adecuadamente
- Heartbeat habilitado
- TLS configurado
Resiliencia
- Dead Letter Queues configuradas
- Retry policies implementadas
- Circuit breaker para publicación
- Reconnect automático
- Graceful shutdown
Observabilidad
- Métricas Prometheus
- Logs estructurados
- Alertas para colas llenas
- Alertas para mensajes en DLQ
- Tracing distribuido
Conclusión
La comunicación asíncrona con RabbitMQ es fundamental para construir microservicios resilientes y escalables. Los puntos clave son:
- Desacoplamiento: Servicios no necesitan estar disponibles simultáneamente
- Resiliencia: Dead Letter Queues y retry policies garantizan que los mensajes no se pierdan
- Escalabilidad: Consumers pueden escalarse independientemente
- Observabilidad: Métricas y logs permiten identificar problemas rápidamente
En el próximo artículo, exploraremos cómo implementar observabilidad completa con OpenTelemetry, Prometheus y Grafana para tus microservicios: Observabilidad con OpenTelemetry.
¿Te gustó el contenido? ¡Tu contribución ayuda a mantener todo online y gratuito!
0737160d-e98f-4a65-8392-5dba70e7ff3e