Skip to content
← System Design · advanced · 36 min · 25 / 26

Case Study: Live Streaming Platform

Design and build a production live streaming system with RTMP ingest, real-time transcoding, HLS delivery, chat integration, and viewer scaling.

live streamingRTMPreal-time transcodingHLSviewer scalinglow latency

What Makes Live Streaming Hard?

Live streaming combines the hardest problems in distributed systems: real-time processing (no do-overs — if you drop a frame, it’s gone), massive concurrent viewers (Twitch peaks at 30M+ simultaneous viewers), low latency (2-10 seconds from camera to screen), adaptive quality (viewers on 5G and 3G watch the same stream), and chat synchronization (chat messages should align with what’s happening on screen).

Real-World Analogy

Like a live TV broadcast of a sports match — cameras capture the action, the production truck mixes feeds, and the signal goes to millions of viewers simultaneously with minimal delay.

Think of it like broadcasting live TV, but every viewer can choose their own quality level, and the broadcast infrastructure must scale from 0 to millions of viewers in seconds when a popular streamer goes live. Unlike VOD where you process the entire video before anyone watches, live streaming must transcode, package, and distribute each second of video as it’s created.

Live Streaming Architecture
Broadcaster
RTMP Ingest
--->
Edge Ingest
Nearest PoP
--->
Transcoder
Multi-Quality
v
Origin Server
HLS Segments
--->
CDN Edge
Global Delivery
--->
Viewers
HLS Player

Requirements

  • Functional: RTMP ingest from OBS/Streamlabs, real-time transcoding to multiple qualities, HLS delivery, stream key authentication, live chat, viewer count, stream recording (DVR), go-live/end-stream lifecycle
  • Non-functional: Glass-to-glass latency under 5 seconds, support 100K+ concurrent viewers per stream, 99.95% uptime, auto-scaling CDN
  • Scale: 10K concurrent streams, 50M concurrent viewers total

The Live Streaming Pipeline

Ingest

Broadcasters send video from OBS, Streamlabs, or mobile apps using RTMP (Real-Time Messaging Protocol). RTMP is used for ingest (not delivery) because it provides low-latency, reliable, bidirectional communication with wide encoder support. The stream connects to the nearest Point of Presence (PoP) to minimize upload latency. The stream key serves as authentication — it’s a secret token the broadcaster gets from the platform.

Real-Time Transcoding

As RTMP data arrives, it’s transcoded into multiple quality levels in real-time. Unlike VOD (where you can split and parallelize), live transcoding is sequential — you must process segment N before segment N+1. Each output quality must have keyframe-aligned segments so players can switch between qualities at any segment boundary without visual artifacts.

HLS Packaging

Transcoded segments are packaged as HLS — .ts video segments (typically 2-4 seconds each) with .m3u8 playlists. The playlist uses a sliding window that keeps only the last N segments (e.g., last 30 seconds). New segments are appended, old ones are removed. The EXT-X-MEDIA-SEQUENCE tag tells players which segment number comes next.

CDN Distribution

Segments are pushed to CDN origin servers, which propagate to edge nodes worldwide. When 100K viewers request the same segment, only one request reaches the origin — the CDN serves cached copies from edge nodes. This is why HLS (HTTP-based) won over RTMP for delivery: HTTP content is trivially cacheable by existing CDN infrastructure.

Low Latency vs Ultra-Low Latency

ApproachLatencyHow It WorksUse Case
Standard HLS10-30s3 segments × 6s + bufferVOD-like live (sports replays)
Low-Latency HLS (LL-HLS)2-5sPartial segments + blocking playlistInteractive streams (Twitch)
WebRTCunder 1sPeer-to-peer, no segmentsVideo calls, auctions

Standard HLS has high latency because the player buffers 3 segments before playing (to handle network jitter). LL-HLS solves this with partial segments — instead of waiting for a full 6-second segment, the player can start playing after receiving just 200ms of data. The playlist uses blocking reload — the player’s request blocks at the CDN until the next partial segment is ready, eliminating polling delay.

Live Chat at Scale

Chat is deceptively complex at live-streaming scale. A popular stream with 100K viewers generates thousands of chat messages per second. The chat system must: fan out messages to all viewers in the room (100K WebSocket connections), rate limit senders (slow mode: 1 message every 3 seconds), synchronize with video (chat timestamps align with stream time), and handle moderation (ban, timeout, delete messages).

Building the Live Streaming Platform

import http from "node:http";
import crypto from "node:crypto";

// ===========================================
// 1. TYPES
// ===========================================
type StreamStatus = "idle" | "live" | "ended";

interface Stream {
  id: string;
  title: string;
  streamKey: string;
  status: StreamStatus;
  startedAt: string | null;
  endedAt: string | null;
  broadcasterId: string;
  quality: string[];
  viewerCount: number;
  chatEnabled: boolean;
  recordingEnabled: boolean;
}

