Skip to content

Commit 02d3e18

Browse files
authored
Support Parquet ZSTD and BigInt JSON (#54)
## Summary - register ZSTD support for Parquet via zlib - normalize BigInt values to JSON-safe numbers or strings - update Parquet tests and README ## Testing - yarn test --runTestsByPath __tests__/parquet_reader.test.js
1 parent bbce9e6 commit 02d3e18

File tree

7 files changed

+190
-9
lines changed

7 files changed

+190
-9
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"node-es-transformer": patch
3+
---
4+
5+
Add ZSTD support for Parquet ingestion and normalize BigInt values during indexing

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,8 @@ Choose **one** of these sources:
344344
- **`sourceFormat`** (`'ndjson' | 'csv' | 'parquet' | 'arrow'`): Format for file/stream sources. Default: `'ndjson'`.
345345
- `arrow` expects Arrow IPC file/stream payloads.
346346
- `parquet` stream sources are currently buffered in memory before row iteration (file sources remain streaming by row cursor).
347+
- `parquet` supports ZSTD-compressed files when running on Node.js 22+ (uses the built-in zlib zstd implementation).
348+
- `parquet` INT64 values are normalized for JSON: safe-range values become numbers, larger values become strings.
347349
- **`csvOptions`** (object): CSV parser options (delimiter, quote, columns, etc.) used when `sourceFormat: 'csv'`.
348350

349351
#### Client Configuration

__tests__/parquet_reader.test.js

Lines changed: 123 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,32 @@ const path = require('path');
44

55
const retry = require('async-retry');
66
const parquet = require('@dsnp/parquetjs');
7+
const zlib = require('zlib');
8+
const { PARQUET_COMPRESSION_METHODS } = require('@dsnp/parquetjs/dist/lib/compression');
9+
10+
function registerZstdCompression() {
11+
if (PARQUET_COMPRESSION_METHODS.ZSTD) {
12+
return;
13+
}
14+
15+
if (
16+
typeof zlib.zstdCompressSync !== 'function' ||
17+
typeof zlib.zstdDecompressSync !== 'function'
18+
) {
19+
throw new Error('ZSTD compression requires Node.js with zstd support.');
20+
}
21+
22+
PARQUET_COMPRESSION_METHODS.ZSTD = {
23+
deflate(value) {
24+
return zlib.zstdCompressSync(value);
25+
},
26+
inflate(value) {
27+
return zlib.zstdDecompressSync(value);
28+
},
29+
};
30+
}
31+
32+
registerZstdCompression();
733

834
const transformer = require('../dist/node-es-transformer.cjs');
935
const deleteIndex = require('./utils/delete_index');
@@ -13,13 +39,16 @@ const client = getElasticsearchClient();
1339

1440
const indexes = {
1541
single: 'parquet_file_reader_single',
42+
noTransform: 'parquet_file_reader_no_transform',
1643
wildcard: 'parquet_file_reader_wildcard',
1744
stream: 'parquet_stream_reader',
45+
zstd: 'parquet_file_reader_zstd',
1846
};
1947

2048
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'node-es-transformer-parquet-'));
2149
const sampleFile1 = path.join(tempDir, 'sample_data_parquet_1.parquet');
2250
const sampleFile2 = path.join(tempDir, 'sample_data_parquet_2.parquet');
51+
const sampleFileZstd = path.join(tempDir, 'sample_data_parquet_zstd.parquet');
2352

2453
async function runTransformerAndWait(options) {
2554
const { events } = await transformer(options);
@@ -30,12 +59,16 @@ async function runTransformerAndWait(options) {
3059
});
3160
}
3261

33-
async function createParquetFile(filePath, rows) {
62+
function buildField(type, compression) {
63+
return compression ? { type, compression } : { type };
64+
}
65+
66+
async function createParquetFile(filePath, rows, compression) {
3467
const schema = new parquet.ParquetSchema({
35-
the_index: { type: 'INT64' },
36-
code: { type: 'INT64' },
37-
url: { type: 'UTF8' },
38-
text: { type: 'UTF8' },
68+
the_index: buildField('INT64', compression),
69+
code: buildField('INT64', compression),
70+
url: buildField('UTF8', compression),
71+
text: buildField('UTF8', compression),
3972
});
4073

4174
const writer = await parquet.ParquetWriter.openFile(schema, filePath);
@@ -87,12 +120,33 @@ describe('indexes parquet sources', () => {
87120
text: 'parquet-five',
88121
},
89122
]);
123+
124+
await createParquetFile(
125+
sampleFileZstd,
126+
[
127+
{
128+
the_index: 6,
129+
code: 600,
130+
url: 'https://example.com/p6',
131+
text: 'parquet-zstd-one',
132+
},
133+
{
134+
the_index: 7,
135+
code: 700,
136+
url: 'https://example.com/p7',
137+
text: 'parquet-zstd-two',
138+
},
139+
],
140+
'ZSTD',
141+
);
90142
});
91143

92144
afterAll(async () => {
93145
await deleteIndex(client, indexes.single)();
146+
await deleteIndex(client, indexes.noTransform)();
94147
await deleteIndex(client, indexes.wildcard)();
95148
await deleteIndex(client, indexes.stream)();
149+
await deleteIndex(client, indexes.zstd)();
96150
await client.close();
97151
fs.rmSync(tempDir, { recursive: true, force: true });
98152
});
@@ -132,6 +186,69 @@ describe('indexes parquet sources', () => {
132186
});
133187
});
134188

