@@ -151,10 +151,10 @@ function createFallbackJson(hexValue: string): string {
151151}
152152
153153// Main preprocessing function - transforms keys in place
154- export function preprocessCSV (
154+ export async function preprocessCSV (
155155 content : string ,
156156 options : PreprocessOptions ,
157- ) : string {
157+ ) : Promise < string > {
158158 const delimiter = options . delimiter || "\t" ;
159159 const decoder = options . protoDecoder || protoDecoder ; // Use provided decoder or fallback to global
160160 const { headers, rows } = parseDelimited ( content , delimiter ) ;
@@ -194,15 +194,10 @@ export function preprocessCSV(
194194
195195 // Check for proto columns
196196 if ( options . decodeProtos ) {
197- if (
198- columnName === "config" ||
199- columnName === "descriptor" ||
200- columnName === "payload" ||
201- columnName === "progress" ||
202- columnName === "value"
203- ) {
204- const mapping = findProtoType ( options . tableName , header ) ;
205- protoColumns . set ( index , mapping ?. protoType || null ) ;
197+ const mapping = findProtoType ( options . tableName , header ) ;
198+ if ( mapping ) {
199+ protoColumns . set ( index , mapping . protoType ) ;
200+ console . log ( `📋 Proto mapping: ${ options . tableName } .${ header } -> ${ mapping . protoType } ` ) ;
206201 }
207202 }
208203 } ) ;
@@ -214,8 +209,8 @@ export function preprocessCSV(
214209
215210 // Process rows - transform values in place
216211
217- const processedRows = rows . map ( ( row ) => {
218- return row . map ( ( value , colIndex ) => {
212+ const processedRows = await Promise . all ( rows . map ( async ( row , rowIndex ) => {
213+ return Promise . all ( row . map ( async ( value , colIndex ) => {
219214 // Transform key columns
220215 if ( keyColumns . has ( colIndex ) ) {
221216 // Handle null/empty differently from \x (which is a valid empty key)
@@ -230,15 +225,51 @@ export function preprocessCSV(
230225 if ( protoType !== undefined && options . decodeProtos ) {
231226 // Handle dynamic proto type resolution for job_info table
232227 if ( protoType === "dynamic:job_info" && infoKeyColumnIndex >= 0 ) {
233- const infoKey = row [ infoKeyColumnIndex ] ;
228+ const infoKey = row [ infoKeyColumnIndex ] ?. trim ( ) ;
234229 if ( infoKey === "legacy_payload" ) {
235230 protoType = "cockroach.sql.jobs.jobspb.Payload" ;
236231 } else if ( infoKey === "legacy_progress" ) {
237232 protoType = "cockroach.sql.jobs.jobspb.Progress" ;
233+ } else if ( infoKey ?. includes ( "cockroach.sql.jobs.jobspb.TraceData" ) ) {
234+ // Handle TraceData specially
235+ if ( infoKey . includes ( ".binpb#_final" ) ) {
236+ // This is the final chunk - try to decode, but silently fail to hex wrapper
237+ protoType = "cockroach.sql.jobs.jobspb.TraceData" ;
238+ } else {
239+ // This is a partial chunk - go straight to hex wrapper without trying to decode
240+ if ( value && value !== "\\N" && value !== "NULL" ) {
241+ return createFallbackJson ( value ) ;
242+ }
243+ return value ;
244+ }
245+ } else if (
246+ infoKey ?. startsWith ( "~dsp-diag-url-" ) ||
247+ infoKey ?. startsWith ( "~node-processor-progress-" )
248+ ) {
249+ // These info_key types contain string values, not protobuf
250+ // If hex-encoded, decode and wrap in JSON
251+ if ( value && value !== "\\N" && value !== "NULL" ) {
252+ if ( value . startsWith ( "\\x" ) ) {
253+ return createFallbackJson ( value ) ;
254+ }
255+ // If it's already a plain string, wrap it in JSON for consistency
256+ return JSON . stringify ( { value : value } ) ;
257+ }
258+ return value ;
238259 } else {
239- // Unknown info_key type - use fallback JSON wrapper
260+ // Unknown info_key type
240261 if ( value && value !== "\\N" && value !== "NULL" ) {
241- return createFallbackJson ( value ) ;
262+ // Only warn and use fallback for hex-encoded values
263+ if ( value . startsWith ( "\\x" ) ) {
264+ const rowData = headers . map ( ( h , i ) => `${ h } =${ row [ i ] ?. substring ( 0 , 50 ) } ${ row [ i ] ?. length > 50 ? '...' : '' } ` ) . join ( ', ' ) ;
265+ console . warn (
266+ `⚠️ Unknown job_info.info_key type "${ infoKey } " with hex value:\n` +
267+ ` Row ${ rowIndex + 1 } : {${ rowData } }`
268+ ) ;
269+ return createFallbackJson ( value ) ;
270+ }
271+ // If it's not hex, it's probably already a string - return as-is
272+ return value ;
242273 }
243274 return value ;
244275 }
@@ -254,9 +285,10 @@ export function preprocessCSV(
254285 // If we have an explicit proto mapping, decode it
255286 if ( protoType && protoType !== "dynamic:job_info" ) {
256287 const currentColumnName = headers [ colIndex ] ;
288+ const isTraceData = protoType === "cockroach.sql.jobs.jobspb.TraceData" ;
257289 try {
258290 const bytes = hexToBytes ( value ) ;
259- const decoded = decoder . decode ( bytes , protoType ) ;
291+ const decoded = await decoder . decodeAsync ( bytes , protoType ) ;
260292
261293 // Don't use fallback for job_info - if the specific proto fails, leave as hex
262294 // The fallback was incorrectly decoding Progress data as SpanConfig
@@ -265,11 +297,56 @@ export function preprocessCSV(
265297 // Return as compact JSON string
266298 return JSON . stringify ( decoded . decoded ) ;
267299 } else {
268- console . warn ( `❌ Proto decode failed for ${ currentColumnName } :` , decoded . error ) ;
300+ // Decoding failed - use fallback
301+ const hexSample = value . substring ( 0 , 100 ) + ( value . length > 100 ? '...' : '' ) ;
302+ const rowData = headers . map ( ( h , i ) => `${ h } =${ row [ i ] ?. substring ( 0 , 50 ) } ${ row [ i ] ?. length > 50 ? '...' : '' } ` ) . join ( ', ' ) ;
303+
304+ if ( isTraceData ) {
305+ // TraceData decode failures - log to understand why
306+ console . warn (
307+ `⚠️ TraceData decode failed for ${ currentColumnName } :\n` +
308+ ` Row ${ rowIndex + 1 } : {${ rowData } }\n` +
309+ ` Hex (first 100 chars): ${ hexSample } \n` +
310+ ` Error: ${ decoded . error } `
311+ ) ;
312+ } else {
313+ // Show more details for debugging for other proto types
314+ console . warn (
315+ `❌ Proto decode failed for ${ currentColumnName } (${ protoType } ):\n` +
316+ ` Row ${ rowIndex + 1 } : {${ rowData } }\n` +
317+ ` Hex (first 100 chars): ${ hexSample } \n` +
318+ ` Error details:\n${ decoded . error } `
319+ ) ;
320+ }
321+ // Return hex-wrapped JSON for all failed decodes
322+ return createFallbackJson ( value ) ;
269323 }
270324 } catch ( err ) {
271- console . warn ( `❌ Proto decode exception for ${ currentColumnName } :` , err ) ;
272- // Don't let protobuf errors stop processing of subsequent rows
325+ // Decode exception - use fallback
326+ const hexSample = value . substring ( 0 , 100 ) + ( value . length > 100 ? '...' : '' ) ;
327+ const rowData = headers . map ( ( h , i ) => `${ h } =${ row [ i ] ?. substring ( 0 , 50 ) } ${ row [ i ] ?. length > 50 ? '...' : '' } ` ) . join ( ', ' ) ;
328+ const errorDetails = err instanceof Error
329+ ? `${ err . name } : ${ err . message } \n${ err . stack } `
330+ : String ( err ) ;
331+
332+ if ( isTraceData ) {
333+ // TraceData decode exceptions - log to understand why
334+ console . warn (
335+ `⚠️ TraceData decode exception for ${ currentColumnName } :\n` +
336+ ` Row ${ rowIndex + 1 } : {${ rowData } }\n` +
337+ ` Hex (first 100 chars): ${ hexSample } \n` +
338+ ` Exception: ${ errorDetails } `
339+ ) ;
340+ } else {
341+ console . warn (
342+ `❌ Proto decode exception for ${ currentColumnName } (${ protoType } ):\n` +
343+ ` Row ${ rowIndex + 1 } : {${ rowData } }\n` +
344+ ` Hex (first 100 chars): ${ hexSample } \n` +
345+ ` Error details:\n${ errorDetails } `
346+ ) ;
347+ }
348+ // Return hex-wrapped JSON for all exceptions
349+ return createFallbackJson ( value ) ;
273350 }
274351 }
275352 }
@@ -318,8 +395,8 @@ export function preprocessCSV(
318395 }
319396
320397 return value ;
321- } ) ;
322- } ) ;
398+ } ) ) ;
399+ } ) ) ;
323400
324401 // Reconstruct the CSV with transformed values
325402 const processedLines = [
0 commit comments