从 Redis + Channel 到 Redis Stream:一次 Go 任务队列方案的重新理解
在讨论 PostgreSQL queue 方案之后,我又回头看了一下之前自己设想过的 Go + Redis 任务架构。
最早我脑子里的方案大概是:
POST 创建任务
Redis 存状态
Go goroutine 直接处理
进程内 channel 控制并发
这个方案很直观,也很符合 Go 的写法。
用户请求进来后,API 生成一个 task_id,把任务状态写进 Redis,然后把任务塞进 Go 的 channel,由后台 goroutine 消费执行。并发控制就靠一个固定大小的 channel 或 worker pool,比如最多 20 个 goroutine 同时执行任务。
这个设计在单进程里确实很舒服。
但当我开始思考多进程、多容器、worker 崩溃恢复、全局并发控制这些问题时,我发现它还不够完整。
更下一阶段的 Redis 方案应该是:
POST 创建任务
Redis Stream 入队
Worker 从队列消费
Redis Lua 控全局并发
XAUTOCLAIM 做崩溃恢复
这篇文章就是整理这个理解过程。
一、最初方案:Redis 存状态 + Go channel 控并发
最开始的方案大概是这样:
Gin / Go API
↓
POST /tasks
↓
生成 task_id
↓
Redis 记录任务状态
↓
taskChan <- task
↓
goroutine worker pool 执行任务
Redis 里负责存状态:
task:{task_id}
比如:
status=pending
request={...}
result={...}
error=
created_at=...
updated_at=...
Go 进程内部有一个 channel:
taskChan := make(chan Task, 1000)
再启动固定数量的 goroutine:
for i := 0; i < 20; i++ {
go worker(taskChan)
}
这样就可以做到:
最多 20 个任务同时执行
这个模型非常容易理解。
API 负责接任务,Redis 负责存状态,goroutine 负责执行,channel 负责进程内排队和并发控制。
对于单进程服务来说,它甚至是一个很好的方案。
二、这个方案为什么看起来很舒服?
因为它充分利用了 Go 的优势。
Go 的 goroutine 很轻,channel 又天然适合做进程内任务分发。
整个模型看起来非常清爽:
请求进来
↓
写 Redis
↓
丢给 channel
↓
goroutine 执行
而且并发控制也很简单。
比如我希望最多同时执行 20 个任务,那我就启动 20 个 worker goroutine。
如果任务主要是 LLM 请求、HTTP 调用、图片下载这类 I/O 型任务,Go 的 goroutine 确实很适合。
单进程下,这个方案没有明显问题。
三、真正的问题:channel 是进程内的
问题出在这里:
channel 是进程内的。
这句话非常关键。
Go channel 只能管理当前这个 Go 进程里的任务。
如果我只有一个 API 进程:
api-1:20 个 goroutine
那确实最多 20 并发。
但如果我把 API 扩成 4 个容器:
api-1:20 个 goroutine
api-2:20 个 goroutine
api-3:20 个 goroutine
api-4:20 个 goroutine
真实并发就变成了:
20 × 4 = 80
这时候所谓的“最多 20 并发”就不是全局的,而只是单进程内的。
所以 channel 控制的是:
进程内并发
不是:
全局并发
这是旧方案最大的边界。
四、任务状态也会分裂
旧方案还有另一个问题:状态容易分裂。
任务提交后,状态在 Redis 里:
Redis:
task:{id} status=pending
但任务本身可能已经进入了 Go 进程内的 channel:
Go channel:
Task{id}
任务执行中,又存在于某个 goroutine 的内存里:
goroutine:
正在处理 task_id
于是系统里出现了三个地方:
Redis 里有任务状态
channel 里有排队任务
goroutine 里有执行中任务
如果进程不挂,这当然没问题。
但如果进程挂了,channel 里的任务就没了。
Redis 里可能还显示:
status=pending
或者:
status=running
这时候就必须额外设计恢复逻辑,比如:
服务启动时扫描 Redis 里的 pending 任务
running 超时后重置为 pending
重新塞回 channel
这当然可以做。
但做到这里就会发现:我其实已经在手写一个任务队列系统了。
五、旧方案适合什么场景?
旧方案不是不好。
它适合这些场景:
单进程服务
任务不太重要
任务丢失可以接受
只需要保护当前进程不要打爆下游
状态只做短期展示
内部工具或一次性任务
比如临时爬虫、内部批处理、小型 webhook 处理、实验性服务,这个模型很爽。
但如果是一个正式的任务系统:
用户提交任务
必须返回 task_id
后续要查询状态
任务不能因为进程重启就丢
多 worker 要共同协作
全局并发必须受控
失败要可恢复
那么 Redis + channel + goroutine 就不够完整了。
它解决的是进程内并发,不是系统级任务调度。
六、下一阶段方案:Redis Stream 入队
如果继续用 Redis,但希望架构更完整,我会把任务队列从 Go channel 提升到 Redis Stream。
新的结构是:
POST 创建任务
↓
Redis Hash 存任务状态
↓
Redis Stream 入队
↓
Worker Consumer Group 消费
↓
Redis Lua 控制全局并发
↓
XAUTOCLAIM 做崩溃恢复
这时 channel 不再是系统级任务队列。
真正的任务队列变成:
Redis Stream
例如:
tasks:stream
提交任务时:
XADD tasks:stream * task_id abc123
Worker 使用 consumer group 消费:
XREADGROUP GROUP workers worker-1 COUNT 1 BLOCK 1000 STREAMS tasks:stream >
这时候任务不再只存在某个 Go 进程的内存里,而是进入了 Redis 的 Stream。
worker 可以是多个进程、多个容器,大家都从同一个 Stream 里取任务。
这就比 channel 更接近真正的分布式队列。
七、Redis Hash 存任务状态
任务状态仍然可以用 Redis Hash 存。
每个任务一个 key:
task:{task_id}
字段可以是:
status
task_type
request
result
error
attempts
created_at
updated_at
locked_until
例如:
HSET task:abc123 \
status pending \
task_type veo \
request '{...}' \
attempts 0 \
created_at 1710000000
查询任务时:
HGETALL task:abc123
这样可以保持原来的接口体验:
GET /tasks/{task_id}
仍然返回:
task_id
status
result
error
也就是说,Redis Hash 负责状态,Redis Stream 负责队列。
八、Redis Stream 比 channel 强在哪里?
Redis Stream 的关键优势是:
队列不在进程内
支持多 worker 消费
支持 consumer group
支持 pending message
支持 ack
支持崩溃后的重新领取
这就比 channel 更适合跨进程任务队列。
在 Redis Stream 里,worker 取走任务后,如果没有确认完成,这条消息不会彻底消失,而是进入 Pending Entries List,也就是 PEL。
worker 完成任务后,需要:
XACK tasks:stream workers message_id
这表示:
这个任务已经处理完成。
如果 worker 崩溃,没有执行 XACK,这条消息还在 Redis 的 pending 列表里。
后续可以被其他 worker 重新 claim。
这就是它比进程内 channel 更可靠的地方。
九、全局并发控制不能靠 channel
如果使用 Redis Stream,只解决了“任务队列”的问题。
但还没解决:
全局最多同时执行 N 个任务
因为多个 worker 都可以从 Stream 里读取任务。
如果没有全局并发控制,worker 多了以后,仍然可能同时执行太多任务。
所以需要一个所有 worker 都能看到的全局 running 记录。
Redis 里可以用 Sorted Set:
tasks:running
score 放 locked_until 时间戳:
ZADD tasks:running 1710000300 abc123
这样可以做两件事:
1. 用 ZCARD 统计当前 running 数量
2. 用 ZRANGEBYSCORE 找出超时 running 任务
但这里有一个关键问题:
检查 running 数量 + 加入 running 集合
这两个动作必须是原子的。
否则多个 worker 同时执行时可能超额。
十、为什么需要 Redis Lua?
假设最大并发是 20。
如果用 Go 普通命令写:
ZCARD tasks:running
如果 count < 20:
ZADD tasks:running locked_until task_id
HSET task:{id} status running
就可能出现竞态。
例如:
worker A 看到 running=19
worker B 也看到 running=19
worker A 加入一个任务
worker B 也加入一个任务
最后 running=21
这就突破了全局并发上限。
Redis 单条命令是原子的,但多条命令组合起来不是自动原子的。
这时候就需要 Lua。
Redis Lua 的意思是:
把一小段 Lua 脚本发给 Redis,让 Redis 在服务器内部一次性执行多条命令。
它不是用 Lua 写整个项目。
主语言还是 Go。
Lua 只负责关键的原子逻辑。
比如:
local running_key = KEYS[1]
local task_key = KEYS[2]
local task_id = ARGV[1]
local now = tonumber(ARGV[2])
local locked_until = tonumber(ARGV[3])
local max_running = tonumber(ARGV[4])
redis.call("ZREMRANGEBYSCORE", running_key, "-inf", now)
local running_count = redis.call("ZCARD", running_key)
if running_count >= max_running then
return 0
end
redis.call("ZADD", running_key, locked_until, task_id)
redis.call("HSET", task_key,
"status", "running",
"locked_until", locked_until,
"updated_at", now
)
return 1
这段脚本做了几件事:
1. 清理过期 running 任务
2. 查询当前 running 数量
3. 判断是否达到最大并发
4. 如果没满,把任务加入 running
5. 更新任务状态为 running
6. 返回是否成功拿到执行槽
Redis 执行 Lua 脚本时,中间不会插入其他客户端命令。
所以这整段逻辑是原子的。
这就相当于 Redis 版的:
事务 + 锁
十一、Worker 消费流程
比较完整的 worker 流程应该是:
1. 从 Redis Stream 读取一条任务消息
2. 调用 Lua 脚本尝试获取全局执行槽
3. 如果没拿到槽位,稍后再试
4. 如果拿到槽位,把任务状态标记为 running
5. 执行 LLM 调用
6. 成功后写 completed + result
7. 失败后写 failed + error
8. 从 tasks:running 移除 task_id
9. XACK Stream 消息
伪代码大概是:
for {
msg := readFromStream()
ok := acquireSlotByLua(taskID)
if !ok {
sleep()
continue
}
result, err := processTask(taskID)
if err != nil {
markFailed(taskID, err)
} else {
markCompleted(taskID, result)
}
releaseSlot(taskID)
ackStreamMessage(msg.ID)
}
这里的重点是:
Stream 负责队列
Lua + running zset 负责全局并发
Hash 负责任务状态
这三个职责不能混在一起。
十二、worker 崩溃恢复:XAUTOCLAIM
Redis Stream 有一个重要能力:
XAUTOCLAIM
它用来处理 worker 崩溃后的 pending message。
假设 worker-1 读走了一条任务:
XREADGROUP 读取成功
但 worker-1 执行过程中挂了,没有执行:
XACK
那么这条消息会停留在 consumer group 的 pending 列表里。
其他 worker 可以用:
XAUTOCLAIM tasks:stream workers worker-2 30000 0 COUNT 10
大概含义是:
把 idle 超过 30000ms 的 pending 消息,转移给 worker-2 继续处理。
这就解决了:
worker 读走任务后崩溃,消息没人处理
的问题。
不过 Redis Stream 的恢复比 PostgreSQL queue 稍微复杂,因为你还要同时维护:
Stream pending message
task:{id} Hash 状态
tasks:running ZSET
所以恢复逻辑一般要配套处理:
1. XAUTOCLAIM 抢回长时间未 ack 的消息
2. 清理 tasks:running 中 locked_until 过期的任务
3. 根据 attempts 判断重新执行还是标记 failed
4. 保持 task:{id} 的状态一致
可以做,但一定要设计清楚。
十三、新旧方案的核心差异
旧方案:
POST 创建任务
Redis 存状态
Go goroutine 直接处理
进程内 channel 控并发
优点:
简单
开发快
单进程很好用
Go 写起来很顺
缺点:
channel 只控制当前进程
多容器后并发会放大
任务可能藏在进程内存里
进程崩溃后需要额外恢复
Redis 状态和 channel 队列是两份东西
新方案:
POST 创建任务
Redis Stream 入队
Worker 从队列消费
Redis Lua 控全局并发
XAUTOCLAIM 做崩溃恢复
优点:
队列在 Redis,不在进程内
多个 worker 可以共同消费
全局并发可以通过 Lua 控制
worker 崩溃后消息可以重新 claim
更适合多进程、多容器
缺点:
实现复杂度更高
需要理解 Redis Stream / Consumer Group / PEL
需要写 Lua 保证原子性
状态一致性要自己维护
查询统计能力不如 PostgreSQL
十四、和 PostgreSQL queue 的对比
Redis Stream 方案和 PostgreSQL queue 都可以做任务系统。
但它们的气质不同。
PostgreSQL queue 更像:
任务账本
它擅长:
状态持久化
复杂查询
结果落库
失败排查
事务一致性
任务历史
Redis Stream 更像:
高速调度器
它擅长:
高吞吐
轻量队列
短期状态
快速分发
内存型消息处理
如果任务是 LLM 调用、视频生成、图片处理这种:
任务时间较长
需要 task_id 查询
需要保存 result/error
需要失败追踪
任务量不是每秒几十万
PostgreSQL queue 会更稳。
如果任务是:
极高吞吐
短任务
短期状态
对复杂查询要求不高
更偏消息流处理
Redis Stream 会很漂亮。
十五、这次我对 channel 的重新定位
这次讨论后,我对 Go channel 的定位更清楚了。
channel 很好,但它应该是:
进程内并发工具
而不是:
系统级任务队列
它适合:
当前进程内部任务分发
worker 内部 pipeline
goroutine 之间通信
局部背压
但如果要做跨进程、跨容器、可恢复的任务系统,队列最好放在所有 worker 都能访问的地方。
比如:
PostgreSQL
Redis Stream
RabbitMQ
Kafka
NATS
而不是某个进程里的 channel。
更准确的分层应该是:
channel:
进程内协调工具
Redis Hash:
短期任务状态
Redis Stream:
分布式任务队列
Redis Lua:
原子并发控制
XAUTOCLAIM:
崩溃恢复
PostgreSQL:
任务事实来源和长期账本
十六、如果真的用 Go + Redis,我会怎么设计
如果要做一个相对优雅的 Go + Redis 版本,我会这样拆:
cmd/api
cmd/worker
internal/taskstore
internal/queue
internal/limiter
internal/processor
internal/llm
各模块职责:
taskstore:
读写 task:{id}
markPending
markRunning
markCompleted
markFailed
queue:
XADD
XREADGROUP
XACK
XAUTOCLAIM
limiter:
Lua acquireSlot
releaseSlot
heartbeat
cleanupExpiredRunning
processor:
根据 task_type 调用不同任务逻辑
llm:
封装外部 LLM 请求
Redis key 设计:
task:{task_id} Hash,存任务状态
tasks:stream Stream,存任务消息
tasks:running ZSET,存 running 任务和过期时间
这套就比 Redis + channel 更像一个完整任务系统。
十七、最后的理解
我现在对这两套 Go + Redis 方案的理解是:
旧方案:
POST 创建任务
Redis 存状态
Go goroutine 直接处理
进程内 channel 控并发
它是一个很好的单进程异步模型。
新方案:
POST 创建任务
Redis Stream 入队
Worker 从队列消费
Redis Lua 控全局并发
XAUTOCLAIM 做崩溃恢复
它才更像一个可扩展的分布式任务队列模型。
最核心的区别不在于用了不用 Redis,而在于:
队列到底在哪里?
并发控制到底在哪里?
任务恢复到底靠什么?
旧方案的答案是:
队列在进程内 channel
并发控制在进程内 channel
恢复需要额外扫描 Redis
新方案的答案是:
队列在 Redis Stream
并发控制在 Redis Lua + running zset
恢复靠 Redis Stream pending + XAUTOCLAIM
这就是层级上的差别。
结语
Go 的 goroutine 和 channel 很强,但它们首先是进程内并发工具。
如果只是单进程、轻量任务、临时系统,用:
Redis 存状态 + channel 控并发 + goroutine 执行
完全可以,而且写起来很爽。
但如果目标是多 worker、多容器、全局并发、崩溃恢复,那么更完整的 Redis 方案应该升级为:
Redis Stream + Consumer Group + Lua + XAUTOCLAIM
也就是:
POST 创建任务
Redis Stream 入队
Worker 从队列消费
Redis Lua 控全局并发
XAUTOCLAIM 做崩溃恢复
这套方案比旧方案复杂,但边界更清楚。
而这次真正的收获是:
channel 不是不能用,而是不能把它当成分布式任务队列。它应该留在进程内部;系统级队列和全局并发控制,应该放在所有 worker 都能共同访问的地方。
陕公网安备61011302002223号