|  | 
|  | 1 | +--- | 
|  | 2 | +title: vercel-ai 部分源码阅读,看看如何实现 node 高并发 | 
|  | 3 | +date: 2025-03-16T15:24:38Z | 
|  | 4 | +slug: post-51 | 
|  | 5 | +author: coderPerseus:https://github.com/coderPerseus | 
|  | 6 | +tags: [] | 
|  | 7 | +--- | 
|  | 8 | + | 
|  | 9 | +## 背景 | 
|  | 10 | + | 
|  | 11 | +在开发 AI 工具的时候,自己使用 fetch 调用 AI 的 api ,然后发现在并发的场景下会出现问题,经过询问得知社区的开源项目: https://github.com/vercel/ai ,直接能够解决我的问题,使用下来确实是这样,于是打算研究一下它是如何实现的 | 
|  | 12 | + | 
|  | 13 | +## 目的 | 
|  | 14 | + | 
|  | 15 | +找到源码中 streamText 的实现,然后找到它是如何支持并发请求的 | 
|  | 16 | + | 
|  | 17 | +## streamText 功能解析 | 
|  | 18 | + | 
|  | 19 | +这个 API 的官方文档: https://sdk.vercel.ai/docs/reference/ai-sdk-core/stream-text | 
|  | 20 | + | 
|  | 21 | +它接受的输入: | 
|  | 22 | +- model:使用的语言模型 | 
|  | 23 | +- system:将作为提示部分内容的系统消息。 | 
|  | 24 | +- tools: 可供模型访问和调用的工具集。模型需支持工具调用功能。 | 
|  | 25 | +-  prompt:简单文本提示。`prompt` 和 `messages` 参数不可同时使用。 | 
|  | 26 | +- messages: 消息列表。`prompt` 和 `messages` 参数不可同时使用。 | 
|  | 27 | +- 还有很多。。。 | 
|  | 28 | +它返回的输出: | 
|  | 29 | +- **textStream**:一个仅返回生成文本增量的文本流。您可将其作为 AsyncIterable 或 ReadableStream 使用。当发生错误时,该流会抛出错误。 | 
|  | 30 | +- 还有很多。。。 | 
|  | 31 | +除了上面的参数还有很多参数,但是我们的目的是搞清楚它发送请求的逻辑,所以就不一一列举了! | 
|  | 32 | +## 源码阅读 | 
|  | 33 | + | 
|  | 34 | +**这里我的目标是找到如何发送请求,处理数据,然后如何返回内容的** | 
|  | 35 | + | 
|  | 36 | +1)安装源码 | 
|  | 37 | +```bash | 
|  | 38 | +git clone [email protected] :vercel/ai.git | 
|  | 39 | +``` | 
|  | 40 | +安装好后,我使用 Cursor 打开 | 
|  | 41 | + | 
|  | 42 | +2)找到源码地址 | 
|  | 43 | + | 
|  | 44 | +我知道我是从 ai 里面导出的 streamText ,所以它肯定在 ai 目录下,于是我找到了,它在这里:`vercel/ai/packages/ai/core/generate-text/stream-text.ts` 。但是居然高达 1600 行,我还是去玩游戏吧!(那是不可能的) | 
|  | 45 | + | 
|  | 46 | +可以看到这里返回的是一个 DefaultStreamTextResult 的实例,这里代码逻辑过于复杂,借助 AI,我找到了相关代码部分: | 
|  | 47 | + | 
|  | 48 | +这里可以看到它调用了 模型自己的 doStream 方法来获取结果,streamText 是负责准备请求数据,然后调用 model.doStream() => 处理返回的数据 | 
|  | 49 | + | 
|  | 50 | +所以我们去看模型自己如何写 doStream 方法,这里我看的是 openai ,我们去:`packages/openai/src/openai-chat-language-model.ts` ,这里我们可以看到: | 
|  | 51 | + | 
|  | 52 | +这里调用 @ai-sdk/provider-utils 的 postJsonToApi 方法,我们继续去`packages/provider-utils/src/post-to-api.ts` 然后可以看到 postJsonToApi 返回 PostToApi ,所以我们看这个方法: | 
|  | 53 | + | 
|  | 54 | + | 
|  | 55 | +这就是我们要找的方法,我们详细看一下: | 
|  | 56 | +它接收: | 
|  | 57 | + - url:目标API的URL | 
|  | 58 | + - headers:请求头信息 | 
|  | 59 | + - body:请求体(包含content和values) | 
|  | 60 | + - successfulResponseHandler:成功响应处理器 | 
|  | 61 | + - failedResponseHandler:失败响应处理器 | 
|  | 62 | + - abortSignal:中止信号 | 
|  | 63 | + - fetch = getOriginalFetch(): fetch实现(默认为全局fetch) | 
|  | 64 | +它返回:将 response 传给 successfulResponseHandler。也就是说它返回的是 successfulResponseHandler 函数执行的返回 | 
|  | 65 | +然后: | 
|  | 66 | +1)发送 http 请求 | 
|  | 67 | +2)提取、处理响应头 | 
|  | 68 | +3)错误处理,这里通过多步处理然后抛出错误 | 
|  | 69 | +4)成功处理,也通过 try catch 保证成功处理过程中错误能抛出 | 
|  | 70 | +5)网络错误处理,处理 fetch 失败,并且设置可重试 | 
|  | 71 | + | 
|  | 72 | +代码解析: | 
|  | 73 | +- 异步非阻塞:使用 async/await 实现异步操作,不会阻塞事件循环 | 
|  | 74 | +- 事件驱动:使用 promise ,当网络请求完成通过事件回调处理响应 | 
|  | 75 | +- 错误处理链:精细的错误处理机制,保证失败也能获取到精确的错误信息,或者重试 | 
|  | 76 | + | 
|  | 77 | +## successfulResponseHandler | 
|  | 78 | + | 
|  | 79 | +我们去 openai 看到传入了createEventSourceResponseHandler 函数给successfulResponseHandler ,createEventSourceResponseHandler 在 packages/provider-utils/src/response-handler.ts ,代码如下: | 
|  | 80 | + | 
|  | 81 | + | 
|  | 82 | +createEventSourceResponseHandler 是一个工厂函数,用于创建处理服务器发送事件(Server-Sent Events, SSE)格式响应的处理器,这里使用 [ReadableStream.PipeThrough()](https://developer.mozilla.org/zh-CN/docs/Web/API/ReadableStream/pipeThrough) 方法来进行数据处理 | 
|  | 83 | +- TextDecoderStram:将二进制转为文本 | 
|  | 84 | +- EventSourceParseStram:解析 SSE 格式 | 
|  | 85 | +- TransformStram:处理每个事件数据 | 
|  | 86 | +- 通过 [enqueue()](https://developer.mozilla.org/zh-CN/docs/Web/API/ReadableStreamDefaultController/enqueue) 方法将给定数据块送入到关联的流中 | 
|  | 87 | +这种流式处理方式使得函数可以高效处理大量数据,而不必等待所有数据接收完毕。 | 
|  | 88 | + | 
|  | 89 | +函数优点: | 
|  | 90 | +- **非阻塞处理**:使用流式处理,不会阻塞事件循环 | 
|  | 91 | +- **增量处理**:可以立即处理每个到达的数据块,无需等待完整响应 | 
|  | 92 | +- **内存效率**:避免将整个响应加载到内存中 | 
|  | 93 | +- **类型安全**:通过Zod模式确保每个数据块符合预期格式 | 
|  | 94 | +- **错误隔离**:单个数据块解析错误不会影响整个流程 | 
|  | 95 | + | 
|  | 96 | + | 
|  | 97 | +## stream-text 流程图 | 
|  | 98 | + | 
|  | 99 | +```mermaid | 
|  | 100 | +flowchart TD | 
|  | 101 | +    A[用户调用 streamText] --> B[初始化参数和设置] | 
|  | 102 | +    B --> C[创建流处理管道] | 
|  | 103 | +    C --> D[执行 streamStep 函数] | 
|  | 104 | +    D --> E[调用 model.doStream 请求API] | 
|  | 105 | +    E --> F[处理流式响应] | 
|  | 106 | +    F --> G{是否需要后续步骤?} | 
|  | 107 | +    G -->|是| H[更新消息历史] | 
|  | 108 | +    H --> D | 
|  | 109 | +    G -->|否| I[关闭流并返回结果] | 
|  | 110 | +``` | 
|  | 111 | + | 
|  | 112 | +初始化: | 
|  | 113 | +```mermaid | 
|  | 114 | +flowchart TD | 
|  | 115 | +    A[用户调用 streamText] --> B[验证参数] | 
|  | 116 | +    B --> C[初始化承诺对象] | 
|  | 117 | +    C --> D[创建可拼接流] | 
|  | 118 | +    D --> E[设置流转换器] | 
|  | 119 | +    E --> F[准备遥测和重试设置] | 
|  | 120 | +    F --> G[标准化提示信息] | 
|  | 121 | +``` | 
|  | 122 | +API 请求处理: | 
|  | 123 | +```mermaid | 
|  | 124 | +flowchart TD | 
|  | 125 | +    A[streamStep 函数] --> B[准备提示信息] | 
|  | 126 | +    B --> C[准备工具选择] | 
|  | 127 | +    C --> D[应用模型设置] | 
|  | 128 | +    D --> E[使用retry函数包装请求] | 
|  | 129 | +    E --> F["model.doStream 发起API请求"] | 
|  | 130 | +     | 
|  | 131 | +    subgraph "实际API调用" | 
|  | 132 | +    F --> G[provider包中的doStream实现] | 
|  | 133 | +    G --> H[调用 postToApi/postJsonToApi] | 
|  | 134 | +    H --> I[发起实际HTTP请求] | 
|  | 135 | +    end | 
|  | 136 | +     | 
|  | 137 | +    I --> J[返回流式响应] | 
|  | 138 | +``` | 
|  | 139 | +流处理管道: | 
|  | 140 | +```mermaid | 
|  | 141 | +flowchart TD | 
|  | 142 | +    A[获取API响应流] --> B[应用工具转换] | 
|  | 143 | +    B --> C[创建事件处理器] | 
|  | 144 | +     | 
|  | 145 | +    subgraph "转换流" | 
|  | 146 | +    C --> D[处理文本增量] | 
|  | 147 | +    C --> E[处理推理内容] | 
|  | 148 | +    C --> F[处理工具调用] | 
|  | 149 | +    C --> G[处理步骤完成] | 
|  | 150 | +    end | 
|  | 151 | +     | 
|  | 152 | +    H[合并所有转换流] --> I[输出到最终结果] | 
|  | 153 | +``` | 
|  | 154 | +多步处理与工具调用: | 
|  | 155 | +```mermaid | 
|  | 156 | +flowchart TD | 
|  | 157 | +    A[接收步骤结果] --> B{检查结束原因} | 
|  | 158 | +    B -->|工具调用| C[执行工具] | 
|  | 159 | +    B -->|长度限制| D[继续生成] | 
|  | 160 | +    B -->|完成| E[结束流] | 
|  | 161 | +     | 
|  | 162 | +    C --> F[处理工具结果] | 
|  | 163 | +    F --> G[准备下一步输入] | 
|  | 164 | +    G --> H[执行下一步] | 
|  | 165 | +     | 
|  | 166 | +    D --> I[继续当前消息] | 
|  | 167 | +    I --> J[执行下一步] | 
|  | 168 | +``` | 
|  | 169 | +数据输出处理: | 
|  | 170 | +```mermaid | 
|  | 171 | +flowchart TD | 
|  | 172 | +    A[流处理完成] --> B[解析最终结果] | 
|  | 173 | +    B --> C[分离不同类型数据] | 
|  | 174 | +     | 
|  | 175 | +    subgraph "输出流转换" | 
|  | 176 | +    C --> D[textStream] | 
|  | 177 | +    C --> E[fullStream] | 
|  | 178 | +    C --> F[dataStream] | 
|  | 179 | +    end | 
|  | 180 | +     | 
|  | 181 | +    G[用户选择输出格式] --> H[返回相应格式数据] | 
|  | 182 | +``` | 
|  | 183 | +数据流动过程 | 
|  | 184 | +用户输入 → 格式化为模型可以理解的提示 | 
|  | 185 | +API 请求 → 使用 postToApi 函数发起 HTTP 请求 | 
|  | 186 | +流式返回 → 通过 TransformStream 处理返回的数据块 | 
|  | 187 | +转换处理 → 根据类型(文本、工具调用、推理等)处理每个数据块 | 
|  | 188 | +结果整合 → 将所有处理后的数据整合为最终结果 | 
|  | 189 | +多步处理机制 | 
|  | 190 | +代码支持多步骤处理,通过以下方式: | 
|  | 191 | +处理工具调用结果并根据需要发起新的 API 请求 | 
|  | 192 | +当模型输出被截断时(finishReason 为 "length")可以继续生成 | 
|  | 193 | +维护消息历史,将前面步骤的输出作为后续步骤的输入的一部分 | 
|  | 194 | +这种设计使 SDK 能够支持复杂的对话和工具使用场景,实现更智能的交互体验。 | 
|  | 195 | + | 
|  | 196 | +--- | 
|  | 197 | +此文自动发布于:<a href="https://github.com/coderPerseus/blog/issues/51" target="_blank">github issues</a> | 
0 commit comments