|
1 | | -import asyncio |
2 | 1 | import json |
| 2 | +import logging |
3 | 3 | from typing import AsyncGenerator |
| 4 | +import jsonpatch |
4 | 5 |
|
5 | 6 |
|
6 | 7 | async def transform_delta_stream(input_stream) -> AsyncGenerator[str, None]: |
7 | | - current_message = None |
8 | | - current_event = None |
| 8 | + current_documents = {} |
| 9 | + next_is_delta = False |
| 10 | + current_c = None |
| 11 | + current_o = None |
| 12 | + current_p = None |
9 | 13 |
|
10 | 14 | async for line in input_stream: |
11 | 15 | line = line.decode("utf-8").strip() |
12 | 16 | if not line: |
13 | 17 | continue |
14 | 18 |
|
15 | | - # 处理事件类型行 |
16 | 19 | if line.startswith("event: "): |
17 | | - current_event = line[7:] |
| 20 | + next_is_delta = True |
18 | 21 | continue |
19 | 22 |
|
20 | | - # 处理数据行 |
21 | | - if line.startswith("data: "): |
| 23 | + if line.startswith("data: ") and next_is_delta: |
22 | 24 | data = line[6:] |
23 | 25 |
|
24 | | - # 处理特殊结束标记 [DONE],直接原样返回 |
25 | 26 | if data == "[DONE]": |
26 | 27 | yield line |
27 | 28 | continue |
28 | 29 |
|
29 | 30 | try: |
30 | 31 | json_data = json.loads(data) |
31 | 32 |
|
32 | | - # 1. 处理完整的消息数据 |
33 | | - if "message" in json_data: |
34 | | - current_message = json_data["message"] |
35 | | - yield f'data: {json.dumps({"message": current_message})}' |
| 33 | + if 'c' in json_data: |
| 34 | + current_c = json_data['c'] |
| 35 | + if 'v' in json_data and isinstance(json_data['v'], dict): |
| 36 | + current_documents[current_c] = json_data['v'] |
| 37 | + yield f'data: {json.dumps(current_documents[current_c])}' |
36 | 38 | continue |
37 | 39 |
|
38 | | - # 2. 处理delta编码的消息 |
39 | | - if current_event == "delta": |
40 | | - # 2.1 处理包含完整消息的v字段 |
41 | | - if "v" in json_data and isinstance(json_data["v"], dict) and "message" in json_data["v"]: |
42 | | - current_message = json_data["v"]["message"] |
43 | | - yield f'data: {json.dumps({"message": current_message})}' |
44 | | - continue |
| 40 | + if 'v' in json_data: |
| 41 | + if 'p' in json_data: |
| 42 | + current_p = json_data['p'] |
| 43 | + if 'o' in json_data: |
| 44 | + current_o = json_data['o'] |
| 45 | + if current_c is not None and current_c in current_documents: |
| 46 | + apply_patch(current_documents, current_p, current_o, json_data['v'], current_c) |
45 | 47 |
|
46 | | - # 2.2 处理简单的文本增量 {"v": "text"} |
47 | | - if "v" in json_data and isinstance(json_data["v"], str): |
48 | | - if current_message is None: |
49 | | - current_message = { |
50 | | - "content": { |
51 | | - "content_type": "text", |
52 | | - "parts": [""] |
53 | | - } |
54 | | - } |
55 | | - # 确保content和parts结构存在 |
56 | | - if "content" not in current_message: |
57 | | - current_message["content"] = {"content_type": "text", "parts": [""]} |
58 | | - if "parts" not in current_message["content"]: |
59 | | - current_message["content"]["parts"] = [""] |
60 | | - if not current_message["content"]["parts"]: |
61 | | - current_message["content"]["parts"].append("") |
62 | | - # 追加文本到第一个part |
63 | | - current_message["content"]["parts"][0] += json_data["v"] |
64 | | - yield f'data: {json.dumps({"message": current_message})}' |
65 | | - continue |
66 | | - |
67 | | - # 2.3 处理patch操作 |
68 | | - if "p" in json_data and "o" in json_data: |
69 | | - if current_message is None: |
70 | | - current_message = {} |
71 | | - current_message = apply_patch(current_message, json_data) |
72 | | - yield f'data: {json.dumps({"message": current_message})}' |
73 | | - continue |
74 | | - |
75 | | - # 3. 不是delta事件的数据,直接输出 |
76 | | - if isinstance(json_data, dict) and "v" in json_data and isinstance(json_data["v"], |
77 | | - dict) and "message" in json_data[ |
78 | | - "v"]: |
79 | | - yield f'data: {json.dumps({"message": json_data["v"]["message"]})}' |
80 | | - else: |
81 | | - # 其他情况直接原样返回 |
82 | | - yield line |
83 | | - |
84 | | - except json.JSONDecodeError: |
85 | | - # 处理简单的字符串数据(如"v1"),直接返回原行 |
| 48 | + yield f'data: {json.dumps(current_documents[current_c])}' |
| 49 | + except: |
86 | 50 | yield line |
87 | | - |
88 | | - # 输出最后的消息(如果有) |
89 | | - if current_message is not None: |
90 | | - yield f'data: {json.dumps({"message": current_message})}' |
91 | | - |
92 | | - |
93 | | -def apply_patch(message: dict, patch: dict) -> dict: |
94 | | - """应用单个patch操作到消息""" |
95 | | - path = patch["p"] |
96 | | - operation = patch["o"] |
97 | | - value = patch.get("v", None) |
98 | | - |
99 | | - # 处理根路径 |
100 | | - if path == "": |
101 | | - if operation == "add": |
102 | | - return value |
103 | | - elif operation == "patch": |
104 | | - for sub_patch in value: |
105 | | - message = apply_patch(message, sub_patch) |
106 | | - return message |
107 | | - return message |
108 | | - |
109 | | - # 分割路径 |
110 | | - parts = [p for p in path.split("/") if p] |
111 | | - target = message |
112 | | - |
113 | | - # 导航到目标位置 |
114 | | - for part in parts[:-1]: |
115 | | - if part not in target: |
116 | | - target[part] = {} |
117 | | - target = target[part] |
118 | | - |
119 | | - last_part = parts[-1] |
120 | | - |
121 | | - # 应用操作 |
122 | | - if operation == "replace": |
123 | | - target[last_part] = value |
124 | | - elif operation == "append": |
125 | | - if last_part not in target: |
126 | | - target[last_part] = [] |
127 | | - if isinstance(target[last_part], list): |
128 | | - target[last_part].append(value) |
129 | | - elif isinstance(target[last_part], str): |
130 | | - target[last_part] += str(value) |
131 | | - elif operation == "truncate": |
132 | | - if isinstance(target[last_part], (list, str)): |
133 | | - target[last_part] = target[last_part][:value] |
134 | | - elif operation == "add": |
135 | | - target[last_part] = value |
136 | | - elif operation == "remove": |
137 | | - if last_part in target: |
138 | | - del target[last_part] |
139 | | - |
140 | | - return message |
141 | | - |
142 | | - |
143 | | -async def main(): |
144 | | - # 模拟输入流 |
145 | | - input_stream = [ |
146 | | - 'event: delta_encoding', |
147 | | - 'data: "v1"', |
148 | | - '', |
149 | | - 'event: delta', |
150 | | - 'data: {"p": "", "o": "add", "v": {"message": {"id": "1471f310-cd55-4f98-be10-a8a0ab0cf19e", "author": {"role": "system", "name": null, "metadata": {}}, "create_time": null, "update_time": null, "content": {"content_type": "text", "parts": [""]}, "status": "finished_successfully", "end_turn": true, "weight": 0.0, "metadata": {"is_visually_hidden_from_conversation": true}, "recipient": "all", "channel": null}, "conversation_id": "680225d7-a0d8-8004-aaba-00083cfb2ae3", "error": null}, "c": 0}', |
151 | | - '', |
152 | | - 'event: delta', |
153 | | - 'data: {"v": {"message": {"id": "906d0092-95ba-4517-a3ef-081d64a86a88", "author": {"role": "user", "name": null, "metadata": {}}, "create_time": 1744971224.028, "update_time": null, "content": {"content_type": "text", "parts": ["\u753b\u4e00\u53ea\u732b"]}, "status": "finished_successfully", "end_turn": null, "weight": 1.0, "metadata": {"serialization_metadata": {"custom_symbol_offsets": []}, "request_id": "9323641f7a5008e0-SJC", "message_source": null, "timestamp_": "absolute", "message_type": null}, "recipient": "all", "channel": null}, "conversation_id": "680225d7-a0d8-8004-aaba-00083cfb2ae3", "error": null}, "c": 1}', |
154 | | - '', |
155 | | - 'data: {"v": {"message": {"id": "02e534af-11f5-4e74-ac37-71f65e845332", "author": {"role": "assistant", "name": null, "metadata": {}}, "create_time": 1744971224.343261, "update_time": null, "content": {"content_type": "text", "parts": [""]}, "status": "in_progress", "end_turn": null, "weight": 1.0, "metadata": {"citations": [], "content_references": [], "message_type": "next", "model_slug": "gpt-4o", "default_model_slug": "auto", "parent_id": "906d0092-95ba-4517-a3ef-081d64a86a88", "model_switcher_deny": [{"slug": "o1-mini", "context": "regenerate", "reason": "unsupported_tool_use", "is_available": false, "description": "\u6b64\u6a21\u578b\u4e0d\u652f\u6301\u4f7f\u7528\u5de5\u5177\u3002"}, {"slug": "o1-mini", "context": "conversation", "reason": "unsupported_tool_use", "is_available": false, "description": "\u6b64\u6a21\u578b\u4e0d\u652f\u6301\u4f7f\u7528\u5de5\u5177\u3002"}]}, "recipient": "t2uay3k.sj1i4kz", "channel": null}, "conversation_id": "680225d7-a0d8-8004-aaba-00083cfb2ae3", "error": null}, "c": 2}', |
156 | | - '', |
157 | | - 'data: [DONE]' |
158 | | - ] |
159 | | - # 处理并输出转换后的流 |
160 | | - async for complete_json in transform_delta_stream(input_stream): |
161 | | - print(complete_json) |
162 | | - |
163 | | - |
164 | | -if __name__ == "__main__": |
165 | | - asyncio.run(main()) |
| 51 | + else: |
| 52 | + yield line |
| 53 | + |
| 54 | +def apply_patch(document, p, o, v, c): |
| 55 | + if o == 'patch': |
| 56 | + for patch in v: |
| 57 | + apply_patch(document, patch['p'], patch['o'], patch['v'], c) |
| 58 | + |
| 59 | + elif o == 'append': |
| 60 | + current_value = jsonpatch.JsonPointer(p).get(document[c]) |
| 61 | + if isinstance(current_value, list): |
| 62 | + new_value = current_value + [v] if not isinstance(v, list) else current_value + v |
| 63 | + patch = jsonpatch.JsonPatch([{ |
| 64 | + 'op': 'replace', |
| 65 | + 'path': p, |
| 66 | + 'value': new_value |
| 67 | + }]) |
| 68 | + elif isinstance(current_value, str): |
| 69 | + patch = jsonpatch.JsonPatch([{ |
| 70 | + 'op': 'replace', |
| 71 | + 'path': p, |
| 72 | + 'value': current_value + str(v) |
| 73 | + }]) |
| 74 | + else: |
| 75 | + patch = jsonpatch.JsonPatch([{ |
| 76 | + 'op': 'replace', |
| 77 | + 'path': p, |
| 78 | + 'value': v |
| 79 | + }]) |
| 80 | + |
| 81 | + document[c] = patch.apply(document[c]) |
| 82 | + else: |
| 83 | + patch = jsonpatch.JsonPatch([{ |
| 84 | + 'op': o, |
| 85 | + 'path': p, |
| 86 | + 'value': v |
| 87 | + }]) |
| 88 | + document[c] = patch.apply(document[c]) |
0 commit comments