Skip to content

Commit 8baf80c

Browse files
committed
convert to a subscription per alias
1 parent 1249a08 commit 8baf80c

File tree

7 files changed

+113
-98
lines changed

7 files changed

+113
-98
lines changed

packages/db/src/errors.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -350,8 +350,11 @@ export class LimitOffsetRequireOrderByError extends QueryCompilationError {
350350
}
351351

352352
export class CollectionInputNotFoundError extends QueryCompilationError {
353-
constructor(collectionId: string) {
354-
super(`Input for collection "${collectionId}" not found in inputs map`)
353+
constructor(alias: string, collectionId?: string) {
354+
const details = collectionId
355+
? `alias "${alias}" (collection "${collectionId}")`
356+
: `collection "${alias}"`
357+
super(`Input for ${details} not found in inputs map`)
355358
}
356359
}
357360

@@ -563,3 +566,11 @@ export class CannotCombineEmptyExpressionListError extends QueryOptimizerError {
563566
super(`Cannot combine empty expression list`)
564567
}
565568
}
569+
570+
export class WhereClauseConversionError extends QueryOptimizerError {
571+
constructor(collectionId: string, alias: string) {
572+
super(
573+
`Failed to convert WHERE clause to collection filter for collection '${collectionId}' alias '${alias}'. This indicates a bug in the query optimization logic.`
574+
)
575+
}
576+
}