interface Segment {
  streamId: string;
  quality: string;
  sequenceNum: number;
  duration: number; // seconds
  createdAt: number;
  data: string; // placeholder for actual .ts data
}

interface ChatMessage {
  id: string;
  streamId: string;
  userId: string;
  username: string;
  content: string;
  timestamp: number;
  streamTime: number; // seconds since stream start
}

interface ViewerSession {
  userId: string;
  streamId: string;
  quality: string;
  connectedAt: number;
  lastHeartbeat: number;
}

// ===========================================
// 2. STREAM MANAGER
// ===========================================
class StreamManager {
  private streams = new Map<string, Stream>();
  private streamsByKey = new Map<string, string>(); // streamKey -> streamId

  create(broadcasterId: string, title: string): Stream {
    const stream: Stream = {
      id: crypto.randomUUID().slice(0, 8),
      title,
      streamKey: `live_${crypto.randomBytes(16).toString("hex")}`,
      status: "idle",
      startedAt: null, endedAt: null,
      broadcasterId,
      quality: ["1080p", "720p", "480p", "360p"],
      viewerCount: 0,
      chatEnabled: true, recordingEnabled: true,
    };
    this.streams.set(stream.id, stream);
    this.streamsByKey.set(stream.streamKey, stream.id);
    return stream;
  }

  authenticate(streamKey: string): Stream | null {
    const streamId = this.streamsByKey.get(streamKey);
    if (!streamId) return null;
    return this.streams.get(streamId) || null;
  }

  goLive(streamId: string): Stream {
    const stream = this.streams.get(streamId);
    if (!stream) throw new Error("Stream not found");
    if (stream.status === "live") throw new Error("Already live");
    stream.status = "live";
    stream.startedAt = new Date().toISOString();
    console.log(`[STREAM] ${stream.id} is LIVE — ${stream.title}`);
    return stream;
  }

  endStream(streamId: string): Stream {
    const stream = this.streams.get(streamId);
    if (!stream) throw new Error("Stream not found");
    if (stream.status !== "live") throw new Error("Not live");
    stream.status = "ended";
    stream.endedAt = new Date().toISOString();
    console.log(`[STREAM] ${stream.id} ended`);
    return stream;
  }

  get(id: string): Stream | null {
    return this.streams.get(id) || null;
  }

  getLive(): Stream[] {
    return [...this.streams.values()].filter(s => s.status === "live");
  }
}

// ===========================================
// 3. SEGMENT BUFFER (Sliding Window HLS)
// ===========================================
class SegmentBuffer {
  private segments = new Map<string, Segment[]>(); // "streamId:quality" -> segments
  private readonly windowSize = 15; // keep last 15 segments (~30-60 seconds)
  private sequenceCounters = new Map<string, number>();

  addSegment(streamId: string, quality: string, duration: number): Segment {
    const key = `${streamId}:${quality}`;
    const seqNum = (this.sequenceCounters.get(key) || 0) + 1;
    this.sequenceCounters.set(key, seqNum);

    const segment: Segment = {
      streamId, quality, sequenceNum: seqNum,
      duration, createdAt: Date.now(),
      data: `segment_${seqNum}_${quality}.ts`,
    };

    const list = this.segments.get(key) || [];
    list.push(segment);

    // Sliding window: remove old segments
    while (list.length > this.windowSize) list.shift();
    this.segments.set(key, list);

    return segment;
  }

  getSegments(streamId: string, quality: string): Segment[] {
    return this.segments.get(`${streamId}:${quality}`) || [];
  }

  generatePlaylist(streamId: string, quality: string): string {
    const segments = this.getSegments(streamId, quality);
    if (segments.length === 0) return "";

    const firstSeq = segments[0].sequenceNum;
    let playlist = "#EXTM3U\n#EXT-X-VERSION:3\n";
    playlist += `#EXT-X-TARGETDURATION:4\n`;
    playlist += `#EXT-X-MEDIA-SEQUENCE:${firstSeq}\n\n`;

    for (const seg of segments) {
      playlist += `#EXTINF:${seg.duration.toFixed(3)},\n`;
      playlist += `${seg.data}\n`;
    }
    return playlist;
  }

  generateMasterPlaylist(streamId: string, qualities: string[]): string {
    const bandwidths: Record<string, number> = {
      "1080p": 5000000, "720p": 2500000, "480p": 1000000, "360p": 500000,
    };
    const resolutions: Record<string, string> = {
      "1080p": "1920x1080", "720p": "1280x720", "480p": "854x480", "360p": "640x360",
    };

    let manifest = "#EXTM3U\n#EXT-X-VERSION:3\n\n";
    for (const q of qualities) {
      manifest += `#EXT-X-STREAM-INF:BANDWIDTH=${bandwidths[q] || 1000000},RESOLUTION=${resolutions[q] || "640x360"}\n`;
      manifest += `${q}/playlist.m3u8\n\n`;
    }
    return manifest;
  }
}

