Advanced Concurrency Patterns
Context, sync primitives, worker pools, and the patterns that power Go at scale — from Uber to Cloudflare.
Context: The Cancellation Backbone
context.Context is Go’s mechanism for managing request lifecycles — timeouts, cancellation, and request-scoped values. Every production Go function that does I/O should accept a context as its first argument.
// Context hierarchy: parent cancellation cascades to children
func handleRequest(w http.ResponseWriter, r *http.Request) {
// r.Context() is cancelled when the client disconnects
ctx := r.Context()
// Add a timeout (whichever happens first: client disconnect or 5s)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel() // Always call cancel to release resources
user, err := fetchUser(ctx, userID)
if err != nil {
// Could be: context.DeadlineExceeded (timeout)
// context.Canceled (client disconnected)
// or an actual error
handleError(w, err)
return
}
json.NewEncoder(w).Encode(user)
}
func fetchUser(ctx context.Context, id int) (*User, error) {
// Pass context to database query — if cancelled, query stops
row := db.QueryRowContext(ctx,
"SELECT id, email, name FROM users WHERE id = $1", id,
)
var user User
if err := row.Scan(&user.ID, &user.Email, &user.Name); err != nil {
return nil, err
}
return &user, nil
} Real-World Analogy
Context is like a walkie-talkie for a construction crew. The foreman (parent context) can tell everyone to stop (cancel). If the foreman’s boss (parent’s parent) cancels the entire project, the foreman’s cancel cascades to all workers too. Each worker checks their walkie-talkie before starting expensive work.
Context Best Practices
// 1. Always pass context as the first parameter
func GetUser(ctx context.Context, id int) (*User, error) // Good
func GetUser(id int, ctx context.Context) (*User, error) // Bad
// 2. Never store context in a struct
type Service struct {
ctx context.Context // BAD — context is request-scoped, not service-scoped
db *sql.DB
}
// 3. Use context.WithValue sparingly (only for request-scoped data)
type contextKey string
const userIDKey contextKey = "userID"
ctx = context.WithValue(ctx, userIDKey, 42)
userID := ctx.Value(userIDKey).(int)
// 4. Check for cancellation in long loops
for _, item := range largeDataset {
select {
case <-ctx.Done():
return ctx.Err() // Bail out early
default:
}
process(item)
} sync.Mutex: Protecting Shared State
When goroutines must share state (not communicable through channels), use a mutex:
type SafeCache struct {
mu sync.RWMutex
items map[string]string
}
func NewSafeCache() *SafeCache {
return &SafeCache{
items: make(map[string]string),
}
}
// Multiple readers can hold RLock simultaneously
func (c *SafeCache) Get(key string) (string, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
val, ok := c.items[key]
return val, ok
}
// Only one writer at a time (blocks all readers too)
func (c *SafeCache) Set(key, value string) {
c.mu.Lock()
defer c.mu.Unlock()
c.items[key] = value
}
func (c *SafeCache) Delete(key string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.items, key)
} Use sync.RWMutex when reads vastly outnumber writes (like a cache). Multiple goroutines can read simultaneously with RLock(), but writing with Lock() is exclusive. For write-heavy workloads, a regular sync.Mutex has less overhead.
sync.Once: One-Time Initialization
type DBConnection struct {
once sync.Once
db *sql.DB
}
func (c *DBConnection) Get() *sql.DB {
c.once.Do(func() {
// This runs exactly once, even if 1000 goroutines call Get()
db, err := sql.Open("postgres", connectionString)
if err != nil {
log.Fatal(err)
}
c.db = db
})
return c.db
} errgroup: Goroutines with Error Handling
golang.org/x/sync/errgroup is the standard way to run goroutines that return errors:
import "golang.org/x/sync/errgroup"
func fetchAllData(ctx context.Context) (*Dashboard, error) {
g, ctx := errgroup.WithContext(ctx)
var users []*User
var orders []*Order
var metrics *Metrics
// All three run concurrently
g.Go(func() error {
var err error
users, err = fetchUsers(ctx)
return err
})
g.Go(func() error {
var err error
orders, err = fetchOrders(ctx)
return err
})
g.Go(func() error {
var err error
metrics, err = fetchMetrics(ctx)
return err
})
// Wait for all goroutines. Returns first error (cancels others via ctx)
if err := g.Wait(); err != nil {
return nil, fmt.Errorf("fetching dashboard data: %w", err)
}
return &Dashboard{Users: users, Orders: orders, Metrics: metrics}, nil
} Real-World Analogy
errgroup is like sending three employees to get different supplies. If any one of them reports a problem (“the store is closed”), you cancel the others and deal with the error. If all three succeed, you have everything you need.
errgroup with Concurrency Limit
func processImages(ctx context.Context, images []Image) error {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10) // Max 10 concurrent goroutines
for _, img := range images {
g.Go(func() error {
return resizeAndUpload(ctx, img)
})
}
return g.Wait()
} Semaphore Pattern
Control the maximum number of concurrent operations:
type Semaphore struct {
ch chan struct{}
}
func NewSemaphore(max int) *Semaphore {
return &Semaphore{ch: make(chan struct{}, max)}
}
func (s *Semaphore) Acquire() {
s.ch <- struct{}{} // Blocks if buffer is full
}
func (s *Semaphore) Release() {
<-s.ch
}
// Usage: limit concurrent API calls
sem := NewSemaphore(5) // Max 5 concurrent
for _, url := range urls {
sem.Acquire()
go func() {
defer sem.Release()
fetch(url)
}()
} Pipeline Pattern
Chain stages where each stage is a goroutine processing a stream:
// Stage 1: Generate numbers
func generate(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-ctx.Done():
return
}
}
}()
return out
}
// Stage 2: Square each number
func square(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-ctx.Done():
return
}
}
}()
return out
}
// Stage 3: Filter even numbers
func filterEven(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n%2 == 0 {
select {
case out <- n:
case <-ctx.Done():
return
}
}
}
}()
return out
}
// Compose the pipeline
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// generate → square → filterEven
pipeline := filterEven(ctx, square(ctx, generate(ctx, 1, 2, 3, 4, 5)))
for result := range pipeline {
fmt.Println(result) // 4, 16 (squares of 2 and 4)
}
} Real-World Analogy
A pipeline is like an assembly line in a factory. Station 1 cuts the metal, Station 2 bends it, Station 3 paints it. Each station works on a different piece at the same time. If the factory shuts down (context cancelled), every station stops.
Real-World: Rate-Limited API Client
type APIClient struct {
client *http.Client
limiter *rate.Limiter // golang.org/x/time/rate
}
func NewAPIClient(rps int) *APIClient {
return &APIClient{
client: &http.Client{Timeout: 10 * time.Second},
limiter: rate.NewLimiter(rate.Limit(rps), rps), // rps requests per second, burst of rps
}
}
func (c *APIClient) Fetch(ctx context.Context, url string) (*http.Response, error) {
// Wait for rate limiter (respects context cancellation)
if err := c.limiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("rate limiter: %w", err)
}
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
return c.client.Do(req)
}
// Fetch 1000 URLs at 50 requests/second with 10 concurrent workers
func fetchAll(ctx context.Context, urls []string) []Result {
client := NewAPIClient(50)
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10)
results := make(chan Result, len(urls))
for _, url := range urls {
g.Go(func() error {
resp, err := client.Fetch(ctx, url)
if err != nil {
results <- Result{URL: url, Err: err}
return nil // Don't cancel other requests
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
results <- Result{URL: url, Body: body, Status: resp.StatusCode}
return nil
})
}
go func() {
g.Wait()
close(results)
}()
var out []Result
for r := range results {
out = append(out, r)
}
return out
} Key Takeaways
- Context is mandatory for I/O — always pass
context.Contextas the first parameter defer cancel()— always call cancel on contexts you create to prevent resource leakssync.RWMutexfor read-heavy shared state,sync.Mutexfor write-heavyerrgroupis the production standard for concurrent operations with error handling- Pipelines compose stages as goroutine-driven channels — each stage runs concurrently
- Rate limiting + concurrency limiting are separate concerns — use both in production API clients
- Check
ctx.Done()in long-running loops to support cancellation