SSE 流式数据处理代码分析

2025-05-27

SSE 流式数据处理代码分析

由于 AI 的特性和为了方便计费,在服务端基本上都是使用 SSE(Server-Send Event) 发送数据给到客户端,每条数据就是一个Token。而客户端处理这些数据的时候,当然也希望能立刻看到AI的输出,所以基本都是用流来处理 SSE。

typescript

什么是 SSE

简单说一下SSE是如何发送数据的。

  1. 首先,SSE响应的Content-Type为text/event-stream,这是SSE的规范。
  2. 每个通知以文本块形式发送,是用 UTF-8 编码,并以一对换行符(LF/CRLF)结尾,以冒号开头的内容会被识别为注释且会被忽略。
  3. 每个SSE消息由四个字段组成:eventdataidretry,具体可以查阅MDN的开发文档

Stream 概述

本文分析的代码来自 openai-node(v4.103.0)streaming.ts及其依赖,这个文件主要是把以下两种流整合成可异步迭代对象。

  1. 服务器发送事件(SSE)
  2. 普通可读流(ReadableStream)。

Stream 类的使用

首先,先看看这个 Stream 是怎么使用的。

1. 从SSE响应创建流

若需处理SSE格式的HTTP响应(如来自服务端的实时事件流),调用 Stream.fromSSEResponse()

typescript

2. 从普通可读流创建流

若数据是简单的换行分隔的JSON流,调用 Stream.fromReadableStream()

typescript

解析: Stream 类的核心思路

现在你已经了解了怎么调用这个类了,接下来看看这个类的核心逻辑。

1. Stream

首先,这个类的主要目标是从流中获取数据并提供统一的异步迭代接口。以下是其接口定义:

typescript

2. Stream 类的构造

接下来我们先看 Stream 是怎么构造的。 Stream 类的构造函数接受一个迭代器函数 iterator 和一个 AbortController 实例。具体的实现在静态构造方法,这两个方法主要做了这些事情:

  1. 构造一个generator函数,这个函数返回一个异步迭代器。
  2. 在函数中读取 ReadableStream 或者 Response 中的数据。
  3. 将函数作为参数,创建一个 Stream 实例。

以下只分析fromReadableStream, 原理是一样的,感兴趣可以自行分析fromSSEResponse,源码不长,直接贴上来。

typescript

这部分源码主要逻辑是这样:

  1. 读取 ReadableStream 中的数据。先把 ReadableStream 转换为 AsyncIterable的形式。
  2. 这个流中的数据以 Uint8Array 的形式返回,我们需要将其转换为字符串,所以 LineDecoder 的作用就是把流中的数据分割并返回可读内容。
  3. 调用可迭代对象,把流的输出用 JSON.parse 解析为对象。

3. LineDecoder 解析 SSE 数据

进一步,我们来看这个行解码器的解码操作做了什么事情。

typescript

上述代码主要做了一下事情:

  1. 合并当前数据与上一次的剩余数据。
  2. 查找换行符。
  3. 处理 \r 回车。

我们可以用写一个测试函数来模拟一下:

typescript

jcode

可以看到,decode操作是保证了每次读取的内容都是完整的一行内容,针对网络数据返回不稳定的特性,同时,根据sse的规则,提取数据中对应的文本内容。

4. 异步迭代器

Stream 通过实现 AsyncIterable 的接口,实现了异步迭代的功能。

typescript

5. 其他方法

除了异步迭代器外,Stream 还提供了 teetoReadableStream 方法,用于复制流和转换为可读流。这部分代码过于简单,这里就不贴了。

总结

OpenAI 的 Stream 类通过模块化设计,将SSE协议解析、流的分块处理、异步迭代等功能进行封装,既可作为独立库使用,也可轻松集成到现有请求逻辑中。如果觉得本文写得不错,不妨点个赞支持一下吧。