11import { chQuery } from "@databuddy/db" ;
2+ import { record , setAttributes } from "../lib/tracing" ;
23import { QueryBuilders } from "./builders" ;
34import { SimpleQueryBuilder } from "./simple-builder" ;
45import type { QueryRequest , SimpleQueryConfig } from "./types" ;
@@ -13,53 +14,68 @@ function getSchemaSignature(config: SimpleQueryConfig): string | null {
1314 return fields ?. length ? fields . map ( ( f ) => `${ f . name } :${ f . type } ` ) . join ( "," ) : null ;
1415}
1516
16- async function runSingle ( req : BatchRequest , opts ?: BatchOptions ) : Promise < BatchResult > {
17+ function runSingle ( req : BatchRequest , opts ?: BatchOptions ) : Promise < BatchResult > {
1718 const config = QueryBuilders [ req . type ] ;
1819 if ( ! config ) {
19- return { type : req . type , data : [ ] , error : `Unknown query type: ${ req . type } ` } ;
20+ return Promise . resolve ( { type : req . type , data : [ ] , error : `Unknown query type: ${ req . type } ` } ) ;
2021 }
2122
22- try {
23- const builder = new SimpleQueryBuilder (
24- config ,
25- { ...req , timezone : opts ?. timezone ?? req . timezone } ,
26- opts ?. websiteDomain
27- ) ;
28- return { type : req . type , data : await builder . execute ( ) } ;
29- } catch ( e ) {
30- return { type : req . type , data : [ ] , error : e instanceof Error ? e . message : "Query failed" } ;
31- }
23+ return record ( `query.single.${ req . type } ` , async ( ) => {
24+ const startTime = performance . now ( ) ;
25+ try {
26+ const builder = new SimpleQueryBuilder (
27+ config ,
28+ { ...req , timezone : opts ?. timezone ?? req . timezone } ,
29+ opts ?. websiteDomain
30+ ) ;
31+ const data = await builder . execute ( ) ;
32+
33+ setAttributes ( {
34+ "query.type" : req . type ,
35+ "query.from" : req . from ,
36+ "query.to" : req . to ,
37+ "query.rows" : data . length ,
38+ "query.duration_ms" : Math . round ( performance . now ( ) - startTime ) ,
39+ } ) ;
40+
41+ return { type : req . type , data } ;
42+ } catch ( e ) {
43+ const error = e instanceof Error ? e . message : "Query failed" ;
44+ setAttributes ( { "query.error" : error } ) ;
45+ return { type : req . type , data : [ ] , error } ;
46+ }
47+ } ) ;
3248}
3349
34- function groupBySchema ( requests : BatchRequest [ ] ) : Map < string , BatchRequest [ ] > {
35- const groups = new Map < string , BatchRequest [ ] > ( ) ;
50+ function groupBySchema ( requests : BatchRequest [ ] ) : Map < string , { index : number ; req : BatchRequest } [ ] > {
51+ const groups = new Map < string , { index : number ; req : BatchRequest } [ ] > ( ) ;
52+
53+ for ( let i = 0 ; i < requests . length ; i ++ ) {
54+ const req = requests [ i ] ;
55+ if ( ! req ) {
56+ continue ;
57+ }
3658
37- for ( const req of requests ) {
3859 const config = QueryBuilders [ req . type ] ;
3960 if ( ! config ) {
4061 continue ;
4162 }
4263
4364 const sig = getSchemaSignature ( config ) || `__solo_${ req . type } ` ;
4465 const list = groups . get ( sig ) || [ ] ;
45- list . push ( req ) ;
66+ list . push ( { index : i , req } ) ;
4667 groups . set ( sig , list ) ;
4768 }
4869
4970 return groups ;
5071}
5172
52- function buildUnionQuery ( requests : BatchRequest [ ] , opts ?: BatchOptions ) {
73+ function buildUnionQuery ( items : { index : number ; req : BatchRequest } [ ] , opts ?: BatchOptions ) {
5374 const queries : string [ ] = [ ] ;
5475 const params : Record < string , unknown > = { } ;
55- const types : string [ ] = [ ] ;
56-
57- for ( let i = 0 ; i < requests . length ; i ++ ) {
58- const req = requests [ i ] ;
59- if ( ! req ) {
60- continue ;
61- }
76+ const indices : number [ ] = [ ] ;
6277
78+ for ( const { index, req } of items ) {
6379 const config = QueryBuilders [ req . type ] ;
6480 if ( ! config ) {
6581 continue ;
@@ -74,74 +90,104 @@ function buildUnionQuery(requests: BatchRequest[], opts?: BatchOptions) {
7490 let { sql, params : queryParams } = builder . compile ( ) ;
7591
7692 for ( const [ key , value ] of Object . entries ( queryParams ) ) {
77- const prefixedKey = `q${ i } _${ key } ` ;
93+ const prefixedKey = `q${ index } _${ key } ` ;
7894 params [ prefixedKey ] = value ;
7995 sql = sql . replaceAll ( `{${ key } :` , `{${ prefixedKey } :` ) ;
8096 }
8197
82- types . push ( req . type ) ;
83- queries . push ( `SELECT ' ${ req . type } ' as __query_type , * FROM (${ sql } )` ) ;
98+ indices . push ( index ) ;
99+ queries . push ( `SELECT ${ index } as __query_idx , * FROM (${ sql } )` ) ;
84100 }
85101
86- return { sql : queries . join ( "\nUNION ALL\n" ) , params, types } ;
102+ return { sql : queries . join ( "\nUNION ALL\n" ) , params, indices } ;
87103}
88104
89105function splitResults (
90- rows : Array < Record < string , unknown > & { __query_type : string } > ,
91- types : string [ ]
92- ) : Map < string , Record < string , unknown > [ ] > {
93- const byType = new Map < string , Record < string , unknown > [ ] > ( types . map ( ( t ) => [ t , [ ] ] ) ) ;
106+ rows : Array < Record < string , unknown > & { __query_idx : number } > ,
107+ indices : number [ ]
108+ ) : Map < number , Record < string , unknown > [ ] > {
109+ const byIndex = new Map < number , Record < string , unknown > [ ] > ( indices . map ( ( i ) => [ i , [ ] ] ) ) ;
94110
95- for ( const { __query_type , ...rest } of rows ) {
96- byType . get ( __query_type ) ?. push ( rest ) ;
111+ for ( const { __query_idx , ...rest } of rows ) {
112+ byIndex . get ( __query_idx ) ?. push ( rest ) ;
97113 }
98114
99- return byType ;
115+ return byIndex ;
100116}
101117
102- export async function executeBatch ( requests : BatchRequest [ ] , opts ?: BatchOptions ) : Promise < BatchResult [ ] > {
118+ export function executeBatch ( requests : BatchRequest [ ] , opts ?: BatchOptions ) : Promise < BatchResult [ ] > {
103119 if ( requests . length === 0 ) {
104- return [ ] ;
105- }
106- if ( requests . length === 1 && requests [ 0 ] ) {
107- return [ await runSingle ( requests [ 0 ] , opts ) ] ;
120+ return Promise . resolve ( [ ] ) ;
108121 }
109122
110- const groups = groupBySchema ( requests ) ;
111- const results : BatchResult [ ] = [ ] ;
123+ return record ( "query.batch" , async ( ) => {
124+ const startTime = performance . now ( ) ;
112125
113- for ( const groupReqs of groups . values ( ) ) {
114- if ( groupReqs . length === 0 ) {
115- continue ;
116- }
126+ setAttributes ( {
127+ "batch.size" : requests . length ,
128+ "batch.types" : requests . map ( ( r ) => r . type ) . join ( "," ) ,
129+ } ) ;
117130
118- if ( groupReqs . length === 1 && groupReqs [ 0 ] ) {
119- results . push ( await runSingle ( groupReqs [ 0 ] , opts ) ) ;
120- continue ;
131+ if ( requests . length === 1 && requests [ 0 ] ) {
132+ return [ await runSingle ( requests [ 0 ] , opts ) ] ;
121133 }
122134
123- try {
124- const { sql, params, types } = buildUnionQuery ( groupReqs , opts ) ;
125- const rawRows = await chQuery ( sql , params ) ;
126- const split = splitResults ( rawRows as Array < Record < string , unknown > & { __query_type : string } > , types ) ;
127-
128- for ( const type of types ) {
129- const config = QueryBuilders [ type ] ;
130- const raw = split . get ( type ) || [ ] ;
131- results . push ( {
132- type,
133- data : config ? applyPlugins ( raw , config , opts ?. websiteDomain ) : raw ,
134- } ) ;
135+ const groups = groupBySchema ( requests ) ;
136+ const results : BatchResult [ ] = Array . from ( { length : requests . length } ) ;
137+ let unionCount = 0 ;
138+ let singleCount = 0 ;
139+
140+ for ( const groupItems of groups . values ( ) ) {
141+ if ( groupItems . length === 0 ) {
142+ continue ;
135143 }
136- } catch {
137- for ( const req of groupReqs ) {
138- results . push ( await runSingle ( req , opts ) ) ;
144+
145+ if ( groupItems . length === 1 && groupItems [ 0 ] ) {
146+ const { index, req } = groupItems [ 0 ] ;
147+ results [ index ] = await runSingle ( req , opts ) ;
148+ singleCount += 1 ;
149+ continue ;
150+ }
151+
152+ try {
153+ const { sql, params, indices } = buildUnionQuery ( groupItems , opts ) ;
154+ const queryStart = performance . now ( ) ;
155+ const rawRows = await chQuery ( sql , params ) ;
156+ const queryDuration = Math . round ( performance . now ( ) - queryStart ) ;
157+
158+ setAttributes ( {
159+ "batch.union.query_count" : indices . length ,
160+ "batch.union.rows" : rawRows . length ,
161+ "batch.union.duration_ms" : queryDuration ,
162+ } ) ;
163+
164+ const split = splitResults ( rawRows as Array < Record < string , unknown > & { __query_idx : number } > , indices ) ;
165+
166+ for ( const { index, req } of groupItems ) {
167+ const config = QueryBuilders [ req . type ] ;
168+ const raw = split . get ( index ) || [ ] ;
169+ results [ index ] = {
170+ type : req . type ,
171+ data : config ? applyPlugins ( raw , config , opts ?. websiteDomain ) : raw ,
172+ } ;
173+ }
174+ unionCount += 1 ;
175+ } catch {
176+ for ( const { index, req } of groupItems ) {
177+ results [ index ] = await runSingle ( req , opts ) ;
178+ singleCount += 1 ;
179+ }
139180 }
140181 }
141- }
142182
143- const resultMap = new Map ( results . map ( ( r ) => [ r . type , r ] ) ) ;
144- return requests . map ( ( req ) => resultMap . get ( req . type ) || { type : req . type , data : [ ] } ) ;
183+ setAttributes ( {
184+ "batch.union_groups" : unionCount ,
185+ "batch.single_queries" : singleCount ,
186+ "batch.duration_ms" : Math . round ( performance . now ( ) - startTime ) ,
187+ } ) ;
188+
189+ return results . map ( ( r , i ) => r || { type : requests [ i ] ?. type || "unknown" , data : [ ] } ) ;
190+ } ) ;
145191}
146192
147193export function areQueriesCompatible ( type1 : string , type2 : string ) : boolean {
0 commit comments