Skip to content
← Messaging · intermediate · 11 min · 05 / 06

Messaging Patterns

Saga, inbox/outbox, event-driven choreography vs orchestration — the patterns that make distributed systems reliable despite partial failure.

sagaoutboxchoreographyorchestrationidempotencytransactional messaging

Real-World Analogy

A multi-department approval process: you submit a request, it moves through HR, Finance, and Legal sequentially (orchestration — a coordinator tracks state), or each department gets a copy and acts independently while publishing their own decisions (choreography — no central coordinator). The saga pattern handles what happens when Finance approves but Legal rejects: compensate what already happened.

The Dual-Write Problem

The most common reliability mistake: writing to a database AND publishing an event in two separate operations.

// WRONG — dual write
async function createOrder(order: Order) {
  await db.insert('orders', order);          // succeeds
  await kafka.publish('orders', order);       // crash here → event never sent
  // OR:
  await kafka.publish('orders', order);       // succeeds
  await db.insert('orders', order);           // crash here → event sent but no DB record
}

If the process crashes between the two operations, you have an inconsistency: DB and message broker are out of sync.

Outbox Pattern

Write the event to the database in the same transaction as the business data. A separate process reads undelivered events and publishes them.

-- outbox table
CREATE TABLE outbox (
  id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  topic       TEXT NOT NULL,
  key         TEXT,
  payload     JSONB NOT NULL,
  created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  published_at TIMESTAMPTZ   -- NULL until delivered
);
// Atomic: business write + event write in one transaction
async function createOrder(order: Order) {
  await db.transaction(async (tx) => {
    await tx.query(
      'INSERT INTO orders (id, customer_id, total) VALUES ($1, $2, $3)',
      [order.id, order.customerId, order.total]
    );
    
    await tx.query(
      'INSERT INTO outbox (topic, key, payload) VALUES ($1, $2, $3)',
      ['orders', order.id, JSON.stringify({ event: 'order.created', ...order })]
    );
  });
}

// Outbox publisher — runs separately, polls for undelivered events
async function publishOutbox() {
  while (true) {
    const rows = await db.query(
      `SELECT * FROM outbox
       WHERE published_at IS NULL
       ORDER BY created_at
       LIMIT 100
       FOR UPDATE SKIP LOCKED`
    );
    
    for (const row of rows.rows) {
      await kafka.publish(row.topic, { key: row.key, value: row.payload });
      
      await db.query(
        'UPDATE outbox SET published_at = NOW() WHERE id = $1',
        [row.id]
      );
    }
    
    await sleep(1000);
  }
}

FOR UPDATE SKIP LOCKED lets multiple publisher instances run without duplicate publishing — each row is claimed by one publisher.

Cleanup: delete published rows after a retention window:

DELETE FROM outbox WHERE published_at < NOW() - INTERVAL '7 days';

Use Debezium for the publisher instead of polling — CDC watches the Postgres WAL and publishes outbox rows to Kafka automatically (zero polling delay).

Inbox Pattern

Prevent duplicate processing when a consumer receives the same message twice (at-least-once delivery):

