概览
约 852 字大约 3 分钟
2025-08-12
面向异步队列的任务处理子系统,包含通用抽象、生命周期管理与最小化约定。
架构组成
BaseWorker(app/worker/base_worker.py)- 抽象基类,封装通用职责:连接池管理、队列监听、消息读取/删除、错误与死信处理、生命周期控制。
- 默认使用队列:
ai_tasks,死信队列:ai_tasks_dlq,可见性超时:1800 秒。 - 需要子类实现:
should_process_task(task_data)与process_task(task_data)。
WorkerManager(app/worker/worker_manager.py)- 负责 Worker 的创建、启动、停止、状态查询。
- 支持按环境变量动态配置 Worker 的类型与数量。
- 提供
register_worker_class(name, cls)用于运行时注册新 Worker 类型。
系统在 FastAPI 生命周期中自动启动与停止(见 app/main.py 中的 lifespan)。
关键能力
- 队列消费:通过
pgmq.read/delete/send从 PostgreSQL/pgmq 读取与确认消息,并写入死信队列。 - 任务路由:每个 Worker 通过
should_process_task判断是否处理某条消息(避免“误领”)。 - 错误兜底:
process_task抛出异常时自动写入死信队列并清理原消息。 - 优雅停止:
stop()触发后,完成当前处理后退出;管理器统一取消与回收任务。 - 状态可观测:
WorkerManager.get_worker_status()返回管理器与各 Worker 的关键运行信息。
配置
- 环境变量
WorkerManager 启动和运行时会读取以下环境变量进行配置。
AI_TASK_QUEUE_NAME:AI 任务的主队列名称(默认值:ai_tasks)。AI_DEAD_LETTER_QUEUE:用于存放处理失败任务的死信队列名称(默认值:ai_tasks_dlq)。WORKER_COUNT:要启动的 Worker 进程总数(默认值:2)。WORKER_TYPES:要启动的 Worker 类型,使用逗号分隔。例如type1,type2(默认值:example)。QUEUE_POLLING_INTERVAL:从队列中拉取新任务的轮询间隔时间,单位为秒(默认值:5)。TASK_VISIBILITY_TIMEOUT:任务从队列中取出后的可见性超时时间,单位为秒。在此期间任务对其他消费者不可见,如果 Worker 未在此时间内完成并删除任务,任务将重新变为可见状态(默认值:1800)。TASK_TIMEOUT:单个任务的处理逻辑允许执行的最大时间,单位为秒。超过此时间 Worker 会中断任务(默认值:1800)。目前未启用这项配置,将视情况决定。连接池:通过
app.utils.supabase_config.get_db_pool()获取asyncpg连接池。
快速使用(自定义 Worker)
- 新建自定义 Worker(继承
BaseWorker,实现两个抽象方法):
from typing import Dict, Any
from app.worker.base_worker import BaseWorker
class MyWorker(BaseWorker):
async def should_process_task(self, task_data: Dict[str, Any]) -> bool:
return task_data.get("type") == "my_task"
async def process_task(self, task_data: Dict[str, Any]) -> None:
# 编写具体处理逻辑
...- 在
WorkerManager中注册类型(名称用于WORKER_TYPES):
from app.worker.worker_manager import worker_manager
from path.to.my_worker import MyWorker
worker_manager.register_worker_class("my_worker", MyWorker)- 配置环境变量并启动应用:
WORKER_COUNT=3 WORKER_TYPES=my_worker,my_worker,my_worker应用启动后,lifespan 会自动调用 worker_manager.start(),在关闭时调用 stop()。
运行与监控
- 在运行中可调用
WorkerManager.get_worker_status()获取:是否运行、已启动 Worker 数量、类型、每个 Worker 的worker_id/queue_name/dead_letter_queue/visibility_timeout等。 - 外部服务可调用
/workers/status端点获取。
约定与边界
- Worker用来处理长任务,请合理划分任务,尽可能在Supabase后端中完成(典型的Supabase边缘函数运行时间不超过120秒)。
should_process_task尽量快速判定并保持幂等;process_task失败即抛异常以触发死信逻辑。- 队列消息建议为 JSON,可包含
type作为路由字段与必要的业务上下文。
版权所有
版权归属:Evoliant
许可证:MIT