Skip to content

Commit 1c9b28b

Browse files
authored
Fix web impl streaming line breaks with large rows (#333)
1 parent 5c08c5e commit 1c9b28b

File tree

5 files changed

+174
-57
lines changed

5 files changed

+174
-57
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# 1.7.0 (Common, Node.js, Web)
22

3+
## Bug fixes
4+
5+
- (Web only) Fixed an issue where streaming large datasets could provide corrupted results. See [#333](https://github.com/ClickHouse/clickhouse-js/pull/333) (PR) for more details.
6+
7+
## New features
8+
39
- (Experimental) Exposed the `parseColumnType` function that takes a string representation of a ClickHouse type (e.g., `FixedString(16)`, `Nullable(Int32)`, etc.) and returns an AST-like object that represents the type. For example:
410

511
```ts
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import type { ClickHouseClient } from '@clickhouse/client-common'
2+
import { fakerRU } from '@faker-js/faker'
3+
import { createTableWithFields } from '@test/fixtures/table_with_fields'
4+
5+
export async function genLargeStringsDataset<Stream = unknown>(
6+
client: ClickHouseClient<Stream>,
7+
{
8+
rows,
9+
words,
10+
}: {
11+
rows: number
12+
words: number
13+
},
14+
): Promise<{
15+
table: string
16+
values: { id: number; sentence: string; timestamp: string }[]
17+
}> {
18+
const table = await createTableWithFields(
19+
client as ClickHouseClient,
20+
`sentence String, timestamp String`,
21+
)
22+
const values = [...new Array(rows)].map((_, id) => ({
23+
id,
24+
// it seems that it is easier to trigger an incorrect behavior with non-ASCII symbols
25+
sentence: fakerRU.lorem.sentence(words),
26+
timestamp: new Date().toISOString(),
27+
}))
28+
await client.insert({
29+
table,
30+
values,
31+
format: 'JSONEachRow',
32+
})
33+
return {
34+
table,
35+
values,
36+
}
37+
}

packages/client-node/__tests__/integration/node_streaming_e2e.test.ts

Lines changed: 5 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@ import {
33
type ClickHouseClient,
44
type ClickHouseSettings,
55
} from '@clickhouse/client-common'
6-
import { fakerRU } from '@faker-js/faker'
76
import { createSimpleTable } from '@test/fixtures/simple_table'
8-
import { createTableWithFields } from '@test/fixtures/table_with_fields'
97
import { createTestClient, guid } from '@test/utils'
8+
import { genLargeStringsDataset } from '@test/utils/datasets'
109
import { tableFromIPC } from 'apache-arrow'
1110
import { Buffer } from 'buffer'
1211
import Fs from 'fs'
@@ -152,40 +151,9 @@ describe('[Node.js] streaming e2e', () => {
152151
// Here we generate a large enough dataset to break into multiple chunks while streaming,
153152
// effectively testing the implementation of incomplete rows handling
154153
describe('should correctly process multiple chunks', () => {
155-
async function generateData({
156-
rows,
157-
words,
158-
}: {
159-
rows: number
160-
words: number
161-
}): Promise<{
162-
table: string
163-
values: { id: number; sentence: string; timestamp: string }[]
164-
}> {
165-
const table = await createTableWithFields(
166-
client as ClickHouseClient,
167-
`sentence String, timestamp String`,
168-
)
169-
const values = [...new Array(rows)].map((_, id) => ({
170-
id,
171-
// it seems that it is easier to trigger an incorrect behavior with non-ASCII symbols
172-
sentence: fakerRU.lorem.sentence(words),
173-
timestamp: new Date().toISOString(),
174-
}))
175-
await client.insert({
176-
table,
177-
values,
178-
format: 'JSONEachRow',
179-
})
180-
return {
181-
table,
182-
values,
183-
}
184-
}
185-
186154
describe('large amount of rows', () => {
187155
it('should work with .json()', async () => {
188-
const { table, values } = await generateData({
156+
const { table, values } = await genLargeStringsDataset(client, {
189157
rows: 10000,
190158
words: 10,
191159
})
@@ -199,7 +167,7 @@ describe('[Node.js] streaming e2e', () => {
199167
})
200168

201169
it('should work with .stream()', async () => {
202-
const { table, values } = await generateData({
170+
const { table, values } = await genLargeStringsDataset(client, {
203171
rows: 10000,
204172
words: 10,
205173
})
@@ -222,7 +190,7 @@ describe('[Node.js] streaming e2e', () => {
222190

223191
describe("rows that don't fit into a single chunk", () => {
224192
it('should work with .json()', async () => {
225-
const { table, values } = await generateData({
193+
const { table, values } = await genLargeStringsDataset(client, {
226194
rows: 5,
227195
words: 10000,
228196
})
@@ -236,7 +204,7 @@ describe('[Node.js] streaming e2e', () => {
236204
})
237205

238206
it('should work with .stream()', async () => {
239-
const { table, values } = await generateData({
207+
const { table, values } = await genLargeStringsDataset(client, {
240208
rows: 5,
241209
words: 10000,
242210
})

packages/client-web/__tests__/integration/web_select_streaming.test.ts

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,21 @@
11
import type { ClickHouseClient, Row } from '@clickhouse/client-common'
22
import { createTestClient } from '@test/utils'
3+
import { genLargeStringsDataset } from '@test/utils/datasets'
34

45
describe('[Web] SELECT streaming', () => {
56
let client: ClickHouseClient<ReadableStream<Row[]>>
67
afterEach(async () => {
78
await client.close()
89
})
910
beforeEach(async () => {
10-
client = createTestClient()
11+
client = createTestClient({
12+
// It is required to disable keep-alive to allow for larger inserts
13+
// https://fetch.spec.whatwg.org/#http-network-or-cache-fetch
14+
// If contentLength is non-null and httpRequest’s keepalive is true, then:
15+
// <...>
16+
// If the sum of contentLength and inflightKeepaliveBytes is greater than 64 kibibytes, then return a network error.
17+
keep_alive: { enabled: false },
18+
})
1119
})
1220

1321
describe('consume the response only once', () => {
@@ -199,6 +207,75 @@ describe('[Web] SELECT streaming', () => {
199207
])
200208
})
201209
})
210+
211+
// See https://github.com/ClickHouse/clickhouse-js/issues/171 for more details
212+
// Here we generate a large enough dataset to break into multiple chunks while streaming,
213+
// effectively testing the implementation of incomplete rows handling
214+
describe('should correctly process multiple chunks', () => {
215+
describe('large amount of rows', () => {
216+
it('should work with .json()', async () => {
217+
const { table, values } = await genLargeStringsDataset(client, {
218+
rows: 10000,
219+
words: 10,
220+
})
221+
const result = await client
222+
.query({
223+
query: `SELECT * FROM ${table} ORDER BY id ASC`,
224+
format: 'JSONEachRow',
225+
})
226+
.then((r) => r.json())
227+
expect(result).toEqual(values)
228+
})
229+
230+
it('should work with .stream()', async () => {
231+
const { table, values } = await genLargeStringsDataset(client, {
232+
rows: 10000,
233+
words: 10,
234+
})
235+
const stream = await client
236+
.query({
237+
query: `SELECT * FROM ${table} ORDER BY id ASC`,
238+
format: 'JSONEachRow',
239+
})
240+
.then((r) => r.stream())
241+
242+
const result = await rowsJsonValues(stream)
243+
expect(result).toEqual(values)
244+
})
245+
})
246+
247+
describe("rows that don't fit into a single chunk", () => {
248+
it('should work with .json()', async () => {
249+
const { table, values } = await genLargeStringsDataset(client, {
250+
rows: 5,
251+
words: 10000,
252+
})
253+
const result = await client
254+
.query({
255+
query: `SELECT * FROM ${table} ORDER BY id ASC`,
256+
format: 'JSONEachRow',
257+
})
258+
.then((r) => r.json())
259+
expect(result).toEqual(values)
260+
})
261+
262+
it('should work with .stream()', async () => {
263+
const { table, values } = await genLargeStringsDataset(client, {
264+
rows: 5,
265+
words: 10000,
266+
})
267+
const stream = await client
268+
.query({
269+
query: `SELECT * FROM ${table} ORDER BY id ASC`,
270+
format: 'JSONEachRow',
271+
})
272+
.then((r) => r.stream())
273+
274+
const result = await rowsJsonValues(stream)
275+
expect(result).toEqual(values)
276+
})
277+
})
278+
})
202279
})
203280

204281
async function rowsJsonValues<T = unknown>(

packages/client-web/src/result_set.ts

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ import type {
99
import {
1010
isNotStreamableJSONFamily,
1111
isStreamableJSONFamily,
12+
validateStreamFormat,
1213
} from '@clickhouse/client-common'
13-
import { validateStreamFormat } from '@clickhouse/client-common'
1414
import { getAsText } from './utils'
1515

16+
const NEWLINE = 0x0a as const
17+
1618
export class ResultSet<Format extends DataFormat | unknown>
1719
implements BaseResultSet<ReadableStream<Row[]>, Format>
1820
{
@@ -67,40 +69,67 @@ export class ResultSet<Format extends DataFormat | unknown>
6769
this.markAsConsumed()
6870
validateStreamFormat(this.format)
6971

70-
let decodedChunk = ''
72+
let incompleteChunks: Uint8Array[] = []
73+
let totalIncompleteLength = 0
7174
const decoder = new TextDecoder('utf-8')
7275
const transform = new TransformStream({
7376
start() {
7477
//
7578
},
76-
transform: (chunk, controller) => {
79+
transform: (chunk: Uint8Array, controller) => {
7780
if (chunk === null) {
7881
controller.terminate()
7982
}
80-
decodedChunk += decoder.decode(chunk)
8183
const rows: Row[] = []
82-
// eslint-disable-next-line no-constant-condition
83-
while (true) {
84-
const idx = decodedChunk.indexOf('\n')
85-
if (idx !== -1) {
86-
const text = decodedChunk.slice(0, idx)
87-
decodedChunk = decodedChunk.slice(idx + 1)
84+
let idx: number
85+
let lastIdx = 0
86+
do {
87+
// an unescaped newline character denotes the end of a row
88+
idx = chunk.indexOf(NEWLINE, lastIdx)
89+
// there is no complete row in the rest of the current chunk
90+
if (idx === -1) {
91+
// to be processed during the next transform iteration
92+
const incompleteChunk = chunk.slice(lastIdx)
93+
incompleteChunks.push(incompleteChunk)
94+
totalIncompleteLength += incompleteChunk.length
95+
// send the extracted rows to the consumer, if any
96+
if (rows.length > 0) {
97+
controller.enqueue(rows)
98+
}
99+
} else {
100+
let text: string
101+
if (incompleteChunks.length > 0) {
102+
const completeRowBytes = new Uint8Array(
103+
totalIncompleteLength + idx,
104+
)
105+
106+
// using the incomplete chunks from the previous iterations
107+
let offset = 0
108+
incompleteChunks.forEach((incompleteChunk) => {
109+
completeRowBytes.set(incompleteChunk, offset)
110+
offset += incompleteChunk.length
111+
})
112+
// finalize the row with the current chunk slice that ends with a newline
113+
const finalChunk = chunk.slice(0, idx)
114+
completeRowBytes.set(finalChunk, offset)
115+
116+
// reset the incomplete chunks
117+
incompleteChunks = []
118+
totalIncompleteLength = 0
119+
120+
text = decoder.decode(completeRowBytes)
121+
} else {
122+
text = decoder.decode(chunk.slice(lastIdx, idx))
123+
}
88124
rows.push({
89125
text,
90126
json<T>(): T {
91127
return JSON.parse(text)
92128
},
93129
})
94-
} else {
95-
if (rows.length) {
96-
controller.enqueue(rows)
97-
}
98-
break
130+
lastIdx = idx + 1 // skipping newline character
99131
}
100-
}
101-
},
102-
flush() {
103-
decodedChunk = ''
132+
} while (idx !== -1)
104133
},
105134
})
106135

0 commit comments

Comments
 (0)