CREATE TABLE inbox (
  message_id  TEXT PRIMARY KEY,
  topic       TEXT NOT NULL,
  processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
async function handleOrder(msg: KafkaMessage) {
  const messageId = `${msg.topic}-${msg.partition}-${msg.offset}`;
  
  await db.transaction(async (tx) => {
    // Check if already processed
    const result = await tx.query(
      'INSERT INTO inbox (message_id, topic) VALUES ($1, $2) ON CONFLICT DO NOTHING RETURNING message_id',
      [messageId, msg.topic]
    );
    
    // No rows returned = conflict = already processed
    if (result.rows.length === 0) return;
    
    const order = JSON.parse(msg.value!.toString());
    await processOrder(tx, order);
  });
}

The ON CONFLICT DO NOTHING combined with RETURNING makes the duplicate check atomic. No separate SELECT needed.

Choreography

Services react to events from other services — no central coordinator.

OrderService publishes order.created
  → PaymentService (subscribes) charges card, publishes payment.completed
    → FulfillmentService (subscribes) ships order, publishes order.shipped
      → NotificationService (subscribes) sends email

Pros: loose coupling, no SPOF coordinator, easy to add new services.

Cons: hard to trace a saga across services, hard to answer “what’s the current state of order 123?”, failure recovery requires each service to handle compensating events.

// Each service is autonomous
class PaymentService {
  async onOrderCreated(event: OrderCreated) {
    try {
      const payment = await chargeCard(event.customerId, event.total);
      await publish('payment.completed', { orderId: event.orderId, paymentId: payment.id });
    } catch {
      await publish('payment.failed', { orderId: event.orderId, reason: 'card_declined' });
    }
  }
  
  async onOrderCancelled(event: OrderCancelled) {
    // Compensation: refund if payment was taken
    await refundPayment(event.orderId);
  }
}

Orchestration (Saga)

A central coordinator (the saga) tracks the state of a distributed transaction and directs each step.

// Saga state machine
interface OrderSagaState {
  orderId: string;
  step: 'payment' | 'inventory' | 'fulfillment' | 'completed' | 'failed';
  paymentId?: string;
  compensations: Array<() => Promise<void>>;
}

class OrderSaga {
  async execute(order: Order): Promise<void> {
    const state: OrderSagaState = {
      orderId: order.id,
      step: 'payment',
      compensations: [],
    };

    try {
      // Step 1: Payment
      const payment = await paymentClient.charge(order);
      state.paymentId = payment.id;
      state.compensations.push(() => paymentClient.refund(payment.id));

      // Step 2: Reserve inventory
      await inventoryClient.reserve(order.items);
      state.compensations.push(() => inventoryClient.release(order.items));

      // Step 3: Fulfill
      await fulfillmentClient.ship(order);
      state.step = 'completed';

    } catch (err) {
      state.step = 'failed';
      // Run compensations in reverse order
      for (const compensate of state.compensations.reverse()) {
        await compensate().catch(console.error);  // best-effort
      }
      throw err;
    }
  }
}

Pros: clear state, easy to reason about, one place to handle failures.

Cons: the saga coordinator is a SPOF (mitigated by persisting state), tighter coupling to step order.

For durable sagas (survive process restart), persist state to a database:

CREATE TABLE sagas (
  id         UUID PRIMARY KEY,
  type       TEXT NOT NULL,
  state      JSONB NOT NULL,
  status     TEXT NOT NULL DEFAULT 'running',  -- running, completed, failed
  created_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ DEFAULT NOW()
);

Temporal (temporal.io) is a purpose-built durable workflow engine that makes saga implementation with automatic replay, retries, and state persistence trivial.

Competing Consumers

Scale message processing by running multiple worker instances against the same queue:

Queue: [msg1, msg2, msg3, msg4, msg5]
  Worker 1 processes: msg1, msg3, msg5
  Worker 2 processes: msg2, msg4

Works automatically with:

  • RabbitMQ: multiple consumers on the same queue
  • Kafka: multiple consumers in the same consumer group (up to partition count)
  • NATS JetStream: multiple pull consumers on same durable

The key invariant: each message processed by exactly one worker. Guaranteed by the broker’s locking semantics.

Fan-Out

One event consumed by multiple independent services:

Per-consumer queues (RabbitMQ):

// Exchange with one binding per service
await ch.assertExchange('orders', 'topic', { durable: true });

// Each service gets its own queue
await ch.assertQueue('orders.payment', { durable: true });
await ch.assertQueue('orders.analytics', { durable: true });
await ch.assertQueue('orders.notifications', { durable: true });

await ch.bindQueue('orders.payment', 'orders', 'created');
await ch.bindQueue('orders.analytics', 'orders', 'created');
await ch.bindQueue('orders.notifications', 'orders', 'created');

Kafka: multiple consumer groups automatically achieve fan-out. Each group reads all messages independently.

// payment-service group — reads all messages
const paymentConsumer = kafka.consumer({ groupId: 'payment-service' });

// analytics-service group — reads same messages independently
const analyticsConsumer = kafka.consumer({ groupId: 'analytics-service' });

Poison Pills

A message that always causes consumer failure, blocking the queue.

Detection:

ch.consume('orders', async (msg) => {
  const attempt = (msg.properties.headers['x-attempt'] || 0) as number;
  
  try {
    await processOrder(JSON.parse(msg.content.toString()));
    ch.ack(msg);
  } catch (err) {
    if (attempt >= 3) {
      // Poison pill — move to DLQ with diagnostic headers
      ch.publish('orders.dlx', 'created', msg.content, {
        headers: {
          ...msg.properties.headers,
          'x-failed-reason': err.message,
          'x-failed-at': new Date().toISOString(),
        }
      });
      ch.ack(msg);
    } else {
      // Retry
      ch.publish('orders', 'created', msg.content, {
        headers: { 'x-attempt': attempt + 1 }
      });
      ch.ack(msg);
    }
  }
});

Always have a DLQ. A queue without a DLQ eventually blocks on a poison pill indefinitely.