Concurrency Primitives in Go

By George Aristy / llorllale

Goroutines

  • Core concept in Go's concurrency model.
  • Can efficiently run 1000s of goroutines (for work not bound by CPU)

How do they work?

M:N Scheduler





Schedulers
  • Go runtime: schedules goroutines (M) unto threads (N).
  • OS: schedules threads unto CPU cores.

Benefits
  • Speed: no expensive system calls to create threads.
  • Memory: small stack per-goroutine; can grow as needed.
  • Context Switching: entirely within the process (faster).
  • Scheduling: optimized for Go.
  • Excellent overview: The Scheduler Saga

Example: scheduling

A goroutine is blocked by another:

Example: scheduling

Some threads created by the Go runtime have been freed up:

Example: scheduling

Go runtime schedules goroutine to a free thread:

How do I run goroutines?

How do I run goroutines?

Use keyword go followed by an expression (typically a function call):

                        func main() {
                            go func() {
                                fmt.Println("Hello goroutine!")
                            }()
                        }
                    

Channels

What are channels?

What are channels?

  • Blocking bounded queues
  • Channels have a type
  • Concurrency-safe
  • Can allow either reads or writes (or both)


Fun Fact

Channels are a struct, hchan, under the hood. Access to hchan's buffer is guarded by a mutex.

Purpose

A channel provides a mechanism for concurrently executing functions to communicate by sending and receiving values of a specified element type.
- The Spec

How do I use channels?

Create one using the make keyword:

                        func main() {
                            unbuffered := make(chan int)
                            buffered := make(chan int, 3)
                        }
                    

How do I use channels?

Read and write using the <- operator:

                        // Which of the following compiles?
                        func main() {
                            readWrite := make(chan int)
                            readOnly := make(<-chan int)
                            writeOnly := make(chan<- int)

                            readWrite <- 1 // OK
                            <-readWrite    // OK
                            readOnly <- 1  // compilation error
                            <-readOnly     // OK
                            writeOnly <- 1 // OK
                            <-writeOnly    // compilation error
                        }
                    

How do I use channels?

Read in a loop using the range keyword:

                        func main() {
                            ch := make(chan string, 3)

                            ch <- "Hello"
                            ch <- "Fellow"
                            ch <- "Gophers"

                            close(ch)

                            for s := range ch { // exits when channel is closed
                                fmt.Println(s)
                            }

                            // Output:
                            //  Hello
                            //  Fellow
                            //  Gophers
                        }
                    

How do I use channels?

Read from the first available channel using the select statement:

                        select {
                        case <-ch1:
                            doSomething()
                        case <-ch2:
                            doSomethingElse()
                        ...
                        case <-chN:
                            doTheNthThing()
                        }
                    

How do I use channels?

select example:

                        func main() {
                            rand.Seed(time.Now().Unix())

                            ch1 := make(chan int)
                            ch2 := make(chan int)

                            go func() { // write to ch1 after random delay
                                delay := time.Duration(rand.Intn(2)) * time.Second
                                time.Sleep(delay)
                                ch1 <- 1
                                close(ch1)
                            }()

                            go func() { // write to ch2 after random delay
                                delay := time.Duration(rand.Intn(2)) * time.Second
                                time.Sleep(delay)
                                ch2 <- 2
                                close(ch2)
                            }()

                            select { // Output: either 1 or 2
                            case v := <-ch1:
                                fmt.Println(v)
                            case v := <-ch2:
                                fmt.Println(v)
                            }
                        }
                    

How do I use channels?

select example:

                        func main() {
                            rand.Seed(time.Now().Unix())

                            ch1 := make(chan int)
                            ch2 := make(chan int)

                            go func() { // write to ch1 after random delay
                                delay := time.Duration(rand.Intn(2)) * time.Second
                                time.Sleep(delay)
                                ch1 <- 1
                                close(ch1)
                            }()

                            go func() { // write to ch2 after random delay
                                delay := time.Duration(rand.Intn(2)) * time.Second
                                time.Sleep(delay)
                                ch2 <- 2
                                close(ch2)
                            }()

                            select { // Output: either 1 or 2
                            case v := <-ch1:
                                fmt.Println(v)
                            case v := <-ch2:
                                fmt.Println(v)
                            }
                        }
                    

