@@ -16,10 +16,12 @@ import {
1616 PostgresConnection ,
1717 type PostgresConnectionOptions ,
1818 FrontendMessageCode ,
19+ BackendMessageCode ,
1920} from "pg-gateway" ;
2021import { logger } from "../../logger" ;
2122import { hasMessage , FirebaseError } from "../../error" ;
2223import { moveAll } from "../../fsutils" ;
24+ import { StringDecoder } from "node:string_decoder" ;
2325
2426export const TRUNCATE_TABLES_SQL = `
2527DO $do$
3436END
3537$do$;` ;
3638
39+ const decoder = new StringDecoder ( ) ;
40+ const pgliteDebugLog = fs . createWriteStream ( "pglite-debug.log" ) ;
41+
3742export class PostgresServer {
3843 private baseDataDirectory ?: string ;
3944 private importPath ?: string ;
@@ -46,7 +51,7 @@ export class PostgresServer {
4651 const getDb = this . getDb . bind ( this ) ;
4752
4853 const server = net . createServer ( async ( socket ) => {
49- await fromNodeSocket ( socket , {
54+ const connection = await fromNodeSocket ( socket , {
5055 serverVersion : "17.4 (PGlite 0.3.3)" ,
5156 auth : { method : "trust" } ,
5257
@@ -62,13 +67,16 @@ export class PostgresServer {
6267 await db . query ( "DEALLOCATE ALL" ) ;
6368 }
6469 const response = await db . execProtocolRaw ( data ) ;
65-
66- for await ( const message of getMessages ( response ) ) {
70+ for await ( const message of extendedQueryPatch . filterResponse ( data , response ) ) {
6771 yield message ;
6872 }
73+
74+ // Extended query patch removes the extra Ready for Query messages that
75+ // pglite wrongly sends.
6976 } ,
7077 } ) ;
7178
79+ const extendedQueryPatch : PGliteExtendedQueryPatch = new PGliteExtendedQueryPatch ( connection ) ;
7280 socket . on ( "end" , ( ) => {
7381 logger . debug ( "Postgres client disconnected" ) ;
7482 } ) ;
@@ -247,7 +255,7 @@ export class PostgresServer {
247255 constructor ( args : { dataDirectory ?: string ; importPath ?: string ; debug ?: boolean } ) {
248256 this . baseDataDirectory = args . dataDirectory ;
249257 this . importPath = args . importPath ;
250- this . debug = args . debug ? 5 : 0 ;
258+ this . debug = args . debug ? 1 : 0 ;
251259 }
252260}
253261
@@ -271,3 +279,51 @@ export async function fromNodeSocket(socket: net.Socket, options?: PostgresConne
271279
272280 return new PostgresConnection ( { readable : rs , writable : ws } , opts ) ;
273281}
282+
283+ export class PGliteExtendedQueryPatch {
284+ isExtendedQuery = false ;
285+ eqpErrored = false ;
286+
287+ constructor ( public connection : PostgresConnection ) { }
288+
289+ async * filterResponse ( message : Uint8Array , response : Uint8Array ) {
290+ // 'Parse' indicates the start of an extended query
291+ const pipelineStartMessages : number [ ] = [
292+ FrontendMessageCode . Parse ,
293+ FrontendMessageCode . Bind ,
294+ FrontendMessageCode . Close ,
295+ ] ;
296+ const decoded = decoder . write ( message as any as Buffer ) ;
297+
298+ pgliteDebugLog . write ( "Front: " + decoded ) ;
299+
300+ if ( pipelineStartMessages . includes ( message [ 0 ] ) ) {
301+ this . isExtendedQuery = true ;
302+ }
303+
304+ // 'Sync' indicates the end of an extended query
305+ if ( message [ 0 ] === FrontendMessageCode . Sync ) {
306+ this . isExtendedQuery = false ;
307+ this . eqpErrored = false ;
308+ }
309+
310+ // A PGlite response can contain multiple messages
311+ for await ( const bm of getMessages ( response ) ) {
312+ // After an ErrorMessage in extended query protocol, we should throw away messages until the next Sync
313+ // (per https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY:~:text=When%20an%20error,for%20each%20Sync.))
314+ if ( this . eqpErrored ) {
315+ continue ;
316+ }
317+ if ( this . isExtendedQuery && bm [ 0 ] === BackendMessageCode . ErrorMessage ) {
318+ this . eqpErrored = true ;
319+ }
320+ // Filter out incorrect `ReadyForQuery` messages during the extended query protocol
321+ if ( this . isExtendedQuery && bm [ 0 ] === BackendMessageCode . ReadyForQuery ) {
322+ pgliteDebugLog . write ( "Filtered: " + decoder . write ( bm as any as Buffer ) ) ;
323+ continue ;
324+ }
325+ pgliteDebugLog . write ( "Sent: " + decoder . write ( bm as any as Buffer ) ) ;
326+ yield bm ;
327+ }
328+ }
329+ }
0 commit comments