Skip to content

Commit 2405af3

Browse files
committed
pipeline loading per-node tables for large zips
1 parent 393fcc9 commit 2405af3

File tree

8 files changed

+146
-27
lines changed

8 files changed

+146
-27
lines changed

src/components/ErrorViewer.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ interface ErrorViewerProps {
88
tableName: string;
99
fullTableName?: string;
1010
isPreLoadError?: boolean;
11-
errorFiles?: Array<{ path: string; nodeId?: number; size?: number; isError: boolean }>;
12-
availableFiles?: Array<{ path: string; nodeId?: number; size?: number; isError: boolean }>;
11+
errorFiles?: Array<{ path: string; nodeId: number; size: number; isError: boolean }>;
12+
availableFiles?: Array<{ path: string; nodeId: number; size: number; isError: boolean }>;
1313
}
1414

1515
function ErrorViewer({

src/components/sidebar/TablesView.tsx

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,11 @@ function TablesView() {
9494

9595
if (justLoaded || rowCountJustBecameZero) {
9696
// This table just became empty
97-
console.log(`Table ${table.name} just loaded with 0 rows - delaying move to empty section`);
9897
newRecentlyEmpty.set(table.name, now);
9998
hasChanges = true;
10099

101100
// Set timeout to remove from recently empty after 3 seconds
102101
setTimeout(() => {
103-
console.log(`Moving ${table.name} to empty section after 3s delay`);
104102
setRecentlyEmptyTables((prev) => {
105103
const next = new Map(prev);
106104
next.delete(table.name);
@@ -632,6 +630,12 @@ function TablesView() {
632630
)}
633631
</div>
634632
)}
633+
{/* File progress (multi-node tables) */}
634+
{table.fileProgress && (
635+
<div className="file-progress-text">
636+
Loading file {table.fileProgress.current} of {table.fileProgress.total}
637+
</div>
638+
)}
635639
{/* Chunk progress bar */}
636640
{table.chunkProgress && (
637641
<div className="chunk-progress-bar">
@@ -757,6 +761,12 @@ function TablesView() {
757761
)}
758762
</div>
759763
)}
764+
{/* File progress (multi-node tables) */}
765+
{table.fileProgress && (
766+
<div className="file-progress-text">
767+
Loading file {table.fileProgress.current} of {table.fileProgress.total}
768+
</div>
769+
)}
760770
{table.chunkProgress && (
761771
<div className="chunk-progress-bar">
762772
<div

src/crdb/columnTypeRegistry.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,13 @@ export const COLUMN_TYPE_HINTS: ColumnTypeHint[] = [
5050
description: "Mixed timestamp formats",
5151
},
5252

53-
// crdb_internal.kv_node_status - tables with complex JSON that break CSV sniffing
53+
// Node IDs should be INTEGER (32-bit) across all tables
54+
{
55+
table: "crdb_internal.node_build_info",
56+
column: "node_id",
57+
duckdbType: "INTEGER",
58+
description: "Node ID as 32-bit integer",
59+
},
5460
{
5561
table: "crdb_internal.kv_node_status",
5662
column: "node_id",

src/services/WorkerManager.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,9 @@ export class WorkerManager implements IWorkerManager {
354354
message.chunkProgress as
355355
| { current: number; total: number; percentage: number }
356356
| undefined,
357+
message.fileProgress as
358+
| { current: number; total: number; percentage: number }
359+
| undefined,
357360
);
358361
break;
359362

src/state/AppContext.tsx

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,11 @@ export function AppProvider({ children }: { children: ReactNode }) {
542542
total: number;
543543
percentage: number;
544544
},
545+
fileProgress?: {
546+
current: number;
547+
total: number;
548+
percentage: number;
549+
},
545550
) => {
546551
if (!mounted) return;
547552

@@ -556,6 +561,10 @@ export function AppProvider({ children }: { children: ReactNode }) {
556561
if (chunkProgress) {
557562
updates.chunkProgress = chunkProgress;
558563
}
564+
// Add file progress if provided
565+
if (fileProgress) {
566+
updates.fileProgress = fileProgress;
567+
}
559568
break;
560569
case "completed":
561570
updates.loading = false;
@@ -565,6 +574,7 @@ export function AppProvider({ children }: { children: ReactNode }) {
565574
}
566575
updates.deferred = false;
567576
updates.chunkProgress = undefined; // Clear chunk progress
577+
updates.fileProgress = undefined; // Clear file progress
568578
updates.isError = false; // Clear error flag since we successfully loaded
569579
break;
570580
case "error":

src/state/types.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@ export interface TableMeta {
7474
total: number;
7575
percentage: number;
7676
};
77+
fileProgress?: {
78+
// For multi-node tables loading multiple files
79+
current: number;
80+
total: number;
81+
percentage: number;
82+
};
7783
nodeFiles?: Array<{
7884
// For multi-node tables
7985
path: string;
@@ -286,6 +292,11 @@ export interface IWorkerManagerCallbacks {
286292
total: number;
287293
percentage: number;
288294
},
295+
fileProgress?: {
296+
current: number;
297+
total: number;
298+
percentage: number;
299+
},
289300
) => void;
290301
onTableLoadingComplete?: (
291302
success: boolean,

src/styles/components.css

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2026,6 +2026,14 @@
20262026
}
20272027
}
20282028

2029+
/* File progress text for multi-node tables */
2030+
.file-progress-text {
2031+
font-size: 10px;
2032+
color: var(--text-secondary);
2033+
margin-top: 2px;
2034+
font-style: italic;
2035+
}
2036+
20292037
/* Chunk progress bar for large file loading */
20302038
.chunk-progress-bar {
20312039
position: relative;

src/workers/db.worker.ts

Lines changed: 93 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ function generateCsvReadSql(options: CsvReadOptions): string {
224224
// Add debug_node column for multi-node tables (per-node data like node_queries)
225225
// Skip debug_node for single-file tables (cluster-wide data like cluster_settings)
226226
const selectClause = nodeId !== undefined
227-
? `SELECT ${nodeId} AS debug_node, * FROM read_csv('${fileName}', ${csvParams})`
227+
? `SELECT CAST(${nodeId} AS INTEGER) AS debug_node, * FROM read_csv('${fileName}', ${csvParams})`
228228
: `SELECT * FROM read_csv('${fileName}', ${csvParams})`;
229229

230230
if (operation === 'create') {
@@ -270,7 +270,7 @@ async function loadLargeFileIncrementally(
270270
FROM information_schema.tables
271271
WHERE table_name = '${tableName}' AND table_schema = 'main'
272272
`);
273-
tableExists = checkResult.toArray()[0].count > 0;
273+
tableExists = Number(checkResult.toArray()[0].count) > 0;
274274
} else {
275275
// Drop table if exists (only for non-multi-node tables)
276276
await conn.query(`DROP TABLE IF EXISTS ${quotedTableName}`);
@@ -482,18 +482,18 @@ async function loadLargeFileIncrementally(
482482
const countBeforeResult = await conn.query(
483483
`SELECT COUNT(*) as count FROM ${quotedTableName} WHERE debug_node != ${nodeId}`,
484484
);
485-
const countBefore = countBeforeResult.toArray()[0].count;
485+
const countBefore = Number(countBeforeResult.toArray()[0].count);
486486
const countAfterResult = await conn.query(
487487
`SELECT COUNT(*) as count FROM ${quotedTableName}`,
488488
);
489-
const countAfter = countAfterResult.toArray()[0].count;
489+
const countAfter = Number(countAfterResult.toArray()[0].count);
490490
finalRowCount = countAfter - countBefore;
491491
} else {
492492
// For CREATE operations or single-node tables, get total count
493493
const countResult = await conn.query(
494494
`SELECT COUNT(*) as count FROM ${quotedTableName}`,
495495
);
496-
finalRowCount = countResult.toArray()[0].count;
496+
finalRowCount = Number(countResult.toArray()[0].count);
497497
}
498498

499499
console.log(`✅ Successfully loaded large table with ${finalRowCount} rows`);
@@ -798,8 +798,24 @@ async function startTableLoading(message: StartTableLoadingMessage) {
798798
}
799799

800800
async function loadSingleTableFromMessage(message: LoadSingleTableMessage) {
801-
const { table } = message;
802-
await loadSingleTable(table);
801+
const { table, id } = message;
802+
try {
803+
await loadSingleTable(table);
804+
// Send success response
805+
sendResponse(message, {
806+
type: "loadSingleTableComplete",
807+
id,
808+
success: true,
809+
});
810+
} catch (error) {
811+
// Send error response
812+
sendResponse(message, {
813+
type: "loadSingleTableComplete",
814+
id,
815+
success: false,
816+
error: error instanceof Error ? error.message : "Unknown error",
817+
});
818+
}
803819
}
804820

805821
interface TableInfo {
@@ -852,20 +868,75 @@ async function loadSingleTable(table: TableInfo) {
852868
size,
853869
});
854870

855-
// Handle multi-node tables
871+
// Handle multi-node tables with sliding window parallel loading
856872
if (nodeFiles && nodeFiles.length > 0) {
857-
858873
let totalRowCount = 0;
859-
for (const nodeFile of nodeFiles) {
860-
// Skip error files - don't try to load them into DuckDB
861-
if (nodeFile.isError) {
862-
continue;
874+
let filesProcessed = 0;
875+
const totalFiles = nodeFiles.filter(f => !f.isError).length;
876+
const validFiles = nodeFiles.filter(f => !f.isError);
877+
878+
// Sliding window: limit by file count AND byte size
879+
const MAX_QUEUED_FILES = 20; // Max files in-flight
880+
const MAX_QUEUED_BYTES = 8 * 1024 * 1024; // Max 8MB of compressed data in-flight
881+
const pendingPromises: Array<Promise<{ nodeFile: typeof validFiles[0], response: any }>> = [];
882+
let nextFileIndex = 0;
883+
let queuedBytes = 0;
884+
885+
// Fill initial window (stop when either limit is reached)
886+
while (
887+
nextFileIndex < validFiles.length &&
888+
pendingPromises.length < MAX_QUEUED_FILES &&
889+
queuedBytes < MAX_QUEUED_BYTES
890+
) {
891+
const nodeFile = validFiles[nextFileIndex];
892+
pendingPromises.push(
893+
sendMessageToZipWorker({
894+
type: "readFileChunked",
895+
path: nodeFile.path,
896+
}).then(response => ({ nodeFile, response }))
897+
);
898+
queuedBytes += nodeFile.size;
899+
nextFileIndex++;
900+
}
901+
902+
// Process files as they complete, maintaining the window
903+
while (pendingPromises.length > 0) {
904+
// Wait for first promise to complete (FIFO order)
905+
const { nodeFile, response: fileResponse } = await pendingPromises.shift()!;
906+
907+
filesProcessed++;
908+
queuedBytes -= nodeFile.size; // Remove completed file from queue size
909+
910+
// Queue more files to maintain thresholds (stop when either limit is reached)
911+
while (
912+
nextFileIndex < validFiles.length &&
913+
pendingPromises.length < MAX_QUEUED_FILES &&
914+
queuedBytes < MAX_QUEUED_BYTES
915+
) {
916+
const nextFile = validFiles[nextFileIndex];
917+
pendingPromises.push(
918+
sendMessageToZipWorker({
919+
type: "readFileChunked",
920+
path: nextFile.path,
921+
}).then(response => ({ nodeFile: nextFile, response }))
922+
);
923+
queuedBytes += nextFile.size;
924+
nextFileIndex++;
863925
}
864926

865-
// Request file from zip worker
866-
const fileResponse = await sendMessageToZipWorker({
867-
type: "readFileChunked",
868-
path: nodeFile.path,
927+
// Send progress update
928+
self.postMessage({
929+
type: "tableLoadProgress",
930+
tableName,
931+
status: "loading",
932+
nodeId,
933+
originalName,
934+
isError,
935+
fileProgress: {
936+
current: filesProcessed,
937+
total: totalFiles,
938+
percentage: Math.round((filesProcessed / totalFiles) * 100),
939+
},
869940
});
870941

871942
if (!fileResponse.success) {
@@ -897,7 +968,7 @@ async function loadSingleTable(table: TableInfo) {
897968
);
898969
totalRowCount += rowCount;
899970
} else {
900-
// Load from text
971+
// Load from text - DuckDB operations are sequential, but zip worker stays busy
901972
const rowCount = await loadTableFromText(
902973
tableName,
903974
text,
@@ -931,7 +1002,7 @@ async function loadSingleTable(table: TableInfo) {
9311002
const countResult = await conn.query(
9321003
`SELECT COUNT(*) as count FROM ${quotedTableName}`,
9331004
);
934-
const rowCount = countResult.toArray()[0].count;
1005+
const rowCount = Number(countResult.toArray()[0].count);
9351006

9361007
self.postMessage({
9371008
type: "tableLoadProgress",
@@ -1053,7 +1124,7 @@ async function loadTableFromText(
10531124
const countResult = await conn.query(
10541125
`SELECT COUNT(*) as count FROM ${quotedTableName}`,
10551126
);
1056-
return countResult.toArray()[0].count;
1127+
return Number(countResult.toArray()[0].count);
10571128
}
10581129

10591130
try {
@@ -1102,7 +1173,7 @@ async function loadTableFromText(
11021173
FROM information_schema.tables
11031174
WHERE table_name = '${tableName}' AND table_schema = 'main'
11041175
`);
1105-
tableExists = checkResult.toArray()[0].count > 0;
1176+
tableExists = Number(checkResult.toArray()[0].count) > 0;
11061177
}
11071178

11081179
// Create table from CSV with auto-detection or explicit types
@@ -1232,7 +1303,7 @@ async function loadTableFromText(
12321303
const countResult = await conn.query(
12331304
`SELECT COUNT(*) as count FROM ${quotedTableName}`,
12341305
);
1235-
count = countResult.toArray()[0].count;
1306+
count = Number(countResult.toArray()[0].count);
12361307
}
12371308

12381309
loadedTables.add(tableName);

0 commit comments

Comments
 (0)