// ===========================================
// 4. VIEWER MANAGER
// ===========================================
class ViewerManager {
  private viewers = new Map<string, ViewerSession>(); // viewerId -> session
  private readonly timeout = 30000; // 30s heartbeat timeout

  join(userId: string, streamId: string, quality: string): void {
    this.viewers.set(`${userId}:${streamId}`, {
      userId, streamId, quality, connectedAt: Date.now(), lastHeartbeat: Date.now(),
    });
  }

  leave(userId: string, streamId: string): void {
    this.viewers.delete(`${userId}:${streamId}`);
  }

  heartbeat(userId: string, streamId: string): void {
    const key = `${userId}:${streamId}`;
    const session = this.viewers.get(key);
    if (session) session.lastHeartbeat = Date.now();
  }

  getCount(streamId: string): number {
    const now = Date.now();
    let count = 0;
    for (const [, session] of this.viewers) {
      if (session.streamId === streamId && now - session.lastHeartbeat < this.timeout) count++;
    }
    return count;
  }
}

// ===========================================
// 5. LIVE CHAT
// ===========================================
class LiveChat {
  private messages = new Map<string, ChatMessage[]>(); // streamId -> messages
  private rateLimiter = new Map<string, number>(); // "userId:streamId" -> lastMessage timestamp
  private readonly slowModeMs = 3000; // 1 message per 3 seconds
  private readonly maxHistory = 200;

  send(streamId: string, userId: string, username: string, content: string, streamStartTime: number): ChatMessage {
    const key = `${userId}:${streamId}`;
    const lastMsg = this.rateLimiter.get(key) || 0;
    if (Date.now() - lastMsg < this.slowModeMs) {
      throw new Error("Slow mode: wait before sending another message");
    }

    const msg: ChatMessage = {
      id: crypto.randomUUID().slice(0, 8),
      streamId, userId, username, content,
      timestamp: Date.now(),
      streamTime: streamStartTime > 0 ? (Date.now() - streamStartTime) / 1000 : 0,
    };

    const list = this.messages.get(streamId) || [];
    list.push(msg);
    if (list.length > this.maxHistory) list.shift();
    this.messages.set(streamId, list);
    this.rateLimiter.set(key, Date.now());

    return msg;
  }

  getRecent(streamId: string, limit = 50): ChatMessage[] {
    const list = this.messages.get(streamId) || [];
    return list.slice(-limit);
  }
}

// ===========================================
// 6. STREAM HEALTH MONITOR
// ===========================================
class HealthMonitor {
  private metrics = new Map<string, { bitrate: number; fps: number; droppedFrames: number; lastUpdate: number }>();

  update(streamId: string, bitrate: number, fps: number, droppedFrames: number): void {
    this.metrics.set(streamId, { bitrate, fps, droppedFrames, lastUpdate: Date.now() });

    if (bitrate < 500000) console.log(`[HEALTH] ⚠ Stream ${streamId}: low bitrate ${(bitrate/1000).toFixed(0)}kbps`);
    if (fps < 20) console.log(`[HEALTH] ⚠ Stream ${streamId}: low FPS ${fps}`);
    if (droppedFrames > 100) console.log(`[HEALTH] ⚠ Stream ${streamId}: ${droppedFrames} dropped frames`);
  }

  get(streamId: string) { return this.metrics.get(streamId); }
}

// ===========================================
// 7. TRANSCODER SIMULATOR
// ===========================================
class LiveTranscoder {
  private segmentBuffer: SegmentBuffer;
  private intervals = new Map<string, ReturnType<typeof setInterval>>();

  constructor(buffer: SegmentBuffer) { this.segmentBuffer = buffer; }

  startTranscoding(streamId: string, qualities: string[]): void {
    // Simulate generating segments every 2 seconds
    const interval = setInterval(() => {
      for (const quality of qualities) {
        this.segmentBuffer.addSegment(streamId, quality, 2.0);
      }
    }, 2000);
    this.intervals.set(streamId, interval);
    console.log(`[TRANSCODE] Started for stream ${streamId} — ${qualities.length} qualities`);
  }

  stopTranscoding(streamId: string): void {
    const interval = this.intervals.get(streamId);
    if (interval) clearInterval(interval);
    this.intervals.delete(streamId);
  }
}

