Skip to content

Implemented one-off query support for TypeScript SDK #2960

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/codegen/src/typescript.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@ export class {table_handle} {{
writeln!(out, "return this.tableCache.iter();");
});
writeln!(out, "}}");
writeln!(out);
writeln!(out, "remoteQuery(ctx: DbContext, filters: string): Promise<Iterable<{row_type}>> {{");
out.with_indent(|out| {
writeln!(out, "return this.tableCache.remoteQuery(ctx, filters);");
});
writeln!(out, "}}");
writeln!(out);

for (unique_field_ident, unique_field_type_use) in
iter_unique_cols(module.typespace_for_generate(), &schema, product_def)
Expand Down
202 changes: 158 additions & 44 deletions sdks/typescript/packages/sdk/src/db_connection_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import type { Event } from './event.ts';
import {
type ErrorContextInterface,
type EventContextInterface,
type QueryEventContextInterface,
type ReducerEventContextInterface,
type SubscriptionEventContextInterface,
} from './event_context.ts';
Expand All @@ -39,6 +40,7 @@ import type { Identity } from './identity.ts';
import type {
IdentityTokenMessage,
Message,
QueryResolvedMessage,
SubscribeAppliedMessage,
UnsubscribeAppliedMessage,
} from './message_types.ts';
Expand All @@ -53,6 +55,12 @@ import {
import { deepEqual, toPascalCase } from './utils.ts';
import { WebsocketDecompressAdapter } from './websocket_decompress_adapter.ts';
import type { WebsocketTestAdapter } from './websocket_test_adapter.ts';
import {
QueryBuilderImpl,
QueryHandleImpl,
QueryManager,
type QueryEvent,
} from './query_builder_impl.ts';
import {
SubscriptionBuilderImpl,
SubscriptionHandleImpl,
Expand Down Expand Up @@ -94,6 +102,9 @@ export type {
export type ConnectionEvent = 'connect' | 'disconnect' | 'connectError';
export type CallReducerFlags = 'FullUpdate' | 'NoSuccessNotify';

type QueryEventCallback = (
ctx: QueryEventContextInterface
) => void;
type ReducerEventCallback<ReducerArgs extends any[] = any[]> = (
ctx: ReducerEventContextInterface,
...args: ReducerArgs
Expand Down Expand Up @@ -177,6 +188,7 @@ export class DbConnectionImpl<
#onApplied?: SubscriptionEventCallback;
#remoteModule: RemoteModule;
#messageQueue = Promise.resolve();
#queryManager = new QueryManager();
#subscriptionManager = new SubscriptionManager();

// These fields are not part of the public API, but in a pinch you
Expand Down Expand Up @@ -259,6 +271,29 @@ export class DbConnectionImpl<
return queryId;
};

queryBuilder = (): QueryBuilderImpl => {
return new QueryBuilderImpl(this);
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API we use in the C# SDK for running remote queries has them as methods of the TableHandle, rather than on the DbContext. This provides some amount of type safety, as the method inserts the SELECT clause into the query and also encodes the TypeScript type of the rows returned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we use the phrase "remote query" to describe this operation in the user-facing API. This emphasizes that this is a query that bypasses the client cache and goes directly to the remote database.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the same time, formatting SELECTs restricts the provided functionality to querying only. What if a client needs to execute something else, e.g. INSERT, perhaps with the owner's identity?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, there I discovered (in #2968) that SpacetimeDB does currently not support anything other than SELECT even in one-shot queries.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, we have no interest in generalizing one-off queries to support SQL DML operations like INSERT, DELETE, UPDATE, &c.


registerQuery(
handle: QueryHandleImpl<DBView, Reducers, SetReducerFlags>,
handleEmitter: EventEmitter<QueryEvent, QueryEventCallback>,
querySql: string,
): number {
const queryId = this.#getNextQueryId();
this.#queryManager.queries.set(queryId, {
handle,
emitter: handleEmitter,
});
this.#sendMessage(
ClientMessage.OneOffQuery({
queryString: querySql,
messageId: new Uint8Array([queryId]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be a problem after 255 queries. Using a 4 byte integer would probably be safe enough, but we would need to convert it to bytes with something like:

const queryId = this.#getNextQueryId();
const buffer = new ArrayBuffer(4);
const view = new DataView(buffer);
view.setUint32(0, queryId, true); // true = little-endian

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, thanks for spotting it! I added a single-byte counter as a placeholder and totally forgot to widen it.

})
);
return queryId;
}

// NOTE: This is very important!!! This is the actual function that
// gets called when you call `connection.subscriptionBuilder()`.
// The `subscriptionBuilder` function which is generated, just shadows
Expand Down Expand Up @@ -303,49 +338,49 @@ export class DbConnectionImpl<
);
}

