Go的goroutine和channel天生适合构建并发数据处理流水线。pipeline、fan-out、fan-in是三个最基础也最实用的并发模式。
Pipeline模式
Pipeline就是把处理流程拆成多个阶段(stage),每个阶段通过channel连接,数据像流水线一样依次流过。
最简单的例子
package main
import "fmt"
// stage 1: 生成数据
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// stage 2: 平方
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// stage 3: 转字符串输出
func toString(in <-chan int) <-chan string {
out := make(chan string)
go func() {
for n := range in {
out <- fmt.Sprintf("result: %d", n)
}
close(out)
}()
return out
}
func main() {
// 链接pipeline: generate -> square -> toString
ch := toString(square(generate(2, 3, 4, 5)))
for s := range ch {
fmt.Println(s)
}
}
每个stage都是独立的goroutine,通过channel解耦。上游close channel,下游的range循环自动结束。
设计原则
- 每个stage启动一个goroutine
- 通过channel传递数据
- stage函数接收
<-chan T(只读),返回<-chan T(只读) - 生产者负责close channel
Fan-Out:并行处理
当pipeline中某个stage成为瓶颈时,可以启动多个goroutine同时处理,这就是Fan-Out。
package main
import (
"crypto/md5"
"fmt"
"sync"
)
// 模拟耗时的计算
func heavyCompute(in <-chan string) <-chan string {
out := make(chan string)
go func() {
for s := range in {
hash := md5.Sum([]byte(s))
out <- fmt.Sprintf("%s -> %x", s, hash)
}
close(out)
}()
return out
}
// Fan-Out: 启动n个worker
func fanOut(in <-chan string, n int) []<-chan string {
channels := make([]<-chan string, n)
// 需要一个分发器把输入分发给多个worker
inputs := make([]chan string, n)
for i := 0; i < n; i++ {
inputs[i] = make(chan string)
channels[i] = heavyCompute(inputs[i])
}
go func() {
i := 0
for item := range in {
inputs[i%n] <- item
i++
}
for _, ch := range inputs {
close(ch)
}
}()
return channels
}
Fan-In:合并结果
Fan-In把多个channel的输出合并到一个channel中:
func fanIn(channels ...<-chan string) <-chan string {
var wg sync.WaitGroup
merged := make(chan string)
output := func(ch <-chan string) {
defer wg.Done()
for val := range ch {
merged <- val
}
}
wg.Add(len(channels))
for _, ch := range channels {
go output(ch)
}
// 所有输入channel关闭后,关闭输出channel
go func() {
wg.Wait()
close(merged)
}()
return merged
}
完整示例:文件处理流水线
将上面的模式组合起来,实现一个文件处理流水线:
package main
import (
"context"
"crypto/sha256"
"fmt"
"io/fs"
"os"
"path/filepath"
"sync"
)
// Stage 1: 遍历目录,产出文件路径
func walkFiles(ctx context.Context, root string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
filepath.WalkDir(root, func(path string, d fs.DirEntry, err error) error {
if err != nil || d.IsDir() {
return nil
}
select {
case out <- path:
case <-ctx.Done():
return ctx.Err()
}
return nil
})
}()
return out
}
// Stage 2: 计算文件hash(耗时操作,适合fan-out)
type FileHash struct {
Path string
Hash string
}
func hashFile(ctx context.Context, paths <-chan string) <-chan FileHash {
out := make(chan FileHash)
go func() {
defer close(out)
for path := range paths {
select {
case <-ctx.Done():
return
default:
}
data, err := os.ReadFile(path)
if err != nil {
continue
}
hash := sha256.Sum256(data)
select {
case out <- FileHash{Path: path, Hash: fmt.Sprintf("%x", hash)}:
case <-ctx.Done():
return
}
}
}()
return out
}
// Fan-In合并
func merge(ctx context.Context, channels ...<-chan FileHash) <-chan FileHash {
var wg sync.WaitGroup
merged := make(chan FileHash)
output := func(ch <-chan FileHash) {
defer wg.Done()
for val := range ch {
select {
case merged <- val:
case <-ctx.Done():
return
}
}
}
wg.Add(len(channels))
for _, ch := range channels {
go output(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
root := "."
if len(os.Args) > 1 {
root = os.Args[1]
}
// Pipeline: walkFiles -> fan-out(hashFile x 4) -> fan-in -> collect
paths := walkFiles(ctx, root)
// Fan-Out: 4个worker并行计算hash
numWorkers := 4
hashChannels := make([]<-chan FileHash, numWorkers)
// 分发输入
dispatchers := make([]chan string, numWorkers)
for i := 0; i < numWorkers; i++ {
dispatchers[i] = make(chan string)
hashChannels[i] = hashFile(ctx, dispatchers[i])
}
go func() {
i := 0
for path := range paths {
dispatchers[i%numWorkers] <- path
i++
}
for _, d := range dispatchers {
close(d)
}
}()
// Fan-In
results := merge(ctx, hashChannels...)
// 收集结果
count := 0
for fh := range results {
fmt.Printf("%s %s\n", fh.Hash[:16], fh.Path)
count++
}
fmt.Printf("\nProcessed %d files\n", count)
}
Context取消传播
上面的代码已经展示了context的用法。核心原则:
- 每个stage接收
context.Context参数 - 在channel操作时用
select监听ctx.Done() - 上游取消后,所有下游goroutine都能感知并退出
select {
case out <- value:
// 正常发送
case <-ctx.Done():
return // 上游已取消,退出
}
这种模式避免了goroutine泄漏——即使pipeline被提前终止,所有goroutine也能正确退出。
适用场景
- ETL数据处理:数据抽取 -> 清洗 -> 转换 -> 加载
- 日志分析:读取日志文件 -> 解析 -> 过滤 -> 聚合
- 爬虫:URL发现 -> 下载 -> 解析 -> 存储
- 文件处理:遍历文件 -> 读取内容 -> 计算/转换 -> 输出
Pipeline模式让并发程序的结构清晰可读,每个stage职责单一,易于测试和替换。