PGMQ 消息队列模块
约 849 字大约 3 分钟
2025-08-13
概述
PGMQ (PostgreSQL Message Queue) 是基于 PostgreSQL 的消息队列系统,为 Evoliant 平台提供异步任务处理能力。主要配合 AI Worker 系统 处理 AI 相关任务。
队列设计
队列定义
-- 启用 PGMQ 扩展
CREATE EXTENSION IF NOT EXISTS pgmq;
-- AI 任务处理队列
SELECT pgmq.create('ai_tasks');
-- AI 任务死信队列
SELECT pgmq.create('ai_tasks_dlq');队列说明
| 队列名 | 用途 | 描述 |
|---|---|---|
ai_tasks | AI 任务处理 | 存储待处理的 AI 任务请求 |
ai_tasks_dlq | 死信队列 | 存储处理失败的 AI 任务 |
客户端函数
pgmq_send()
发送消息到指定队列:
CREATE OR REPLACE FUNCTION pgmq_send(queue_name TEXT, message JSONB)
RETURNS BIGINT
LANGUAGE plpgsql
SECURITY DEFINER
AS $$
DECLARE
msg_id BIGINT;
BEGIN
SELECT pgmq.send(queue_name, message) INTO msg_id;
RETURN msg_id;
END;
$$;参数说明:
queue_name: 目标队列名称message: JSON 格式的消息内容
返回值: 消息 ID (BIGINT)
pgmq_read()
从队列读取消息:
CREATE OR REPLACE FUNCTION pgmq_read(queue_name TEXT, vt INTEGER DEFAULT 30, qty INTEGER DEFAULT 1)
RETURNS TABLE(msg_id BIGINT, read_ct INTEGER, enqueued_at TIMESTAMP WITH TIME ZONE, message JSONB)
LANGUAGE plpgsql
SECURITY DEFINER
AS $$
BEGIN
RETURN QUERY
SELECT * FROM pgmq.read(queue_name, vt, qty);
END;
$$;参数说明:
queue_name: 源队列名称vt: 可见性超时时间(秒),默认 30 秒qty: 读取消息数量,默认 1 条
pgmq_delete()
删除已处理的消息:
CREATE OR REPLACE FUNCTION pgmq_delete(queue_name TEXT, msg_id BIGINT)
RETURNS BOOLEAN
LANGUAGE plpgsql
SECURITY DEFINER
AS $$
DECLARE
result BOOLEAN;
BEGIN
SELECT pgmq.delete(queue_name, msg_id) INTO result;
RETURN result;
END;
$$;参数说明:
queue_name: 目标队列名称msg_id: 要删除的消息 ID
RLS 策略
AI 任务队列策略
-- 启用行级安全策略
ALTER TABLE pgmq.q_ai_tasks ENABLE ROW LEVEL SECURITY;
-- 认证用户可以发送 AI 任务
CREATE POLICY "Users can send AI tasks" ON pgmq.q_ai_tasks
FOR INSERT TO authenticated WITH CHECK (true);
-- 工作进程可以读取 AI 任务
CREATE POLICY "Workers can read AI tasks" ON pgmq.q_ai_tasks
FOR SELECT TO authenticated, anon USING (true);
-- 工作进程可以更新任务状态
CREATE POLICY "Workers can update AI tasks" ON pgmq.q_ai_tasks
FOR UPDATE TO authenticated, anon USING (true);
-- 工作进程可以删除已处理任务
CREATE POLICY "Workers can delete processed AI tasks" ON pgmq.q_ai_tasks
FOR DELETE TO authenticated, anon USING (true);死信队列策略
-- 启用行级安全策略
ALTER TABLE pgmq.q_ai_tasks_dlq ENABLE ROW LEVEL SECURITY;
-- 系统可以将失败任务转移到死信队列
CREATE POLICY "System can send failed tasks to DLQ" ON pgmq.q_ai_tasks_dlq
FOR INSERT TO authenticated, anon WITH CHECK (true);
-- 管理员可以查看死信队列
CREATE POLICY "Admins can read DLQ tasks" ON pgmq.q_ai_tasks_dlq
FOR SELECT TO authenticated USING (true);
-- 管理员可以处理死信队列
CREATE POLICY "Admins can manage DLQ tasks" ON pgmq.q_ai_tasks_dlq
FOR ALL TO authenticated USING (true);权限配置
基础权限
-- Schema 和表访问权限
GRANT USAGE ON SCHEMA pgmq TO anon, authenticated;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA pgmq TO anon, authenticated;
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA pgmq TO anon, authenticated;
-- PGMQ 核心函数权限
GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA pgmq TO anon, authenticated;客户端函数权限
-- 客户端函数执行权限
GRANT EXECUTE ON FUNCTION pgmq_send(TEXT, JSONB) TO anon, authenticated;
GRANT EXECUTE ON FUNCTION pgmq_read(TEXT, INTEGER, INTEGER) TO anon, authenticated;
GRANT EXECUTE ON FUNCTION pgmq_delete(TEXT, BIGINT) TO anon, authenticated;使用示例
发送 AI 任务
-- 发送聊天完成任务
SELECT pgmq_send('ai_tasks', '{
"type": "chat_completion",
"messages": [{"role": "user", "content": "Hello"}],
"user_id": "123e4567-e89b-12d3-a456-426614174000"
}');读取和处理任务
-- 读取任务
SELECT * FROM pgmq_read('ai_tasks', 60, 1);
-- 删除已处理任务
SELECT pgmq_delete('ai_tasks', 12345);死信队列处理
-- 将失败任务转移到死信队列
SELECT pgmq_send('ai_tasks_dlq', message)
FROM pgmq_read('ai_tasks', 30, 1)
WHERE read_ct > 3;相关文档
- AI Worker 系统 - PGMQ 的主要使用者,处理 AI 任务队列
版权所有
版权归属:Evoliant
许可证:MIT