Skip to content

SSE

发布于:编辑此页

创建聊天

和大模型交互非常容易,拿 openai 举例,调用 SDK,仅仅 6 行代码就能完成交互:

import OpenAI from "openai";
const openai = new OpenAI();
 
const completion = await openai.chat.completions.create({
  model: "gpt-3.5-turbo",
  messages: [
    { role: "system", content: "You are a helpful assistant." },
    { role: "user", content: "你是谁?" },
  ],
});
 
console.log(completion.choices[0].message.content);

除了 SDK,你也可以调用 RESTful API

const res = await fetch("https://api.openai.com/v1/chat/completions", {
  headers: {
    "Content-Type": "application/json",
    Authorization: `Bearer ${$OPENAI_API_KEY}`,
  },
  method: "POST",
  body: JSON.stringif({
    model: "gpt-3.5-turbo",
    messages: [
      { role: "system", content: "You are a helpful assistant." },
      { role: "user", content: "你是谁?" },
    ],
  }),
});
 
const completion = await res.json();
 
console.log(completion.choices[0].message.content);

这样的交互方式和平时写的业务接口,好像没什么两样,

依然是前后端通过 JSON 传输数据,

随着 prompt 越来越长,越来越复杂,

模型返回 completion 内容和时间也会随之增长,

用户等待的时间也会越来越长…

流式输出

开启 stream 后,大模型不再一次性生成最终结果,

而是逐步地生成中间结果,最终结果由中间结果拼接而成。

用流式输出的方式调用大模型,能够实时返回中间结果,

减少用户的阅读等待时间,并降低请求的超时风险。

import OpenAI from "openai";
const openai = new OpenAI();
 
const stream = await openai.chat.completions.create({
  model: "gpt-3.5-turbo",
  messages: [
    { role: "user", content: "Say 'double bubble bath' ten times fast." },
  ],
  stream: true,
});
 
for await (const chunk of stream) {
  console.log(chunk);
  console.log(chunk.choices[0].delta);
  console.log("****************");
}

响应会通过事件流以块为单位逐步发送,我们可以使用 for 循环遍历事件流:

{ role: 'assistant', content: '', refusal: null }
****************
{ content: 'Why' }
****************
{ content: " don't" }
****************
{ content: ' scientists' }
****************
{ content: ' trust' }
****************
{ content: ' atoms' }
****************
{ content: '?\n\n' }
****************
{ content: 'Because' }
****************
{ content: ' they' }
****************
{ content: ' make' }
****************
{ content: ' up' }
****************
{ content: ' everything' }
****************
{ content: '!' }
****************
{}
****************

结构化输出

虽然我们可以通过微调 prompt,让模型返回 JSON,例如:

prompt

解析:“江岸区建设大道”,并提取文本内包含中国的省(province)、市(city)、区(district)、详细地址(address),在保证 province、city、dirstric 级联关系正确的情况下以 JSON 的形式返回,JSON 的格式是: “{province?: string, city?: string, district?: string, address?: string}“

completion

{
  "id": "chatcmpl-B9MBs8CjcvOU2jLn4n570S5qMJKcT",
  "object": "chat.completion",
  "created": 1741569952,
  "model": "gpt-3.5-turbo",
  "choices": [
    {
      "index": 0,
      "message": {
        "role": "assistant",
        "content": "根据你提供的文本“江岸区建设大道”,我们可以解析出以下信息:\n\n- **省 (province)**: 湖北省\n- **市 (city)**: 武汉市\n- **区 (district)**: 江岸区\n- **详细地址 (address)**: 建设大道\n\n以下是 JSON 格式的返回结果:\n\n```json\n{\n  \"province\": \"湖北省\",\n  \"city\": \"武汉市\",\n  \"district\": \"江岸区\",\n  \"address\": \"建设大道\"\n}\n```\n\n",
        "refusal": null,
        "annotations": []
      },
      "logprobs": null,
      "finish_reason": "stop"
    }
  ]
}

但大模型输出内容具有不确定性,返回的内容可能不符合 JSON 格式:

response_format

开启结构化输出功能可以确保大模型输出标准格式的 JSON 字符串。

json_object

大部分模型都支持

const chatCompletion = await openai.chat.completions.create({
  response_format: {
    type: "json_object",
  },
  messages,
  model: "gpt-3.5-turbo",
});

json_schema

不是所有模型都支持

对于 openai 来说,从 GPT-4o 开始,大模型才支持结构化输出,

现在可以直接结合 zod 设置 json-schema

import OpenAI from "openai";
import { zodResponseFormat } from "openai/helpers/zod";
import { z } from "zod";
 
const openai = new OpenAI();
 
const Areas = z.object({
  province: z.string(),
  city: z.string(),
  district: z.string(),
});
 
const completion = await openai.beta.chat.completions.parse({
  model: "gpt-4.1",
  messages,
  response_format: zodResponseFormat(Areas, "event"),
});
 
const event = completion.choices[0].message.parsed;

