Semaphores are like mutexes that allow N threads to access a shared resource instead of just 1. Go’s buffered channels make creating semaphore-like behaviour a doddle. To create a semaphore that allows 10 threads to concurrently access a shared resource, its simply make(chan struct{}, 10)
etc. The only thing our threads need to lock/own is an item of the buffered channel, so it makes sense to use an empty struct, as it uses zero memory.
To keep the interface to our semaphore clean, we’ll start by hiding the channel in a struct with a sensible name:
type semaphore struct {
channel chan struct{}
}
At creation time, we’ll want to initialise the semaphore with an underlying channel size, so let’s create a new
method next:
func newSemaphore(concurrency int) (s *semaphore) {
return &semaphore{
channel: make(chan struct{}, concurrency),
}
}
Next, we’ll want to ask the semaphore to execute something for us. This is acheieved by:
- Waiting for the underlying channel to have an empty slot.
- Adding an item into it to signal that something is running.
- Removing the item from it after the function has completed.
func (s *semaphore) execute(f func()) {
s.channel <- struct{}{}
go func() {
defer func() {
<-s.channel
}()
f()
}()
}
In order to reliably finish our workload, we’ll need to be able to tell the semaphore to wait for all functions to complete. This can be achieved elegantly by using the underlying channel itself:
- Add as many items to the channel as it has capacity for. This ensures there’s nothing else using the semaphore.
- Remove the items so that the semaphore can be reused.
func (s *semaphore) Wait() {
for i := 0; i < cap(s.c); i++ {
s.c <- struct{}{}
}
for i := 0; i < cap(s.c); i++ {
<-s.c
}
}
I’ve created a library that encapsulates this logic into a tested package and will use that in the following example:
In the following example, we perform 10 operations using a semaphore with 5 slots. You’ll notice from the output that the first 5 operations are completed together and the next 5 operations are completed after. Note that I’m calling Wait()
after kicking off all of the operations, to wait for them to be completed before allowing the program to continue, removing the need for any other kind of hacky synchronisation work.
package main
import (
"fmt"
"time"
"github.com/codingconcepts/semaphore"
)
func main() {
s := semaphore.New(5)
for i := 0; i < 10; i++ {
j := i
s.Run(func() {
fmt.Println(j, time.Now())
time.Sleep(time.Second)
})
}
s.Wait()
}
$ go run main.go
2 2019-03-25 16:54:00.014916 +0000 GMT m=+0.000370912 <- 1st batch
4 2019-03-25 16:54:00.014909 +0000 GMT m=+0.000363865
1 2019-03-25 16:54:00.014911 +0000 GMT m=+0.000366103
0 2019-03-25 16:54:00.014996 +0000 GMT m=+0.000450888
3 2019-03-25 16:54:00.014968 +0000 GMT m=+0.000423673
6 2019-03-25 16:54:01.019303 +0000 GMT m=+1.004793716 <- 2nd batch
8 2019-03-25 16:54:01.019422 +0000 GMT m=+1.004912804
9 2019-03-25 16:54:01.019457 +0000 GMT m=+1.004947535
7 2019-03-25 16:54:01.019366 +0000 GMT m=+1.004857143
5 2019-03-25 16:54:01.019313 +0000 GMT m=+1.004804887