@@ -19,103 +19,123 @@ ALTER TABLE "workflow_execution_logs"
1919 ADD COLUMN IF NOT EXISTS " execution_data" jsonb NOT NULL DEFAULT ' {}' ::jsonb,
2020 ADD COLUMN IF NOT EXISTS " cost" jsonb;-- > statement-breakpoint
2121
22- -- 2) Backfill top-level cost from legacy numeric columns, tokenBreakdown/models, and traceSpans aggregates
23- WITH RECURSIVE spans AS (
24- SELECT l .id , s .span
25- FROM workflow_execution_logs l
26- LEFT JOIN LATERAL jsonb_array_elements(
27- COALESCE(
28- CASE
29- WHEN jsonb_typeof(l .execution_data - > ' traceSpans' ) = ' array' THEN l .execution_data - > ' traceSpans'
30- ELSE ' []' ::jsonb
31- END
32- )
33- ) s(span) ON true
34- UNION ALL
35- SELECT spans .id , c .span
36- FROM spans
37- JOIN LATERAL jsonb_array_elements(COALESCE(spans .span - > ' children' ,' []' ::jsonb)) c(span) ON true
38- ),
39- agg AS (
40- SELECT id,
41- SUM (COALESCE((span- > ' cost' - >> ' input' )::numeric ,0 )) AS agg_input,
42- SUM (COALESCE((span- > ' cost' - >> ' output' )::numeric ,0 )) AS agg_output,
43- SUM (COALESCE((span- > ' cost' - >> ' total' )::numeric ,0 )) AS agg_total,
44- SUM (COALESCE((span- > ' cost' - > ' tokens' - >> ' prompt' )::numeric , COALESCE((span- > ' tokens' - >> ' prompt' )::numeric ,0 ))) AS agg_tokens_prompt,
45- SUM (COALESCE((span- > ' cost' - > ' tokens' - >> ' completion' )::numeric , COALESCE((span- > ' tokens' - >> ' completion' )::numeric ,0 ))) AS agg_tokens_completion,
46- SUM (COALESCE((span- > ' cost' - > ' tokens' - >> ' total' )::numeric , COALESCE((span- > ' tokens' - >> ' total' )::numeric ,0 ))) AS agg_tokens_total
47- FROM spans
48- GROUP BY id
49- ),
50- model_rows AS (
51- SELECT id,
52- (span- > ' cost' - >> ' model' ) AS model,
53- COALESCE((span- > ' cost' - >> ' input' )::numeric ,0 ) AS input,
54- COALESCE((span- > ' cost' - >> ' output' )::numeric ,0 ) AS output,
55- COALESCE((span- > ' cost' - >> ' total' )::numeric ,0 ) AS total,
56- COALESCE((span- > ' cost' - > ' tokens' - >> ' prompt' )::numeric ,0 ) AS tokens_prompt,
57- COALESCE((span- > ' cost' - > ' tokens' - >> ' completion' )::numeric ,0 ) AS tokens_completion,
58- COALESCE((span- > ' cost' - > ' tokens' - >> ' total' )::numeric ,0 ) AS tokens_total
59- FROM spans
60- WHERE span ? ' cost' AND (span- > ' cost' - >> ' model' ) IS NOT NULL
61- ),
62- model_sums AS (
63- SELECT id,
64- model,
65- SUM (input) AS input,
66- SUM (output) AS output,
67- SUM (total) AS total,
68- SUM (tokens_prompt) AS tokens_prompt,
69- SUM (tokens_completion) AS tokens_completion,
70- SUM (tokens_total) AS tokens_total
71- FROM model_rows
72- GROUP BY id, model
73- ),
74- models AS (
75- SELECT id,
76- jsonb_object_agg(model, jsonb_build_object(
77- ' input' , input,
78- ' output' , output,
79- ' total' , total,
80- ' tokens' , jsonb_build_object(
81- ' prompt' , tokens_prompt,
82- ' completion' , tokens_completion,
83- ' total' , tokens_total
84- )
85- )) AS models
86- FROM model_sums
87- GROUP BY id
88- ),
89- tb AS (
90- SELECT l .id ,
91- NULLIF((l .execution_data - > ' tokenBreakdown' - >> ' prompt' )::numeric , 0 ) AS prompt,
92- NULLIF((l .execution_data - > ' tokenBreakdown' - >> ' completion' )::numeric , 0 ) AS completion
93- FROM workflow_execution_logs l
94- )
95- UPDATE workflow_execution_logs AS l
96- SET cost = jsonb_strip_nulls(
97- jsonb_build_object(
98- ' total' , COALESCE(l .total_cost , NULLIF(agg .agg_total ,0 )),
99- ' input' , COALESCE(l .total_input_cost , NULLIF(agg .agg_input ,0 )),
100- ' output' , COALESCE(l .total_output_cost , NULLIF(agg .agg_output ,0 )),
101- ' tokens' , CASE
102- WHEN l .total_tokens IS NOT NULL OR tb .prompt IS NOT NULL OR tb .completion IS NOT NULL OR NULLIF(agg .agg_tokens_total ,0 ) IS NOT NULL THEN
103- jsonb_strip_nulls(
104- jsonb_build_object(
105- ' total' , COALESCE(l .total_tokens , NULLIF(agg .agg_tokens_total ,0 )),
106- ' prompt' , COALESCE(tb .prompt , NULLIF(agg .agg_tokens_prompt ,0 )),
107- ' completion' , COALESCE(tb .completion , NULLIF(agg .agg_tokens_completion ,0 ))
108- )
22+ -- Process the backfill in batches to avoid large temporary files on big datasets
23+ DO $$
24+ DECLARE
25+ v_batch_size integer := 5000 ; -- tune if needed based on dataset size
26+ v_rows_updated integer := 0 ;
27+ BEGIN
28+ LOOP
29+ WITH candidate AS (
30+ SELECT id
31+ FROM workflow_execution_logs
32+ WHERE cost IS NULL
33+ ORDER BY id
34+ LIMIT v_batch_size
35+ ),
36+ spans AS (
37+ SELECT l .id , s .span
38+ FROM workflow_execution_logs l
39+ JOIN candidate c ON c .id = l .id
40+ LEFT JOIN LATERAL jsonb_array_elements(
41+ COALESCE(
42+ CASE
43+ WHEN jsonb_typeof(l .execution_data - > ' traceSpans' ) = ' array' THEN l .execution_data - > ' traceSpans'
44+ ELSE ' []' ::jsonb
45+ END
10946 )
110- ELSE NULL
111- END,
112- ' models' , models .models
113- )
114- )
115- FROM agg
116- LEFT JOIN models ON models .id = agg .id
117- LEFT JOIN tb ON tb .id = agg .id
118- WHERE l .id = agg .id ;-- > statement-breakpoint
47+ ) s(span) ON true
48+ UNION ALL
49+ SELECT spans .id , c .span
50+ FROM spans
51+ JOIN LATERAL jsonb_array_elements(COALESCE(spans .span - > ' children' ,' []' ::jsonb)) c(span) ON true
52+ ),
53+ agg AS (
54+ SELECT id,
55+ SUM (COALESCE((span- > ' cost' - >> ' input' )::numeric ,0 )) AS agg_input,
56+ SUM (COALESCE((span- > ' cost' - >> ' output' )::numeric ,0 )) AS agg_output,
57+ SUM (COALESCE((span- > ' cost' - >> ' total' )::numeric ,0 )) AS agg_total,
58+ SUM (COALESCE((span- > ' cost' - > ' tokens' - >> ' prompt' )::numeric , COALESCE((span- > ' tokens' - >> ' prompt' )::numeric ,0 ))) AS agg_tokens_prompt,
59+ SUM (COALESCE((span- > ' cost' - > ' tokens' - >> ' completion' )::numeric , COALESCE((span- > ' tokens' - >> ' completion' )::numeric ,0 ))) AS agg_tokens_completion,
60+ SUM (COALESCE((span- > ' cost' - > ' tokens' - >> ' total' )::numeric , COALESCE((span- > ' tokens' - >> ' total' )::numeric ,0 ))) AS agg_tokens_total
61+ FROM spans
62+ GROUP BY id
63+ ),
64+ model_rows AS (
65+ SELECT id,
66+ (span- > ' cost' - >> ' model' ) AS model,
67+ COALESCE((span- > ' cost' - >> ' input' )::numeric ,0 ) AS input,
68+ COALESCE((span- > ' cost' - >> ' output' )::numeric ,0 ) AS output,
69+ COALESCE((span- > ' cost' - >> ' total' )::numeric ,0 ) AS total,
70+ COALESCE((span- > ' cost' - > ' tokens' - >> ' prompt' )::numeric ,0 ) AS tokens_prompt,
71+ COALESCE((span- > ' cost' - > ' tokens' - >> ' completion' )::numeric ,0 ) AS tokens_completion,
72+ COALESCE((span- > ' cost' - > ' tokens' - >> ' total' )::numeric ,0 ) AS tokens_total
73+ FROM spans
74+ WHERE span ? ' cost' AND (span- > ' cost' - >> ' model' ) IS NOT NULL
75+ ),
76+ model_sums AS (
77+ SELECT id,
78+ model,
79+ SUM (input) AS input,
80+ SUM (output) AS output,
81+ SUM (total) AS total,
82+ SUM (tokens_prompt) AS tokens_prompt,
83+ SUM (tokens_completion) AS tokens_completion,
84+ SUM (tokens_total) AS tokens_total
85+ FROM model_rows
86+ GROUP BY id, model
87+ ),
88+ models AS (
89+ SELECT id,
90+ jsonb_object_agg(model, jsonb_build_object(
91+ ' input' , input,
92+ ' output' , output,
93+ ' total' , total,
94+ ' tokens' , jsonb_build_object(
95+ ' prompt' , tokens_prompt,
96+ ' completion' , tokens_completion,
97+ ' total' , tokens_total
98+ )
99+ )) AS models
100+ FROM model_sums
101+ GROUP BY id
102+ ),
103+ tb AS (
104+ SELECT l .id ,
105+ NULLIF((l .execution_data - > ' tokenBreakdown' - >> ' prompt' )::numeric , 0 ) AS prompt,
106+ NULLIF((l .execution_data - > ' tokenBreakdown' - >> ' completion' )::numeric , 0 ) AS completion
107+ FROM workflow_execution_logs l
108+ JOIN candidate c ON c .id = l .id
109+ )
110+ UPDATE workflow_execution_logs AS l
111+ SET cost = jsonb_strip_nulls(
112+ jsonb_build_object(
113+ ' total' , COALESCE(l .total_cost , NULLIF(agg .agg_total ,0 )),
114+ ' input' , COALESCE(l .total_input_cost , NULLIF(agg .agg_input ,0 )),
115+ ' output' , COALESCE(l .total_output_cost , NULLIF(agg .agg_output ,0 )),
116+ ' tokens' , CASE
117+ WHEN l .total_tokens IS NOT NULL OR tb .prompt IS NOT NULL OR tb .completion IS NOT NULL OR NULLIF(agg .agg_tokens_total ,0 ) IS NOT NULL THEN
118+ jsonb_strip_nulls(
119+ jsonb_build_object(
120+ ' total' , COALESCE(l .total_tokens , NULLIF(agg .agg_tokens_total ,0 )),
121+ ' prompt' , COALESCE(tb .prompt , NULLIF(agg .agg_tokens_prompt ,0 )),
122+ ' completion' , COALESCE(tb .completion , NULLIF(agg .agg_tokens_completion ,0 ))
123+ )
124+ )
125+ ELSE NULL
126+ END,
127+ ' models' , models .models
128+ )
129+ )
130+ FROM agg
131+ LEFT JOIN models ON models .id = agg .id
132+ LEFT JOIN tb ON tb .id = agg .id
133+ WHERE l .id = agg .id ;
134+
135+ GET DIAGNOSTICS v_rows_updated = ROW_COUNT;
136+ EXIT WHEN v_rows_updated = 0 ; -- no more rows to backfill
137+ END LOOP;
138+ END $$;-- > statement-breakpoint
119139
120140-- 3) Drop legacy columns now that backfill is complete
121141ALTER TABLE " workflow_execution_logs"
0 commit comments