diff --git a/crates/codegen/src/typescript.rs b/crates/codegen/src/typescript.rs index ce074dcf266..063bf92a1f1 100644 --- a/crates/codegen/src/typescript.rs +++ b/crates/codegen/src/typescript.rs @@ -149,6 +149,13 @@ export class {table_handle} {{ writeln!(out, "return this.tableCache.iter();"); }); writeln!(out, "}}"); + writeln!(out); + writeln!(out, "remoteQuery(ctx: DbContext, filters: string): Promise> {{"); + out.with_indent(|out| { + writeln!(out, "return this.tableCache.remoteQuery(ctx, filters);"); + }); + writeln!(out, "}}"); + writeln!(out); for (unique_field_ident, unique_field_type_use) in iter_unique_cols(module.typespace_for_generate(), &schema, product_def) diff --git a/sdks/typescript/packages/sdk/src/client_cache.ts b/sdks/typescript/packages/sdk/src/client_cache.ts index 94ac8749422..56066796088 100644 --- a/sdks/typescript/packages/sdk/src/client_cache.ts +++ b/sdks/typescript/packages/sdk/src/client_cache.ts @@ -1,3 +1,4 @@ +import type { DbContext } from './db_context.ts'; import type { TableRuntimeTypeInfo } from './spacetime_module.ts'; import { TableCache } from './table_cache.ts'; @@ -6,9 +7,11 @@ export class ClientCache { * The tables in the database. */ tables: Map; + private ctx: DbContext; - constructor() { + constructor(ctx: DbContext) { this.tables = new Map(); + this.ctx = ctx; } /** @@ -35,7 +38,7 @@ export class ClientCache { ): TableCache { let table: TableCache; if (!this.tables.has(tableTypeInfo.tableName)) { - table = new TableCache(tableTypeInfo); + table = new TableCache(this.ctx, tableTypeInfo); this.tables.set(tableTypeInfo.tableName, table); } else { table = this.tables.get(tableTypeInfo.tableName)!; diff --git a/sdks/typescript/packages/sdk/src/db_connection_impl.ts b/sdks/typescript/packages/sdk/src/db_connection_impl.ts index d4ef11f66c0..156eac907c5 100644 --- a/sdks/typescript/packages/sdk/src/db_connection_impl.ts +++ b/sdks/typescript/packages/sdk/src/db_connection_impl.ts @@ -30,6 +30,7 @@ import type { Event } from './event.ts'; import { type ErrorContextInterface, type EventContextInterface, + type QueryEventContextInterface, type ReducerEventContextInterface, type SubscriptionEventContextInterface, } from './event_context.ts'; @@ -39,6 +40,7 @@ import type { Identity } from './identity.ts'; import type { IdentityTokenMessage, Message, + QueryResolvedMessage, SubscribeAppliedMessage, UnsubscribeAppliedMessage, } from './message_types.ts'; @@ -53,6 +55,11 @@ import { import { deepEqual, toPascalCase } from './utils.ts'; import { WebsocketDecompressAdapter } from './websocket_decompress_adapter.ts'; import type { WebsocketTestAdapter } from './websocket_test_adapter.ts'; +import { + QueryHandleImpl, + QueryManager, + type QueryEvent, +} from './query_builder_impl.ts'; import { SubscriptionBuilderImpl, SubscriptionHandleImpl, @@ -94,6 +101,9 @@ export type { export type ConnectionEvent = 'connect' | 'disconnect' | 'connectError'; export type CallReducerFlags = 'FullUpdate' | 'NoSuccessNotify'; +type QueryEventCallback = ( + ctx: QueryEventContextInterface +) => void; type ReducerEventCallback = ( ctx: ReducerEventContextInterface, ...args: ReducerArgs @@ -177,6 +187,7 @@ export class DbConnectionImpl< #onApplied?: SubscriptionEventCallback; #remoteModule: RemoteModule; #messageQueue = Promise.resolve(); + #queryManager = new QueryManager(); #subscriptionManager = new SubscriptionManager(); // These fields are not part of the public API, but in a pinch you @@ -216,7 +227,7 @@ export class DbConnectionImpl< let connectionId = this.connectionId.toHexString(); url.searchParams.set('connection_id', connectionId); - this.clientCache = new ClientCache(); + this.clientCache = new ClientCache(this); this.db = this.#remoteModule.dbViewConstructor(this); this.setReducerFlags = this.#remoteModule.setReducerFlagsConstructor(); this.reducers = this.#remoteModule.reducersConstructor( @@ -259,6 +270,25 @@ export class DbConnectionImpl< return queryId; }; + registerQuery( + handle: QueryHandleImpl, + handleEmitter: EventEmitter, + querySql: string, + ): number { + const queryId = this.#getNextQueryId(); + this.#queryManager.queries.set(queryId, { + handle, + emitter: handleEmitter, + }); + this.#sendMessage( + ClientMessage.OneOffQuery({ + queryString: querySql, + messageId: new Uint8Array(new Uint32Array([queryId]).buffer), + }) + ); + return queryId; + } + // NOTE: This is very important!!! This is the actual function that // gets called when you call `connection.subscriptionBuilder()`. // The `subscriptionBuilder` function which is generated, just shadows @@ -303,49 +333,49 @@ export class DbConnectionImpl< ); } + #parseRowList( + type: 'insert' | 'delete', + tableName: string, + rowList: BsatnRowList + ): Operation[] { + const buffer = rowList.rowsData; + const reader = new BinaryReader(buffer); + const rows: Operation[] = []; + const rowType = this.#remoteModule.tables[tableName]!.rowType; + const primaryKeyInfo = + this.#remoteModule.tables[tableName]!.primaryKeyInfo; + while (reader.offset < buffer.length + buffer.byteOffset) { + const initialOffset = reader.offset; + const row = rowType.deserialize(reader); + let rowId: ComparablePrimitive | undefined = undefined; + if (primaryKeyInfo !== undefined) { + rowId = primaryKeyInfo.colType.intoMapKey( + row[primaryKeyInfo.colName] + ); + } else { + // Get a view of the bytes for this row. + const rowBytes = buffer.subarray( + initialOffset - buffer.byteOffset, + reader.offset - buffer.byteOffset + ); + // Convert it to a base64 string, so we can use it as a map key. + const asBase64 = fromByteArray(rowBytes); + rowId = asBase64; + } + + rows.push({ + type, + rowId, + row, + }); + } + return rows; + } + // This function is async because we decompress the message async async #processParsedMessage( message: ServerMessage ): Promise { - const parseRowList = ( - type: 'insert' | 'delete', - tableName: string, - rowList: BsatnRowList - ): Operation[] => { - const buffer = rowList.rowsData; - const reader = new BinaryReader(buffer); - const rows: Operation[] = []; - const rowType = this.#remoteModule.tables[tableName]!.rowType; - const primaryKeyInfo = - this.#remoteModule.tables[tableName]!.primaryKeyInfo; - while (reader.offset < buffer.length + buffer.byteOffset) { - const initialOffset = reader.offset; - const row = rowType.deserialize(reader); - let rowId: ComparablePrimitive | undefined = undefined; - if (primaryKeyInfo !== undefined) { - rowId = primaryKeyInfo.colType.intoMapKey( - row[primaryKeyInfo.colName] - ); - } else { - // Get a view of the bytes for this row. - const rowBytes = buffer.subarray( - initialOffset - buffer.byteOffset, - reader.offset - buffer.byteOffset - ); - // Convert it to a base64 string, so we can use it as a map key. - const asBase64 = fromByteArray(rowBytes); - rowId = asBase64; - } - - rows.push({ - type, - rowId, - row, - }); - } - return rows; - }; - const parseTableUpdate = async ( rawTableUpdate: RawTableUpdate ): Promise => { @@ -366,10 +396,10 @@ export class DbConnectionImpl< decompressed = update.value; } operations = operations.concat( - parseRowList('insert', tableName, decompressed.inserts) + this.#parseRowList('insert', tableName, decompressed.inserts) ); operations = operations.concat( - parseRowList('delete', tableName, decompressed.deletes) + this.#parseRowList('delete', tableName, decompressed.deletes) ); } return { @@ -480,9 +510,14 @@ export class DbConnectionImpl< } case 'OneOffQueryResponse': { - throw new Error( - `TypeScript SDK never sends one-off queries, but got OneOffQueryResponse ${message}` - ); + const queryResolvedMessage: QueryResolvedMessage = { + tag: 'QueryResolved', + messageId: message.value.messageId, + error: message.value.error, + tables: message.value.tables, + totalHostExecutionDuration: message.value.totalHostExecutionDuration, + }; + return queryResolvedMessage; } case 'SubscribeMultiApplied': { @@ -537,6 +572,25 @@ export class DbConnectionImpl< this.isActive = true; } + #applyTableState( + tableStates: clientApi.OneOffTable[], + eventContext: EventContextInterface + ): Map { + let tables: Map = new Map(); + for (let tableState of tableStates) { + // Get table information for the table being updated + const tableName = tableState.tableName; + const tableTypeInfo = this.#remoteModule.tables[tableName]!; + const table = new TableCache(eventContext, tableTypeInfo); + const newCallbacks = table.applyOperations( + this.#parseRowList('insert', tableState.tableName, tableState.rows), + eventContext, + ); + tables.set(table, newCallbacks); + } + return tables; + } + #applyTableUpdates( tableUpdates: CacheTableUpdate[], eventContext: EventContextInterface @@ -700,6 +754,61 @@ export class DbConnectionImpl< this.#emitter.emit('connect', this, this.identity, this.token); break; } + case 'QueryResolved': { + if (message.messageId?.length != 4) { + stdbLogger( + 'error', + `Received QueryResolved with invalid messageId ${message.messageId}.` + ); + break; + } + const queryId = new DataView(message.messageId.buffer, message.messageId.byteOffset, 4).getUint32(0, true); + const query = this.#queryManager.queries.get(queryId); + if (query === undefined) { + stdbLogger( + 'error', + `Received QueryResolved for unknown queryId ${queryId}.` + ); + break; + } + if (message.error !== undefined) { + const error = Error(message.error); + const event: Event = { tag: 'Error', value: error }; + const eventContext = this.#remoteModule.eventContextConstructor( + this, + event + ); + const errorContext = { + ...eventContext, + event: error, + }; + this.#queryManager.queries + .get(queryId) + ?.emitter.emit( + 'error', + errorContext, + error, + message.totalHostExecutionDuration + ); + } else { + const event: Event = { tag: 'QueryResolved' }; + const eventContext = this.#remoteModule.eventContextConstructor( + this, + event + ); + const { event: _, ...queryEventContext } = eventContext; + const tables = this.#applyTableState(message.tables, eventContext); + query?.emitter.emit( + 'resolved', + queryEventContext, + tables.keys().reduce((map, table) => + map.set(table.name(), table), new Map()), + message.totalHostExecutionDuration + ); + } + this.#queryManager.queries.delete(queryId); + break; + } case 'SubscribeApplied': { const subscription = this.#subscriptionManager.subscriptions.get( message.queryId @@ -784,6 +893,7 @@ export class DbConnectionImpl< emitter.emit('error', errorContext, error); }); } + break; } } } diff --git a/sdks/typescript/packages/sdk/src/event.ts b/sdks/typescript/packages/sdk/src/event.ts index 8be677e47ab..7a2072f764c 100644 --- a/sdks/typescript/packages/sdk/src/event.ts +++ b/sdks/typescript/packages/sdk/src/event.ts @@ -2,6 +2,7 @@ import type { ReducerEvent, ReducerInfoType } from './reducer_event'; export type Event = | { tag: 'Reducer'; value: ReducerEvent } + | { tag: 'QueryResolved' } | { tag: 'SubscribeApplied' } | { tag: 'UnsubscribeApplied' } | { tag: 'Error'; value: Error } diff --git a/sdks/typescript/packages/sdk/src/event_context.ts b/sdks/typescript/packages/sdk/src/event_context.ts index f4c67bc5685..98f8f31af5c 100644 --- a/sdks/typescript/packages/sdk/src/event_context.ts +++ b/sdks/typescript/packages/sdk/src/event_context.ts @@ -12,6 +12,14 @@ export interface EventContextInterface< event: Event; } +export interface QueryEventContextInterface< + DBView = any, + Reducers = any, + SetReducerFlags = any, +> extends DbContext { + /** No event is provided **/ +} + export interface ReducerEventContextInterface< DBView = any, Reducers = any, diff --git a/sdks/typescript/packages/sdk/src/message_types.ts b/sdks/typescript/packages/sdk/src/message_types.ts index 3ce1f9ca982..fc50354bd96 100644 --- a/sdks/typescript/packages/sdk/src/message_types.ts +++ b/sdks/typescript/packages/sdk/src/message_types.ts @@ -1,8 +1,9 @@ -import { ConnectionId } from './connection_id'; -import type { UpdateStatus } from './client_api/index.ts'; -import { Identity } from './identity.ts'; +import type { ConnectionId } from './connection_id'; +import type { OneOffTable, UpdateStatus } from './client_api/index.ts'; +import type { Identity } from './identity.ts'; import type { TableUpdate } from './table_cache.ts'; -import { Timestamp } from './timestamp.ts'; +import type { TimeDuration } from './time_duration.ts'; +import type { Timestamp } from './timestamp.ts'; export type InitialSubscriptionMessage = { tag: 'InitialSubscription'; @@ -36,6 +37,20 @@ export type IdentityTokenMessage = { connectionId: ConnectionId; }; +export type QueryResolvedMessage = { + tag: 'QueryResolved'; + messageId: Uint8Array; + error?: string; + tables: OneOffTable[]; + totalHostExecutionDuration: TimeDuration; +}; + +export type QueryErrorMessage = { + tag: 'QueryError'; + messageId?: Uint8Array; + error: string; +}; + export type SubscribeAppliedMessage = { tag: 'SubscribeApplied'; queryId: number; @@ -59,6 +74,8 @@ export type Message = | TransactionUpdateMessage | TransactionUpdateLightMessage | IdentityTokenMessage + | QueryResolvedMessage + | QueryErrorMessage | SubscribeAppliedMessage | UnsubscribeAppliedMessage | SubscriptionError; diff --git a/sdks/typescript/packages/sdk/src/query_builder_impl.ts b/sdks/typescript/packages/sdk/src/query_builder_impl.ts new file mode 100644 index 00000000000..c0529a65eb9 --- /dev/null +++ b/sdks/typescript/packages/sdk/src/query_builder_impl.ts @@ -0,0 +1,138 @@ +import type { OneOffTable } from './client_api'; +import type { DbConnectionImpl } from './db_connection_impl'; +import type { + ErrorContextInterface, + QueryEventContextInterface, +} from './event_context'; +import { EventEmitter } from './event_emitter'; +import type { TimeDuration } from './time_duration'; + +export class QueryBuilderImpl< + DBView = any, + Reducers = any, + SetReducerFlags = any, +> { + #onResolved?: ( + ctx: QueryEventContextInterface, + tables: Map, + totalHostExecutionDuration: TimeDuration + ) => void = undefined; + #onError?: ( + ctx: ErrorContextInterface, + error: Error, + totalHostExecutionDuration: TimeDuration + ) => void = undefined; + constructor( + private db: DbConnectionImpl + ) {} + + onResolved( + cb: ( + ctx: QueryEventContextInterface, + tables: Map, + totalHostExecutionDuration: TimeDuration + ) => void + ): QueryBuilderImpl { + this.#onResolved = cb; + return this; + } + + onError( + cb: ( + ctx: ErrorContextInterface, + error: Error, + totalHostExecutionDuration: TimeDuration + ) => void + ): QueryBuilderImpl { + this.#onError = cb; + return this; + } + + query( + query_sql: string + ): QueryHandleImpl { + return new QueryHandleImpl( + this.db, + query_sql, + this.#onResolved, + this.#onError + ); + } +} + +export type QueryEvent = 'resolved' | 'error'; + +export class QueryManager { + queries: Map< + number, + { handle: QueryHandleImpl; emitter: EventEmitter } + > = new Map(); +} + +export class QueryHandleImpl< + DBView = any, + Reducers = any, + SetReducerFlags = any, +> { + #queryId: number; + #endedState: boolean = false; + #resolvedState: boolean = false; + #emitter: EventEmitter void> = + new EventEmitter(); + + constructor( + private db: DbConnectionImpl, + querySql: string, + onResolved?: ( + ctx: QueryEventContextInterface, + tables: Map, + totalHostExecutionDuration: TimeDuration + ) => void, + onError?: ( + ctx: ErrorContextInterface, + error: Error, + totalHostExecutionDuration: TimeDuration + ) => void + ) { + this.#emitter.on( + 'resolved', + ( + ctx: QueryEventContextInterface< + DBView, + Reducers, + SetReducerFlags + >, + tables: Map, + totalHostExecutionDuration: TimeDuration + ) => { + this.#resolvedState = true; + if (onResolved) { + onResolved(ctx, tables, totalHostExecutionDuration); + } + } + ); + this.#emitter.on( + 'error', + ( + ctx: ErrorContextInterface, + error: Error, + totalHostExecutionDuration: TimeDuration + ) => { + this.#resolvedState = false; + this.#endedState = true; + if (onError) { + onError(ctx, error, totalHostExecutionDuration); + } + } + ); + this.#queryId = this.db.registerQuery(this, this.#emitter, querySql); + } + + isEnded(): boolean { + return this.#endedState; + } + + isResolved(): boolean { + return this.#resolvedState; + } +} diff --git a/sdks/typescript/packages/sdk/src/table_cache.ts b/sdks/typescript/packages/sdk/src/table_cache.ts index 8763c2189f6..282b86a89b7 100644 --- a/sdks/typescript/packages/sdk/src/table_cache.ts +++ b/sdks/typescript/packages/sdk/src/table_cache.ts @@ -5,8 +5,11 @@ import { BinaryWriter, type EventContextInterface, } from './db_connection_impl.ts'; +import type { DbConnectionImpl } from './db_connection_impl.ts'; +import type { DbContext } from './db_context.ts'; import { stdbLogger } from './logger.ts'; import type { ComparablePrimitive } from './algebraic_type.ts'; +import { QueryBuilderImpl } from './query_builder_impl.ts'; export type Operation = { type: 'insert' | 'delete'; @@ -32,6 +35,7 @@ export type PendingCallback = { */ export class TableCache { private rows: Map; + private ctx: DbContext; private tableTypeInfo: TableRuntimeTypeInfo; private emitter: EventEmitter<'insert' | 'delete' | 'update'>; @@ -41,12 +45,20 @@ export class TableCache { * @param primaryKey column name designated as `#[primarykey]` * @param entityClass the entityClass */ - constructor(tableTypeInfo: TableRuntimeTypeInfo) { + constructor(ctx: DbContext, tableTypeInfo: TableRuntimeTypeInfo) { + this.ctx = ctx; this.tableTypeInfo = tableTypeInfo; this.rows = new Map(); this.emitter = new EventEmitter(); } + /** + * @returns name of the table + */ + name(): string { + return this.tableTypeInfo.tableName; + } + /** * @returns number of rows in the table */ @@ -57,10 +69,21 @@ export class TableCache { /** * @returns The values of the rows in the table */ - iter(): any[] { + iter(): RowType[] { return Array.from(this.rows.values()).map(([row]) => row); } + remoteQuery(filters: string): Promise { + return new Promise((resolve, reject) => { + const name = this.name(); + + new QueryBuilderImpl(this.ctx as DbConnectionImpl) + .onResolved((ctx, tables) => resolve(tables.get(name)?.iter())) + .onError((ctx, error) => reject(error)) + .query(`SELECT ${name}.* FROM ${name} ${filters}`); + }); + } + applyOperations = ( operations: Operation[], ctx: EventContextInterface