Go并发模式:pipeline与fan-out/fan-in

Go的goroutine和channel天生适合构建并发数据处理流水线。pipeline、fan-out、fan-in是三个最基础也最实用的并发模式。

Pipeline模式

Pipeline就是把处理流程拆成多个阶段(stage),每个阶段通过channel连接,数据像流水线一样依次流过。

最简单的例子

package main

import "fmt"

// stage 1: 生成数据
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// stage 2: 平方
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

// stage 3: 转字符串输出
func toString(in <-chan int) <-chan string {
    out := make(chan string)
    go func() {
        for n := range in {
            out <- fmt.Sprintf("result: %d", n)
        }
        close(out)
    }()
    return out
}

func main() {
    // 链接pipeline: generate -> square -> toString
    ch := toString(square(generate(2, 3, 4, 5)))
    for s := range ch {
        fmt.Println(s)
    }
}

每个stage都是独立的goroutine,通过channel解耦。上游close channel,下游的range循环自动结束。

设计原则

  • 每个stage启动一个goroutine
  • 通过channel传递数据
  • stage函数接收<-chan T(只读),返回<-chan T(只读)
  • 生产者负责close channel

Fan-Out:并行处理

当pipeline中某个stage成为瓶颈时,可以启动多个goroutine同时处理,这就是Fan-Out。

package main

import (
    "crypto/md5"
    "fmt"
    "sync"
)

// 模拟耗时的计算
func heavyCompute(in <-chan string) <-chan string {
    out := make(chan string)
    go func() {
        for s := range in {
            hash := md5.Sum([]byte(s))
            out <- fmt.Sprintf("%s -> %x", s, hash)
        }
        close(out)
    }()
    return out
}

// Fan-Out: 启动n个worker
func fanOut(in <-chan string, n int) []<-chan string {
    channels := make([]<-chan string, n)
    // 需要一个分发器把输入分发给多个worker
    inputs := make([]chan string, n)
    for i := 0; i < n; i++ {
        inputs[i] = make(chan string)
        channels[i] = heavyCompute(inputs[i])
    }

    go func() {
        i := 0
        for item := range in {
            inputs[i%n] <- item
            i++
        }
        for _, ch := range inputs {
            close(ch)
        }
    }()

    return channels
}

Fan-In:合并结果

Fan-In把多个channel的输出合并到一个channel中:

func fanIn(channels ...<-chan string) <-chan string {
    var wg sync.WaitGroup
    merged := make(chan string)

    output := func(ch <-chan string) {
        defer wg.Done()
        for val := range ch {
            merged <- val
        }
    }

    wg.Add(len(channels))
    for _, ch := range channels {
        go output(ch)
    }

    // 所有输入channel关闭后,关闭输出channel
    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

完整示例:文件处理流水线

将上面的模式组合起来,实现一个文件处理流水线:

package main

import (
    "context"
    "crypto/sha256"
    "fmt"
    "io/fs"
    "os"
    "path/filepath"
    "sync"
)

// Stage 1: 遍历目录,产出文件路径
func walkFiles(ctx context.Context, root string) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        filepath.WalkDir(root, func(path string, d fs.DirEntry, err error) error {
            if err != nil || d.IsDir() {
                return nil
            }
            select {
            case out <- path:
            case <-ctx.Done():
                return ctx.Err()
            }
            return nil
        })
    }()
    return out
}

// Stage 2: 计算文件hash(耗时操作,适合fan-out)
type FileHash struct {
    Path string
    Hash string
}

func hashFile(ctx context.Context, paths <-chan string) <-chan FileHash {
    out := make(chan FileHash)
    go func() {
        defer close(out)
        for path := range paths {
            select {
            case <-ctx.Done():
                return
            default:
            }

            data, err := os.ReadFile(path)
            if err != nil {
                continue
            }
            hash := sha256.Sum256(data)
            select {
            case out <- FileHash{Path: path, Hash: fmt.Sprintf("%x", hash)}:
            case <-ctx.Done():
                return
            }
        }
    }()
    return out
}

// Fan-In合并
func merge(ctx context.Context, channels ...<-chan FileHash) <-chan FileHash {
    var wg sync.WaitGroup
    merged := make(chan FileHash)

    output := func(ch <-chan FileHash) {
        defer wg.Done()
        for val := range ch {
            select {
            case merged <- val:
            case <-ctx.Done():
                return
            }
        }
    }

    wg.Add(len(channels))
    for _, ch := range channels {
        go output(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    root := "."
    if len(os.Args) > 1 {
        root = os.Args[1]
    }

    // Pipeline: walkFiles -> fan-out(hashFile x 4) -> fan-in -> collect
    paths := walkFiles(ctx, root)

    // Fan-Out: 4个worker并行计算hash
    numWorkers := 4
    hashChannels := make([]<-chan FileHash, numWorkers)
    // 分发输入
    dispatchers := make([]chan string, numWorkers)
    for i := 0; i < numWorkers; i++ {
        dispatchers[i] = make(chan string)
        hashChannels[i] = hashFile(ctx, dispatchers[i])
    }

    go func() {
        i := 0
        for path := range paths {
            dispatchers[i%numWorkers] <- path
            i++
        }
        for _, d := range dispatchers {
            close(d)
        }
    }()

    // Fan-In
    results := merge(ctx, hashChannels...)

    // 收集结果
    count := 0
    for fh := range results {
        fmt.Printf("%s  %s\n", fh.Hash[:16], fh.Path)
        count++
    }
    fmt.Printf("\nProcessed %d files\n", count)
}

Context取消传播

上面的代码已经展示了context的用法。核心原则:

  1. 每个stage接收context.Context参数
  2. 在channel操作时用select监听ctx.Done()
  3. 上游取消后,所有下游goroutine都能感知并退出
select {
case out <- value:
    // 正常发送
case <-ctx.Done():
    return  // 上游已取消,退出
}

这种模式避免了goroutine泄漏——即使pipeline被提前终止,所有goroutine也能正确退出。

适用场景

  • ETL数据处理:数据抽取 -> 清洗 -> 转换 -> 加载
  • 日志分析:读取日志文件 -> 解析 -> 过滤 -> 聚合
  • 爬虫:URL发现 -> 下载 -> 解析 -> 存储
  • 文件处理:遍历文件 -> 读取内容 -> 计算/转换 -> 输出

Pipeline模式让并发程序的结构清晰可读,每个stage职责单一,易于测试和替换。