Saltar al contenidoPedro Farbo

Mensajería con RabbitMQ: Comunicación Asíncrona entre Microservicios

Aprende a implementar comunicación asíncrona entre microservicios usando RabbitMQ, incluyendo patrones de mensajería, dead letter queues, retry policies y observabilidad.

¡Este contenido es gratuito! Ayuda a mantener el proyecto en línea.

PIX:0737160d-e98f-4a65-8392-5dba70e7ff3e

En el artículo anterior sobre API Gateway con Kong, vimos cómo gestionar la comunicación síncrona entre clientes y microservicios. Ahora, vamos a explorar la comunicación asíncrona usando RabbitMQ, esencial para sistemas distribuidos resilientes.

¿Por qué Comunicación Asíncrona?

En arquitecturas de microservicios, la comunicación síncrona (HTTP/REST) crea acoplamiento temporal entre servicios. Si el servicio destino está indisponible, la solicitud falla.

Comunicación Síncrona (HTTP):
┌──────────┐     HTTP Request      ┌──────────┐
│ Service  │ ──────────────────►   │ Service  │
│    A     │ ◄──────────────────   │    B     │
└──────────┘     HTTP Response     └──────────┘
     ⚠️ Si B está caído, A falla

Comunicación Asíncrona (Message Broker):
┌──────────┐    Publish     ┌──────────┐    Consume    ┌──────────┐
│ Service  │ ──────────►    │ RabbitMQ │ ──────────►   │ Service  │
│    A     │                │  Broker  │               │    B     │
└──────────┘                └──────────┘               └──────────┘
     ✅ Si B está caído, mensaje queda en la cola

Cuándo Usar Mensajería

EscenarioRecomendación
Operaciones que pueden procesarse después✅ Asíncrono
Notificaciones y emails✅ Asíncrono
Procesamiento de archivos/imágenes✅ Asíncrono
Sincronización de datos entre servicios✅ Asíncrono
Consulta que necesita respuesta inmediata❌ Síncrono
Validación en tiempo real❌ Síncrono

RabbitMQ: Conceptos Fundamentales

RabbitMQ es un message broker que implementa el protocolo AMQP (Advanced Message Queuing Protocol).

Arquitectura

┌─────────────────────────────────────────────────────────────────────┐
│                          RabbitMQ Broker                             │
│                                                                      │
│  Producer ──► Exchange ──► Binding ──► Queue ──► Consumer           │
│                  │                       │                           │
│            ┌─────┴─────┐           ┌─────┴─────┐                    │
│            │  Routing  │           │  Message  │                    │
│            │   Rules   │           │  Storage  │                    │
│            └───────────┘           └───────────┘                    │
└─────────────────────────────────────────────────────────────────────┘

Componentes Principales

ComponenteDescripción
ProducerAplicación que envía mensajes
ExchangeRecibe mensajes y enruta a colas
QueueAlmacena mensajes hasta ser consumidos
BindingRegla que conecta Exchange a Queue
ConsumerAplicación que recibe y procesa mensajes
Virtual HostAislamiento lógico (multi-tenancy)

Tipos de Exchange