189+
it('should index a parquet file without a transform', async () => {
190+
await runTransformerAndWait({
191+
fileName: sampleFile1,
192+
sourceFormat: 'parquet',
193+
targetIndexName: indexes.noTransform,
194+
mappings: {
195+
properties: {
196+
the_index: { type: 'integer' },
197+
code: { type: 'integer' },
198+
url: { type: 'keyword' },
199+
text: { type: 'keyword' },
200+
},
201+
},
202+
verbose: false,
203+
});
204+
205+
await client.indices.refresh({ index: indexes.noTransform });
206+
207+
await retry(async () => {
208+
const res = await fetch(`${elasticsearchUrl}/${indexes.noTransform}/_search?q=the_index:2`);
209+
expect(res.status).toBe(200);
210+
211+
const body = await res.json();
212+
expect(body?.hits?.total?.value).toBe(1);
213+
expect(body?.hits?.hits?.[0]?._source?.text).toBe('parquet-two');
214+
});
215+
});
216+
217+
it('should index a ZSTD-compressed parquet file', async () => {
218+
await runTransformerAndWait({
219+
fileName: sampleFileZstd,
220+
sourceFormat: 'parquet',
221+
targetIndexName: indexes.zstd,
222+
mappings: {
223+
properties: {
224+
the_index: { type: 'integer' },
225+
code: { type: 'integer' },
226+
url: { type: 'keyword' },
227+
text: { type: 'keyword' },
228+
},
229+
},
230+
transform(doc) {
231+
return {
232+
...doc,
233+
the_index: Number(doc.the_index),
234+
code: Number(doc.code),
235+
};
236+
},
237+
verbose: false,
238+
});
239+
240+
await client.indices.refresh({ index: indexes.zstd });
241+
242+
await retry(async () => {
243+
const res = await fetch(`${elasticsearchUrl}/${indexes.zstd}/_search?q=the_index:6`);
244+
expect(res.status).toBe(200);
245+
246+
const body = await res.json();
247+
expect(body?.hits?.total?.value).toBe(1);
248+
expect(body?.hits?.hits?.[0]?._source?.text).toBe('parquet-zstd-one');
249+
});
250+
});
251+
135252
it('should index parquet files through wildcard patterns', async () => {
136253
await runTransformerAndWait({
137254
fileName: path.join(tempDir, 'sample_data_parquet_*.parquet'),
@@ -162,7 +279,7 @@ describe('indexes parquet sources', () => {
162279
expect(res.status).toBe(200);
163280

164281
const body = await res.json();
165-
expect(body?.count).toBe(5);
282+
expect(body?.count).toBe(7);
166283
});
167284
});
168285

src/_file-reader.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import parquet from '@dsnp/parquetjs';
1+
import parquet from './_parquet';
22
import * as arrow from 'apache-arrow';
33
import fs from 'fs';
44
import { parse } from 'csv-parse';

src/_index-queue.js

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,26 @@ import { DEFAULT_BUFFER_SIZE } from './_constants';
55
const EventEmitter = require('events');
66

77
const parallelCalls = 5;
8+
const MAX_SAFE_BIGINT = BigInt(Number.MAX_SAFE_INTEGER);
9+
const MIN_SAFE_BIGINT = BigInt(Number.MIN_SAFE_INTEGER);
10+
11+
function coerceBigInt(value) {
12+
if (value >= MIN_SAFE_BIGINT && value <= MAX_SAFE_BIGINT) {
13+
return Number(value);
14+
}
15+
16+
return value.toString();
17+
}
18+
19+
function safeStringify(doc) {
20+
return JSON.stringify(doc, (_key, value) => {
21+
if (typeof value === 'bigint') {
22+
return coerceBigInt(value);
23+
}
24+
25+
return value;
26+
});
27+
}
828

929
// a simple helper queue to bulk index documents
1030
export default function indexQueueFactory({
@@ -126,7 +146,7 @@ export default function indexQueueFactory({
126146
throw new Error('Unexpected doc added after indexer should finish.');
127147
}
128148

129-
const canContinue = stream.write(`${JSON.stringify(doc)}\n`);
149+
const canContinue = stream.write(`${safeStringify(doc)}\n`);
130150
if (!canContinue) {
131151
queueEmitter.emit('pause');
132152

src/_parquet.js

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import parquet from '@dsnp/parquetjs';
2+
import zlib from 'zlib';
3+
import { PARQUET_COMPRESSION_METHODS } from '@dsnp/parquetjs/dist/lib/compression.js';
4+
5+
function registerZstdCompression() {
6+
if (PARQUET_COMPRESSION_METHODS.ZSTD) {
7+
return;
8+
}
9+
10+
if (
11+
typeof zlib.zstdCompressSync !== 'function' ||
12+
typeof zlib.zstdDecompressSync !== 'function'
13+
) {
14+
PARQUET_COMPRESSION_METHODS.ZSTD = {
15+
deflate() {
16+
throw new Error('ZSTD compression requires Node.js with zstd support.');
17+
},
18+
inflate() {
19+
throw new Error('ZSTD compression requires Node.js with zstd support.');
20+
},
21+
};
22+
return;
23+
}
24+
25+
PARQUET_COMPRESSION_METHODS.ZSTD = {
26+
deflate(value) {
27+
return zlib.zstdCompressSync(value);
28+
},
29+
inflate(value) {
30+
return zlib.zstdDecompressSync(value);
31+
},
32+
};
33+
}
34+
35+
registerZstdCompression();
36+
37+
export default parquet;

src/_stream-reader.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import parquet from '@dsnp/parquetjs';
1+
import parquet from './_parquet';
22
import * as arrow from 'apache-arrow';
33
import { parse } from 'csv-parse';
44
import es from 'event-stream';

0 commit comments

Comments
 (0)