// ===========================================
// 8. HTTP SERVER
// ===========================================
const streams = new StreamManager();
const segmentBuffer = new SegmentBuffer();
const viewers = new ViewerManager();
const chat = new LiveChat();
const health = new HealthMonitor();
const transcoder = new LiveTranscoder(segmentBuffer);

function parseBody(req: http.IncomingMessage): Promise<unknown> {
  return new Promise((resolve, reject) => {
    const chunks: Buffer[] = [];
    req.on("data", (c) => chunks.push(c));
    req.on("end", () => {
      try { resolve(JSON.parse(Buffer.concat(chunks).toString())); }
      catch { reject(new Error("Invalid JSON")); }
    });
  });
}

function json(res: http.ServerResponse, status: number, data: unknown): void {
  res.writeHead(status, { "Content-Type": "application/json" });
  res.end(JSON.stringify(data));
}

const server = http.createServer(async (req, res) => {
  const url = new URL(req.url || "/", `http://${req.headers.host}`);
  const method = req.method || "GET";

  try {
    // POST /api/streams — Create stream
    if (url.pathname === "/api/streams" && method === "POST") {
      const body = await parseBody(req) as any;
      const stream = streams.create(body.broadcasterId || "anon", body.title || "Untitled");
      json(res, 201, stream); return;
    }

    // POST /api/streams/:id/start — Go live
    const startMatch = url.pathname.match(/^\/api\/streams\/([^/]+)\/start$/);
    if (startMatch && method === "POST") {
      const body = await parseBody(req) as any;
      const stream = streams.authenticate(body.streamKey);
      if (!stream || stream.id !== startMatch[1]) {
        json(res, 401, { error: "Invalid stream key" }); return;
      }
      const live = streams.goLive(stream.id);
      transcoder.startTranscoding(stream.id, live.quality);
      json(res, 200, live); return;
    }

    // POST /api/streams/:id/end — End stream
    const endMatch = url.pathname.match(/^\/api\/streams\/([^/]+)\/end$/);
    if (endMatch && method === "POST") {
      transcoder.stopTranscoding(endMatch[1]);
      const stream = streams.endStream(endMatch[1]);
      json(res, 200, stream); return;
    }

    // GET /api/streams/:id/master.m3u8 — HLS master playlist
    const masterMatch = url.pathname.match(/^\/api\/streams\/([^/]+)\/master\.m3u8$/);
    if (masterMatch && method === "GET") {
      const stream = streams.get(masterMatch[1]);
      if (!stream || stream.status !== "live") { json(res, 404, { error: "Stream not live" }); return; }
      const manifest = segmentBuffer.generateMasterPlaylist(stream.id, stream.quality);
      res.writeHead(200, { "Content-Type": "application/vnd.apple.mpegurl" });
      res.end(manifest); return;
    }

    // GET /api/streams/:id/:quality/playlist.m3u8 — Variant playlist
    const variantMatch = url.pathname.match(/^\/api\/streams\/([^/]+)\/(\w+)\/playlist\.m3u8$/);
    if (variantMatch && method === "GET") {
      const playlist = segmentBuffer.generatePlaylist(variantMatch[1], variantMatch[2]);
      res.writeHead(200, { "Content-Type": "application/vnd.apple.mpegurl" });
      res.end(playlist); return;
    }

    // GET /api/streams/:id/viewers
    const viewerMatch = url.pathname.match(/^\/api\/streams\/([^/]+)\/viewers$/);
    if (viewerMatch && method === "GET") {
      json(res, 200, { count: viewers.getCount(viewerMatch[1]) }); return;
    }

    // POST /api/streams/:id/chat — Send chat message
    const chatMatch = url.pathname.match(/^\/api\/streams\/([^/]+)\/chat$/);
    if (chatMatch && method === "POST") {
      const body = await parseBody(req) as any;
      const stream = streams.get(chatMatch[1]);
      const startTime = stream?.startedAt ? new Date(stream.startedAt).getTime() : 0;
      const msg = chat.send(chatMatch[1], body.userId || "anon", body.username || "Anonymous", body.content, startTime);
      json(res, 201, msg); return;
    }

    // GET /api/streams/:id/chat
    if (chatMatch && method === "GET") {
      const limit = parseInt(url.searchParams.get("limit") || "50");
      json(res, 200, { messages: chat.getRecent(chatMatch[1], limit) }); return;
    }

    // GET /api/streams/live — List live streams
    if (url.pathname === "/api/streams/live" && method === "GET") {
      const live = streams.getLive().map(s => ({
        ...s, viewerCount: viewers.getCount(s.id), streamKey: undefined,
      }));
      json(res, 200, { streams: live }); return;
    }

    // GET /api/streams/:id
    const streamMatch = url.pathname.match(/^\/api\/streams\/([^/]+)$/);
    if (streamMatch && method === "GET") {
      const stream = streams.get(streamMatch[1]);
      if (!stream) { json(res, 404, { error: "Stream not found" }); return; }
      json(res, 200, { ...stream, viewerCount: viewers.getCount(stream.id), streamKey: undefined }); return;
    }

    if (url.pathname === "/health") { json(res, 200, { status: "ok" }); return; }
    json(res, 404, { error: "Not found" });
  } catch (err: any) {
    json(res, 400, { error: err.message || "Internal server error" });
  }
});

