@@ -37,18 +37,29 @@ export class RealtimeClient {
3737 this . #registerCommands( ) ;
3838 }
3939
40- async streamRun ( url : URL | string , environment : RealtimeEnvironment , runId : string ) {
41- return this . #streamRunsWhere( url , environment , `id='${ runId } '` ) ;
40+ async streamRun (
41+ url : URL | string ,
42+ environment : RealtimeEnvironment ,
43+ runId : string ,
44+ clientVersion ?: string
45+ ) {
46+ return this . #streamRunsWhere( url , environment , `id='${ runId } '` , clientVersion ) ;
4247 }
4348
44- async streamBatch ( url : URL | string , environment : RealtimeEnvironment , batchId : string ) {
45- return this . #streamRunsWhere( url , environment , `"batchId"='${ batchId } '` ) ;
49+ async streamBatch (
50+ url : URL | string ,
51+ environment : RealtimeEnvironment ,
52+ batchId : string ,
53+ clientVersion ?: string
54+ ) {
55+ return this . #streamRunsWhere( url , environment , `"batchId"='${ batchId } '` , clientVersion ) ;
4656 }
4757
4858 async streamRuns (
4959 url : URL | string ,
5060 environment : RealtimeEnvironment ,
51- params : RealtimeRunsParams
61+ params : RealtimeRunsParams ,
62+ clientVersion ?: string
5263 ) {
5364 const whereClauses : string [ ] = [ `"runtimeEnvironmentId"='${ environment . id } '` ] ;
5465
@@ -58,16 +69,21 @@ export class RealtimeClient {
5869
5970 const whereClause = whereClauses . join ( " AND " ) ;
6071
61- return this . #streamRunsWhere( url , environment , whereClause ) ;
72+ return this . #streamRunsWhere( url , environment , whereClause , clientVersion ) ;
6273 }
6374
64- async #streamRunsWhere( url : URL | string , environment : RealtimeEnvironment , whereClause : string ) {
65- const electricUrl = this . #constructElectricUrl( url , whereClause ) ;
75+ async #streamRunsWhere(
76+ url : URL | string ,
77+ environment : RealtimeEnvironment ,
78+ whereClause : string ,
79+ clientVersion ?: string
80+ ) {
81+ const electricUrl = this . #constructElectricUrl( url , whereClause , clientVersion ) ;
6682
67- return this . #performElectricRequest( electricUrl , environment ) ;
83+ return this . #performElectricRequest( electricUrl , environment , clientVersion ) ;
6884 }
6985
70- #constructElectricUrl( url : URL | string , whereClause : string ) : URL {
86+ #constructElectricUrl( url : URL | string , whereClause : string , clientVersion ?: string ) : URL {
7187 const $url = new URL ( url . toString ( ) ) ;
7288
7389 const electricUrl = new URL ( `${ this . options . electricOrigin } /v1/shape` ) ;
@@ -77,36 +93,42 @@ export class RealtimeClient {
7793 electricUrl . searchParams . set ( key , value ) ;
7894 } ) ;
7995
80- // const electricParams = ["shape_id", "live", "offset", "columns", "cursor"];
81-
82- // electricParams.forEach((param) => {
83- // if ($url.searchParams.has(param) && $url.searchParams.get(param)) {
84- // electricUrl.searchParams.set(param, $url.searchParams.get(param)!);
85- // }
86- // });
87-
8896 electricUrl . searchParams . set ( "where" , whereClause ) ;
8997 electricUrl . searchParams . set ( "table" , 'public."TaskRun"' ) ;
9098
99+ if ( ! clientVersion ) {
100+ // If the client version is not provided, that means we're using an older client
101+ // This means the client will be sending shape_id instead of handle
102+ electricUrl . searchParams . set ( "handle" , electricUrl . searchParams . get ( "shape_id" ) ?? "" ) ;
103+ }
104+
91105 return electricUrl ;
92106 }
93107
94- async #performElectricRequest( url : URL , environment : RealtimeEnvironment ) {
108+ async #performElectricRequest(
109+ url : URL ,
110+ environment : RealtimeEnvironment ,
111+ clientVersion ?: string
112+ ) {
95113 const shapeId = extractShapeId ( url ) ;
96114
97115 logger . debug ( "[realtimeClient] request" , {
98116 url : url . toString ( ) ,
99117 } ) ;
100118
119+ const rewriteResponseHeaders : Record < string , string > = clientVersion
120+ ? { }
121+ : { "electric-handle" : "electric-shape-id" , "electric-offset" : "electric-chunk-last-offset" } ;
122+
101123 if ( ! shapeId ) {
102124 // If the shapeId is not present, we're just getting the initial value
103- return longPollingFetch ( url . toString ( ) ) ;
125+ return longPollingFetch ( url . toString ( ) , { } , rewriteResponseHeaders ) ;
104126 }
105127
106128 const isLive = isLiveRequestUrl ( url ) ;
107129
108130 if ( ! isLive ) {
109- return longPollingFetch ( url . toString ( ) ) ;
131+ return longPollingFetch ( url . toString ( ) , { } , rewriteResponseHeaders ) ;
110132 }
111133
112134 const requestId = randomUUID ( ) ;
@@ -148,7 +170,7 @@ export class RealtimeClient {
148170
149171 try {
150172 // ... (rest of your existing code for the long polling request)
151- const response = await longPollingFetch ( url . toString ( ) ) ;
173+ const response = await longPollingFetch ( url . toString ( ) , { } , rewriteResponseHeaders ) ;
152174
153175 // Decrement the counter after the long polling request is complete
154176 await this . #decrementConcurrency( environment . id , requestId ) ;
0 commit comments