How do I use channels?

select example:

                        func main() {
                            rand.Seed(time.Now().Unix())

                            ch1 := make(chan int)
                            ch2 := make(chan int)

                            go func() { // write to ch1 after random delay
                                delay := time.Duration(rand.Intn(2)) * time.Second
                                time.Sleep(delay)
                                ch1 <- 1
                                close(ch1)
                            }()

                            go func() { // write to ch2 after random delay
                                delay := time.Duration(rand.Intn(2)) * time.Second
                                time.Sleep(delay)
                                ch2 <- 2
                                close(ch2)
                            }()

                            select { // Output: either 1 or 2
                            case v := <-ch1:
                                fmt.Println(v)
                            case v := <-ch2:
                                fmt.Println(v)
                            }
                        }
                    

How do I use channels?

select example:

                        func main() {
                            rand.Seed(time.Now().Unix())

                            ch1 := make(chan int)
                            ch2 := make(chan int)

                            go func() { // write to ch1 after random delay
                                delay := time.Duration(rand.Intn(2)) * time.Second
                                time.Sleep(delay)
                                ch1 <- 1
                                close(ch1)
                            }()

                            go func() { // write to ch2 after random delay
                                delay := time.Duration(rand.Intn(2)) * time.Second
                                time.Sleep(delay)
                                ch2 <- 2
                                close(ch2)
                            }()

                            select { // Output: either 1 or 2
                            case v := <-ch1:
                                fmt.Println(v)
                            case v := <-ch2:
                                fmt.Println(v)
                            }
                        }
                    

Example: producer and consumer

Example: producer and consumer


                        func main() { // "main" goroutine
                            data := make(chan string) // data channel

                            go func() { // producer
                                for _, s := range []string{"Hello", "Fellow", "Gophers!"} {
                                    data <- s
                                }

                                close(data)
                            }()

                            done := make(chan struct{}) // done channel

                            go func() { // consumer
                                for s := range data {
                                    fmt.Println(s)
                                }

                                close(done)
                            }()

                            select {
                            case <-done:
                                fmt.Println("bye!")
                            case <-time.After(time.Second):
                                fmt.Println("timeout!")
                            }
                        }
                    

Example: producer and consumer


                        func main() { // "main" goroutine
                            data := make(chan string) // data channel

                            go func() { // producer
                                for _, s := range []string{"Hello", "Fellow", "Gophers!"} {
                                    data <- s
                                }

                                close(data)
                            }()

                            done := make(chan struct{}) // done channel

                            go func() { // consumer
                                for s := range data {
                                    fmt.Println(s)
                                }

                                close(done)
                            }()

                            select {
                            case <-done:
                                fmt.Println("bye!")
                            case <-time.After(time.Second):
                                fmt.Println("timeout!")
                            }
                        }
                    

Example: producer and consumer


                        func main() { // "main" goroutine
                            data := make(chan string) // data channel

                            go func() { // producer
                                for _, s := range []string{"Hello", "Fellow", "Gophers!"} {
                                    data <- s
                                }

                                close(data)
                            }()

                            done := make(chan struct{}) // done channel

                            go func() { // consumer
                                for s := range data {
                                    fmt.Println(s)
                                }

                                close(done)
                            }()

                            select {
                            case <-done:
                                fmt.Println("bye!")
                            case <-time.After(time.Second):
                                fmt.Println("timeout!")
                            }
                        }
                    

