diff --git a/npm-shrinkwrap.json b/npm-shrinkwrap.json index 6632cbeb694..b2e613eec11 100644 --- a/npm-shrinkwrap.json +++ b/npm-shrinkwrap.json @@ -11,8 +11,8 @@ "dependencies": { "@apphosting/build": "^0.1.6", "@apphosting/common": "^0.0.8", - "@electric-sql/pglite": "^0.3.3", - "@electric-sql/pglite-tools": "^0.2.8", + "@electric-sql/pglite": "^0.3.14", + "@electric-sql/pglite-tools": "^0.2.19", "@google-cloud/cloud-sql-connector": "^1.3.3", "@google-cloud/pubsub": "^5.2.0", "@inquirer/prompts": "^7.10.1", @@ -962,18 +962,18 @@ } }, "node_modules/@electric-sql/pglite": { - "version": "0.3.3", - "resolved": "https://registry.npmjs.org/@electric-sql/pglite/-/pglite-0.3.3.tgz", - "integrity": "sha512-JrvHOx9q0yvKEby0bK8qzGTVw6K+yEg8enxDWb2IwNKr5XZxRrBb+GNIqoAIP7yXyhRg5jcENWmdHmtnAT87vA==", + "version": "0.3.14", + "resolved": "https://us-npm.pkg.dev/artifact-foundry-prod/ah-3p-staging-npm/@electric-sql/pglite/-/pglite-0.3.14.tgz", + "integrity": "sha512-3DB258dhqdsArOI1fIt7cb9RpUOgcDg5hXWVgVHAeqVQ/qxtFy605QKs4gx6mFq3jWsSPqDN8TgSEsqC3OfV9Q==", "license": "Apache-2.0" }, "node_modules/@electric-sql/pglite-tools": { - "version": "0.2.8", - "resolved": "https://registry.npmjs.org/@electric-sql/pglite-tools/-/pglite-tools-0.2.8.tgz", - "integrity": "sha512-MBWelYjUZThOBrktPU4beuuX4hrUdIPRgfLbTgltLMT6Chh2R7ATxHsT9Nr7L9fXUSYlZCyoIf+n8pis3uoiiw==", + "version": "0.2.19", + "resolved": "https://us-npm.pkg.dev/artifact-foundry-prod/ah-3p-staging-npm/@electric-sql/pglite-tools/-/pglite-tools-0.2.19.tgz", + "integrity": "sha512-Ls4ZcSymnFRlEHtDyO3k9qPXLg7awfRAE3YnXk4WLsint17JBsU4UEX8le9YE8SgPkWNnQC898SqbFGGU/5JUA==", "license": "Apache-2.0", "peerDependencies": { - "@electric-sql/pglite": "0.3.3" + "@electric-sql/pglite": "0.3.14" } }, "node_modules/@emmetio/abbreviation": { @@ -22857,14 +22857,14 @@ } }, "@electric-sql/pglite": { - "version": "0.3.3", - "resolved": "https://registry.npmjs.org/@electric-sql/pglite/-/pglite-0.3.3.tgz", - "integrity": "sha512-JrvHOx9q0yvKEby0bK8qzGTVw6K+yEg8enxDWb2IwNKr5XZxRrBb+GNIqoAIP7yXyhRg5jcENWmdHmtnAT87vA==" + "version": "0.3.14", + "resolved": "https://us-npm.pkg.dev/artifact-foundry-prod/ah-3p-staging-npm/@electric-sql/pglite/-/pglite-0.3.14.tgz", + "integrity": "sha512-3DB258dhqdsArOI1fIt7cb9RpUOgcDg5hXWVgVHAeqVQ/qxtFy605QKs4gx6mFq3jWsSPqDN8TgSEsqC3OfV9Q==" }, "@electric-sql/pglite-tools": { - "version": "0.2.8", - "resolved": "https://registry.npmjs.org/@electric-sql/pglite-tools/-/pglite-tools-0.2.8.tgz", - "integrity": "sha512-MBWelYjUZThOBrktPU4beuuX4hrUdIPRgfLbTgltLMT6Chh2R7ATxHsT9Nr7L9fXUSYlZCyoIf+n8pis3uoiiw==", + "version": "0.2.19", + "resolved": "https://us-npm.pkg.dev/artifact-foundry-prod/ah-3p-staging-npm/@electric-sql/pglite-tools/-/pglite-tools-0.2.19.tgz", + "integrity": "sha512-Ls4ZcSymnFRlEHtDyO3k9qPXLg7awfRAE3YnXk4WLsint17JBsU4UEX8le9YE8SgPkWNnQC898SqbFGGU/5JUA==", "requires": {} }, "@emmetio/abbreviation": { @@ -26646,7 +26646,7 @@ "resolved": "https://registry.npmjs.org/ajv-formats/-/ajv-formats-3.0.1.tgz", "integrity": "sha512-8iUql50EUR+uUcdRQ3HDqa6EVyo3docL8g5WJ3FNcWmu62IbkGUue/pEyLBW8VGKKucTPgqeks4fIU1DA4yowQ==", "requires": { - "ajv": "^8.0.0" + "ajv": "^8.17.1" } }, "ansi-align": { diff --git a/package.json b/package.json index 54647b42399..72aeb930351 100644 --- a/package.json +++ b/package.json @@ -106,8 +106,8 @@ "dependencies": { "@apphosting/build": "^0.1.6", "@apphosting/common": "^0.0.8", - "@electric-sql/pglite": "^0.3.3", - "@electric-sql/pglite-tools": "^0.2.8", + "@electric-sql/pglite": "^0.3.14", + "@electric-sql/pglite-tools": "^0.2.19", "@google-cloud/cloud-sql-connector": "^1.3.3", "@google-cloud/pubsub": "^5.2.0", "@inquirer/prompts": "^7.10.1", diff --git a/src/emulator/dataconnect/pgliteServer.ts b/src/emulator/dataconnect/pgliteServer.ts index 381a91143a4..aa21aed4950 100644 --- a/src/emulator/dataconnect/pgliteServer.ts +++ b/src/emulator/dataconnect/pgliteServer.ts @@ -66,7 +66,7 @@ export class PostgresServer { await db.query("DEALLOCATE ALL"); } const response = await db.execProtocolRaw(data); - for await (const message of extendedQueryPatch.filterResponse(data, response)) { + for await (const message of extendedQueryPatch.getMessages(data, response)) { yield message; } @@ -75,7 +75,7 @@ export class PostgresServer { }, }); - const extendedQueryPatch: PGliteExtendedQueryPatch = new PGliteExtendedQueryPatch(connection); + const extendedQueryPatch: PGlitePatch = new PGlitePatch(connection); socket.on("end", () => { logger.debug("Postgres client disconnected"); }); @@ -279,52 +279,44 @@ export async function fromNodeSocket(socket: net.Socket, options?: PostgresConne return new PostgresConnection({ readable: rs, writable: ws }, opts); } -export class PGliteExtendedQueryPatch { - isExtendedQuery = false; - eqpErrored = false; - pgliteDebugLog: fs.WriteStream; - - constructor(public connection: PostgresConnection) { - this.pgliteDebugLog = fs.createWriteStream("pglite-debug.log"); +const CODE_TO_FRONTEND_MESSAGE: Record = {}; +for (const key in FrontendMessageCode) { + if (Object.prototype.hasOwnProperty.call(FrontendMessageCode, key)) { + CODE_TO_FRONTEND_MESSAGE[FrontendMessageCode[key as keyof typeof FrontendMessageCode]] = key; } +} - async *filterResponse(message: Uint8Array, response: Uint8Array) { - // 'Parse' indicates the start of an extended query - const pipelineStartMessages: number[] = [ - FrontendMessageCode.Parse, - FrontendMessageCode.Bind, - FrontendMessageCode.Close, - ]; - const decoded = decoder.write(message as any as Buffer); +const CODE_TO_BACKEND_MESSAGE: Record = {}; +for (const key in BackendMessageCode) { + if (Object.prototype.hasOwnProperty.call(BackendMessageCode, key)) { + CODE_TO_BACKEND_MESSAGE[BackendMessageCode[key as keyof typeof BackendMessageCode]] = key; + } +} - this.pgliteDebugLog.write("Front: " + decoded); +function codeToFrontendMessageName(code: number): string { + return CODE_TO_FRONTEND_MESSAGE[code] || `UNKNOWN_FRONTEND_CODE_${code}`; +} - if (pipelineStartMessages.includes(message[0])) { - this.isExtendedQuery = true; - } +function codeTogBackendMessageName(code: number): string { + return CODE_TO_BACKEND_MESSAGE[code] || `UNKNOWN_BACKEND_CODE_${code}`; +} +export class PGlitePatch { + pgliteDebugLog: fs.WriteStream; - // 'Sync' indicates the end of an extended query - if (message[0] === FrontendMessageCode.Sync) { - this.isExtendedQuery = false; - this.eqpErrored = false; - } + constructor(public connection: PostgresConnection) { + this.pgliteDebugLog = fs.createWriteStream("pglite-debug.log"); + } + async *getMessages(request: Uint8Array, response: Uint8Array) { + this.pgliteDebugLog.write( + `\n[-> ${codeToFrontendMessageName(request[0])}] ` + decoder.write(request as any as Buffer), + ); // A PGlite response can contain multiple messages + // https://www.postgresql.org/docs/current/protocol-message-formats.html for await (const bm of getMessages(response)) { - // After an ErrorMessage in extended query protocol, we should throw away messages until the next Sync - // (per https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY:~:text=When%20an%20error,for%20each%20Sync.)) - if (this.eqpErrored) { - continue; - } - if (this.isExtendedQuery && bm[0] === BackendMessageCode.ErrorMessage) { - this.eqpErrored = true; - } - // Filter out incorrect `ReadyForQuery` messages during the extended query protocol - if (this.isExtendedQuery && bm[0] === BackendMessageCode.ReadyForQuery) { - this.pgliteDebugLog.write("Filtered: " + decoder.write(bm as any as Buffer)); - continue; - } - this.pgliteDebugLog.write("Sent: " + decoder.write(bm as any as Buffer)); + this.pgliteDebugLog.write( + `\n[<- ${codeTogBackendMessageName(bm[0])}] ${decoder.write(bm as any as Buffer)}`, + ); yield bm; } }