@@ -21,6 +21,7 @@ interface BatchProcessOptions {
2121 sourcePath ?: string ;
2222 nodeId ?: number ;
2323 forceInsert ?: boolean ; // Force INSERT mode (table already exists)
24+ ignoreErrors ?: boolean ; // Use relaxed CSV parsing to load partial data
2425}
2526
2627const BATCH_SIZE_ROWS = 10000 ; // Max rows per batch
@@ -129,6 +130,7 @@ function tryReplaceWithPrettyKey(value: string): string {
129130 return value ;
130131 }
131132
133+ // Handle hex keys (start with \x)
132134 if ( value === "\\x" || value . startsWith ( "\\x" ) ) {
133135 try {
134136 const decoded = prettyKey ( value ) ;
@@ -138,16 +140,74 @@ function tryReplaceWithPrettyKey(value: string): string {
138140 }
139141 }
140142
143+ // Handle base64-encoded keys (like oA==, oQ==, etc.)
144+ // Check if it looks like base64: only contains base64 chars and correct padding
145+ if ( / ^ [ A - Z a - z 0 - 9 + / ] * ( = | = = ) ? $ / . test ( value ) && value . length % 4 === 0 && value . length > 0 ) {
146+ try {
147+ // Convert base64 to hex
148+ const binaryString = atob ( value ) ;
149+ const bytes = new Uint8Array ( binaryString . length ) ;
150+ for ( let i = 0 ; i < binaryString . length ; i ++ ) {
151+ bytes [ i ] = binaryString . charCodeAt ( i ) ;
152+ }
153+ const hexStr = Array . from ( bytes )
154+ . map ( ( b ) => b . toString ( 16 ) . padStart ( 2 , "0" ) )
155+ . join ( "" ) ;
156+
157+ const decoded = prettyKey ( hexStr ) ;
158+ // Only use the decoded version if it's different from the hex
159+ if ( decoded . pretty !== hexStr ) {
160+ return decoded . pretty ;
161+ }
162+ } catch {
163+ // If decoding fails, return original value
164+ }
165+ }
166+
141167 return value ;
142168}
143169
170+ // Recursively decode base64 keys in JSON objects
171+ function decodeKeysInJson ( obj : unknown ) : unknown {
172+ if ( typeof obj === "string" ) {
173+ // If it's a string that looks like base64, try to decode it as a key
174+ return tryReplaceWithPrettyKey ( obj ) ;
175+ } else if ( Array . isArray ( obj ) ) {
176+ return obj . map ( decodeKeysInJson ) ;
177+ } else if ( obj !== null && typeof obj === "object" ) {
178+ const result : Record < string , unknown > = { } ;
179+ for ( const [ key , value ] of Object . entries ( obj ) ) {
180+ // Decode values in key-related fields
181+ if ( key . toLowerCase ( ) . includes ( "key" ) || key === "start" || key === "end" ) {
182+ result [ key ] = typeof value === "string" ? tryReplaceWithPrettyKey ( value ) : decodeKeysInJson ( value ) ;
183+ } else {
184+ result [ key ] = decodeKeysInJson ( value ) ;
185+ }
186+ }
187+ return result ;
188+ }
189+ return obj ;
190+ }
191+
192+ function decodeJsonKeys ( jsonString : string ) : string {
193+ try {
194+ const parsed = JSON . parse ( jsonString ) ;
195+ const decoded = decodeKeysInJson ( parsed ) ;
196+ return JSON . stringify ( decoded ) ;
197+ } catch {
198+ // If parsing fails, return original
199+ return jsonString ;
200+ }
201+ }
202+
144203async function processRow (
145204 row : string [ ] ,
146205 headers : string [ ] ,
147206 rowIndex : number ,
148207 options : {
149208 tableName : string ;
150209 keyColumns : Set < number > ;
210+ jsonKeyColumns : Set < number > ;
151211 protoColumns : Map < number , string | null > ;
152212 infoKeyColumnIndex : number ;
153213 decodeKeys : boolean ;
@@ -157,6 +217,18 @@ async function processRow(
157217) : Promise < string [ ] > {
158218 const processedRow = await Promise . all (
159219 row . map ( async ( value , colIndex ) => {
220+ // Transform JSON columns that contain base64 keys
221+ if ( options . jsonKeyColumns . has ( colIndex ) && options . decodeKeys ) {
222+ if ( value === "\\N" || value === "NULL" || ! value ) {
223+ return value ;
224+ }
225+ // Check if it looks like JSON
226+ if ( value . startsWith ( "{" ) || value . startsWith ( "[" ) ) {
227+ return decodeJsonKeys ( value ) ;
228+ }
229+ return value ;
230+ }
231+
160232 // Transform key columns
161233 if ( options . keyColumns . has ( colIndex ) && options . decodeKeys ) {
162234 if ( value === "\\N" || value === "NULL" ) {
@@ -242,7 +314,7 @@ export async function preprocessAndLoadInBatches(
242314 content : string ,
243315 options : BatchProcessOptions
244316) : Promise < number > {
245- const { conn, db, tableName, delimiter, sourcePath, nodeId } = options ;
317+ const { conn, db, tableName, delimiter, sourcePath, nodeId, ignoreErrors } = options ;
246318
247319 // Process content in a streaming fashion
248320 let position = 0 ;
@@ -266,6 +338,7 @@ export async function preprocessAndLoadInBatches(
266338 // Identify columns that need special processing
267339 const keyColumns = new Set < number > ( ) ;
268340 const protoColumns = new Map < number , string | null > ( ) ;
341+ const jsonKeyColumns = new Set < number > ( ) ; // JSON columns that may contain base64 keys
269342 let infoKeyColumnIndex = - 1 ;
270343
271344 headers . forEach ( ( header , index ) => {
@@ -276,7 +349,14 @@ export async function preprocessAndLoadInBatches(
276349 keyColumns . add ( index ) ;
277350 }
278351
279- // Check for proto columns
352+ // Check for JSON columns that may contain encoded keys (e.g., rangelog.info)
353+ // These are already JSON from CRDB, not protos that need decoding
354+ if ( options . decodeKeys && columnName === "info" &&
355+ ( tableName . includes ( "rangelog" ) || tableName . includes ( "range_log" ) ) ) {
356+ jsonKeyColumns . add ( index ) ;
357+ }
358+
359+ // Check for proto columns (hex-encoded that need proto->JSON conversion)
280360 if ( options . decodeProtos && options . protoDecoder ) {
281361 if ( columnName === "config" || columnName === "descriptor" ||
282362 columnName === "payload" || columnName === "progress" || columnName === "value" ) {
@@ -347,6 +427,7 @@ export async function preprocessAndLoadInBatches(
347427 const processedRow = await processRow ( row , headers , lineNumber , {
348428 tableName,
349429 keyColumns,
430+ jsonKeyColumns,
350431 protoColumns,
351432 infoKeyColumnIndex,
352433 decodeKeys : options . decodeKeys ,
@@ -383,6 +464,7 @@ export async function preprocessAndLoadInBatches(
383464 nodeId, // Always pass nodeId for both CREATE and INSERT
384465 typeHints,
385466 headers : ! tableCreated ? headers : undefined ,
467+ ignoreErrors,
386468 } ) ;
387469
388470 await conn . query ( sql ) ;
@@ -422,6 +504,7 @@ export async function preprocessAndLoadInBatches(
422504 nodeId, // Always pass nodeId for both CREATE and INSERT
423505 typeHints,
424506 headers : ! tableCreated ? headers : undefined ,
507+ ignoreErrors,
425508 } ) ;
426509
427510 await conn . query ( sql ) ;
0 commit comments