Example: producer and consumer


                        func main() { // "main" goroutine
                            data := make(chan string) // data channel

                            go func() { // producer
                                for _, s := range []string{"Hello", "Fellow", "Gophers!"} {
                                    data <- s
                                }

                                close(data)
                            }()

                            done := make(chan struct{}) // done channel

                            go func() { // consumer
                                for s := range data {
                                    fmt.Println(s)
                                }

                                close(done)
                            }()

                            select {
                            case <-done:
                                fmt.Println("bye!")
                            case <-time.After(time.Second):
                                fmt.Println("timeout!")
                            }
                        }
                    

Example: producer and consumer


                        func main() { // "main" goroutine
                            data := make(chan string) // data channel

                            go func() { // producer
                                for _, s := range []string{"Hello", "Fellow", "Gophers!"} {
                                    data <- s
                                }

                                close(data)
                            }()

                            done := make(chan struct{}) // done channel

                            go func() { // consumer
                                for s := range data { // exits when data is closed
                                    fmt.Println(s)
                                }

                                close(done)
                            }()

                            select {
                            case <-done:
                                fmt.Println("bye!")
                            case <-time.After(time.Second):
                                fmt.Println("timeout!")
                            }

                            // Output:
                            //  Hello
                            //  Fellow
                            //  Gophers!
                            //  bye!
                        }
                    

Example: producer and consumer


                        func main() { // "main" goroutine
                            data := make(chan string) // data channel

                            go func() { // producer
                                for _, s := range []string{"Hello", "Fellow", "Gophers!"} {
                                    data <- s
                                }

                                close(data)
                            }()

                            done := make(chan struct{}) // done channel

                            go func() { // consumer
                                for s := range data { // exits when data is closed
                                    fmt.Println(s)
                                }

                                close(done)
                            }()

                            select {
                            case <-done:
                                fmt.Println("bye!")
                            case <-time.After(time.Second):
                                fmt.Println("timeout!")
                            }

                            // Output:
                            //  Hello
                            //  Fellow
                            //  Gophers!
                            //  bye!
                        }
                    

⚠️️ Warning ⚠️


  • Reading from a nil channel will hang forever
  • Writing to a closed channel will panic
  • Closing a closed channel will panic

The sync package

Contains useful APIs to synchronize work across goroutines.

sync.WaitGroup

sync.WaitGroup

Use when one goroutine needs to wait for several others to complete.

sync.WaitGroup

Example: "main" waits for three workers to finish

                        func main() { // "main" goroutine
                            var wg sync.WaitGroup

                            work := func(delay time.Duration) {
                                defer wg.Done() // signal this work is "done"
                                time.Sleep(delay)
                            }

                            wg.Add(3) // # of workers known beforehand

                            go work(time.Second)
                            go work(500 * time.Millisecond)
                            go work(700 * time.Millisecond)

                            wg.Wait() // blocks until all three are done

                            fmt.Println("done!")
                        }
                    

sync.WaitGroup

Example: "main" waits for three workers to finish

                        func main() { // "main" goroutine
                            var wg sync.WaitGroup

                            work := func(delay time.Duration) {
                                defer wg.Done() // signal this work is "done"
                                time.Sleep(delay)
                            }

                            wg.Add(3) // # of workers known beforehand

                            go work(time.Second)
                            go work(500 * time.Millisecond)
                            go work(700 * time.Millisecond)

                            wg.Wait() // blocks until all three are done

                            fmt.Println("done!")
                        }
                    

sync.WaitGroup

Example: "main" waits for three workers to finish

                        func main() { // "main" goroutine
                            var wg sync.WaitGroup

                            work := func(delay time.Duration) {
                                defer wg.Done() // signal this work is "done"
                                time.Sleep(delay)
                            }

                            wg.Add(3) // # of workers known beforehand

                            go work(time.Second)
                            go work(500 * time.Millisecond)
                            go work(700 * time.Millisecond)

                            wg.Wait() // blocks until all three are done

                            fmt.Println("done!")
                        }
                    

sync.WaitGroup