const PORT = parseInt(process.env.PORT || "3000");
server.listen(PORT, () => console.log(`Live Streaming on http://localhost:${PORT}`));
process.on("SIGTERM", () => server.close());
package main

import (
	"crypto/rand"
	"encoding/hex"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
	"os/signal"
	"regexp"
	"strconv"
	"strings"
	"sync"
	"syscall"
	"time"
)

// ===========================================
// 1. TYPES
// ===========================================
type Stream struct {
	ID           string   `json:"id"`
	Title        string   `json:"title"`
	StreamKey    string   `json:"streamKey,omitempty"`
	Status       string   `json:"status"`
	StartedAt    *string  `json:"startedAt,omitempty"`
	EndedAt      *string  `json:"endedAt,omitempty"`
	BroadcasterID string  `json:"broadcasterId"`
	Quality      []string `json:"quality"`
	ViewerCount  int      `json:"viewerCount"`
	ChatEnabled  bool     `json:"chatEnabled"`
}

type Segment struct {
	StreamID    string  `json:"streamId"`
	Quality     string  `json:"quality"`
	SequenceNum int     `json:"sequenceNum"`
	Duration    float64 `json:"duration"`
	Data        string  `json:"data"`
}

type ChatMessage struct {
	ID         string `json:"id"`
	StreamID   string `json:"streamId"`
	UserID     string `json:"userId"`
	Username   string `json:"username"`
	Content    string `json:"content"`
	Timestamp  int64  `json:"timestamp"`
	StreamTime float64 `json:"streamTime"`
}

// ===========================================
// 2. STREAM MANAGER
// ===========================================
type StreamManager struct {
	mu           sync.RWMutex
	streams      map[string]*Stream
	streamsByKey map[string]string
	counter      int
}

func NewStreamManager() *StreamManager {
	return &StreamManager{streams: make(map[string]*Stream), streamsByKey: make(map[string]string)}
}

func (sm *StreamManager) Create(broadcasterID, title string) *Stream {
	sm.mu.Lock()
	defer sm.mu.Unlock()
	sm.counter++
	keyBytes := make([]byte, 16)
	rand.Read(keyBytes)
	s := &Stream{
		ID: fmt.Sprintf("stream_%d", sm.counter), Title: title,
		StreamKey: "live_" + hex.EncodeToString(keyBytes), Status: "idle",
		BroadcasterID: broadcasterID, Quality: []string{"1080p", "720p", "480p", "360p"},
		ChatEnabled: true,
	}
	sm.streams[s.ID] = s
	sm.streamsByKey[s.StreamKey] = s.ID
	return s
}

func (sm *StreamManager) Auth(key string) *Stream {
	sm.mu.RLock()
	defer sm.mu.RUnlock()
	if id, ok := sm.streamsByKey[key]; ok { return sm.streams[id] }
	return nil
}

func (sm *StreamManager) GoLive(id string) (*Stream, error) {
	sm.mu.Lock()
	defer sm.mu.Unlock()
	s := sm.streams[id]
	if s == nil { return nil, fmt.Errorf("not found") }
	if s.Status == "live" { return nil, fmt.Errorf("already live") }
	s.Status = "live"
	now := time.Now().UTC().Format(time.RFC3339)
	s.StartedAt = &now
	log.Printf("[STREAM] %s is LIVE — %s", s.ID, s.Title)
	return s, nil
}

func (sm *StreamManager) EndStream(id string) (*Stream, error) {
	sm.mu.Lock()
	defer sm.mu.Unlock()
	s := sm.streams[id]
	if s == nil { return nil, fmt.Errorf("not found") }
	s.Status = "ended"
	now := time.Now().UTC().Format(time.RFC3339)
	s.EndedAt = &now
	return s, nil
}

func (sm *StreamManager) Get(id string) *Stream {
	sm.mu.RLock()
	defer sm.mu.RUnlock()
	return sm.streams[id]
}

func (sm *StreamManager) GetLive() []*Stream {
	sm.mu.RLock()
	defer sm.mu.RUnlock()
	var result []*Stream
	for _, s := range sm.streams {
		if s.Status == "live" { result = append(result, s) }
	}
	return result
}