#parseRowList(
type: 'insert' | 'delete',
tableName: string,
rowList: BsatnRowList
): Operation[] {
const buffer = rowList.rowsData;
const reader = new BinaryReader(buffer);
const rows: Operation[] = [];
const rowType = this.#remoteModule.tables[tableName]!.rowType;
const primaryKeyInfo =
this.#remoteModule.tables[tableName]!.primaryKeyInfo;
while (reader.offset < buffer.length + buffer.byteOffset) {
const initialOffset = reader.offset;
const row = rowType.deserialize(reader);
let rowId: ComparablePrimitive | undefined = undefined;
if (primaryKeyInfo !== undefined) {
rowId = primaryKeyInfo.colType.intoMapKey(
row[primaryKeyInfo.colName]
);
} else {
// Get a view of the bytes for this row.
const rowBytes = buffer.subarray(
initialOffset - buffer.byteOffset,
reader.offset - buffer.byteOffset
);
// Convert it to a base64 string, so we can use it as a map key.
const asBase64 = fromByteArray(rowBytes);
rowId = asBase64;
}

rows.push({
type,
rowId,
row,
});
}
return rows;
}

// This function is async because we decompress the message async
async #processParsedMessage(
message: ServerMessage
): Promise<Message | undefined> {
const parseRowList = (
type: 'insert' | 'delete',
tableName: string,
rowList: BsatnRowList
): Operation[] => {
const buffer = rowList.rowsData;
const reader = new BinaryReader(buffer);
const rows: Operation[] = [];
const rowType = this.#remoteModule.tables[tableName]!.rowType;
const primaryKeyInfo =
this.#remoteModule.tables[tableName]!.primaryKeyInfo;
while (reader.offset < buffer.length + buffer.byteOffset) {
const initialOffset = reader.offset;
const row = rowType.deserialize(reader);
let rowId: ComparablePrimitive | undefined = undefined;
if (primaryKeyInfo !== undefined) {
rowId = primaryKeyInfo.colType.intoMapKey(
row[primaryKeyInfo.colName]
);
} else {
// Get a view of the bytes for this row.
const rowBytes = buffer.subarray(
initialOffset - buffer.byteOffset,
reader.offset - buffer.byteOffset
);
// Convert it to a base64 string, so we can use it as a map key.
const asBase64 = fromByteArray(rowBytes);
rowId = asBase64;
}

rows.push({
type,
rowId,
row,
});
}
return rows;
};

const parseTableUpdate = async (
rawTableUpdate: RawTableUpdate
): Promise<CacheTableUpdate> => {
Expand All @@ -366,10 +401,10 @@ export class DbConnectionImpl<
decompressed = update.value;
}
operations = operations.concat(
parseRowList('insert', tableName, decompressed.inserts)
this.#parseRowList('insert', tableName, decompressed.inserts)
);
operations = operations.concat(
parseRowList('delete', tableName, decompressed.deletes)
this.#parseRowList('delete', tableName, decompressed.deletes)
);
}
return {
Expand Down Expand Up @@ -480,9 +515,14 @@ export class DbConnectionImpl<
}

case 'OneOffQueryResponse': {
throw new Error(
`TypeScript SDK never sends one-off queries, but got OneOffQueryResponse ${message}`
);
const queryResolvedMessage: QueryResolvedMessage = {
tag: 'QueryResolved',
messageId: message.value.messageId,
error: message.value.error,
tables: message.value.tables,
totalHostExecutionDuration: message.value.totalHostExecutionDuration,
};
return queryResolvedMessage;
}

case 'SubscribeMultiApplied': {
Expand Down Expand Up @@ -537,6 +577,25 @@ export class DbConnectionImpl<
this.isActive = true;
}

#applyTableState(
tableStates: clientApi.OneOffTable[],
eventContext: EventContextInterface
): Map<TableCache, PendingCallback[]> {
let tables: Map<TableCache, PendingCallback[]> = new Map();
for (let tableState of tableStates) {
// Get table information for the table being updated
const tableName = tableState.tableName;
const tableTypeInfo = this.#remoteModule.tables[tableName]!;
const table = new TableCache(tableTypeInfo);
const newCallbacks = table.applyOperations(
this.#parseRowList('insert', tableState.tableName, tableState.rows),
eventContext,
);
tables.set(table, newCallbacks);
}
return tables;
}

