Concurrency Patterns in Go

By George Aristy / llorllale

Building upon previous talk on Golang Concurrency Primitives.


  • The Done channel pattern
  • The Fan-In pattern
  • The Fan-Out pattern
  • Sharding
  • Bounded Parallelism
  • Backpressure
  • Deadlocks (stay tuned)


  • Learning Go: An Idiomatic Approach to Real-World Go Programming ()
  • Cloud-Native Go: Building Reliable Services in Unreliable Environments ()
  • The Go Blog: Go Concurrency Patterns: Pipelines and cancellation ()

The Done Channel Pattern

Provides a way to send a "stop" or "done" signal to a goroutine.

func main() {
    result := make(chan interface{})
    done := make(chan struct{})

    go func() { // launch sub goroutine
        select {
        case result <- ...:  // write to result channel
        case <-done:
            // abort when `done` is closed

    // wait for result or timeout, whichever occurs first
    select {
    case r := <-result:
        // process result
    case <-time.After(time.Second):
        // handle timeout

    close(done) // cleanup `done` channel
Common mistake: not cancelling unit of work after the intended duration (playground )

// request does not support context
func request() interface{} {
    time.Sleep(5 * time.Second) // simulate expensive operation
    return "Hello, World!"

func main() {
    result := make(chan interface{})
    done := make(chan struct{})

    go func() {
        select {
        case result <- request():
            fmt.Println("request completed")
        case <-done:
            fmt.Println("request canceled")

    select {
    case r := <-result:
        fmt.Println("result: ", r)
    case <-time.After(time.Second):
        fmt.Println("request timeout")


    time.Sleep(100 * time.Millisecond) // allow time for sub goroutine to print cancel msg

request timeout

Program exited.
What is happening:

// request does not support context
func request() interface{} {
    time.Sleep(5 * time.Second) // simulate expensive operation
    return "Hello, World!"

func main() {
    result := make(chan interface{})
    done := make(chan struct{})

    go func() {
        // each `case` stmt is evaluated once in source order
        // see
        select {
        case result <- request(): // `request()` evaluation takes ~5s
            fmt.Println("request completed")
        case <-done:
            fmt.Println("request canceled")

    select {
    case r := <-result:
        fmt.Println("result: ", r)
    case <-time.After(time.Second):
        fmt.Println("request timeout")


    time.Sleep(100 * time.Millisecond) // allow sub goroutine to print cancel msg
Fix: refactor case statement into a pure "receive statement" by evaluating request() in another goroutine (playground ).

// request does not support context
func request() interface{} {
    time.Sleep(5 * time.Second) // simulate expensive operation
    return "Hello, World!"

func main() {
    tmp := make(chan interface{})
    go func() {
        tmp <- request()

    result := make(chan interface{})
    done := make(chan struct{})

    go func() {
        select {
        case t := <-tmp:
            result <- t
            fmt.Println("request completed")
        case <-done:
            fmt.Println("request canceled")

    select {
    case r := <-result:
        fmt.Println("result: ", r)
    case <-time.After(time.Second):
        fmt.Println("request timeout")


    time.Sleep(100 * time.Millisecond) // allow time for sub goroutine to print cancel msg
Version using context.Context (playground ):

// request does not support context
func request() interface{} {
    time.Sleep(5 * time.Second) // simulate expensive operation
    return "Hello, World!"

func main() {
    ctx, cancel := context.WithCancel(context.Background()) // or context.WithDeadline, or context.WithTimeout
    defer cancel()                                          // best practice, but don't rely on it for timely cancellation

    tmp := make(chan interface{})
    go func() {
        tmp <- request()

    result := make(chan interface{})

    go func() {
        select {
        case t := <-tmp:
            result <- t
            fmt.Println("request completed")
        case <-ctx.Done():
            fmt.Println("request canceled: ", ctx.Err())

    select {
    case r := <-result:
        fmt.Println("result: ", r)
    case <-time.After(time.Second):
        fmt.Println("request timeout")

    cancel() // important! cancel now; don't defer

    time.Sleep(100 * time.Millisecond) // allow time for sub goroutine to print cancel msg
When the 3rd party API supports context.Context (playground ):

// request supports context (blackbox)
func request(ctx context.Context) (interface{}, error) {
    select {
    case <-time.After(5 * time.Second):
        return "Hello, World!", nil
    case <-ctx.Done():
        return nil, ctx.Err()

func main() {
    ctx, cancel := context.WithCancel(context.Background()) // or context.WithDeadline, or context.WithTimeout
    defer cancel()                                          // best practice, but don't rely on it for timely cancellation

    result := make(chan interface{})

    go func() {
        r, err := request(ctx)
        if err != nil {
            fmt.Println("request error: ", err)

        result <- r

    select {
    case r := <-result:
        fmt.Println("result: ", r)
    case <-time.After(time.Second):
        fmt.Println("request timeout")

    cancel() // important! cancel now; don't defer

    time.Sleep(100 * time.Millisecond) // allow time for sub goroutine to print cancel msg
Lesson: prefer APIs that support context.Context; otherwise wrap them in a function that does.

Things to keep an eye on

Check if context is cancelled before invoking operation:

func main() {
    ctx, cancel := context.WithCancel(context.Background()) // or context.WithDeadline, or context.WithTimeout
    defer cancel()                                          // best practice, but don't rely on it for timely cancellation

    result := make(chan interface{})

    go func() {
        if ctx.Err() != nil { // abort if context is already cancelled

        r, err := request(ctx)
        if err != nil {
            fmt.Println("request error: ", err)

        result <- r

    select {
    case r := <-result:
        fmt.Println("result: ", r)
    case <-time.After(time.Second):
        fmt.Println("request timeout")

    cancel() // important! cancel now; don't defer

    time.Sleep(100 * time.Millisecond) // allow time for sub goroutine to print cancel msg

Things to keep an eye on

Cleanup your channels and goroutines:

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Set a buffer on the channel.
    // This way, even if parent goroutine is gone (timeout, etc.)
    // and no longer listening on this channel, the sub goroutine
    // won't block when writing.
    result := make(chan interface{}, 1)

    go func() {
        // Close channel when done.
        // Not closed in parent goroutine to avoid  writing to a
        // closed channel (panic).
        defer close(result)

        if ctx.Err() != nil { // abort if context is already cancelled

        r, err := request(ctx)
        if err != nil {
            fmt.Println("request error: ", err)

        result <- r // won't block

    select {
    case r := <-result:
        fmt.Println("result: ", r)
    case <-time.After(time.Second):
        fmt.Println("request timeout")

    cancel() // important! cancel now; don't defer

    time.Sleep(100 * time.Millisecond) // allow time for sub goroutine to print cancel msg

The Fan-In Pattern

Multiplex multiple input channels onto one output channel.

func FanIn(sources ...<-chan int) <-chan int {
    dest := make(chan int) // return destination channel

    var wg sync.WaitGroup

    // multiplex each source into dest
    for _, src := range sources {
        go func(src <-chan int) {
            defer wg.Done()

            for v := range src {
                dest <- v

    // close dest when all sources are closed
    go func() {

    return dest
Usage (playground ):

func main() {
    src := sources()
    dest := FanIn(src...) // sources coalesced into a single channel

    for r := range dest {

The Fan-Out Pattern

Distribute messages from one input channel into multiple output channels.

func FanOut(src <-chan int, n int) []<-chan int {
    dests := make([]<-chan int, 0)

    for i := 0; i < n; i++ {
        dest := make(chan int)
        dests = append(dests, dest)

        go func() {
            defer close(dest)

            for v := range src {
                dest <- v

    return dests
Usage (playground ):

func main() {
    src := source()
    dests := FanOut(src, 5)

    var wg sync.WaitGroup

    for _, dest := range dests {
        go func(dest <-chan int) {
            defer wg.Done()

            for v := range dest {



Partition a data structure to localize the effects of read/write locks.

type Shard[V any] struct {
    m map[string]V

type ShardedMap[V any] []*Shard[V]

func NewShardedMap[V any](nShards int) ShardedMap[V] {
    shards := make([]*Shard[V], nShards)

    for i := range shards {
        shards[i] = &Shard[V]{m: make(map[string]V)}

    return shards

func (m ShardedMap[V]) Get(key string) V {
    shard := m.shard(key)
    defer shard.RUnlock()

    return shard.m[key]

func (m ShardedMap[V]) Set(key string, value V) {
    shard := m.shard(key)
    defer shard.Unlock()

    shard.m[key] = value
Usage (playground ):

func main() {
    m := NewShardedMap[int](5)

    m.Set("alpha", 1)
    m.Set("beta", 2)
    m.Set("gamma", 3)


    keys := m.Keys()
    for _, k := range keys {

Bounded Parallelism

Limit concurrent processing of an unknown number of inputs.

func Process(concurrency int, data []int) {
    sem := make(chan struct{}, concurrency)
    var wg sync.WaitGroup

    for _, d := range data {
        sem <- struct{}{} // blocks until capacity is freed up

        go func(d int) {
            defer wg.Done()

            <-sem // free up capacity

Usage (playground ):

func main() {
	data := getData()

	Process(3, data)


Limit the amount of concurrent work and reject extra. Example:

type PressureGauge struct {
    ch chan struct{}

func New(limit int) *PressureGauge {
    ch := make(chan struct{}, limit)

    for i := 0; i < limit; i++ {
        ch <- struct{}{}

    return &PressureGauge{ch: ch}

func (p *PressureGauge) Do(fn func()) error {
    select {
    case <
        fn() <- struct{}{}
        return nil
        return errors.New("out of capacity")


Usage (playground ):

func runServer(pg *PressureGauge) {
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {

        err := pg.Do(func() {
        if err != nil {
            w.Write([]byte("too many requests"))
    http.ListenAndServe(":8080", nil)