// ===========================================
// 3. SEGMENT BUFFER
// ===========================================
type SegmentBuffer struct {
	mu       sync.RWMutex
	segments map[string][]Segment // "streamId:quality" -> segments
	seqNums  map[string]int
}

func NewSegmentBuffer() *SegmentBuffer {
	return &SegmentBuffer{segments: make(map[string][]Segment), seqNums: make(map[string]int)}
}

func (sb *SegmentBuffer) Add(streamID, quality string, duration float64) {
	sb.mu.Lock()
	defer sb.mu.Unlock()
	key := streamID + ":" + quality
	sb.seqNums[key]++
	seg := Segment{StreamID: streamID, Quality: quality, SequenceNum: sb.seqNums[key],
		Duration: duration, Data: fmt.Sprintf("segment_%d_%s.ts", sb.seqNums[key], quality)}
	list := sb.segments[key]
	list = append(list, seg)
	if len(list) > 15 { list = list[len(list)-15:] }
	sb.segments[key] = list
}

func (sb *SegmentBuffer) GetPlaylist(streamID, quality string) string {
	sb.mu.RLock()
	defer sb.mu.RUnlock()
	segs := sb.segments[streamID+":"+quality]
	if len(segs) == 0 { return "" }
	var b strings.Builder
	fmt.Fprintf(&b, "#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-TARGETDURATION:4\n#EXT-X-MEDIA-SEQUENCE:%d\n\n", segs[0].SequenceNum)
	for _, s := range segs {
		fmt.Fprintf(&b, "#EXTINF:%.3f,\n%s\n", s.Duration, s.Data)
	}
	return b.String()
}

func (sb *SegmentBuffer) GetMasterPlaylist(streamID string, qualities []string) string {
	bw := map[string]int{"1080p": 5000000, "720p": 2500000, "480p": 1000000, "360p": 500000}
	res := map[string]string{"1080p": "1920x1080", "720p": "1280x720", "480p": "854x480", "360p": "640x360"}
	var b strings.Builder
	b.WriteString("#EXTM3U\n#EXT-X-VERSION:3\n\n")
	for _, q := range qualities {
		fmt.Fprintf(&b, "#EXT-X-STREAM-INF:BANDWIDTH=%d,RESOLUTION=%s\n%s/playlist.m3u8\n\n", bw[q], res[q], q)
	}
	return b.String()
}

// ===========================================
// 4. LIVE CHAT
// ===========================================
type LiveChat struct {
	mu       sync.Mutex
	messages map[string][]ChatMessage
	rateLimit map[string]int64
	counter   int
}

func NewLiveChat() *LiveChat {
	return &LiveChat{messages: make(map[string][]ChatMessage), rateLimit: make(map[string]int64)}
}

func (lc *LiveChat) Send(streamID, userID, username, content string, startTime int64) (*ChatMessage, error) {
	lc.mu.Lock()
	defer lc.mu.Unlock()
	key := userID + ":" + streamID
	if last, ok := lc.rateLimit[key]; ok && time.Now().UnixMilli()-last < 3000 {
		return nil, fmt.Errorf("slow mode active")
	}
	lc.counter++
	msg := ChatMessage{
		ID: fmt.Sprintf("msg_%d", lc.counter), StreamID: streamID,
		UserID: userID, Username: username, Content: content,
		Timestamp: time.Now().UnixMilli(),
	}
	if startTime > 0 { msg.StreamTime = float64(time.Now().UnixMilli()-startTime) / 1000 }
	list := lc.messages[streamID]
	list = append(list, msg)
	if len(list) > 200 { list = list[len(list)-200:] }
	lc.messages[streamID] = list
	lc.rateLimit[key] = time.Now().UnixMilli()
	return &msg, nil
}

func (lc *LiveChat) GetRecent(streamID string, limit int) []ChatMessage {
	lc.mu.Lock()
	defer lc.mu.Unlock()
	msgs := lc.messages[streamID]
	start := len(msgs) - limit
	if start < 0 { start = 0 }
	return msgs[start:]
}

// ===========================================
// 5. TRANSCODER + VIEWER MANAGER
// ===========================================
type LiveTranscoder struct {
	mu      sync.Mutex
	buffer  *SegmentBuffer
	timers  map[string]*time.Ticker
}

func NewTranscoder(buf *SegmentBuffer) *LiveTranscoder {
	return &LiveTranscoder{buffer: buf, timers: make(map[string]*time.Ticker)}
}

func (t *LiveTranscoder) Start(streamID string, qualities []string) {
	ticker := time.NewTicker(2 * time.Second)
	t.mu.Lock()
	t.timers[streamID] = ticker
	t.mu.Unlock()
	go func() {
		for range ticker.C {
			for _, q := range qualities { t.buffer.Add(streamID, q, 2.0) }
		}
	}()
}