#applyTableUpdates(
tableUpdates: CacheTableUpdate[],
eventContext: EventContextInterface
Expand Down Expand Up @@ -700,6 +759,60 @@ export class DbConnectionImpl<
this.#emitter.emit('connect', this, this.identity, this.token);
break;
}
case 'QueryResolved': {
const query = this.#queryManager.queries.get(message.messageId[0]);
if (query === undefined) {
stdbLogger(
'error',
`Received QueryResolved for unknown messageId ${message.messageId}.`
);
break;
}
if (message.error !== undefined) {
const error = Error(message.error);
const event: Event<never> = { tag: 'Error', value: error };
const eventContext = this.#remoteModule.eventContextConstructor(
this,
event
);
const errorContext = {
...eventContext,
event: error,
};
if (message.messageId === undefined) {
console.error('Received an error message without a messageId: ', error);
break;
}
this.#queryManager.queries
.get(message.messageId[0])
?.emitter.emit(
'error',
errorContext,
error,
message.totalHostExecutionDuration
);
} else {
const event: Event<never> = { tag: 'QueryResolved' };
const eventContext = this.#remoteModule.eventContextConstructor(
this,
event
);
const { event: _, ...queryEventContext } = eventContext;
const tables = this.#applyTableState(message.tables, eventContext);
query?.emitter.emit(
'resolved',
queryEventContext,
tables.keys().reduce((map, table) =>
map.set(table.name(), table), new Map()),
message.totalHostExecutionDuration
);
for (const callbacks of tables.values())
for (const callback of callbacks)
callback.cb();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to call the row-level callbacks for one-off queries, since any rows that are inserted here will never have corresponding delete callbacks.

Since we aren't calling those, we don't need to have the #applyTableState function. We can just get a list of rows for each table.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we can remove the callbacks. But applyTableState() processes the insertion events from the response, isn't it necessary to compute the rows?

}
this.#queryManager.queries.delete(message.messageId[0]);
break;
}
case 'SubscribeApplied': {
const subscription = this.#subscriptionManager.subscriptions.get(
message.queryId
Expand Down Expand Up @@ -784,6 +897,7 @@ export class DbConnectionImpl<
emitter.emit('error', errorContext, error);
});
}
break;
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions sdks/typescript/packages/sdk/src/db_context.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { QueryBuilderImpl } from './query_builder_impl';
import type { SubscriptionBuilderImpl } from './subscription_builder_impl';

/**
Expand Down Expand Up @@ -28,6 +29,17 @@ export interface DbContext<
SetReducerFlags
>;

/**
* Creates a new one-off query builder.
*
* @returns The one-off query builder.
*/
queryBuilder(): QueryBuilderImpl<
DBView,
Reducers,
SetReducerFlags
>;

/**
* Disconnects from the database.
*/
Expand Down
1 change: 1 addition & 0 deletions sdks/typescript/packages/sdk/src/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { ReducerEvent, ReducerInfoType } from './reducer_event';

export type Event<Reducer extends ReducerInfoType> =
| { tag: 'Reducer'; value: ReducerEvent<Reducer> }
| { tag: 'QueryResolved' }
| { tag: 'SubscribeApplied' }
| { tag: 'UnsubscribeApplied' }
| { tag: 'Error'; value: Error }
Expand Down
8 changes: 8 additions & 0 deletions sdks/typescript/packages/sdk/src/event_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ export interface EventContextInterface<
event: Event<Reducer>;
}

export interface QueryEventContextInterface<
DBView = any,
Reducers = any,
SetReducerFlags = any,
> extends DbContext<DBView, Reducers, SetReducerFlags> {
/** No event is provided **/
}

export interface ReducerEventContextInterface<
DBView = any,
Reducers = any,
Expand Down
25 changes: 21 additions & 4 deletions sdks/typescript/packages/sdk/src/message_types.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { ConnectionId } from './connection_id';
import type { UpdateStatus } from './client_api/index.ts';
import { Identity } from './identity.ts';
import type { ConnectionId } from './connection_id';
import type { OneOffTable, UpdateStatus } from './client_api/index.ts';
import type { Identity } from './identity.ts';
import type { TableUpdate } from './table_cache.ts';
import { Timestamp } from './timestamp.ts';
import type { TimeDuration } from './time_duration.ts';
import type { Timestamp } from './timestamp.ts';

export type InitialSubscriptionMessage = {
tag: 'InitialSubscription';
Expand Down Expand Up @@ -36,6 +37,20 @@ export type IdentityTokenMessage = {
connectionId: ConnectionId;
};

export type QueryResolvedMessage = {
tag: 'QueryResolved';
messageId: Uint8Array;
error?: string;
tables: OneOffTable[];
totalHostExecutionDuration: TimeDuration;
};

export type QueryErrorMessage = {
tag: 'QueryError';
messageId?: Uint8Array;
error: string;
};

export type SubscribeAppliedMessage = {
tag: 'SubscribeApplied';
queryId: number;
Expand All @@ -59,6 +74,8 @@ export type Message =
| TransactionUpdateMessage
| TransactionUpdateLightMessage
| IdentityTokenMessage
| QueryResolvedMessage
| QueryErrorMessage
| SubscribeAppliedMessage
| UnsubscribeAppliedMessage
| SubscriptionError;
Loading