packages/db/src/query/compiler/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,9 +337,9 @@ function processFrom(
337337
): { alias: string; input: KeyedStream; collectionId: string } {
338338
switch (from.type) {
339339
case `collectionRef`: {
340-
const input = allInputs[from.collection.id]
340+
const input = allInputs[from.alias] ?? allInputs[from.collection.id]
341341
if (!input) {
342-
throw new CollectionInputNotFoundError(from.collection.id)
342+
throw new CollectionInputNotFoundError(from.alias, from.collection.id)
343343
}
344344
return { alias: from.alias, input, collectionId: from.collection.id }
345345
}

packages/db/src/query/compiler/joins.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -253,12 +253,16 @@ function processJoin(
253253
[key: unknown, [originalKey: string, namespacedRow: NamespacedRow]]
254254
> = activePipeline.pipe(
255255
tap((data) => {
256-
const lazyCollectionSubscription = subscriptions[lazyCollection.id]
256+
const lazyAliasCandidate =
257+
activeCollection === `main` ? joinedTableAlias : mainTableAlias
258+
const lazyCollectionSubscription =
259+
subscriptions[lazyAliasCandidate] ??
260+
subscriptions[`__collection:${lazyCollection.id}`]
257261

258262
if (!lazyCollectionSubscription) {
259-
throw new Error(
260-
`Internal error: subscription for collection is missing in join pipeline. Make sure the live query collection sets the subscription before running the pipeline.`
261-
)
263+
// The alias was not subscribed (e.g. belongs to a nested subquery),
264+
// so we skip the lazy loading optimization for this join.
265+
return
262266
}
263267

264268
if (lazyCollectionSubscription.hasLoadedInitialState()) {
@@ -401,9 +405,9 @@ function processJoinSource(
401405
): { alias: string; input: KeyedStream; collectionId: string } {
402406
switch (from.type) {
403407
case `collectionRef`: {
404-
const input = allInputs[from.collection.id]
408+
const input = allInputs[from.alias] ?? allInputs[from.collection.id]
405409
if (!input) {
406-
throw new CollectionInputNotFoundError(from.collection.id)
410+
throw new CollectionInputNotFoundError(from.alias, from.collection.id)
407411
}
408412
return { alias: from.alias, input, collectionId: from.collection.id }
409413
}

packages/db/src/query/compiler/order-by.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import type { BaseIndex } from "../../indexes/base-index.js"
1313
import type { Collection } from "../../collection/index.js"
1414

1515
export type OrderByOptimizationInfo = {
16+
alias: string
1617
offset: number
1718
limit: number
1819
comparator: (
@@ -157,7 +158,13 @@ export function processOrderBy(
157158

158159
if (index && index.supports(`gt`)) {
159160
// We found an index that we can use to lazily load ordered data
161+
const orderByAlias =
162+
orderByExpression.path.length > 1
163+
? String(orderByExpression.path[0])
164+
: rawQuery.from.alias
165+
160166
const orderByOptimizationInfo = {
167+
alias: orderByAlias,
161168
offset: offset ?? 0,
162169
limit,
163170
comparator,

packages/db/src/query/live/collection-config-builder.ts

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ export class CollectionConfigBuilder<
3232
private readonly id: string
3333
readonly query: QueryIR
3434
private readonly collections: Record<string, Collection<any, any, any>>
35-
private readonly collectionAliasesById: Map<string, Set<string>>
35+
private readonly aliasToCollectionId: Record<string, string>
36+
private readonly collectionByAlias: Record<string, Collection<any, any, any>>
3637

3738
// WeakMap to store the keys of the results
3839
// so that we can retrieve them in the getKey function
@@ -52,7 +53,7 @@ export class CollectionConfigBuilder<
5253
| Map<string, BasicExpression<boolean>>
5354
| undefined
5455

55-
// Map of collection ID to subscription
56+
// Map of collection alias to subscription
5657
readonly subscriptions: Record<string, CollectionSubscription> = {}
5758
// Map of collection IDs to functions that load keys for that lazy collection
5859
lazyCollectionsCallbacks: Record<string, LazyCollectionCallbacks> = {}
@@ -69,7 +70,18 @@ export class CollectionConfigBuilder<
6970

7071
this.query = buildQueryFromConfig(config)
7172
this.collections = extractCollectionsFromQuery(this.query)
72-
this.collectionAliasesById = extractCollectionAliases(this.query)
73+
const collectionAliasesById = extractCollectionAliases(this.query)
74+
75+
this.aliasToCollectionId = {}
76+
this.collectionByAlias = {}
77+
for (const [collectionId, aliases] of collectionAliasesById.entries()) {
78+
const collection = this.collections[collectionId]
79+
if (!collection) continue
80+
for (const alias of aliases) {
81+
this.aliasToCollectionId[alias] = collectionId
82+
this.collectionByAlias[alias] = collection
83+
}
84+
}
7385

7486
// Create compare function for ordering if the query has orderBy
7587
if (this.query.orderBy && this.query.orderBy.length > 0) {
@@ -98,9 +110,12 @@ export class CollectionConfigBuilder<
98110
}
99111
}
100112

101-
getCollectionAliases(collectionId: string): Array<string> {
102-
const aliases = this.collectionAliasesById.get(collectionId)
103-
return aliases ? Array.from(aliases) : []
113+
getCollectionIdForAlias(alias: string): string {
114+
const collectionId = this.aliasToCollectionId[alias]
115+
if (!collectionId) {
116+
throw new Error(`Unknown collection alias "${alias}"`)
117+
}
118+
return collectionId
104119
}
105120

106121
// The callback function is called after the graph has run.
@@ -203,8 +218,8 @@ export class CollectionConfigBuilder<
203218
private compileBasePipeline() {
204219
this.graphCache = new D2()
205220
this.inputsCache = Object.fromEntries(
206-
Object.entries(this.collections).map(([key]) => [
207-
key,
221+
Object.keys(this.collectionByAlias).map((alias) => [
222+
alias,
208223
this.graphCache!.newInput<any>(),
209224
])
210225
)
@@ -340,9 +355,11 @@ export class CollectionConfigBuilder<
340355
config: Parameters<SyncConfig<TResult>[`sync`]>[0],
341356
syncState: FullSyncState
342357
) {
343-
const loaders = Object.entries(this.collections).map(
344-
([collectionId, collection]) => {
358+
const loaders = Object.entries(this.collectionByAlias).map(
359+
([alias, collection]) => {
360+
const collectionId = this.aliasToCollectionId[alias]!
345361
const collectionSubscriber = new CollectionSubscriber(
362+
alias,
346363
collectionId,
347364
collection,
348365
config,
@@ -351,7 +368,9 @@ export class CollectionConfigBuilder<
351368
)
352369

353370
const subscription = collectionSubscriber.subscribe()
354-
this.subscriptions[collectionId] = subscription
371+
this.subscriptions[alias] = subscription
372+
const collectionKey = `__collection:${collectionId}`
373+
this.subscriptions[collectionKey] = subscription
355374

356375
const loadMore = collectionSubscriber.loadMoreIfNeeded.bind(
357376
collectionSubscriber,
@@ -470,7 +489,7 @@ function extractCollectionAliases(query: QueryIR): Map<string, Set<string>> {
470489
}
471490
}
472491

473-
function traverse(q: QueryIR) {
492+
function traverse(q?: QueryIR) {
474493
if (!q) return
475494

476495
recordAlias(q.from)

0 commit comments

Comments
 (0)