SSE 流式数据处理代码分析
由于 AI 的特性和为了方便计费,在服务端基本上都是使用 SSE(Server-Send Event) 发送数据给到客户端,每条数据就是一个Token。而客户端处理这些数据的时候,当然也希望能立刻看到AI的输出,所以基本都是用流来处理 SSE。
typescript
什么是 SSE
简单说一下SSE是如何发送数据的。
- 首先,SSE响应的Content-Type为
text/event-stream,这是SSE的规范。 - 每个通知以文本块形式发送,是用
UTF-8编码,并以一对换行符(LF/CRLF)结尾,以冒号开头的内容会被识别为注释且会被忽略。 - 每个SSE消息由四个字段组成:
event、data、id和retry,具体可以查阅MDN的开发文档
Stream 概述
本文分析的代码来自 openai-node(v4.103.0) 的 streaming.ts及其依赖,这个文件主要是把以下两种流整合成可异步迭代对象。
- 服务器发送事件(SSE)
- 普通可读流(ReadableStream)。
Stream 类的使用
首先,先看看这个 Stream 是怎么使用的。
1. 从SSE响应创建流
若需处理SSE格式的HTTP响应(如来自服务端的实时事件流),调用 Stream.fromSSEResponse():
typescript
2. 从普通可读流创建流
若数据是简单的换行分隔的JSON流,调用 Stream.fromReadableStream():
typescript
解析: Stream 类的核心思路
现在你已经了解了怎么调用这个类了,接下来看看这个类的核心逻辑。
1. Stream 类
首先,这个类的主要目标是从流中获取数据并提供统一的异步迭代接口。以下是其接口定义:
typescript
2. Stream 类的构造
接下来我们先看 Stream 是怎么构造的。 Stream 类的构造函数接受一个迭代器函数 iterator 和一个 AbortController 实例。具体的实现在静态构造方法,这两个方法主要做了这些事情:
- 构造一个generator函数,这个函数返回一个异步迭代器。
- 在函数中读取
ReadableStream或者Response中的数据。 - 将函数作为参数,创建一个
Stream实例。
以下只分析fromReadableStream, 原理是一样的,感兴趣可以自行分析fromSSEResponse,源码不长,直接贴上来。
typescript
这部分源码主要逻辑是这样:
- 读取
ReadableStream中的数据。先把ReadableStream转换为AsyncIterable的形式。 - 这个流中的数据以
Uint8Array的形式返回,我们需要将其转换为字符串,所以LineDecoder的作用就是把流中的数据分割并返回可读内容。 - 调用可迭代对象,把流的输出用
JSON.parse解析为对象。
3. LineDecoder 解析 SSE 数据
进一步,我们来看这个行解码器的解码操作做了什么事情。
typescript
上述代码主要做了一下事情:
- 合并当前数据与上一次的剩余数据。
- 查找换行符。
- 处理 \r 回车。
我们可以用写一个测试函数来模拟一下:
typescript
可以看到,decode操作是保证了每次读取的内容都是完整的一行内容,针对网络数据返回不稳定的特性,同时,根据sse的规则,提取数据中对应的文本内容。
4. 异步迭代器
Stream 通过实现 AsyncIterable 的接口,实现了异步迭代的功能。
typescript
5. 其他方法
除了异步迭代器外,Stream 还提供了 tee 和 toReadableStream 方法,用于复制流和转换为可读流。这部分代码过于简单,这里就不贴了。
总结
OpenAI 的 Stream 类通过模块化设计,将SSE协议解析、流的分块处理、异步迭代等功能进行封装,既可作为独立库使用,也可轻松集成到现有请求逻辑中。如果觉得本文写得不错,不妨点个赞支持一下吧。