Master Go concurrency patterns using goroutines, channels, and pipeline architectures with proper cancellation
Generate idiomatic Go code implementing concurrent pipelines with goroutines, channels, fan-out/fan-in patterns, and proper cancellation handling based on official Go documentation patterns.
This skill helps you design and implement Go concurrency patterns following best practices from the official Go blog. It covers pipeline architectures, goroutine management, channel communication, fan-out/fan-in patterns, and graceful cancellation using context or done channels.
When the user requests help with Go concurrency, pipelines, or goroutine patterns:
1. **Understand the Requirements**
- Identify the data flow: source → stages → sink
- Determine if fan-out (parallelization) or fan-in (merging) is needed
- Ask about cancellation requirements (timeout, explicit cancel, early exit)
- Clarify if buffered channels are appropriate for the use case
2. **Design the Pipeline Architecture**
- Define stages as functions that receive from inbound channels and send to outbound channels
- First stage (source/producer): only outbound channels
- Middle stages: both inbound and outbound channels
- Last stage (sink/consumer): only inbound channels
- Each stage should close its outbound channels when done
3. **Implement Core Patterns**
**Basic Stage Pattern:**
```go
func stage(in <-chan T) <-chan T {
out := make(chan T)
go func() {
defer close(out)
for v := range in {
// Process v
out <- result
}
}()
return out
}
```
**Fan-Out (Parallel Workers):**
- Multiple goroutines read from the same channel
- Distributes work for CPU/IO parallelization
**Fan-In (Merge Results):**
- Use sync.WaitGroup to coordinate multiple input channels
- Close output channel only after all inputs are drained
4. **Add Cancellation Support**
- Accept a `done <-chan struct{}` or `context.Context` parameter in all pipeline functions
- Use `select` statements to check cancellation:
```go
select {
case out <- v:
case <-done:
return
}
```
- Defer close(done) in main to broadcast cancellation to all stages
- Alternative: Use context.Context with context.WithCancel for more control
5. **Handle Resource Cleanup**
- Ensure all goroutines can exit (avoid blocking indefinitely)
- Close channels in the correct order (defer close after all sends complete)
- Use defer statements for cleanup
- Avoid goroutine leaks by ensuring cancellation paths exist
6. **Choose Buffering Strategy**
- Unbuffered channels (default): synchronous, backpressure
- Buffered channels: decouple sender/receiver, use when sizes are known
- Avoid fragile buffer sizes that depend on knowing exact value counts
7. **Follow Best Practices**
- Stages close outbound channels when all sends are done
- Stages keep receiving until inbound channels are closed
- Use `range` loops over channels for clean iteration
- Send on channels only before closing, never after
- Broadcast cancellation by closing the done channel
8. **Generate Complete, Runnable Code**
- Include all imports (sync, fmt, context if used)
- Provide a complete main function demonstrating the pipeline
- Add comments explaining the concurrency flow
- Show example output
**User Request:** "Create a pipeline that reads numbers, squares them in parallel, and sums the results with cancellation support"
**Expected Output:**
```go
package main
import (
"fmt"
"sync"
)
// gen converts a list of integers to a channel
func gen(done <-chan struct{}, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-done:
return
}
}
}()
return out
}
// sq squares numbers from input channel
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
// merge fans-in multiple channels
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
done := make(chan struct{})
defer close(done)
in := gen(done, 1, 2, 3, 4, 5)
// Fan-out: two sq workers
c1 := sq(done, in)
c2 := sq(done, in)
// Fan-in: merge results
sum := 0
for n := range merge(done, c1, c2) {
sum += n
}
fmt.Println("Sum of squares:", sum) // 55
}
```
Based on "Go Concurrency Patterns: Pipelines and cancellation" from the official Go blog (https://go.dev/blog/pipelines)
Leave a review
No reviews yet. Be the first to review this skill!
# Download SKILL.md from killerskills.ai/api/skills/go-concurrency-pipelines/raw