Concurrency Patterns in Go

By George Aristy / llorllale

Building upon previous talk on Golang Concurrency Primitives.

Topics

  • The Done channel pattern
  • The Fan-In pattern
  • The Fan-Out pattern
  • Sharding
  • Bounded Parallelism
  • Backpressure
  • Deadlocks (stay tuned)

References

  • Learning Go: An Idiomatic Approach to Real-World Go Programming ()
  • Cloud-Native Go: Building Reliable Services in Unreliable Environments ()
  • The Go Blog: Go Concurrency Patterns: Pipelines and cancellation ()

The Done Channel Pattern

Provides a way to send a "stop" or "done" signal to a goroutine.
Pattern:

func main() {
    result := make(chan interface{})
    done := make(chan struct{})

    go func() { // launch sub goroutine
        select {
        case result <- ...:  // write to result channel
        case <-done:
            // abort when `done` is closed
        }
    }()

    // wait for result or timeout, whichever occurs first
    select {
    case r := <-result:
        // process result
    case <-time.After(time.Second):
        // handle timeout
    }

    close(done) // cleanup `done` channel
}
                    
Common mistake: not cancelling unit of work after the intended duration (playground )

// request does not support context
func request() interface{} {
    time.Sleep(5 * time.Second) // simulate expensive operation
    return "Hello, World!"
}

func main() {
    result := make(chan interface{})
    done := make(chan struct{})

    go func() {
        select {
        case result <- request():
            fmt.Println("request completed")
        case <-done:
            fmt.Println("request canceled")
        }
    }()

    select {
    case r := <-result:
        fmt.Println("result: ", r)
    case <-time.After(time.Second):
        fmt.Println("request timeout")
    }

    close(done)

    time.Sleep(100 * time.Millisecond) // allow time for sub goroutine to print cancel msg
}
                    
Output:

request timeout

Program exited.
                    
What is happening:

// request does not support context
func request() interface{} {
    time.Sleep(5 * time.Second) // simulate expensive operation
    return "Hello, World!"
}

func main() {
    result := make(chan interface{})
    done := make(chan struct{})

    go func() {
        // each `case` stmt is evaluated once in source order
        // see https://go.dev/ref/spec#Select_statements
        select {
        case result <- request(): // `request()` evaluation takes ~5s
            fmt.Println("request completed")
        case <-done:
            fmt.Println("request canceled")
        }
    }()

    select {
    case r := <-result:
        fmt.Println("result: ", r)
    case <-time.After(time.Second):
        fmt.Println("request timeout")
    }

    close(done)

    time.Sleep(100 * time.Millisecond) // allow sub goroutine to print cancel msg
}
                    
Fix: refactor case statement into a pure "receive statement" by evaluating request() in another goroutine (playground ).

// request does not support context
func request() interface{} {
    time.Sleep(5 * time.Second) // simulate expensive operation
    return "Hello, World!"
}

func main() {
    tmp := make(chan interface{})
    go func() {
        tmp <- request()
    }()

    result := make(chan interface{})
    done := make(chan struct{})

    go func() {
        select {
        case t := <-tmp:
            result <- t
            fmt.Println("request completed")
        case <-done:
            fmt.Println("request canceled")
        }
    }()

    select {
    case r := <-result:
        fmt.Println("result: ", r)
    case <-time.After(time.Second):
        fmt.Println("request timeout")
    }

    close(done)

    time.Sleep(100 * time.Millisecond) // allow time for sub goroutine to print cancel msg
}
                    
Version using context.Context (playground ):

// request does not support context
func request() interface{} {
    time.Sleep(5 * time.Second) // simulate expensive operation
    return "Hello, World!"
}

func main() {
    ctx, cancel := context.WithCancel(context.Background()) // or context.WithDeadline, or context.WithTimeout
    defer cancel()                                          // best practice, but don't rely on it for timely cancellation

    tmp := make(chan interface{})
    go func() {
        tmp <- request()
    }()

    result := make(chan interface{})

    go func() {
        select {
        case t := <-tmp:
            result <- t
            fmt.Println("request completed")
        case <-ctx.Done():
            fmt.Println("request canceled: ", ctx.Err())
        }
    }()

    select {
    case r := <-result:
        fmt.Println("result: ", r)
    case <-time.After(time.Second):
        fmt.Println("request timeout")
    }

    cancel() // important! cancel now; don't defer

    time.Sleep(100 * time.Millisecond) // allow time for sub goroutine to print cancel msg
}
                    
When the 3rd party API supports context.Context (playground ):

// request supports context (blackbox)
func request(ctx context.Context) (interface{}, error) {
    select {
    case <-time.After(5 * time.Second):
        return "Hello, World!", nil
    case <-ctx.Done():
        return nil, ctx.Err()
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background()) // or context.WithDeadline, or context.WithTimeout
    defer cancel()                                          // best practice, but don't rely on it for timely cancellation

    result := make(chan interface{})

    go func() {
        r, err := request(ctx)
        if err != nil {
            fmt.Println("request error: ", err)
            return
        }

        result <- r
    }()

    select {
    case r := <-result:
        fmt.Println("result: ", r)
    case <-time.After(time.Second):
        fmt.Println("request timeout")
    }

    cancel() // important! cancel now; don't defer

    time.Sleep(100 * time.Millisecond) // allow time for sub goroutine to print cancel msg
}
                    
Lesson: prefer APIs that support context.Context; otherwise wrap them in a function that does.

Things to keep an eye on

Check if context is cancelled before invoking operation:

