Skip to content
← Event-Driven · intermediate · 11 min · 02 / 06

Pub/Sub Patterns

Topics, consumer groups, fan-out, filtering, and the delivery guarantees that determine what your subscribers can depend on.

pub/subconsumer groupsfan-outKafkaSNSdelivery guarantees

Real-World Analogy

A radio broadcast: the station (publisher) transmits once on a frequency (topic). Anyone with a receiver tuned to that frequency (subscriber) gets it — the station doesn’t know or care how many people are listening. A consumer group is like a household with one radio: everyone in the household hears the same broadcast once, not once per person.

Core Concepts

Publisher: Produces events to a topic. Knows nothing about subscribers.

Topic: Named channel. Messages are sent to a topic, not to a specific subscriber.

Subscriber: Consumes events from one or more topics. Declares interest, receives matching events.

Consumer Group: Multiple instances of the same subscriber sharing the processing load. Each message is delivered to exactly one member of the group.

Topic: order-events

┌─────────────────────────────────────┐
│     Consumer Group: notifications   │   ← one instance processes each message
│  [notification-service-1]           │
│  [notification-service-2]           │
│  [notification-service-3]           │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│     Consumer Group: analytics       │   ← separate group gets its own copy
│  [analytics-service-1]              │
└─────────────────────────────────────┘

Two consumer groups on the same topic get independent copies of every message. Three instances within the same group share the load — each message goes to one of them.

Delivery Guarantees

Every pub/sub system makes a choice about what it guarantees:

At-most-once: Message delivered zero or one times. Can be lost. Fastest. Use for: metrics, telemetry, real-time dashboards where dropping a point is acceptable.

At-least-once: Message delivered one or more times. May be duplicated. Most common. Use for: anything that can be made idempotent (most business events).

Exactly-once: Message delivered exactly once. Most expensive. Use for: financial transactions, inventory deductions where duplicates cause real harm.

Most systems offer at-least-once and require consumers to handle deduplication:

async function handleOrderPlaced(event: EventEnvelope<OrderPlaced>): Promise<void> {
  // Idempotent: check if we already processed this event
  const alreadyProcessed = await db.processedEvents.exists(event.id);
  if (alreadyProcessed) {
    logger.info({ eventId: event.id }, 'Duplicate event, skipping');
    return;
  }

  await db.transaction(async (tx) => {
    // Process the event
    await tx.notifications.create({ userId: event.data.userId, type: 'order-placed' });

    // Mark as processed — atomic with the processing
    await tx.processedEvents.insert({ id: event.id, processedAt: new Date() });
  });
}

Fan-Out

One event → many subscribers, each doing different work:

OrderPlaced
  ├── notifications-service: send confirmation email
  ├── inventory-service: reserve items
  ├── analytics-service: update sales dashboard
  ├── fraud-service: check for suspicious patterns
  └── loyalty-service: award points

Each subscriber handles independently, fails independently, scales independently. Adding a new subscriber (e.g., a new loyalty program) requires zero changes to the order service.

Implementing fan-out with SNS + SQS (AWS):

import { SNS, SQS } from 'aws-sdk';

const sns = new SNS();
const sqs = new SQS();

// Publisher: sends to SNS topic
async function publishOrderPlaced(order: Order): Promise<void> {
  await sns.publish({
    TopicArn: process.env.ORDER_EVENTS_TOPIC_ARN!,
    Message: JSON.stringify({
      id: crypto.randomUUID(),
      type: 'OrderPlaced',
      version: 1,
      timestamp: new Date().toISOString(),
      data: {
        orderId: order.id,
        userId: order.userId,
        totalAmount: order.totalAmount,
      },
    }),
    MessageAttributes: {
      eventType: {
        DataType: 'String',
        StringValue: 'OrderPlaced',
      },
    },
  }).promise();
}

// Each subscriber has its own SQS queue subscribed to the SNS topic
// SNS automatically delivers to all subscribed queues
// Subscribers poll their own queue independently
async function processNotificationQueue(): Promise<void> {
  while (true) {
    const { Messages } = await sqs.receiveMessage({
      QueueUrl: process.env.NOTIFICATIONS_QUEUE_URL!,
      MaxNumberOfMessages: 10,
      WaitTimeSeconds: 20, // long polling
    }).promise();

    for (const message of Messages ?? []) {
      const event = JSON.parse(JSON.parse(message.Body!).Message);
      await handleOrderPlaced(event);

      await sqs.deleteMessage({
        QueueUrl: process.env.NOTIFICATIONS_QUEUE_URL!,
        ReceiptHandle: message.ReceiptHandle!,
      }).promise();
    }
  }
}