┌─────────────────────────────────────────────────────────────────────┐
│                        Exchange Types                                │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  DIRECT: Routing key exacto                                         │
│  ┌──────────┐                                                       │
│  │ Exchange │──── routing_key="order.created" ────► Queue A         │
│  └──────────┘                                                       │
│                                                                      │
│  FANOUT: Broadcast a todas las colas                                │
│  ┌──────────┐────────────────────────────────────► Queue A          │
│  │ Exchange │────────────────────────────────────► Queue B          │
│  └──────────┘────────────────────────────────────► Queue C          │
│                                                                      │
│  TOPIC: Pattern matching con wildcards                              │
│  ┌──────────┐                                                       │
│  │ Exchange │──── "order.*" ─────────────────────► Queue A          │
│  └──────────┘──── "order.#" ─────────────────────► Queue B          │
│               (* = una palabra, # = cero o más)                     │
│                                                                      │
│  HEADERS: Match por headers del mensaje                             │
│  ┌──────────┐                                                       │
│  │ Exchange │──── x-match: all, type: order ─────► Queue A          │
│  └──────────┘                                                       │
└─────────────────────────────────────────────────────────────────────┘

Estructura del Proyecto

messaging-service/
├── src/
│   ├── config/
│   │   ├── rabbitmq.config.ts
│   │   └── index.ts
│   │
│   ├── infrastructure/
│   │   ├── messaging/
│   │   │   ├── RabbitMQConnection.ts
│   │   │   ├── RabbitMQPublisher.ts
│   │   │   ├── RabbitMQConsumer.ts
│   │   │   ├── MessageSerializer.ts
│   │   │   └── index.ts
│   │   │
│   │   └── health/
│   │       └── RabbitMQHealthCheck.ts
│   │
│   ├── domain/
│   │   ├── events/
│   │   │   ├── DomainEvent.ts
│   │   │   ├── OrderCreatedEvent.ts
│   │   │   ├── OrderPaidEvent.ts
│   │   │   ├── UserRegisteredEvent.ts
│   │   │   └── index.ts
│   │   │
│   │   └── interfaces/
│   │       ├── IMessagePublisher.ts
│   │       ├── IMessageConsumer.ts
│   │       └── IMessageHandler.ts
│   │
│   ├── application/
│   │   ├── handlers/
│   │   │   ├── OrderCreatedHandler.ts
│   │   │   ├── OrderPaidHandler.ts
│   │   │   ├── UserRegisteredHandler.ts
│   │   │   └── index.ts
│   │   │
│   │   └── services/
│   │       └── EventDispatcher.ts
│   │
│   ├── presentation/
│   │   └── consumers/
│   │       ├── OrderConsumer.ts
│   │       ├── NotificationConsumer.ts
│   │       └── index.ts
│   │
│   └── shared/
│       ├── errors/
│       │   ├── MessageProcessingError.ts
│       │   └── RetryableError.ts
│       │
│       └── utils/
│           ├── retry.ts
│           └── logger.ts
│
├── docker/
│   ├── docker-compose.yml
│   └── rabbitmq/
│       ├── definitions.json
│       └── rabbitmq.conf
│
├── tests/
│   ├── unit/
│   │   └── handlers/
│   ├── integration/
│   │   └── messaging/
│   └── e2e/
│
├── package.json
├── tsconfig.json
└── README.md

Implementación

1. Configuración de RabbitMQ

typescript
// src/config/rabbitmq.config.tsexport interface RabbitMQConfig {  host: string;  port: number;  username: string;  password: string;  vhost: string;  heartbeat: number;  prefetch: number;  reconnectDelay: number;  maxReconnectAttempts: number;} export const rabbitmqConfig: RabbitMQConfig = {  host: process.env.RABBITMQ_HOST || 'localhost',  port: parseInt(process.env.RABBITMQ_PORT || '5672'),  username: process.env.RABBITMQ_USER || 'guest',  password: process.env.RABBITMQ_PASS || 'guest',  vhost: process.env.RABBITMQ_VHOST || '/',  heartbeat: 60,  prefetch: 10,  reconnectDelay: 5000,  maxReconnectAttempts: 10,}; export const exchanges: Record<string, ExchangeConfig> = {  orders: {    name: 'orders.exchange',    type: 'topic',    durable: true,    autoDelete: false,  },  notifications: {    name: 'notifications.exchange',    type: 'fanout',    durable: true,    autoDelete: false,  },  deadLetter: {    name: 'dlx.exchange',    type: 'direct',    durable: true,    autoDelete: false,  },}; export const queues: Record<string, QueueConfig> = {  orderCreated: {    name: 'orders.created.queue',    durable: true,    exclusive: false,    autoDelete: false,    deadLetterExchange: 'dlx.exchange',    deadLetterRoutingKey: 'orders.created.dlq',    messageTtl: 86400000, // 24 horas  },  orderPaid: {    name: 'orders.paid.queue',    durable: true,    exclusive: false,    autoDelete: false,    deadLetterExchange: 'dlx.exchange',    deadLetterRoutingKey: 'orders.paid.dlq',  },  notifications: {    name: 'notifications.queue',    durable: true,    exclusive: false,    autoDelete: false,    maxLength: 10000,  },  deadLetter: {    name: 'dead-letter.queue',    durable: true,    exclusive: false,    autoDelete: false,  },};

2. Conexión con RabbitMQ

typescript
// src/infrastructure/messaging/RabbitMQConnection.tsimport amqp, { Connection, Channel, ConfirmChannel } from 'amqplib';import { EventEmitter } from 'events'; export class RabbitMQConnection extends EventEmitter {  private connection: Connection | null = null;  private channel: ConfirmChannel | null = null;  private reconnectAttempts = 0;  private isConnecting = false;   constructor(    private readonly config: RabbitMQConfig,    private readonly logger: Logger  ) {    super();  }   async connect(): Promise<void> {    if (this.isConnecting) return;    this.isConnecting = true;     try {      const url = this.buildConnectionUrl();      this.connection = await amqp.connect(url, {        heartbeat: this.config.heartbeat,      });       this.connection.on('error', (err) => {        this.logger.error('RabbitMQ connection error', { error: err.message });        this.emit('error', err);      });       this.connection.on('close', () => {        this.logger.warn('RabbitMQ connection closed');        this.emit('disconnected');        this.scheduleReconnect();      });       this.channel = await this.connection.createConfirmChannel();      await this.channel.prefetch(this.config.prefetch);       this.reconnectAttempts = 0;      this.isConnecting = false;      this.emit('connected');      this.logger.info('Connected to RabbitMQ');    } catch (error) {      this.isConnecting = false;      this.scheduleReconnect();      throw error;    }  }   private scheduleReconnect(): void {    if (this.reconnectAttempts >= this.config.maxReconnectAttempts) {      this.logger.error('Max reconnect attempts reached');      this.emit('maxReconnectAttempts');      return;    }     this.reconnectAttempts++;    const delay = this.config.reconnectDelay * this.reconnectAttempts;     this.logger.info(`Scheduling reconnect attempt ${this.reconnectAttempts}`, {      delayMs: delay,    });     setTimeout(() => this.connect(), delay);  }   getChannel(): ConfirmChannel {    if (!this.channel) throw new Error('Channel not initialized');    return this.channel;  }   async close(): Promise<void> {    if (this.channel) await this.channel.close();    if (this.connection) await this.connection.close();    this.logger.info('RabbitMQ connection closed gracefully');  }   isConnected(): boolean {    return this.connection !== null && this.channel !== null;  }}

3. Domain Events

typescript
// src/domain/events/DomainEvent.tsexport interface DomainEvent<T = unknown> {  eventId: string;  eventType: string;  aggregateId: string;  aggregateType: string;  timestamp: Date;  version: number;  payload: T;  metadata: EventMetadata;} export interface EventMetadata {  correlationId: string;  causationId?: string;  userId?: string;  traceId?: string;  spanId?: string;} // src/domain/events/OrderCreatedEvent.tsexport interface OrderCreatedPayload {  orderId: string;  customerId: string;  items: Array<{    productId: string;    quantity: number;    price: number;  }>;  totalAmount: number;  currency: string;  shippingAddress: {    street: string;    city: string;    state: string;    zipCode: string;    country: string;  };} export class OrderCreatedEvent extends BaseDomainEvent<OrderCreatedPayload> {  static readonly EVENT_TYPE = 'order.created';   constructor(payload: OrderCreatedPayload, metadata: EventMetadata) {    super(      OrderCreatedEvent.EVENT_TYPE,      payload.orderId,      'Order',      payload,      metadata    );  }}

4. Consumer con Retry y Dead Letter

typescript
// src/infrastructure/messaging/RabbitMQConsumer.tsexport class RabbitMQConsumer {  private channel: ConfirmChannel | null = null;  private handlers: Map<string, IMessageHandler> = new Map();   constructor(    private readonly connection: RabbitMQConnection,    private readonly serializer: MessageSerializer,    private readonly logger: Logger,    private readonly options: ConsumerOptions  ) {    this.connection.on('connected', () => {      this.channel = this.connection.getChannel();    });  }   registerHandler(eventType: string, handler: IMessageHandler): void {    this.handlers.set(eventType, handler);    this.logger.info(`Handler registered for event type: ${eventType}`);  }   async start(): Promise<void> {    if (!this.channel) throw new Error('Channel not available');     await this.channel.consume(      this.options.queue,      (msg) => this.handleMessage(msg),      { noAck: this.options.noAck ?? false }    );     this.logger.info(`Consumer started for queue: ${this.options.queue}`);  }   private async handleMessage(msg: ConsumeMessage | null): Promise<void> {    if (!msg || !this.channel) return;     const startTime = Date.now();    const eventType = msg.properties.type;     try {      const event = this.serializer.deserialize<DomainEvent>(msg.content);      const handler = this.handlers.get(eventType);       if (!handler) {        this.logger.warn(`No handler found for event type: ${eventType}`);        this.channel.ack(msg);        return;      }       await handler.handle(event);      this.channel.ack(msg);       this.logger.info('Message processed successfully', {        eventType,        durationMs: Date.now() - startTime,      });    } catch (error) {      await this.handleError(msg, error as Error);    }  }   private async handleError(msg: ConsumeMessage, error: Error): Promise<void> {    const retryCount = this.getRetryCount(msg);    const maxRetries = this.options.maxRetries ?? 3;     if (error instanceof RetryableError && retryCount < maxRetries) {      // Será reencolado con delay via dead letter exchange      this.channel!.nack(msg, false, false);    } else {      // Enviar a dead letter queue permanentemente      this.channel!.nack(msg, false, false);      this.logger.warn('Message sent to dead letter queue', {        reason: retryCount >= maxRetries ? 'max retries exceeded' : 'non-retryable error',      });    }  }}

Patrones de Mensajería

Saga Pattern

typescript
// src/application/sagas/OrderSaga.tsinterface SagaStep<T> {  name: string;  execute: (data: T) => Promise<void>;  compensate: (data: T) => Promise<void>;} export class OrderSaga {  private steps: SagaStep<OrderSagaData>[] = [];  private executedSteps: SagaStep<OrderSagaData>[] = [];   async execute(data: OrderSagaData): Promise<void> {    this.logger.info('Starting order saga', { orderId: data.orderId });     for (const step of this.steps) {      try {        await step.execute(data);        this.executedSteps.push(step);      } catch (error) {        await this.compensate(data);        throw error;      }    }  }   private async compensate(data: OrderSagaData): Promise<void> {    // Compensar en orden inverso    for (const step of this.executedSteps.reverse()) {      try {        await step.compensate(data);      } catch (error) {        this.logger.error(`Compensation failed for step: ${step.name}`);      }    }  }}

Outbox Pattern

typescript
// src/infrastructure/outbox/OutboxProcessor.tsexport class OutboxProcessor {  private isRunning = false;   constructor(    private readonly repository: OutboxRepository,    private readonly publisher: IMessagePublisher,    private readonly intervalMs: number = 1000  ) {}   start(): void {    if (this.isRunning) return;    this.isRunning = true;    setInterval(() => this.processMessages(), this.intervalMs);  }   private async processMessages(): Promise<void> {    const messages = await this.repository.getUnprocessed(100);     for (const message of messages) {      try {        const event = JSON.parse(message.payload);        await this.publisher.publish(          `${message.aggregateType.toLowerCase()}.exchange`,          message.eventType,          event        );        await this.repository.markAsProcessed(message.id);      } catch (error) {        await this.repository.incrementRetry(message.id);      }    }  }}

Observabilidad

Métricas Prometheus

typescript
// src/infrastructure/metrics/MessagingMetrics.tsexport class MessagingMetrics {  private readonly messagesPublished: Counter;  private readonly messagesConsumed: Counter;  private readonly messagesFailed: Counter;  private readonly messageProcessingDuration: Histogram;   recordPublish(exchange: string, routingKey: string, eventType: string): void {    this.messagesPublished.labels(exchange, routingKey, eventType).inc();  }   recordConsume(queue: string, eventType: string, status: 'success' | 'failed'): void {    this.messagesConsumed.labels(queue, eventType, status).inc();  }   recordProcessingDuration(queue: string, eventType: string, durationMs: number): void {    this.messageProcessingDuration.labels(queue, eventType).observe(durationMs / 1000);  }}

Checklist de Producción

Configuración

  • Credenciales seguras (no hardcoded)
  • Virtual hosts separados por ambiente
  • Prefetch configurado adecuadamente
  • Heartbeat habilitado
  • TLS configurado

Resiliencia

  • Dead Letter Queues configuradas
  • Retry policies implementadas
  • Circuit breaker para publicación
  • Reconnect automático
  • Graceful shutdown

Observabilidad

  • Métricas Prometheus
  • Logs estructurados
  • Alertas para colas llenas
  • Alertas para mensajes en DLQ
  • Tracing distribuido

Conclusión

La comunicación asíncrona con RabbitMQ es fundamental para construir microservicios resilientes y escalables. Los puntos clave son:

  1. Desacoplamiento: Servicios no necesitan estar disponibles simultáneamente
  2. Resiliencia: Dead Letter Queues y retry policies garantizan que los mensajes no se pierdan
  3. Escalabilidad: Consumers pueden escalarse independientemente
  4. Observabilidad: Métricas y logs permiten identificar problemas rápidamente

En el próximo artículo, exploraremos cómo implementar observabilidad completa con OpenTelemetry, Prometheus y Grafana para tus microservicios: Observabilidad con OpenTelemetry.

PF
Sobre el autor

Pedro Farbo

Platform Engineering Lead & Solutions Architect con +10 años de experiencia. CEO de Farbo TSC. Especialista en Microservicios, Kong, Backstage y Cloud.

¿Te gustó el contenido? ¡Tu contribución ayuda a mantener todo online y gratuito!

PIX:0737160d-e98f-4a65-8392-5dba70e7ff3e