工作原理
约 576 字大约 2 分钟
2025-08-12
本模块基于 Supabase/Postgres 的 PGMQ 扩展实现消息队列。应用启动时拉起多个 Worker 并发消费队列消息,每个 Worker 根据任务内容自行路由是否处理,成功即删除消息,失败写入死信队列,保障稳定与可恢复性。
消息流(简要):
- 生产者 → 通过数据库操作即可向pgmq队列中写入消息数据
- Worker 循环 →
pgmq.read('ai_tasks', visibility_timeout, 1) - 路由 →
should_process_task决定处理或放弃 - 处理 →
process_task - 完成 →
pgmq.delete('ai_tasks', msg_id) - 失败 →
pgmq.send('ai_tasks_dlq', dead_letter)+ 删除原消息
关键组件
BaseWorker(
app/worker/base_worker.py)- 维护工作循环:读取→路由→处理→确认/死信。
should_process_task(task):是否由该 Worker 处理。process_task(task):业务处理逻辑(子类实现)。- 可见性超时
visibility_timeout:防止并发重复处理,崩溃时消息自动重现。 - 失败自动写入死信队列
ai_tasks_dlq,携带原始任务与错误信息。
WorkerManager(
app/worker/worker_manager.py)- 按环境变量创建并启动多实例并发消费:
WORKER_COUNT(默认 2)WORKER_TYPES(默认example,可逗号分隔)
- 统一启动/停止、状态查询(配合 API
GET /workers/status)。
- 按环境变量创建并启动多实例并发消费:
应用生命周期(
backend-ai/main.py)- FastAPI
lifespan在应用启动时start(),关闭时优雅stop()。
- FastAPI
负载分配与路由
- 多个 Worker 对同一队列竞争读取。PGMQ 在可见期内只让一位消费者看到该消息,形成天然的负载均衡与水平扩展。
- 若某 Worker
should_process_task返回 False,消息在可见期结束后重新可见,便于被更合适的 Worker 领取。
可见性与幂等
visibility_timeout定义处理窗口。Worker 异常退出或超时未确认,消息会重新回到队列,保证至少一次投递。- 处理成功后显式删除消息;处理逻辑应尽量幂等,避免偶发重试导致副作用重复。
错误与死信
- 处理异常时,构造包含
original_msg_id、原任务、错误信息、worker_id、时间戳与来源队列的记录,发送至ai_tasks_dlq,随后删除原消息,便于离线排查与补偿。
扩展
- 继承
BaseWorker实现两件事:should_process_task(task):声明自己要处理哪些任务。process_task(task):编写处理逻辑。
- 在
WorkerManager注册新类型,或通过WORKER_TYPES组合多种 Worker 并行运行。
版权所有
版权归属:Evoliant
许可证:MIT