Message Filtering

Subscribers can filter to only receive events they care about — no need to receive and discard irrelevant events:

// SNS filter policy: only receive OrderPlaced with amount > $100
const filterPolicy = {
  eventType: ['OrderPlaced'],
  // Can't filter on nested fields with SNS filter policies directly
  // Use message attributes for filterable fields
};

// Publish with filterable attributes
await sns.publish({
  TopicArn: TOPIC_ARN,
  Message: JSON.stringify(event),
  MessageAttributes: {
    eventType: { DataType: 'String', StringValue: 'OrderPlaced' },
    orderAmount: { DataType: 'Number', StringValue: String(order.totalAmount) },
    plan: { DataType: 'String', StringValue: user.plan },
  },
}).promise();

// Subscription filter: VIP orders to a dedicated queue
// {
//   "plan": ["enterprise", "pro"],
//   "orderAmount": [{ "numeric": [">=", 1000] }]
// }

Kafka: Durable, Ordered, Replayable

Kafka is not just a message queue — it’s a persistent log. Messages are retained (configurable, often 7-30 days) and consumers can replay from any offset. This changes what’s possible:

import { Kafka, Consumer, Producer } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['kafka:9092'],
});

// Producer
const producer: Producer = kafka.producer();
await producer.connect();

await producer.send({
  topic: 'order-events',
  messages: [{
    key: order.userId,         // partition by user — ordering per user guaranteed
    value: JSON.stringify(event),
    headers: { eventType: 'OrderPlaced' },
  }],
});

// Consumer
const consumer: Consumer = kafka.consumer({ groupId: 'notifications-service' });
await consumer.connect();
await consumer.subscribe({ topic: 'order-events', fromBeginning: false });

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    const event = JSON.parse(message.value!.toString());

    try {
      await handleEvent(event);
      // Kafka commits offset after successful processing
      // On restart, consumer picks up from committed offset
    } catch (err) {
      // Don't commit — message will be redelivered
      logger.error({ event, err }, 'Failed to process event');
      throw err;
    }
  },
});

Kafka’s key properties:

  • Ordering: Messages with the same partition key are strictly ordered
  • Durability: Messages persisted to disk, replicated across brokers
  • Replay: Consumers can seek to any offset and reprocess history
  • Throughput: Millions of messages/second on modest hardware

When Kafka over SNS/SQS:

  • Need message ordering within a partition
  • Need to replay events (fix a bug in a consumer, reprocess historical data)
  • Need to share events across teams/systems with different retention needs
  • Throughput exceeds what managed queues handle economically

Ordering Guarantees

Ordering is only guaranteed within a partition (Kafka) or within a single FIFO queue (SQS FIFO). Cross-partition ordering is not guaranteed.

// Kafka: partition by user ID for per-user ordering
await producer.send({
  topic: 'user-events',
  messages: [{
    key: userId,   // all events for this user go to the same partition → ordered
    value: JSON.stringify(event),
  }],
});

// If you need global ordering: use a single partition
// Trade-off: single partition = single-threaded consumers = limited throughput

For most business events, per-entity ordering (all events for order_123 in order) is sufficient and achievable. Global ordering across all events usually isn’t needed and isn’t worth the throughput cost.

Dead-Letter Topics

Messages that fail processing after retries go to a dead-letter topic for investigation:

// Consumer with DLQ
await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    const event = JSON.parse(message.value!.toString());

    for (let attempt = 1; attempt <= 3; attempt++) {
      try {
        await handleEvent(event);
        return;
      } catch (err) {
        if (attempt === 3) {
          // Send to DLQ after 3 failures
          await dlqProducer.send({
            topic: `${topic}.dlq`,
            messages: [{
              key: message.key,
              value: message.value,
              headers: {
                ...message.headers,
                'x-original-topic': topic,
                'x-failure-reason': String(err),
                'x-failed-at': new Date().toISOString(),
              },
            }],
          });
          return; // don't rethrow — let consumer continue
        }
        await sleep(1000 * Math.pow(2, attempt));
      }
    }
  },
});

Monitor DLQ depth — a growing DLQ signals a consumer bug or a schema mismatch between producer and consumer.