higress在SSE的场景下,能否支持断连续传 || Can higress support discontinuous transmission in SSE scenarios #2560
undertaker86001
started this conversation in
Ideas
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
需求来源: 社区用户
需求描述: higress在SSE的场景下,能否支持断连续传? 举个例子,大模型服务通过higress回传事件流的时候,客户端因为网络问题和网关断开连接了,过一段时间后客户端又和网关建立了连接,这个时候网关能否从上次断开连接的offset继续回传事件流?
当前 Higress 在 SSE 场景下的断连续传能力
答案:目前 Higress 不支持断连续传功能
Higress 目前在 SSE 场景下不支持从上次断开连接的 offset 继续回传事件流。
SSE (Server-Sent Events) 断连续传方案中的
Last-Event-IDLast-Event-ID 的含义和作用
Last-Event-ID是 SSE 标准中定义的一个 HTTP 请求头,用于实现断连续传功能。工作原理如下:1. SSE 事件结构
在 SSE 协议中,每个事件都可以包含一个唯一的 ID。 Higress 的代码中定义了
StreamEvent结构体,其中包含了Id字段。2. 事件 ID 的设置
在 SSE 流中,事件的格式通常是这样的:
从代码中可以看到, model.go:407-420的
SetValue方法处理了事件 ID 的解析,当遇到id:字段时会设置到StreamEvent.Id中。3. Last-Event-ID 的工作机制
当客户端与服务器的 SSE 连接断开后,客户端重新连接时会在 HTTP 请求头中携带
Last-Event-ID,其值为最后成功接收到的事件的 ID。服务器收到这个头部后,应该从该 ID 之后的事件开始重新发送。4. 当前 Higress 的限制
虽然 Higress 的 provider.go:710-785中
ExtractStreamingEvents函数能够解析 SSE 事件并提取 ID,但是:无状态处理:main.go:237-289 的
onStreamingResponseBody函数是无状态的,不会检查或处理Last-Event-ID头部缺乏持久化:当前实现没有将事件持久化存储,因此无法根据
Last-Event-ID进行事件重放5. 实现 Last-Event-ID 支持的关键点
要在 Higress 中实现
Last-Event-ID支持,需要:请求头检查:在 main.go:74-142
onHttpRequestHeader函数中添加对Last-Event-ID头部的检查事件持久化:扩展现有的缓存机制,将 SSE 事件按 ID 顺序存储
断点续传逻辑:根据
Last-Event-ID的值,从存储中查找对应位置并开始重放后续事件总结
Last-Event-ID是 SSE 标准中实现断连续传的核心机制,它允许客户端告诉服务器从哪个事件开始继续发送。虽然 Higress 当前的代码结构已经支持事件 ID 的解析,但还需要额外的持久化和状态管理机制才能完整实现这个功能。1. 当前实现的局限性
无状态处理:AI Gateway 的流式响应处理逻辑是无状态的,只处理当前接收到的数据块,没有维护客户端连接状态
缺乏 Last-Event-ID 支持:目前没有
Last-Event-ID相关的实现无事件持久化机制:虽然
StreamEvent结构体定义了Id字段,但实际上没有将事件流持久化到存储中,也没有跟踪客户端的消费位置.实时转发模式:当前的
ExtractStreamingEvents函数只负责解析 SSE 事件并实时转发,没有状态保存机制.2. 详细实现方案
当前 Higress 的 SSE 实现主要面向实时流式传输,缺乏断连续传所需的状态保持和事件持久化机制。要实现该功能,需要对现有的
ai-proxy和ai-cache插件进行较大幅度的扩展,引入事件持久化、状态管理和断点续传逻辑。需要在性能、存储成本和功能完整性之间做出权衡。为了实现 SSE 断连续传功能,可以尝试如下实现:
2.1 架构设计
graph TB subgraph "客户端层" Client["AI 客户端"] LastEventID["Last-Event-ID 头部"] end subgraph "Higress 网关层" EventBuffer["事件缓冲服务"] SSEHandler["SSE 处理器"] StateManager["连接状态管理"] end subgraph "存储层" Redis["Redis 事件存储"] EventLog["事件日志"] end subgraph "AI 服务层" AIService["大模型服务"] StreamResponse["流式响应"] end Client --> SSEHandler Client --> LastEventID SSEHandler --> EventBuffer SSEHandler --> StateManager EventBuffer --> Redis StateManager --> Redis AIService --> StreamResponse StreamResponse --> EventBuffer2.2 核心组件实现
1. 事件缓冲和持久化服务
需要扩展现有的
ai-cache插件,添加 SSE 事件缓冲功能:session_id2. Last-Event-ID 处理机制
扩展
ai-proxy插件的请求处理逻辑:onHttpRequestHeader中检查Last-Event-ID头部 43. 连接状态管理
2.3 实现步骤
步骤1:扩展 StreamEvent 结构
需要修改现有的事件结构,添加更多元数据:
步骤2:实现事件持久化层
基于现有的
ai-cache插件实现 5 ,创建事件持久化服务:步骤3:修改流式响应处理
扩展现有的
onStreamingResponseBody函数:步骤4:实现断点续传逻辑
Last-Event-ID头部2.4 配置示例
3. 技术考虑
Source of demand: Community users
Requirement Description: Can higress support discontinuous transmission in SSE scenarios? For example, when a big model service returns event stream through higress, the client disconnects from the gateway due to network problems. After a period of time, the client establishes a connection with the gateway. At this time, can the gateway continue to return event stream from the last disconnected offset?
Current Higress's continuous transmission capability in SSE scenarios
Answer: Currently, Higress does not support the interrupted continuous transmission function
Higress currently does not support *********************************************************************************************************************************************************************************************************************************************************
SSE (Server-Sent Events)
Last-Event-IDin the interrupted continuous transmission schemeThe meaning and function of Last-Event-ID
Last-Event-IDis an HTTP request header defined in the SSE standard, used to implement the interrupted continuous transmission function. The working principle is as follows:1. SSE event structure
In the SSE protocol, each event can contain a unique ID. The
StreamEventstructure is defined in the code of Higress, which contains theIdfield.2. Event ID settings
In an SSE stream, the format of events is usually like this:
As you can see from the code, the
SetValuemethod of model.go:407-420 handles the parsing of event ID, and is set toStreamEvent.Idwhen theid:field is encountered.3. Last-Event-ID working mechanism
When the client disconnects the SSE connection from the server, the client reconnects, it carries
Last-Event-IDin the HTTP request header, which is the ID of the last successfully received event. After the server receives this header, it should start resending from events following the ID.4. Current restrictions on Higress
Although the
ExtractStreamingEventsfunction in Higress' provider.go:710-785 can parse SSE events and extract IDs, however:Stateless Processing: The
onStreamingResponseBodyfunction of main.go:237-289 is stateless and will not check or process theLast-Event-IDheaderLaboring persistence: The current implementation does not persist events, so event replay is not possible based on
Last-Event-ID5. Key points for implementing Last-Event-ID support
To implement
Last-Event-IDsupport in Higress, you need:Request header check: Add a check on the
Last-Event-IDheader in main.go:74-142onHttpRequestHeaderfunctionEvent persistence: Extend the existing cache mechanism and store SSE events in ID order
Breakpoint continuous transmission logic: According to the value of
Last-Event-ID, find the corresponding position from the storage and start replaying subsequent eventsSummarize
Last-Event-IDis the core mechanism in the SSE standard to implement continuous transmission. It allows the client to tell the server which event to continue sending. Although the current code structure of Higress already supports the parsing of event IDs, additional persistence and state management mechanisms are required to fully implement this function.1. Current implementation limitations
Stateless processing: The streaming response processing logic of AI Gateway is stateless, only processing the currently received data blocks, and does not maintain the client connection status.
Laboring Last-Event-ID support: There is currently no
Last-Event-IDrelated implementationEvent-free persistence mechanism: Although the
StreamEventstructure defines theIdfield, it does not actually persist the event stream to storage, nor does it track the client's consumption location.Real-time forwarding mode: The current
ExtractStreamingEventsfunction is only responsible for parsing SSE events and forwarding them in real time, and has no state saving mechanism.2. Detailed implementation plan
Currently, Higress’s SSE implementation is mainly aimed at real-time streaming, and lacks the state holding and event persistence mechanism required for discontinuous transmission. To implement this function, it is necessary to significantly expand the existing
ai-proxyandai-cacheplug-ins, introducing event persistence, state management and breakpoint continuation logic. There is a trade-off between performance, storage cost, and functional integrity.In order to implement the SSE interrupt continuous transmission function, you can try to implement the following:
2.1 Architecture Design
graph TB subgraph "Client Layer" Client["AI Client"] LastEventID["Last-Event-ID Header"] end subgraph "Higress Gateway Layer" EventBuffer["Event Buffer Service"] SSEHandler["SSE Processor"] StateManager["Connection State Management"] end subgraph "Storage Layer" Redis["Redis Event Storage"] EventLog["Event Log"] end subgraph "AI Service Layer" AIService["Model Service"] StreamResponse["Stream Response"] end Client --> SSEHandler Client --> LastEventID SSEHandler --> EventBuffer SSEHandler --> StateManager EventBuffer --> Redis StateManager --> Redis AIService --> StreamResponse StreamResponse --> EventBuffer2.2 Core Component Implementation
1. Event buffering and persistence services
You need to extend the existing
ai-cacheplug-in and add SSE event buffering function:session_idfor each request session2. Last-Event-ID processing mechanism
Extending the request processing logic of the
ai-proxyplugin:Last-Event-IDheader inonHttpRequestHeader43. Connection status management
2.3 Implementation steps
Step 1: Extend StreamEvent Structure
You need to modify the existing event structure and add more metadata:
Step 2: Implement event persistence layer
Based on the existing
ai-cacheplug-in implementation 5, create event persistence services:Step 3: Modify streaming response processing
Extend the existing
onStreamingResponseBodyfunction:Step 4: Implement breakpoint continuous transmission logic
Last-Event-IDheader2.4 Configuration Example
3. Technical considerations
Beta Was this translation helpful? Give feedback.
All reactions