func (t *LiveTranscoder) Stop(streamID string) {
	t.mu.Lock()
	defer t.mu.Unlock()
	if ticker, ok := t.timers[streamID]; ok { ticker.Stop(); delete(t.timers, streamID) }
}

type ViewerManager struct {
	mu      sync.Mutex
	viewers map[string]int64 // "userId:streamId" -> lastHeartbeat
}

func NewViewerManager() *ViewerManager {
	return &ViewerManager{viewers: make(map[string]int64)}
}

func (vm *ViewerManager) Join(userID, streamID string) {
	vm.mu.Lock(); defer vm.mu.Unlock()
	vm.viewers[userID+":"+streamID] = time.Now().UnixMilli()
}

func (vm *ViewerManager) Count(streamID string) int {
	vm.mu.Lock(); defer vm.mu.Unlock()
	count := 0; cutoff := time.Now().UnixMilli() - 30000
	for k, ts := range vm.viewers {
		if strings.HasSuffix(k, ":"+streamID) && ts > cutoff { count++ }
	}
	return count
}

// ===========================================
// 6. HTTP SERVER
// ===========================================
func writeJSON(w http.ResponseWriter, status int, data interface{}) {
	w.Header().Set("Content-Type", "application/json"); w.WriteHeader(status)
	json.NewEncoder(w).Encode(data)
}

func main() {
	sm := NewStreamManager()
	buf := NewSegmentBuffer()
	chat := NewLiveChat()
	tc := NewTranscoder(buf)
	vm := NewViewerManager()

	startP := regexp.MustCompile(`^/api/streams/([^/]+)/start$`)
	endP := regexp.MustCompile(`^/api/streams/([^/]+)/end$`)
	masterP := regexp.MustCompile(`^/api/streams/([^/]+)/master\.m3u8$`)
	variantP := regexp.MustCompile(`^/api/streams/([^/]+)/(\w+)/playlist\.m3u8$`)
	viewerP := regexp.MustCompile(`^/api/streams/([^/]+)/viewers$`)
	chatP := regexp.MustCompile(`^/api/streams/([^/]+)/chat$`)
	streamP := regexp.MustCompile(`^/api/streams/([^/]+)$`)

	mux := http.NewServeMux()

	mux.HandleFunc("/api/streams", func(w http.ResponseWriter, r *http.Request) {
		if r.Method != http.MethodPost { writeJSON(w, 405, map[string]string{"error": "Method not allowed"}); return }
		var body struct{ BroadcasterID, Title string }
		json.NewDecoder(r.Body).Decode(&body)
		s := sm.Create(body.BroadcasterID, body.Title)
		writeJSON(w, 201, s)
	})

	mux.HandleFunc("/api/streams/live", func(w http.ResponseWriter, _ *http.Request) {
		live := sm.GetLive()
		writeJSON(w, 200, map[string]interface{}{"streams": live})
	})

	mux.HandleFunc("/api/streams/", func(w http.ResponseWriter, r *http.Request) {
		if m := startP.FindStringSubmatch(r.URL.Path); m != nil && r.Method == http.MethodPost {
			var body struct{ StreamKey string `json:"streamKey"` }
			json.NewDecoder(r.Body).Decode(&body)
			s := sm.Auth(body.StreamKey)
			if s == nil || s.ID != m[1] { writeJSON(w, 401, map[string]string{"error": "Invalid key"}); return }
			live, err := sm.GoLive(s.ID)
			if err != nil { writeJSON(w, 400, map[string]string{"error": err.Error()}); return }
			tc.Start(s.ID, live.Quality)
			writeJSON(w, 200, live); return
		}
		if m := endP.FindStringSubmatch(r.URL.Path); m != nil && r.Method == http.MethodPost {
			tc.Stop(m[1])
			s, err := sm.EndStream(m[1])
			if err != nil { writeJSON(w, 400, map[string]string{"error": err.Error()}); return }
			writeJSON(w, 200, s); return
		}
		if m := masterP.FindStringSubmatch(r.URL.Path); m != nil {
			s := sm.Get(m[1])
			if s == nil || s.Status != "live" { writeJSON(w, 404, map[string]string{"error": "Not live"}); return }
			w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
			w.Write([]byte(buf.GetMasterPlaylist(s.ID, s.Quality))); return
		}
		if m := variantP.FindStringSubmatch(r.URL.Path); m != nil {
			w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
			w.Write([]byte(buf.GetPlaylist(m[1], m[2]))); return
		}
		if m := viewerP.FindStringSubmatch(r.URL.Path); m != nil {
			writeJSON(w, 200, map[string]int{"count": vm.Count(m[1])}); return
		}
		if m := chatP.FindStringSubmatch(r.URL.Path); m != nil {
			if r.Method == http.MethodPost {
				var body struct{ UserID, Username, Content string }
				json.NewDecoder(r.Body).Decode(&body)
				s := sm.Get(m[1])
				var startMs int64
				if s != nil && s.StartedAt != nil {
					if t, err := time.Parse(time.RFC3339, *s.StartedAt); err == nil { startMs = t.UnixMilli() }
				}
				msg, err := chat.Send(m[1], body.UserID, body.Username, body.Content, startMs)
				if err != nil { writeJSON(w, 429, map[string]string{"error": err.Error()}); return }
				writeJSON(w, 201, msg); return
			}
			limit := 50
			if l := r.URL.Query().Get("limit"); l != "" { limit, _ = strconv.Atoi(l) }
			writeJSON(w, 200, map[string]interface{}{"messages": chat.GetRecent(m[1], limit)}); return
		}
		if m := streamP.FindStringSubmatch(r.URL.Path); m != nil {
			s := sm.Get(m[1])
			if s == nil { writeJSON(w, 404, map[string]string{"error": "Not found"}); return }
			s.ViewerCount = vm.Count(s.ID)
			writeJSON(w, 200, s); return
		}
		writeJSON(w, 404, map[string]string{"error": "Not found"})
	})

	mux.HandleFunc("/health", func(w http.ResponseWriter, _ *http.Request) {
		writeJSON(w, 200, map[string]string{"status": "ok"})
	})

	port := os.Getenv("PORT"); if port == "" { port = "3000" }
	srv := &http.Server{Addr: ":" + port, Handler: mux, ReadTimeout: 5 * time.Second, WriteTimeout: 10 * time.Second}
	go func() {
		log.Printf("Live Streaming on http://localhost:%s", port)
		if err := srv.ListenAndServe(); err != http.ErrServerClosed { log.Fatal(err) }
	}()

	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
	srv.Close()
}