Example: "main" waits for three workers to finish

                        func main() { // "main" goroutine
                            var wg sync.WaitGroup

                            work := func(delay time.Duration) {
                                defer wg.Done() // signal this work is "done"
                                time.Sleep(delay)
                            }

                            wg.Add(3) // # of workers known beforehand

                            go work(time.Second)
                            go work(500 * time.Millisecond)
                            go work(700 * time.Millisecond)

                            wg.Wait() // blocks until all three are done

                            fmt.Println("done!")
                        }
                    

sync.WaitGroup

Example: "main" waits for three workers to finish

                        func main() { // "main" goroutine
                            var wg sync.WaitGroup

                            work := func(delay time.Duration) {
                                defer wg.Done() // signal this work is "done"
                                time.Sleep(delay)
                            }

                            wg.Add(3) // # of workers known beforehand

                            go work(time.Second)
                            go work(500 * time.Millisecond)
                            go work(700 * time.Millisecond)

                            wg.Wait() // blocks until all three are done

                            fmt.Println("done!")
                        }
                    

sync.WaitGroup

Example: "main" waits for three workers to finish

                        func main() { // "main" goroutine
                            var wg sync.WaitGroup

                            work := func(delay time.Duration) {
                                defer wg.Done() // signal this work is "done"
                                time.Sleep(delay)
                            }

                            wg.Add(3) // # of workers known beforehand

                            go work(time.Second)
                            go work(500 * time.Millisecond)
                            go work(700 * time.Millisecond)

                            wg.Wait() // blocks until all three are done

                            fmt.Println("done!")
                        }
                    

sync.Once

sync.Once

Use to defer some initialization code that should run only once and not necessarily on every program execution.

sync.Once

Example:

                        var once sync.Once

                        func expensiveInit() {
                            once.Do(func() {
                                fmt.Println("initialized!")
                            })
                        }

                        func main() {
                            expensiveInit() // prints "initialized!"
                            expensiveInit() // does nothing
                            expensiveInit() // does nothing
                        }
                    

sync.Once

Example:

                        var once sync.Once

                        func expensiveInit() {
                            once.Do(func() {
                                fmt.Println("initialized!")
                            })
                        }

                        func main() {
                            expensiveInit() // prints "initialized!"
                            expensiveInit() // does nothing
                            expensiveInit() // does nothing
                        }
                    

sync.Once

Example:

                        var once sync.Once

                        func expensiveInit() {
                            once.Do(func() {
                                fmt.Println("initialized!")
                            })
                        }

                        func main() {
                            expensiveInit() // prints "initialized!"
                            expensiveInit() // does nothing
                            expensiveInit() // does nothing
                        }
                    

sync.Once

Example:

                        var once sync.Once

                        func expensiveInit() {
                            once.Do(func() {
                                fmt.Println("initialized!")
                            })
                        }

                        func main() {
                            expensiveInit() // prints "initialized!"
                            expensiveInit() // does nothing
                            expensiveInit() // does nothing
                        }
                    

sync.Mutex & sync.RWMutex

Use to control concurrent reads or writes to a shared data structure.

sync.Mutex & sync.RWMutex

  • sync.Mutex: mutual exclusion lock
  • sync.RWMutex: reader/writer mutual exclusion lock

sync.Mutex & sync.RWMutex

