|
1 | 1 | /* eslint-disable camelcase */ |
2 | 2 | import type {StreamingChunk} from '../../types/store/streaming'; |
3 | 3 |
|
| 4 | +// interface Headers { |
| 5 | +// contentType?: string; |
| 6 | +// contentLength?: number; |
| 7 | +// } |
| 8 | + |
4 | 9 | interface MultipartState { |
5 | 10 | lastProcessedLength: number; |
6 | | - buffer: string; |
7 | 11 | } |
8 | 12 |
|
9 | 13 | interface MultipartResult { |
10 | 14 | chunks: StreamingChunk[]; |
11 | 15 | state: MultipartState; |
12 | 16 | } |
13 | 17 |
|
14 | | -function isCompleteChunk(chunk: string): boolean { |
15 | | - // Normalize all possible line ending combinations to \n |
16 | | - const normalizedChunk = chunk.replace(/\r\n/g, '\n').replace(/\r/g, '\n'); |
17 | | - |
18 | | - // Split on double line breaks, accounting for possible multiple \n characters |
19 | | - const sections = normalizedChunk.split(/\n{2,}/); |
20 | | - |
21 | | - // Must have headers and body sections |
22 | | - if (sections.length < 2) { |
23 | | - return false; |
24 | | - } |
25 | | - |
26 | | - const lastSection = sections[sections.length - 1].trim(); |
27 | | - |
28 | | - // Skip empty sections |
29 | | - if (!lastSection) { |
30 | | - return false; |
31 | | - } |
32 | | - |
33 | | - // Must have valid JSON body |
34 | | - try { |
35 | | - JSON.parse(lastSection); |
36 | | - return true; |
37 | | - } catch { |
38 | | - return false; |
39 | | - } |
40 | | -} |
| 18 | +// function parseHeaders(headerLines: string[]): Headers { |
| 19 | +// const headers: Headers = {}; |
| 20 | +// for (const line of headerLines) { |
| 21 | +// const [key, value] = line.split(': '); |
| 22 | +// switch (key.toLowerCase()) { |
| 23 | +// case 'content-type': |
| 24 | +// headers.contentType = value; |
| 25 | +// break; |
| 26 | +// case 'content-length': |
| 27 | +// headers.contentLength = Number(value); |
| 28 | +// break; |
| 29 | +// } |
| 30 | +// } |
| 31 | +// return headers; |
| 32 | +// } |
| 33 | + |
| 34 | +const CRLF = '\r\n'; |
41 | 35 |
|
42 | 36 | export function parseMultipart({ |
43 | 37 | responseText, |
44 | | - state = {lastProcessedLength: 0, buffer: ''}, |
| 38 | + state = {lastProcessedLength: 0}, |
45 | 39 | boundary = 'boundary', |
46 | 40 | }: { |
47 | 41 | responseText: string; |
48 | 42 | state?: MultipartState; |
49 | 43 | boundary?: string; |
50 | 44 | }): MultipartResult { |
51 | 45 | // Combine buffer with new data |
52 | | - const newData = state.buffer + responseText.slice(state.lastProcessedLength); |
| 46 | + const newData = responseText.slice(state.lastProcessedLength); |
53 | 47 |
|
54 | 48 | if (!newData) { |
55 | 49 | return {chunks: [], state}; |
56 | 50 | } |
57 | 51 |
|
58 | | - // Split on boundary with double dashes |
59 | | - const boundaryStr = `--${boundary}`; |
60 | | - const parts = newData.split(boundaryStr).filter(Boolean); |
| 52 | + // Split on boundary with double dashes and CRLF |
| 53 | + const boundaryStr = `--${boundary}${CRLF}`; |
| 54 | + const parts = newData.split(boundaryStr); |
61 | 55 |
|
62 | | - let lastCompleteChunkEnd = 0; |
63 | | - let currentPosition = 0; |
| 56 | + let currentPosition = state.lastProcessedLength; |
64 | 57 | const chunks: StreamingChunk[] = []; |
65 | | - let newBuffer = state.buffer; |
66 | 58 |
|
67 | 59 | for (let i = 0; i < parts.length; i++) { |
68 | 60 | const part = parts[i]; |
69 | 61 | const isLastPart = i === parts.length - 1; |
70 | 62 |
|
71 | | - // Skip the final boundary marker |
72 | | - if (part.trim() === '--') { |
73 | | - continue; |
74 | | - } |
| 63 | + // Split part into lines by CRLF |
| 64 | + const lines = part.split(CRLF); |
75 | 65 |
|
76 | | - // Extract JSON content while preserving line endings |
77 | | - const contentMatch = part.match( |
78 | | - /Content-Type: application\/json\r\n\r\n([\s\S]*?)(?:\r\n)?$/, |
79 | | - ); |
80 | | - if (!contentMatch) { |
| 66 | + // Find the empty line that separates headers from content |
| 67 | + const emptyLineIndex = lines.findIndex((line) => line === ''); |
| 68 | + if (emptyLineIndex === -1 || !lines[emptyLineIndex + 1]) { |
81 | 69 | if (isLastPart) { |
82 | | - newBuffer = part; |
83 | 70 | break; |
84 | 71 | } |
85 | 72 | continue; |
86 | 73 | } |
87 | 74 |
|
88 | | - const jsonContent = contentMatch[1]; |
89 | | - |
90 | | - // Calculate accurate position including: |
91 | | - // 1. Boundary length with CRLF |
92 | | - // 2. Content-Type header length with CRLF |
93 | | - // 3. Extra CRLF after headers |
94 | | - // 4. Part content length |
95 | | - const contentTypeHeader = 'Content-Type: application/json\r\n'; |
96 | | - const crlfLength = 2; // \r\n |
97 | | - currentPosition += |
98 | | - boundaryStr.length + |
99 | | - crlfLength + // boundary line |
100 | | - contentTypeHeader.length + // Content-Type header |
101 | | - crlfLength + // Extra CRLF after headers |
102 | | - jsonContent.length + // Actual content |
103 | | - crlfLength; // Final CRLF |
| 75 | + // const headers = parseHeaders(lines.slice(0, emptyLineIndex)); |
104 | 76 |
|
105 | | - // If it's the last part and not a complete chunk, store in buffer |
106 | | - if (isLastPart && !isCompleteChunk(jsonContent)) { |
107 | | - // For partial chunks, we need to store the raw content |
108 | | - // without headers since they will be added again in the next iteration |
109 | | - newBuffer = jsonContent; |
110 | | - break; |
111 | | - } |
| 77 | + const jsonContent = lines[emptyLineIndex + 1]; |
112 | 78 |
|
| 79 | + // if (headers.contentLength) { |
| 80 | + // jsonContent = jsonContent.slice(0, headers.contentLength); |
| 81 | + // } |
| 82 | + |
| 83 | + // if (headers.contentLength && jsonContent.length < headers.contentLength) { |
| 84 | + // newBuffer = jsonContent; |
| 85 | + // break; |
| 86 | + // } |
| 87 | + |
| 88 | + let parsedChunk: StreamingChunk | null = null; |
113 | 89 | try { |
114 | | - const parsedChunk = JSON.parse(jsonContent) as StreamingChunk; |
115 | | - chunks.push(parsedChunk); |
116 | | - lastCompleteChunkEnd = currentPosition; |
117 | | - newBuffer = ''; // Clear buffer after successful chunk parse |
118 | | - } catch { |
119 | | - if (isLastPart) { |
120 | | - newBuffer = jsonContent; |
121 | | - break; |
122 | | - } |
| 90 | + parsedChunk = JSON.parse(jsonContent) as StreamingChunk; |
| 91 | + } catch {} |
| 92 | + |
| 93 | + // If it's the last part and not a complete chunk, store in buffer |
| 94 | + if (!parsedChunk) { |
| 95 | + break; |
123 | 96 | } |
| 97 | + // Track position by adding boundary length and full part length |
| 98 | + chunks.push(parsedChunk); |
| 99 | + currentPosition += boundaryStr.length + part.length; |
124 | 100 | } |
125 | 101 |
|
126 | 102 | // Update state with precise position tracking |
127 | 103 | const newState: MultipartState = { |
128 | 104 | // Only update lastProcessedLength if we actually processed some chunks |
129 | | - lastProcessedLength: |
130 | | - chunks.length > 0 |
131 | | - ? state.lastProcessedLength + lastCompleteChunkEnd |
132 | | - : state.lastProcessedLength, |
133 | | - buffer: newBuffer, |
| 105 | + lastProcessedLength: currentPosition, |
134 | 106 | }; |
135 | 107 |
|
136 | 108 | return {chunks, state: newState}; |
|
0 commit comments