ai-stream-chat
约 1651 字大约 6 分钟
2025-10-09
概述
ai-chat-stream 是一个基于 Deno、Supabase 和 OpenAI SDK 的流式聊天 API。它通过调用阿里云 DashScope(通义千问)API 提供实时的流式聊天响应,支持完整的 OpenAI 兼容格式。
接口定义
ChatMessage 接口
聊天消息结构定义。
interface ChatMessage {
role: "user" | "assistant" | "system";
content: string;
}ChatRequest 接口
聊天请求的主体结构。
interface ChatRequest {
messages: ChatMessage[];
}环境变量配置
使用此 API 前需要配置以下环境变量:
# 阿里云 DashScope 配置
DASHSCOPE_BASE_URL="https://dashscope.aliyuncs.com/compatible-mode/v1"
DASHSCOPE_API_KEY="your-dashscope-api-key"
DASHSCOPE_CHAT_MODEL="qwen-max" # 或其他支持的模型核心函数详解
validateEnvVars
功能: 验证必要的环境变量是否已配置
参数: 无
返回值: void
验证的环境变量:
DASHSCOPE_BASE_URL: DashScope API 基础URLDASHSCOPE_API_KEY: DashScope API 密钥DASHSCOPE_CHAT_MODEL: 使用的聊天模型
抛出错误: 当缺少任何必需的环境变量时抛出详细错误信息
validateChatRequest
功能: 验证聊天请求的合法性
参数:
body: unknown- 待验证的请求体
返回值: ChatRequest - 验证通过的请求对象
验证规则:
- 必填字段检查:
messages - 消息数组非空检查
- 每条消息的字段完整性检查:
role,content - 角色有效性检查:
user,assistant,system - 内容非空检查
抛出错误:
- 缺少必填字段时抛出详细错误信息
- 消息格式错误时提供具体位置和原因
callDashScopeStreamAPI
功能: 使用 OpenAI SDK 调用 DashScope 流式 API
参数:
messages: ChatMessage[]- 聊天消息历史
返回值: Promise<{ stream: AsyncIterable<OpenAI.Chat.Completions.ChatCompletionChunk> }> - 流式响应对象
配置参数:
temperature: 0.7- 控制生成随机性(0-1)max_tokens: 2000- 最大生成长度stream: true- 启用流式输出stream_options: { include_usage: true }- 包含使用统计信息
实现细节:
- 从环境变量获取API配置
- 创建OpenAI客户端(指向DashScope)
- 发送流式聊天补全请求
- 返回可迭代的流对象
错误处理:
- API调用失败时抛出错误
- 网络错误时提供详细错误信息
handleOpenAIStreamResponse
功能: 处理 OpenAI SDK 流式响应并转换为 SSE 格式
参数:
stream: AsyncIterable<OpenAI.Chat.Completions.ChatCompletionChunk>- OpenAI SDK 流对象
返回值: Response - SSE 格式的流式响应
处理逻辑:
- 实时处理每个流式chunk
- 提取内容delta并发送SSE事件
- 收集使用统计信息(token计数)
- 处理完成信号和错误情况
- 记录详细的调试信息
SSE事件格式:
{
"content": "生成的文本内容",
"finish_reason": "stop" | "length" | "content_filter" | null
}统计信息:
- 记录prompt tokens、completion tokens、total tokens
- 记录生成内容长度
- 记录模型信息
主处理函数
Deno.serve
功能: 处理HTTP请求的主入口点
支持的HTTP方法: POST
处理流程:
- 处理CORS预检请求
- 验证请求方法
- 验证环境变量
- 解析和验证请求体
- 调用DashScope流式API
- 转换并返回SSE流式响应
使用示例
请求示例
curl -X POST https://your-domain.com/functions/v1/ai-chat-stream \
-H "Content-Type: application/json" \
-H "Authorization: Bearer YOUR_SUPABASE_TOKEN" \
-d '{
"messages": [
{
"role": "system",
"content": "你是一个有帮助的AI助手"
},
{
"role": "user",
"content": "请解释一下机器学习的基本概念"
}
]
}'流式响应示例
服务器返回SSE格式的流式响应:
data: {"content":"机器学习","finish_reason":null}
data: {"content":"是","finish_reason":null}
data: {"content":"人工智能","finish_reason":null}
data: {"content":"的一个","finish_reason":null}
data: {"content":"重要分支","finish_reason":null}
data: [DONE]前端处理示例
// 前端处理SSE流的示例
async function streamChat(messages) {
const response = await fetch('/ai-chat-stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${getAccessToken()}`
},
body: JSON.stringify({ messages })
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let accumulatedText = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6).trim();
if (data === '[DONE]') {
console.log('流式响应完成');
return accumulatedText;
}
try {
const parsed = JSON.parse(data);
if (parsed.content) {
accumulatedText += parsed.content;
// 实时更新UI
updateChatUI(accumulatedText);
}
} catch (e) {
console.warn('解析JSON失败:', e, '原始数据:', data);
}
}
}
}
} finally {
reader.releaseLock();
}
return accumulatedText;
}错误处理
验证错误 (400)
- 缺少必填字段
- 消息格式错误
- 环境变量未配置
API 错误 (500)
- DashScope API 调用失败
- 网络连接错误
- 流处理错误
错误代码
VALIDATION_ERROR: 请求验证失败ENV_VAR_MISSING: 环境变量缺失API_CALL_FAILED: AI API 调用失败STREAM_ERROR: 流处理错误
流式协议详解
SSE 事件格式
每个数据块遵循以下格式:
{
"content": "增量文本内容",
"finish_reason": "停止原因或null"
}finish_reason 可能值
stop: API返回完整消息length: 达到max_tokens限制content_filter: 内容被过滤null: 流还在继续
完成信号
流结束时发送特殊事件:
data: [DONE]性能监控
Token 使用统计
API 会记录并输出以下统计信息:
{
prompt_tokens: 85, // 输入token数量
completion_tokens: 215, // 输出token数量
total_tokens: 300, // 总token数量
generated_content_length: 643, // 生成内容长度(字符)
model: "qwen-max" // 使用的模型
}流式特性
- 低延迟: 首个token快速返回
- 实时性: 逐词或逐句返回
- 可中断: 客户端可随时中止流
- 效率高: 减少用户等待时间
最佳实践
前端实现建议
class StreamChat {
constructor() {
this.controller = null;
this.accumulatedText = '';
}
// 开始流式聊天
async start(messages) {
try {
const response = await fetch('/ai-chat-stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${getAccessToken()}`
},
body: JSON.stringify({ messages })
});
await this.processStream(response);
} catch (error) {
console.error('流式聊天错误:', error);
this.onError(error);
}
}
// 处理流式响应
async processStream(response) {
const reader = response.body.getReader();
const decoder = new TextDecoder();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
this.processChunk(chunk);
}
} finally {
reader.releaseLock();
}
}
// 处理单个数据块
processChunk(chunk) {
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6).trim();
if (data === '[DONE]') {
this.onComplete(this.accumulatedText);
return;
}
try {
const parsed = JSON.parse(data);
if (parsed.content) {
this.accumulatedText += parsed.content;
this.onData(parsed.content, this.accumulatedText);
}
} catch (e) {
console.warn('解析失败:', e);
}
}
}
}
// 中止流
abort() {
if (this.controller) {
this.controller.abort();
}
}
// 事件回调(需要实现)
onData(content, accumulatedText) {}
onComplete(finalText) {}
onError(error) {}
}错误处理建议
// 重试机制
async function streamWithRetry(messages, maxRetries = 3) {
let lastError;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await streamChat(messages);
} catch (error) {
lastError = error;
console.warn(`尝试 ${attempt} 失败:`, error);
if (attempt < maxRetries) {
// 指数退避重试
await new Promise(resolve =>
setTimeout(resolve, 1000 * Math.pow(2, attempt - 1))
);
}
}
}
throw lastError;
}安全考虑
- 认证: 依赖 Supabase JWT 认证
- 输入验证: 严格的请求参数验证
- 内容过滤: 依赖 DashScope 的内容过滤机制
- 速率限制: 建议在网关层面添加速率限制
调试技巧
启用详细日志
// 在前端监控流事件
const eventSource = new EventSource('/ai-chat-stream');
eventSource.onmessage = (event) => {
console.log('SSE事件:', event.data);
};
eventSource.onerror = (error) => {
console.error('SSE错误:', error);
};服务器端监控
API 会输出详细的调试信息:
- 每个chunk的处理情况
- Token 使用统计
- 错误堆栈信息
- 性能指标
版权所有
版权归属:Evoliant
许可证:MIT