func main() {
    ctx, cancel := context.WithCancel(context.Background()) // or context.WithDeadline, or context.WithTimeout
    defer cancel()                                          // best practice, but don't rely on it for timely cancellation

    result := make(chan interface{})

    go func() {
        if ctx.Err() != nil { // abort if context is already cancelled
            return
        }

        r, err := request(ctx)
        if err != nil {
            fmt.Println("request error: ", err)
            return
        }

        result <- r
    }()

    select {
    case r := <-result:
        fmt.Println("result: ", r)
    case <-time.After(time.Second):
        fmt.Println("request timeout")
    }

    cancel() // important! cancel now; don't defer

    time.Sleep(100 * time.Millisecond) // allow time for sub goroutine to print cancel msg
}
                    

Things to keep an eye on

Cleanup your channels and goroutines:

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Set a buffer on the channel.
    // This way, even if parent goroutine is gone (timeout, etc.)
    // and no longer listening on this channel, the sub goroutine
    // won't block when writing.
    result := make(chan interface{}, 1)

    go func() {
        // Close channel when done.
        // Not closed in parent goroutine to avoid  writing to a
        // closed channel (panic).
        defer close(result)

        if ctx.Err() != nil { // abort if context is already cancelled
            return
        }

        r, err := request(ctx)
        if err != nil {
            fmt.Println("request error: ", err)
            return
        }

        result <- r // won't block
    }()

    select {
    case r := <-result:
        fmt.Println("result: ", r)
    case <-time.After(time.Second):
        fmt.Println("request timeout")
    }

    cancel() // important! cancel now; don't defer

    time.Sleep(100 * time.Millisecond) // allow time for sub goroutine to print cancel msg
}
                    

The Fan-In Pattern

Multiplex multiple input channels onto one output channel.
Example:

func FanIn(sources ...<-chan int) <-chan int {
    dest := make(chan int) // return destination channel

    var wg sync.WaitGroup
    wg.Add(len(sources))

    // multiplex each source into dest
    for _, src := range sources {
        go func(src <-chan int) {
            defer wg.Done()

            for v := range src {
                dest <- v
            }
        }(src)
    }

    // close dest when all sources are closed
    go func() {
        wg.Wait()
        close(dest)
    }()

    return dest
}
                    
Usage (playground ):

func main() {
    src := sources()
    dest := FanIn(src...) // sources coalesced into a single channel

    for r := range dest {
        fmt.Println(r)
    }
}
                    

The Fan-Out Pattern

Distribute messages from one input channel into multiple output channels.
Example:

func FanOut(src <-chan int, n int) []<-chan int {
    dests := make([]<-chan int, 0)

    for i := 0; i < n; i++ {
        dest := make(chan int)
        dests = append(dests, dest)

        go func() {
            defer close(dest)

            for v := range src {
                dest <- v
            }
        }()
    }

    return dests
}
                    
Usage (playground ):

func main() {
    src := source()
    dests := FanOut(src, 5)

    var wg sync.WaitGroup
    wg.Add(len(dests))

    for _, dest := range dests {
        go func(dest <-chan int) {
            defer wg.Done()

            for v := range dest {
                fmt.Println(v)
            }
        }(dest)
    }

    wg.Wait()
}
                    

Sharding

Partition a data structure to localize the effects of read/write locks.
Example:

type Shard[V any] struct {
    sync.RWMutex
    m map[string]V
}

type ShardedMap[V any] []*Shard[V]

func NewShardedMap[V any](nShards int) ShardedMap[V] {
    shards := make([]*Shard[V], nShards)

    for i := range shards {
        shards[i] = &Shard[V]{m: make(map[string]V)}
    }

    return shards
}

func (m ShardedMap[V]) Get(key string) V {
    shard := m.shard(key)
    shard.RLock()
    defer shard.RUnlock()

    return shard.m[key]
}

func (m ShardedMap[V]) Set(key string, value V) {
    shard := m.shard(key)
    shard.Lock()
    defer shard.Unlock()

    shard.m[key] = value
}
                    
Usage (playground ):

func main() {
    m := NewShardedMap[int](5)

    m.Set("alpha", 1)
    m.Set("beta", 2)
    m.Set("gamma", 3)

    fmt.Println(m.Get("alpha"))
    fmt.Println(m.Get("beta"))
    fmt.Println(m.Get("gamma"))

    keys := m.Keys()
    for _, k := range keys {
        fmt.Println(k)
    }
}
                    

Bounded Parallelism

Limit concurrent processing of an unknown number of inputs.
Example:

func Process(concurrency int, data []int) {
    sem := make(chan struct{}, concurrency)
    var wg sync.WaitGroup
    wg.Add(len(data))

    for _, d := range data {
        sem <- struct{}{} // blocks until capacity is freed up

        go func(d int) {
            defer wg.Done()

            doProcess(d)
            <-sem // free up capacity
        }(d)
    }

    wg.Wait()
}
                    
Usage (playground ):

func main() {
	data := getData()

	Process(3, data)
}
                    

Backpressure

Limit the amount of concurrent work and reject extra. Example:

type PressureGauge struct {
    ch chan struct{}
}

func New(limit int) *PressureGauge {
    ch := make(chan struct{}, limit)

    for i := 0; i < limit; i++ {
        ch <- struct{}{}
    }

    return &PressureGauge{ch: ch}
}

func (p *PressureGauge) Do(fn func()) error {
    select {
    case <-p.ch:
        fn()
        p.ch <- struct{}{}
        return nil
    default:
        return errors.New("out of capacity")
    }
}
                    

Backpressure

Usage (playground ):

func runServer(pg *PressureGauge) {
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {

        err := pg.Do(func() {
            w.Write([]byte(expensiveOperation()))
        })
        if err != nil {
            w.WriteHeader(http.StatusTooManyRequests)
            w.Write([]byte("too many requests"))
        }
    })
    http.ListenAndServe(":8080", nil)
}
                    

Questions?