json_repair

就像服务端永远不能相信客户端的入参一样,大模型也永远不会 100% 正确返回你想要的东西。

尽管已经通过各种手段约定大模型返回 JSON,但是它仍然会出错,

最普遍的场景就是受限于 max_tokens,大模型返回的可能是被截断的 JSON

截断的 JSON
{
  'firstName': 'John'
  lastName: “Smith”
  fullName: John Smith,
 
  // TODO: fill in last season
  scores: [ 7.8 6.3 7.1, ],
 
  "about": "John loves a challenge, " +
          "but can quickly lose focus."

因此,接收大模型返回值后,还需要使用类似 json_repair 这种库来修复常见的问题:

修复后的 JSON
{
  "firstName": "John",
  "lastName": "Smith",
  "fullName": "John Smith",
 
  "scores": [7.8, 6.3, 7.1],
 
  "about": "John loves a challenge, but can quickly lose focus."
}

但是这类库也只能处理基本的修复问题,遇到更为复杂的嵌套数据结构,偶尔也会修复失败,

所以无论如何,都需要使用 try...catch 兜底,以此来确保程序的健壮性:

JSON 修复的逻辑可以酌情考虑是放在服务端还是客户端

放在服务端,可能需要提高机器的配置,影响接口的响应速度

放在客户端,会占用客户端的内存,影响用户的体验

import { jsonrepair } from "jsonrepair";
 
try {
  const json = "{name: 'John'}";
 
  const repaired = jsonrepair(json);
 
  console.log(repaired); // '{"name": "John"}'
} catch (err) {
  // 兜底
  console.error(err);
}

API 封装

service

在服务端,示例中使用 Nextjs,将 openai 相关调用和响应封装为一个工具模块:

/services/chat.ts
import { createParser } from "eventsource-parser";
import { NextRequest } from "next/server";
import fetch from "fetch-with-proxy";
 
const OPENAI_URL = "api.openai.com";
const DEFAULT_PROTOCOL = "https";
const PROTOCOL = process.env.PROTOCOL ?? DEFAULT_PROTOCOL;
const BASE_URL = process.env.BASE_URL ?? OPENAI_URL;
const OPENAI_API_KEY = process.env.OPENAI_API_KEY;
 
export const requestOpenai = async (
  path: string,
  { method = "POST", body }: { method?: "POST" | "PUT" | "GET"; body: any }
) => {
  const res = fetch(`${PROTOCOL}://${BASE_URL}/${path}`, {
    headers: {
      "Content-Type": "application/json",
      Authorization: `Bearer ${OPENAI_API_KEY}`,
    },
    method: method,
    body: body,
  });
 
  return res;
};
 
const buildSystemPrompt = (nickname: string): string => {
  return `你是小红书助手,我的名字是 "${nickname}",在对话中你可以偶尔使用我的名字来增加对话舒适性。`;
};
 
export const createStream = async (req: NextRequest, onDone: () => void) => {
  const encoder = new TextEncoder();
  const decoder = new TextDecoder();
  const body = await req.json();
  const { nickname, stream, messages } = body;
  const systemPrompt = buildSystemPrompt(nickname);
 
  const requestMessages = [{ role: "system", content: systemPrompt }].concat(
    messages
  );
 
  const res = await requestOpenai("v1/chat/completions", {
    body: JSON.stringify({
      model: "gpt-3.5-turbo",
      messages: requestMessages,
      stream: stream,
      max_tokens: 2000,
      temperature: 0.9,
      top_p: 1,
      frequency_penalty: 0,
      presence_penalty: 0.3,
    }),
  });
 
  const contentType = res.headers.get("Content-Type") ?? "";
 
  if (!contentType.includes("stream")) {
    let content = await res.text();
 
    /**
     * 这一行 replace 非常重要,用来防止 OPENAI_KEY 泄露
     *
     * 当你的 OPENAI_KEY 导致 openai 异常时,openai 会返回如下的 error message:
     *
     * Incorrect API key provided: sk-*********************************ZXY. You can find your API key at https://platform.openai.com/api-keys
     *
     * 如果没有任何处理,走到 catch 而后 controller 给出 response,
     *
     * !!!任何调用此接口的都会明文拿到你所使用的 OPENAI_KEY!!!
     *
     * Incorrect API key provided: sk-*********************************ZXY. You can find your API key at...
     *
     * 处理后:
     *
     * Incorrect API key provided: ***. You can find your API key at...
     */
    content = content.replace(/provided:.*. You/, "provided: ***. You");
 
    /**
     * 最后做一层特殊的包装(以个人习惯为准)
     *
     * "ERROR_MESSAGE_BOUNDARY_```json
     * Incorrect API key provided: ***. You can find your API key at...```"
     *
     */
    return "ERROR_MESSAGE_BOUNDARY_```json\n" + content + "```";
  }
 
  const readableStream = new ReadableStream({
    async start(controller) {
      function onParse(event: any) {
        if (event.type === "event") {
          const data = event.data;
          if (data === "[DONE]") {
            onDone();
            controller.close();
            return;
          }
 
          try {
            const json = JSON.parse(data);
            const text = json.choices[0].delta.content;
            const queue = encoder.encode(text);
            controller.enqueue(queue);
          } catch (e) {
            controller.error(e);
          }
        }
      }
 
      const parser = createParser(onParse);
 
      for await (const chunk of res.body as any) {
        parser.feed(decoder.decode(chunk));
      }
    },
  });
 
  // 监听客户端的取消事件
  req.signal.addEventListener("abort", () => {
    // 部分大模型提供商,例如 coze,可以在此回调中调用 ”取消聊天接口“
    console.log("req aborted");
  });
 
  return readableStream;
};

controller

完成 service 的封装后,定义 controller 用来给客户端调用:

api/chat/route.ts
import { NextRequest, NextResponse } from "next/server";
import { checkAccessCode } from "@/services/access";
import { createStream } from "@/services/chat";
 
export async function POST(req: NextRequest) {
  try {
    const errors = await checkAccessCode(req);
    const accessCode = req.headers.get("x-access-code");
 
    if (errors) {
      return NextResponse.json(errors[0], errors[1]);
    }
 
    const stream = await createStream(req);
 
    return new NextResponse(stream);
  } catch (error) {
    console.error("[Chat Stream]", error);
 
    /**
     * 接口正常返回的是模型的 completion,前端会使用 <ReactMarkdown /> 渲染
     *
     * 这里也将异常信息用 ```json *** ``` 包裹,并于前端使用一套渲染逻辑
     *
     * error: { message: "Something went wrong", code: 500 }
     * JSON.stringify 后: '{\n  "message": "Something went wrong",\n  "code": 500\n}'
     * markdown 包裹后:'```json\n{\n  "message": "Something went wrong",\n  "code": 500\n}\n```'
     */
    return new NextResponse(
      ["```json\n", JSON.stringify(error, null, "  "), "\n```"].join("")
    );
  }
}

至此,一个基本的 /api/chat 已经 ready!

前后端交互

最后的最后,在客户端,使用 fetch 通过 SSE 协议并开启 stream 调用:

requests/chat.ts
import { useUserStore } from "@/store";
 
export interface SendMessage {
  role: "user" | "assistant" | "system";
  content: string;
}
 
export interface ChatRequest {
  nickname: string;
  messages: SendMessage[];
  onMessage: (message: string, done: boolean) => void;
  onError: (error: any, code?: number | string) => void;
  onAbortController?: (abortController: AbortController) => void;
}
 
const TIME_OUT_MS = 30000;
 
export const requestStreamingChat = async (params: ChatRequest) => {
  const { nickname, messages, onError, onMessage, onAbortController } = params;
  const accessCode = useUserStore.getState().accessCode;
 
  if (!accessCode) {
    return onError(new Error("No access code"), 401);
  }
 
  const controller = new AbortController();
  const reqTimeoutId = setTimeout(() => controller.abort(), TIME_OUT_MS);
 
  try {
    const res = await fetch("api/chat", {
      method: "POST",
      headers: {
        "Content-Type": "application/json",
        "x-access-code": accessCode,
      },
      signal: controller.signal,
      body: JSON.stringify({
        nickname,
        messages: messages.map(v => ({
          role: v.role,
          content: v.content,
        })),
        stream: true,
      }),
    });
 
    clearTimeout(reqTimeoutId);
 
    let responseText = "";
 
    const finish = () => {
      onMessage(responseText, true);
      controller.abort();
    };
 
    if (res.ok) {
      const reader = res.body?.getReader();
      const decoder = new TextDecoder();
 
      onAbortController?.(controller);
 
      while (true) {
        const resTimeoutId = setTimeout(() => finish(), TIME_OUT_MS);
        const content = await reader?.read();
 
        clearTimeout(resTimeoutId);
 
        /**
         * 至此,我们已经解析到了 SSE 中的 chunk content,
         *
         * 每个大模型服务商所返回的 content 中 value 的数据结构各有千秋,
         *
         * 这里以 /api/chat 为例,假定这个接口永远返回 { value: string }
         */
        const text = decoder.decode(content?.value);
        responseText += text;
 
        const done = !content || content.done;
        onMessage(responseText, false);
 
        if (done) {
          break;
        }
      }
 
      finish();
    } else if (res.status === 401) {
      console.error("Anauthorized");
      onError(new Error("Anauthorized"), res.status);
    } else {
      console.error("Stream Error", res.body);
      onError(new Error("Stream Error"), res.status);
    }
  } catch (err) {
    console.error("NetWork Error", err);
    onError(err as Error, "Network Error");
  }
};

一个前后端基于 SSE 交互,并打通大模型的 样板代码 基本就完成了。


上一篇
解析和渲染
下一篇
佑佑 - 可怕的2岁,麻烦的3岁