-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathparser.js
More file actions
256 lines (216 loc) · 9.4 KB
/
parser.js
File metadata and controls
256 lines (216 loc) · 9.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
// parser.js
import * as arrow from 'https://esm.sh/apache-arrow';
import * as parquet from 'https://cdn.jsdelivr.net/npm/parquet-wasm@0.7.0/esm/parquet_wasm.js';
let wasmInitialized = false;
// Find the geometry column name dynamically.
// Looks for "geometry" first, then common alternatives like "geom" or "wkb".
function findGeometryColumnName(row) {
const commonNames = ['geometry', 'geom', 'wkb_geometry', 'shape', 'wkb', 'geo_shape'];
for (const name of commonNames) {
if (row[name] instanceof Uint8Array) {
return name;
}
}
return null; // Return null if no suitable column is found
}
// Async generator function to process the file stream, now accepting a feature limit.
export async function* processParquetStream(file, limit = Infinity) {
if (!wasmInitialized) {
// As per the parquet-wasm README, await the default export to initialize
await parquet.default();
wasmInitialized = true;
}
// Send an initial status message from the worker once it's ready to process
self.postMessage({ type: 'status', payload: { message: "WASM modules initialized. Reading file...", batchCount: 0, totalProcessed: 0 } });
const parquetFile = await parquet.ParquetFile.fromFile(file);
const recordBatchStream = await parquetFile.stream();
const reader = recordBatchStream.getReader();
let totalProcessed = 0;
let limitReached = false;
let geometryColumnName = null; // To store the detected geometry column name
while (true) {
const { done, value: wasmRecordBatch } = await reader.read();
if (done) break;
// This `intoIPCStream` call consumes `wasmRecordBatch`, freeing its memory.
const ipcStream = wasmRecordBatch.intoIPCStream();
const table = arrow.tableFromIPC(ipcStream);
const batchFeatures = [];
const rowArray = table.toArray();
// On the first batch, try to detect the geometry column name
if (!geometryColumnName && rowArray.length > 0) {
// FIX: Call toJSON() on the first row object, not the entire array.
const firstRow = rowArray[0].toJSON();
geometryColumnName = findGeometryColumnName(firstRow);
}
for (const rowObject of rowArray) {
// Check the feature limit before processing the row.
if (totalProcessed >= limit) {
limitReached = true;
break; // Exit this inner for-loop over rows
}
// First convert to a plain object using toJSON
const jsonData = rowObject.toJSON();
const row = {};
// Handle each field, paying special attention to binary/geometry data
for (const [key, value] of Object.entries(jsonData)) {
// Skip null values
if (value === null) {
row[key] = null;
continue;
}
// Handle binary/geometry data
if (value instanceof Uint8Array) {
row[key] = new Uint8Array(value);
} else {
// For all other types, use the value as is
row[key] = value;
}
}
let geometry = null;
// Priority 1: Try to parse WKB geometry from the dynamically found column
if (geometryColumnName && row[geometryColumnName] instanceof Uint8Array) {
const wkb = row[geometryColumnName];
const buffer = wkb.buffer.slice(
wkb.byteOffset,
wkb.byteOffset + wkb.byteLength
);
geometry = parseWKB(buffer);
}
// Fallback: Use lon/lat columns if they exist and are valid
else if (isValidCoordinate(row.lon) && isValidCoordinate(row.lat)) {
geometry = { type: 'Point', coordinates: [row.lon, row.lat] };
}
if (geometry) {
// Create a clean properties object
const properties = { ...row };
if (geometryColumnName) {
delete properties[geometryColumnName];
}
delete properties.geometry;
delete properties.geometry_bbox; // Also remove the bbox geometry
batchFeatures.push({
type: 'Feature',
id: totalProcessed, // Add a unique ID for each feature
geometry: geometry,
properties: sanitizeProperties(properties) // Sanitize for BigInts
});
totalProcessed++;
}
}
// Yield the result for this batch
yield {
features: batchFeatures,
totalProcessed: totalProcessed,
limitReached: limitReached,
};
// If the limit was reached in the inner loop, break the outer loop as well.
if (limitReached) {
// Crucially, cancel the reader to stop downloading/processing the rest of the file.
await reader.cancel();
break;
}
}
}
function isValidCoordinate(value) {
return value !== undefined && value !== null && typeof value === 'number' && !isNaN(value);
}
// Function to parse Well-Known Binary (WKB) geometry
function parseWKB(wkbBuffer) {
try {
const view = new DataView(wkbBuffer);
let offset = 0;
const byteOrder = view.getUint8(offset++);
const littleEndian = byteOrder === 1;
const geomType = view.getUint32(offset, littleEndian);
offset += 4;
const actualGeomType = geomType & 0xFF; // Handle different WKB variants
// Helper function to read a point
function readPoint() {
const x = view.getFloat64(offset, littleEndian);
offset += 8;
const y = view.getFloat64(offset, littleEndian);
offset += 8;
return [x, y];
}
// Helper to read multiple points for LineString or MultiPoint
function readPoints() {
const numPoints = view.getUint32(offset, littleEndian);
offset += 4;
const points = [];
for (let i = 0; i < numPoints; i++) {
points.push(readPoint());
}
return points;
}
// Helper function to read a ring (for polygons)
function readLinearRing() {
const numPoints = view.getUint32(offset, littleEndian);
offset += 4;
const points = [];
for (let i = 0; i < numPoints; i++) {
points.push(readPoint());
}
return points;
}
// Helper to read all rings for a single polygon
function readPolygonRings() {
const numRings = view.getUint32(offset, littleEndian);
offset += 4;
const rings = [];
for (let i = 0; i < numRings; i++) {
rings.push(readLinearRing());
}
return rings;
}
switch (actualGeomType) {
case 1: // Point
return { type: 'Point', coordinates: readPoint() };
case 2: // LineString
return { type: 'LineString', coordinates: readPoints() };
case 3: // Polygon
return { type: 'Polygon', coordinates: readPolygonRings() };
case 4: // MultiPoint
return { type: 'MultiPoint', coordinates: readPoints() };
case 5: // MultiLineString
const numLines = view.getUint32(offset, littleEndian);
offset += 4;
const lines = [];
for (let i = 0; i < numLines; i++) {
offset++; // Skip byte order for each part
offset += 4; // Skip geometry type for each part
lines.push(readPoints());
}
return { type: 'MultiLineString', coordinates: lines };
case 6: // MultiPolygon
const numPolygons = view.getUint32(offset, littleEndian);
offset += 4;
const polygons = [];
for (let i = 0; i < numPolygons; i++) {
// Each polygon in a multipolygon has its own WKB header
offset++; // Skip byte order
offset += 4; // Skip geometry type
polygons.push(readPolygonRings());
}
return { type: 'MultiPolygon', coordinates: polygons };
default:
console.warn(`Unsupported WKB geometry type: ${actualGeomType}`);
return null;
}
} catch (error) {
console.error('Error parsing WKB:', error);
return null;
}
}
// Recursively sanitizes an object to convert BigInt values to strings for JSON compatibility.
function sanitizeProperties(properties) {
for (const key in properties) {
const value = properties[key];
if (typeof value === 'bigint') {
properties[key] = value.toString();
} else if (typeof value === 'object' && value !== null) {
// Recurse into nested objects if necessary (though unlikely for flat Parquet rows)
sanitizeProperties(value);
}
}
return properties;
}