By George Aristy / llorllale
func main() {
result := make(chan interface{})
done := make(chan struct{})
go func() { // launch sub goroutine
select {
case result <- ...: // write to result channel
case <-done:
// abort when `done` is closed
}
}()
// wait for result or timeout, whichever occurs first
select {
case r := <-result:
// process result
case <-time.After(time.Second):
// handle timeout
}
close(done) // cleanup `done` channel
}
// request does not support context
func request() interface{} {
time.Sleep(5 * time.Second) // simulate expensive operation
return "Hello, World!"
}
func main() {
result := make(chan interface{})
done := make(chan struct{})
go func() {
select {
case result <- request():
fmt.Println("request completed")
case <-done:
fmt.Println("request canceled")
}
}()
select {
case r := <-result:
fmt.Println("result: ", r)
case <-time.After(time.Second):
fmt.Println("request timeout")
}
close(done)
time.Sleep(100 * time.Millisecond) // allow time for sub goroutine to print cancel msg
}
request timeout
Program exited.
// request does not support context
func request() interface{} {
time.Sleep(5 * time.Second) // simulate expensive operation
return "Hello, World!"
}
func main() {
result := make(chan interface{})
done := make(chan struct{})
go func() {
// each `case` stmt is evaluated once in source order
// see https://go.dev/ref/spec#Select_statements
select {
case result <- request(): // `request()` evaluation takes ~5s
fmt.Println("request completed")
case <-done:
fmt.Println("request canceled")
}
}()
select {
case r := <-result:
fmt.Println("result: ", r)
case <-time.After(time.Second):
fmt.Println("request timeout")
}
close(done)
time.Sleep(100 * time.Millisecond) // allow sub goroutine to print cancel msg
}
request()
in another goroutine (playground ).
// request does not support context
func request() interface{} {
time.Sleep(5 * time.Second) // simulate expensive operation
return "Hello, World!"
}
func main() {
tmp := make(chan interface{})
go func() {
tmp <- request()
}()
result := make(chan interface{})
done := make(chan struct{})
go func() {
select {
case t := <-tmp:
result <- t
fmt.Println("request completed")
case <-done:
fmt.Println("request canceled")
}
}()
select {
case r := <-result:
fmt.Println("result: ", r)
case <-time.After(time.Second):
fmt.Println("request timeout")
}
close(done)
time.Sleep(100 * time.Millisecond) // allow time for sub goroutine to print cancel msg
}
context.Context
(playground ):
// request does not support context
func request() interface{} {
time.Sleep(5 * time.Second) // simulate expensive operation
return "Hello, World!"
}
func main() {
ctx, cancel := context.WithCancel(context.Background()) // or context.WithDeadline, or context.WithTimeout
defer cancel() // best practice, but don't rely on it for timely cancellation
tmp := make(chan interface{})
go func() {
tmp <- request()
}()
result := make(chan interface{})
go func() {
select {
case t := <-tmp:
result <- t
fmt.Println("request completed")
case <-ctx.Done():
fmt.Println("request canceled: ", ctx.Err())
}
}()
select {
case r := <-result:
fmt.Println("result: ", r)
case <-time.After(time.Second):
fmt.Println("request timeout")
}
cancel() // important! cancel now; don't defer
time.Sleep(100 * time.Millisecond) // allow time for sub goroutine to print cancel msg
}
context.Context
(playground ):
// request supports context (blackbox)
func request(ctx context.Context) (interface{}, error) {
select {
case <-time.After(5 * time.Second):
return "Hello, World!", nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background()) // or context.WithDeadline, or context.WithTimeout
defer cancel() // best practice, but don't rely on it for timely cancellation
result := make(chan interface{})
go func() {
r, err := request(ctx)
if err != nil {
fmt.Println("request error: ", err)
return
}
result <- r
}()
select {
case r := <-result:
fmt.Println("result: ", r)
case <-time.After(time.Second):
fmt.Println("request timeout")
}
cancel() // important! cancel now; don't defer
time.Sleep(100 * time.Millisecond) // allow time for sub goroutine to print cancel msg
}
context.Context
;
otherwise wrap them in a function that does.
func main() {
ctx, cancel := context.WithCancel(context.Background()) // or context.WithDeadline, or context.WithTimeout
defer cancel() // best practice, but don't rely on it for timely cancellation
result := make(chan interface{})
go func() {
if ctx.Err() != nil { // abort if context is already cancelled
return
}
r, err := request(ctx)
if err != nil {
fmt.Println("request error: ", err)
return
}
result <- r
}()
select {
case r := <-result:
fmt.Println("result: ", r)
case <-time.After(time.Second):
fmt.Println("request timeout")
}
cancel() // important! cancel now; don't defer
time.Sleep(100 * time.Millisecond) // allow time for sub goroutine to print cancel msg
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Set a buffer on the channel.
// This way, even if parent goroutine is gone (timeout, etc.)
// and no longer listening on this channel, the sub goroutine
// won't block when writing.
result := make(chan interface{}, 1)
go func() {
// Close channel when done.
// Not closed in parent goroutine to avoid writing to a
// closed channel (panic).
defer close(result)
if ctx.Err() != nil { // abort if context is already cancelled
return
}
r, err := request(ctx)
if err != nil {
fmt.Println("request error: ", err)
return
}
result <- r // won't block
}()
select {
case r := <-result:
fmt.Println("result: ", r)
case <-time.After(time.Second):
fmt.Println("request timeout")
}
cancel() // important! cancel now; don't defer
time.Sleep(100 * time.Millisecond) // allow time for sub goroutine to print cancel msg
}
func FanIn(sources ...<-chan int) <-chan int {
dest := make(chan int) // return destination channel
var wg sync.WaitGroup
wg.Add(len(sources))
// multiplex each source into dest
for _, src := range sources {
go func(src <-chan int) {
defer wg.Done()
for v := range src {
dest <- v
}
}(src)
}
// close dest when all sources are closed
go func() {
wg.Wait()
close(dest)
}()
return dest
}
func main() {
src := sources()
dest := FanIn(src...) // sources coalesced into a single channel
for r := range dest {
fmt.Println(r)
}
}
func FanOut(src <-chan int, n int) []<-chan int {
dests := make([]<-chan int, 0)
for i := 0; i < n; i++ {
dest := make(chan int)
dests = append(dests, dest)
go func() {
defer close(dest)
for v := range src {
dest <- v
}
}()
}
return dests
}
func main() {
src := source()
dests := FanOut(src, 5)
var wg sync.WaitGroup
wg.Add(len(dests))
for _, dest := range dests {
go func(dest <-chan int) {
defer wg.Done()
for v := range dest {
fmt.Println(v)
}
}(dest)
}
wg.Wait()
}
type Shard[V any] struct {
sync.RWMutex
m map[string]V
}
type ShardedMap[V any] []*Shard[V]
func NewShardedMap[V any](nShards int) ShardedMap[V] {
shards := make([]*Shard[V], nShards)
for i := range shards {
shards[i] = &Shard[V]{m: make(map[string]V)}
}
return shards
}
func (m ShardedMap[V]) Get(key string) V {
shard := m.shard(key)
shard.RLock()
defer shard.RUnlock()
return shard.m[key]
}
func (m ShardedMap[V]) Set(key string, value V) {
shard := m.shard(key)
shard.Lock()
defer shard.Unlock()
shard.m[key] = value
}
func main() {
m := NewShardedMap[int](5)
m.Set("alpha", 1)
m.Set("beta", 2)
m.Set("gamma", 3)
fmt.Println(m.Get("alpha"))
fmt.Println(m.Get("beta"))
fmt.Println(m.Get("gamma"))
keys := m.Keys()
for _, k := range keys {
fmt.Println(k)
}
}
func Process(concurrency int, data []int) {
sem := make(chan struct{}, concurrency)
var wg sync.WaitGroup
wg.Add(len(data))
for _, d := range data {
sem <- struct{}{} // blocks until capacity is freed up
go func(d int) {
defer wg.Done()
doProcess(d)
<-sem // free up capacity
}(d)
}
wg.Wait()
}
func main() {
data := getData()
Process(3, data)
}
type PressureGauge struct {
ch chan struct{}
}
func New(limit int) *PressureGauge {
ch := make(chan struct{}, limit)
for i := 0; i < limit; i++ {
ch <- struct{}{}
}
return &PressureGauge{ch: ch}
}
func (p *PressureGauge) Do(fn func()) error {
select {
case <-p.ch:
fn()
p.ch <- struct{}{}
return nil
default:
return errors.New("out of capacity")
}
}
func runServer(pg *PressureGauge) {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
err := pg.Do(func() {
w.Write([]byte(expensiveOperation()))
})
if err != nil {
w.WriteHeader(http.StatusTooManyRequests)
w.Write([]byte("too many requests"))
}
})
http.ListenAndServe(":8080", nil)
}