Skip to content

Implemented one-off query support for TypeScript SDK #2960

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/codegen/src/typescript.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@ export class {table_handle} {{
writeln!(out, "return this.tableCache.iter();");
});
writeln!(out, "}}");
writeln!(out);
writeln!(out, "remoteQuery(ctx: DbContext, filters: string): Promise<Iterable<{row_type}>> {{");
out.with_indent(|out| {
writeln!(out, "return this.tableCache.remoteQuery(ctx, filters);");
});
writeln!(out, "}}");
writeln!(out);

for (unique_field_ident, unique_field_type_use) in
iter_unique_cols(module.typespace_for_generate(), &schema, product_def)
Expand Down
7 changes: 5 additions & 2 deletions sdks/typescript/packages/sdk/src/client_cache.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { DbContext } from './db_context.ts';
import type { TableRuntimeTypeInfo } from './spacetime_module.ts';
import { TableCache } from './table_cache.ts';

Expand All @@ -6,9 +7,11 @@ export class ClientCache {
* The tables in the database.
*/
tables: Map<string, TableCache>;
private ctx: DbContext;

constructor() {
constructor(ctx: DbContext) {
this.tables = new Map();
this.ctx = ctx;
}

/**
Expand All @@ -35,7 +38,7 @@ export class ClientCache {
): TableCache<RowType> {
let table: TableCache;
if (!this.tables.has(tableTypeInfo.tableName)) {
table = new TableCache<RowType>(tableTypeInfo);
table = new TableCache<RowType>(this.ctx, tableTypeInfo);
this.tables.set(tableTypeInfo.tableName, table);
} else {
table = this.tables.get(tableTypeInfo.tableName)!;
Expand Down
200 changes: 155 additions & 45 deletions sdks/typescript/packages/sdk/src/db_connection_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import type { Event } from './event.ts';
import {
type ErrorContextInterface,
type EventContextInterface,
type QueryEventContextInterface,
type ReducerEventContextInterface,
type SubscriptionEventContextInterface,
} from './event_context.ts';
Expand All @@ -39,6 +40,7 @@ import type { Identity } from './identity.ts';
import type {
IdentityTokenMessage,
Message,
QueryResolvedMessage,
SubscribeAppliedMessage,
UnsubscribeAppliedMessage,
} from './message_types.ts';
Expand All @@ -53,6 +55,11 @@ import {
import { deepEqual, toPascalCase } from './utils.ts';
import { WebsocketDecompressAdapter } from './websocket_decompress_adapter.ts';
import type { WebsocketTestAdapter } from './websocket_test_adapter.ts';
import {
QueryHandleImpl,
QueryManager,
type QueryEvent,
} from './query_builder_impl.ts';
import {
SubscriptionBuilderImpl,
SubscriptionHandleImpl,
Expand Down Expand Up @@ -94,6 +101,9 @@ export type {
export type ConnectionEvent = 'connect' | 'disconnect' | 'connectError';
export type CallReducerFlags = 'FullUpdate' | 'NoSuccessNotify';

type QueryEventCallback = (
ctx: QueryEventContextInterface
) => void;
type ReducerEventCallback<ReducerArgs extends any[] = any[]> = (
ctx: ReducerEventContextInterface,
...args: ReducerArgs
Expand Down Expand Up @@ -177,6 +187,7 @@ export class DbConnectionImpl<
#onApplied?: SubscriptionEventCallback;
#remoteModule: RemoteModule;
#messageQueue = Promise.resolve();
#queryManager = new QueryManager();
#subscriptionManager = new SubscriptionManager();

// These fields are not part of the public API, but in a pinch you
Expand Down Expand Up @@ -216,7 +227,7 @@ export class DbConnectionImpl<
let connectionId = this.connectionId.toHexString();
url.searchParams.set('connection_id', connectionId);

this.clientCache = new ClientCache();
this.clientCache = new ClientCache(this);
this.db = this.#remoteModule.dbViewConstructor(this);
this.setReducerFlags = this.#remoteModule.setReducerFlagsConstructor();
this.reducers = this.#remoteModule.reducersConstructor(
Expand Down Expand Up @@ -259,6 +270,25 @@ export class DbConnectionImpl<
return queryId;
};

registerQuery(
handle: QueryHandleImpl<DBView, Reducers, SetReducerFlags>,
handleEmitter: EventEmitter<QueryEvent, QueryEventCallback>,
querySql: string,
): number {
const queryId = this.#getNextQueryId();
this.#queryManager.queries.set(queryId, {
handle,
emitter: handleEmitter,
});
this.#sendMessage(
ClientMessage.OneOffQuery({
queryString: querySql,
messageId: new Uint8Array(new Uint32Array([queryId]).buffer),
})
);
return queryId;
}

// NOTE: This is very important!!! This is the actual function that
// gets called when you call `connection.subscriptionBuilder()`.
// The `subscriptionBuilder` function which is generated, just shadows
Expand Down Expand Up @@ -303,49 +333,49 @@ export class DbConnectionImpl<
);
}

#parseRowList(
type: 'insert' | 'delete',
tableName: string,
rowList: BsatnRowList
): Operation[] {
const buffer = rowList.rowsData;
const reader = new BinaryReader(buffer);
const rows: Operation[] = [];
const rowType = this.#remoteModule.tables[tableName]!.rowType;
const primaryKeyInfo =
this.#remoteModule.tables[tableName]!.primaryKeyInfo;
while (reader.offset < buffer.length + buffer.byteOffset) {
const initialOffset = reader.offset;
const row = rowType.deserialize(reader);
let rowId: ComparablePrimitive | undefined = undefined;
if (primaryKeyInfo !== undefined) {
rowId = primaryKeyInfo.colType.intoMapKey(
row[primaryKeyInfo.colName]
);
} else {
// Get a view of the bytes for this row.
const rowBytes = buffer.subarray(
initialOffset - buffer.byteOffset,
reader.offset - buffer.byteOffset
);
// Convert it to a base64 string, so we can use it as a map key.
const asBase64 = fromByteArray(rowBytes);
rowId = asBase64;
}

rows.push({
type,
rowId,
row,
});
}
return rows;
}

// This function is async because we decompress the message async
async #processParsedMessage(
message: ServerMessage
): Promise<Message | undefined> {
const parseRowList = (
type: 'insert' | 'delete',
tableName: string,
rowList: BsatnRowList
): Operation[] => {
const buffer = rowList.rowsData;
const reader = new BinaryReader(buffer);
const rows: Operation[] = [];
const rowType = this.#remoteModule.tables[tableName]!.rowType;
const primaryKeyInfo =
this.#remoteModule.tables[tableName]!.primaryKeyInfo;
while (reader.offset < buffer.length + buffer.byteOffset) {
const initialOffset = reader.offset;
const row = rowType.deserialize(reader);
let rowId: ComparablePrimitive | undefined = undefined;
if (primaryKeyInfo !== undefined) {
rowId = primaryKeyInfo.colType.intoMapKey(
row[primaryKeyInfo.colName]
);
} else {
// Get a view of the bytes for this row.
const rowBytes = buffer.subarray(
initialOffset - buffer.byteOffset,
reader.offset - buffer.byteOffset
);
// Convert it to a base64 string, so we can use it as a map key.
const asBase64 = fromByteArray(rowBytes);
rowId = asBase64;
}

rows.push({
type,
rowId,
row,
});
}
return rows;
};

const parseTableUpdate = async (
rawTableUpdate: RawTableUpdate
): Promise<CacheTableUpdate> => {
Expand All @@ -366,10 +396,10 @@ export class DbConnectionImpl<
decompressed = update.value;
}
operations = operations.concat(
parseRowList('insert', tableName, decompressed.inserts)
this.#parseRowList('insert', tableName, decompressed.inserts)
);
operations = operations.concat(
parseRowList('delete', tableName, decompressed.deletes)
this.#parseRowList('delete', tableName, decompressed.deletes)
);
}
return {
Expand Down Expand Up @@ -480,9 +510,14 @@ export class DbConnectionImpl<
}

case 'OneOffQueryResponse': {
throw new Error(
`TypeScript SDK never sends one-off queries, but got OneOffQueryResponse ${message}`
);
const queryResolvedMessage: QueryResolvedMessage = {
tag: 'QueryResolved',
messageId: message.value.messageId,
error: message.value.error,
tables: message.value.tables,
totalHostExecutionDuration: message.value.totalHostExecutionDuration,
};
return queryResolvedMessage;
}

case 'SubscribeMultiApplied': {
Expand Down Expand Up @@ -537,6 +572,25 @@ export class DbConnectionImpl<
this.isActive = true;
}

#applyTableState(
tableStates: clientApi.OneOffTable[],
eventContext: EventContextInterface
): Map<TableCache, PendingCallback[]> {
let tables: Map<TableCache, PendingCallback[]> = new Map();
for (let tableState of tableStates) {
// Get table information for the table being updated
const tableName = tableState.tableName;
const tableTypeInfo = this.#remoteModule.tables[tableName]!;
const table = new TableCache(eventContext, tableTypeInfo);
const newCallbacks = table.applyOperations(
this.#parseRowList('insert', tableState.tableName, tableState.rows),
eventContext,
);
tables.set(table, newCallbacks);
}
return tables;
}

#applyTableUpdates(
tableUpdates: CacheTableUpdate[],
eventContext: EventContextInterface
Expand Down Expand Up @@ -700,6 +754,61 @@ export class DbConnectionImpl<
this.#emitter.emit('connect', this, this.identity, this.token);
break;
}
case 'QueryResolved': {
if (message.messageId?.length != 4) {
stdbLogger(
'error',
`Received QueryResolved with invalid messageId ${message.messageId}.`
);
break;
}
const queryId = new DataView(message.messageId.buffer, message.messageId.byteOffset, 4).getUint32(0, true);
const query = this.#queryManager.queries.get(queryId);
if (query === undefined) {
stdbLogger(
'error',
`Received QueryResolved for unknown queryId ${queryId}.`
);
break;
}
if (message.error !== undefined) {
const error = Error(message.error);
const event: Event<never> = { tag: 'Error', value: error };
const eventContext = this.#remoteModule.eventContextConstructor(
this,
event
);
const errorContext = {
...eventContext,
event: error,
};
this.#queryManager.queries
.get(queryId)
?.emitter.emit(
'error',
errorContext,
error,
message.totalHostExecutionDuration
);
} else {
const event: Event<never> = { tag: 'QueryResolved' };
const eventContext = this.#remoteModule.eventContextConstructor(
this,
event
);
const { event: _, ...queryEventContext } = eventContext;
const tables = this.#applyTableState(message.tables, eventContext);
query?.emitter.emit(
'resolved',
queryEventContext,
tables.keys().reduce((map, table) =>
map.set(table.name(), table), new Map()),
message.totalHostExecutionDuration
);
}
this.#queryManager.queries.delete(queryId);
break;
}
case 'SubscribeApplied': {
const subscription = this.#subscriptionManager.subscriptions.get(
message.queryId
Expand Down Expand Up @@ -784,6 +893,7 @@ export class DbConnectionImpl<
emitter.emit('error', errorContext, error);
});
}
break;
}
}
}
Expand Down
1 change: 1 addition & 0 deletions sdks/typescript/packages/sdk/src/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { ReducerEvent, ReducerInfoType } from './reducer_event';

export type Event<Reducer extends ReducerInfoType> =
| { tag: 'Reducer'; value: ReducerEvent<Reducer> }
| { tag: 'QueryResolved' }
| { tag: 'SubscribeApplied' }
| { tag: 'UnsubscribeApplied' }
| { tag: 'Error'; value: Error }
Expand Down
8 changes: 8 additions & 0 deletions sdks/typescript/packages/sdk/src/event_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ export interface EventContextInterface<
event: Event<Reducer>;
}

export interface QueryEventContextInterface<
DBView = any,
Reducers = any,
SetReducerFlags = any,
> extends DbContext<DBView, Reducers, SetReducerFlags> {
/** No event is provided **/
}

export interface ReducerEventContextInterface<
DBView = any,
Reducers = any,
Expand Down
Loading