Design Decisions Explained

Why RTMP for Ingest?

RTMP provides low-latency, reliable, bidirectional streaming with wide encoder support (OBS, Streamlabs, FFmpeg). While newer protocols like SRT offer better error correction, RTMP’s ubiquity makes it the pragmatic choice. Every streaming software supports it out of the box.

Why HLS for Delivery Instead of RTMP to Viewers?

RTMP requires persistent TCP connections and doesn’t work with CDNs (which cache HTTP responses). HLS uses standard HTTP requests for small segment files, making it trivially cacheable. When 100K viewers request the same segment, the CDN serves 99,999 of them from cache. RTMP would require 100K individual connections to your servers.

Why a Sliding Window Manifest?

A live HLS playlist can’t grow forever — a 24-hour stream at 2-second segments would have 43,200 entries. The sliding window keeps only the last N segments (e.g., 15 segments = 30 seconds). New viewers start from the live edge, and the EXT-X-MEDIA-SEQUENCE tag ensures players know the correct segment ordering even as old segments are removed.

Why Keyframe-Aligned Segments?

Video codecs use keyframes (I-frames) as reference points — you can only start decoding from a keyframe. If segments across quality levels don’t align at keyframe boundaries, switching from 720p to 480p mid-segment would produce visual artifacts. Keyframe alignment ensures clean quality switching at every segment boundary.

Why Chat Separate from Video?

Chat and video have different latency profiles. Video has 3-5 second delivery latency (segment buffering). Chat can be near-instant (WebSocket). Coupling them would either delay chat (bad for interaction) or require complex synchronization. Keeping them separate and adding stream timestamps to chat messages lets clients optionally sync them.

Key Takeaways

  • RTMP for ingest + HLS for delivery is the industry standard split — RTMP is low-latency for upload, HLS is CDN-friendly for distribution
  • Keyframe-aligned segments across quality levels enable seamless quality switching mid-stream
  • Sliding window HLS manifests keep the playlist bounded — viewers join at the live edge, DVR users get a longer window
  • Live chat must be rate-limited and decoupled from video delivery — a chat storm shouldn’t affect video quality
  • Stream health monitoring detects encoding issues before viewers notice — dropped frames, bitrate drops, audio desync
  • CDN edge caching is critical — without it, 100K viewers requesting the same segment would overwhelm the origin server

Real-World Usage

  • Twitch handles 30M+ concurrent viewers using RTMP ingest → real-time transcoding → HLS delivery through a global CDN
  • YouTube Live uses RTMP/SRT ingest with automatic quality transcoding and LL-HLS for sub-3-second latency
  • Netflix Live (launched 2024) serves live events to 200M+ subscribers using their existing CDN infrastructure
  • Discord uses WebRTC for screen sharing (sub-1-second latency) but HLS for Go Live streams to larger audiences
  • This architecture supports 10K concurrent streams with 100K+ viewers each, with sub-5-second glass-to-glass latency