Example: what does the following print?

                        func printLetters(words []string) {
                            n := time.Duration(rand.Intn(100))

                            for _, s := range words {
                                fmt.Print(s + " ")
                                time.Sleep(n * time.Millisecond)
                            }

                            fmt.Println()
                        }

                        func main() {
                            rand.Seed(time.Now().Unix())

                            var wg sync.WaitGroup

                            wg.Add(2)

                            go func() {
                                defer wg.Done()
                                printLetters([]string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"})
                            }()

                            go func() {
                                defer wg.Done()
                                printLetters([]string{"k", "l", "m", "n", "o", "p", "q", "r", "s", "t"})
                            }()

                            wg.Wait()
                        }
                    

sync.Mutex & sync.RWMutex

Example: what does the following print?

                        func printLetters(words []string) {
                            n := time.Duration(rand.Intn(100))

                            for _, s := range words {
                                fmt.Print(s + " ")
                                time.Sleep(n * time.Millisecond)
                            }

                            fmt.Println()
                        }

                        func main() {
                            rand.Seed(time.Now().Unix())

                            var wg sync.WaitGroup

                            wg.Add(2)

                            go func() {
                                defer wg.Done()
                                printLetters([]string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"})
                            }()

                            go func() {
                                defer wg.Done()
                                printLetters([]string{"k", "l", "m", "n", "o", "p", "q", "r", "s", "t"})
                            }()

                            wg.Wait()
                        }
                    

sync.Mutex & sync.RWMutex



                        k a b c l d e m f g n h i o j
                        p q r s t
                    
Char sequences are not on their own lines.

Let's fix it!

sync.Mutex & sync.RWMutex


                        func printLetters(words []string) {
                            n := time.Duration(rand.Intn(100))

                            for _, s := range words {
                                fmt.Print(s + " ")
                                time.Sleep(n * time.Millisecond)
                            }

                            fmt.Println()
                        }

                        func main() {
                            rand.Seed(time.Now().Unix())

                            var wg sync.WaitGroup
                            var mu sync.Mutex

                            wg.Add(2)

                            go func() {
                                defer wg.Done()
                                mu.Lock()         // acquire lock
                                defer mu.Unlock() // release lock when done
                                printLetters([]string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"})
                            }()

                            go func() {
                                defer wg.Done()
                                mu.Lock()         // acquire lock
                                defer mu.Unlock() // release lock when done
                                printLetters([]string{"k", "l", "m", "n", "o", "p", "q", "r", "s", "t"})
                            }()

                            wg.Wait()
                        }
                    

sync.Mutex & sync.RWMutex


                        func printLetters(words []string) {
                            n := time.Duration(rand.Intn(100))

                            for _, s := range words {
                                fmt.Print(s + " ")
                                time.Sleep(n * time.Millisecond)
                            }

                            fmt.Println()
                        }

                        func main() {
                            rand.Seed(time.Now().Unix())

                            var wg sync.WaitGroup
                            var mu sync.Mutex

                            wg.Add(2)

                            go func() {
                                defer wg.Done()
                                mu.Lock()         // acquire lock
                                defer mu.Unlock() // release lock when done
                                printLetters([]string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"})
                            }()

                            go func() {
                                defer wg.Done()
                                mu.Lock()         // acquire lock
                                defer mu.Unlock() // release lock when done
                                printLetters([]string{"k", "l", "m", "n", "o", "p", "q", "r", "s", "t"})
                            }()

                            wg.Wait()
                        }
                    

sync.Mutex & sync.RWMutex


                        func printLetters(words []string) {
                            n := time.Duration(rand.Intn(100))

                            for _, s := range words {
                                fmt.Print(s + " ")
                                time.Sleep(n * time.Millisecond)
                            }

                            fmt.Println()
                        }

                        func main() {
                            rand.Seed(time.Now().Unix())

                            var wg sync.WaitGroup
                            var mu sync.Mutex

                            wg.Add(2)

                            go func() {
                                defer wg.Done()
                                mu.Lock()         // acquire lock
                                defer mu.Unlock() // release lock when done
                                printLetters([]string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"})
                            }()

                            go func() {
                                defer wg.Done()
                                mu.Lock()         // acquire lock
                                defer mu.Unlock() // release lock when done
                                printLetters([]string{"k", "l", "m", "n", "o", "p", "q", "r", "s", "t"})
                            }()

                            wg.Wait()
                        }
                    

sync.Mutex & sync.RWMutex



                        k l m n o p q r s t
                        a b c d e f g h i j
                    
Char sequences guaranteed to be on their own lines.

sync.Mutex & sync.RWMutex


⚠️ Warning ⚠️
Lock() is not reentrant!

Honorable Mentions

Package sync/atomic


  • Primitive wrappers around CPU register operations (swap, load, store, compare-and-swap)
  • Lockless and often used in implementation of other sync primitives


You probably don't need these.

Package sync/atomic

A few std packages where used:
  • sync
  • database/sql
  • net/http

Package sync/atomic

Example (broken):

                        func main() {
                            var (
                                total int32
                                wg    sync.WaitGroup
                            )

                            wg.Add(10_000)

                            for i := 0; i < 10_000; i++ {
                                go func() {
                                    defer wg.Done()
                                    total++
                                }()
                            }

                            wg.Wait()

                            fmt.Println(total) // example: 9106
                        }
                    

Package sync/atomic

Example (fixed):

                        func main() {
                            var (
                                total int32
                                wg    sync.WaitGroup
                            )

                            wg.Add(10_000)

                            for i := 0; i < 10_000; i++ {
                                go func() {
                                    defer wg.Done()
                                    atomic.AddInt32(&total, 1)
                                }()
                            }

                            wg.Wait()

                            fmt.Println(total) // 10000
                        }
                    

sync.Map


  • Concurrency-safe
  • Optimized for highly concurrent scenarios where keys are only added and read
  • Still cannot bind type parameter at instantiation as of Go 1.18

sync.Map


A regular map guarded by a sync.RWMutex should fulfill most use cases without sacrificing legibility.

sync.Map

Example:

                        func main() {
                            var m sync.Map
                            m.Store("key", 1)
                            m.Store(1, 1)

                            v, ok := m.Load("key")
                            if ok {
                                n := v.(int)
                                fmt.Println(n) // 1
                            }

                            v, ok = m.Load(1)
                            if ok {
                                n := v.(int)
                                fmt.Println(n) // 1
                            }
                        }
                    

errgroup.Group


  • Part of golang.org/x/sync (non-standard)
  • Supplement to sync.WaitGroup
  • "Provides synchronization, error propagation, and Context cancelation"
  • Maintained by Go authors

errgroup.Group

Comparison: error handling with sync.WaitGroup:

func main() {
	timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	var urls = []string{
		"http://www.golang.org/",
		"http://www.google.com/",
	}

	var g sync.WaitGroup

	g.Add(len(urls))

	var client http.Client
	var errors []error
	var mu sync.Mutex

	for _, url := range urls {
		u := url
		go func() {
			defer g.Done()

			req, err := http.NewRequestWithContext(timeout, http.MethodGet, u, nil)
			if err != nil {
				mu.Lock()
				errors = append(errors, err)
				mu.Unlock()

				return
			}

			// Fetch the URL.
			resp, err := client.Do(req)
			if err != nil {
				mu.Lock()
				errors = append(errors, err)
				mu.Unlock()

				return
			}

			_ = resp.Body.Close()
		}()
	}

	// Wait for all HTTP fetches to complete.
	g.Wait()

	if len(errors) == 0 {
		fmt.Println("Successfully fetched all URLs.")
	}
}
                    

errgroup.Group

Comparison: error handling with errgroup.Group:

func main() {
	timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	var urls = []string{
		"http://www.golang.org/",
		"http://www.google.com/",
	}

	g, _ := errgroup.WithContext(timeout)

	var client http.Client

	for _, url := range urls {
		u := url
		go func() {
			g.Go(func() error {
				req, err := http.NewRequestWithContext(timeout, http.MethodGet, u, nil)
				if err != nil {
					return err
				}

				// Fetch the URL.
				resp, err := client.Do(req)
				if err != nil {
					return err
				}

				return resp.Body.Close()
			})
		}()
	}

	// Wait for all HTTP fetches to complete.
	if err := g.Wait(); err == nil {
		fmt.Println("Successfully fetched all URLs.")
	}
}
                    

Questions?