Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions npm-shrinkwrap.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@
"dependencies": {
"@apphosting/build": "^0.1.6",
"@apphosting/common": "^0.0.8",
"@electric-sql/pglite": "^0.3.3",
"@electric-sql/pglite-tools": "^0.2.8",
"@electric-sql/pglite": "^0.3.14",
"@electric-sql/pglite-tools": "^0.2.19",
"@google-cloud/cloud-sql-connector": "^1.3.3",
"@google-cloud/pubsub": "^5.2.0",
"@inquirer/prompts": "^7.10.1",
Expand Down
72 changes: 32 additions & 40 deletions src/emulator/dataconnect/pgliteServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export class PostgresServer {
await db.query("DEALLOCATE ALL");
}
const response = await db.execProtocolRaw(data);
for await (const message of extendedQueryPatch.filterResponse(data, response)) {
for await (const message of extendedQueryPatch.getMessages(data, response)) {
yield message;
}

Expand All @@ -75,7 +75,7 @@ export class PostgresServer {
},
});

const extendedQueryPatch: PGliteExtendedQueryPatch = new PGliteExtendedQueryPatch(connection);
const extendedQueryPatch: PGlitePatch = new PGlitePatch(connection);
socket.on("end", () => {
logger.debug("Postgres client disconnected");
});
Expand Down Expand Up @@ -279,52 +279,44 @@ export async function fromNodeSocket(socket: net.Socket, options?: PostgresConne
return new PostgresConnection({ readable: rs, writable: ws }, opts);
}

export class PGliteExtendedQueryPatch {
isExtendedQuery = false;
eqpErrored = false;
pgliteDebugLog: fs.WriteStream;

constructor(public connection: PostgresConnection) {
this.pgliteDebugLog = fs.createWriteStream("pglite-debug.log");
const CODE_TO_FRONTEND_MESSAGE: Record<number, string> = {};
for (const key in FrontendMessageCode) {
if (Object.prototype.hasOwnProperty.call(FrontendMessageCode, key)) {
CODE_TO_FRONTEND_MESSAGE[FrontendMessageCode[key as keyof typeof FrontendMessageCode]] = key;
}
}

async *filterResponse(message: Uint8Array, response: Uint8Array) {
// 'Parse' indicates the start of an extended query
const pipelineStartMessages: number[] = [
FrontendMessageCode.Parse,
FrontendMessageCode.Bind,
FrontendMessageCode.Close,
];
const decoded = decoder.write(message as any as Buffer);
const CODE_TO_BACKEND_MESSAGE: Record<number, string> = {};
for (const key in BackendMessageCode) {
if (Object.prototype.hasOwnProperty.call(BackendMessageCode, key)) {
CODE_TO_BACKEND_MESSAGE[BackendMessageCode[key as keyof typeof BackendMessageCode]] = key;
}
}

this.pgliteDebugLog.write("Front: " + decoded);
function codeToFrontendMessageName(code: number): string {
return CODE_TO_FRONTEND_MESSAGE[code] || `UNKNOWN_FRONTEND_CODE_${code}`;
}

if (pipelineStartMessages.includes(message[0])) {
this.isExtendedQuery = true;
}
function codeTogBackendMessageName(code: number): string {
return CODE_TO_BACKEND_MESSAGE[code] || `UNKNOWN_BACKEND_CODE_${code}`;
}
export class PGlitePatch {
pgliteDebugLog: fs.WriteStream;

// 'Sync' indicates the end of an extended query
if (message[0] === FrontendMessageCode.Sync) {
this.isExtendedQuery = false;
this.eqpErrored = false;
}
constructor(public connection: PostgresConnection) {
this.pgliteDebugLog = fs.createWriteStream("pglite-debug.log");
}

async *getMessages(request: Uint8Array, response: Uint8Array) {
this.pgliteDebugLog.write(
`\n[-> ${codeToFrontendMessageName(request[0])}] ` + decoder.write(request as any as Buffer),
);
// A PGlite response can contain multiple messages
// https://www.postgresql.org/docs/current/protocol-message-formats.html
for await (const bm of getMessages(response)) {
// After an ErrorMessage in extended query protocol, we should throw away messages until the next Sync
// (per https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY:~:text=When%20an%20error,for%20each%20Sync.))
if (this.eqpErrored) {
continue;
}
if (this.isExtendedQuery && bm[0] === BackendMessageCode.ErrorMessage) {
this.eqpErrored = true;
}
// Filter out incorrect `ReadyForQuery` messages during the extended query protocol
if (this.isExtendedQuery && bm[0] === BackendMessageCode.ReadyForQuery) {
this.pgliteDebugLog.write("Filtered: " + decoder.write(bm as any as Buffer));
continue;
}
this.pgliteDebugLog.write("Sent: " + decoder.write(bm as any as Buffer));
this.pgliteDebugLog.write(
`\n[<- ${codeTogBackendMessageName(bm[0])}] ${decoder.write(bm as any as Buffer)}`,
);
yield bm;
}
}
Expand Down
Loading