Skip to content

Commit 42f81bb

Browse files
committed
change to subscription per source alias, rather than per colleciton
1 parent 8baf80c commit 42f81bb

File tree

4 files changed

+144
-55
lines changed

4 files changed

+144
-55
lines changed

packages/db/src/query/compiler/index.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ export interface CompilationResult {
4040
pipeline: ResultStream
4141
/** Map of collection aliases to their WHERE clauses for index optimization */
4242
collectionWhereClauses: Map<string, BasicExpression<boolean>>
43+
/** Map of alias to underlying collection id used during compilation */
44+
aliasToCollectionId: Record<string, string>
4345
}
4446

4547
/**
@@ -78,6 +80,10 @@ export function compileQuery(
7880
// Create a copy of the inputs map to avoid modifying the original
7981
const allInputs = { ...inputs }
8082

83+
// Track alias to collection id relationships discovered during compilation so
84+
// the live layer can subscribe to every alias the optimiser introduces.
85+
const aliasToCollectionId: Record<string, string> = {}
86+
8187
// Create a map of table aliases to inputs
8288
const tables: Record<string, KeyedStream> = {}
8389

@@ -95,7 +101,8 @@ export function compileQuery(
95101
lazyCollections,
96102
optimizableOrderByCollections,
97103
cache,
98-
queryMapping
104+
queryMapping,
105+
aliasToCollectionId
99106
)
100107
tables[mainTableAlias] = mainInput
101108

@@ -128,7 +135,8 @@ export function compileQuery(
128135
lazyCollections,
129136
optimizableOrderByCollections,
130137
rawQuery,
131-
compileQuery
138+
compileQuery,
139+
aliasToCollectionId
132140
)
133141
}
134142

@@ -287,6 +295,7 @@ export function compileQuery(
287295
collectionId: mainCollectionId,
288296
pipeline: result,
289297
collectionWhereClauses,
298+
aliasToCollectionId,
290299
}
291300
cache.set(rawQuery, compilationResult)
292301

@@ -315,6 +324,7 @@ export function compileQuery(
315324
collectionId: mainCollectionId,
316325
pipeline: result,
317326
collectionWhereClauses,
327+
aliasToCollectionId,
318328
}
319329
cache.set(rawQuery, compilationResult)
320330

@@ -333,14 +343,16 @@ function processFrom(
333343
lazyCollections: Set<string>,
334344
optimizableOrderByCollections: Record<string, OrderByOptimizationInfo>,
335345
cache: QueryCache,
336-
queryMapping: QueryMapping
346+
queryMapping: QueryMapping,
347+
aliasToCollectionId: Record<string, string>
337348
): { alias: string; input: KeyedStream; collectionId: string } {
338349
switch (from.type) {
339350
case `collectionRef`: {
340351
const input = allInputs[from.alias] ?? allInputs[from.collection.id]
341352
if (!input) {
342353
throw new CollectionInputNotFoundError(from.alias, from.collection.id)
343354
}
355+
aliasToCollectionId[from.alias] = from.collection.id
344356
return { alias: from.alias, input, collectionId: from.collection.id }
345357
}
346358
case `queryRef`: {
@@ -360,6 +372,8 @@ function processFrom(
360372
queryMapping
361373
)
362374

375+
Object.assign(aliasToCollectionId, subQueryResult.aliasToCollectionId)
376+
363377
// Extract the pipeline from the compilation result
364378
const subQueryInput = subQueryResult.pipeline
365379

packages/db/src/query/compiler/joins.ts

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ export function processJoins(
6363
lazyCollections: Set<string>,
6464
optimizableOrderByCollections: Record<string, OrderByOptimizationInfo>,
6565
rawQuery: QueryIR,
66-
onCompileSubquery: CompileQueryFn
66+
onCompileSubquery: CompileQueryFn,
67+
aliasToCollectionId: Record<string, string>
6768
): NamespacedAndKeyedStream {
6869
let resultPipeline = pipeline
6970

@@ -83,7 +84,8 @@ export function processJoins(
8384
lazyCollections,
8485
optimizableOrderByCollections,
8586
rawQuery,
86-
onCompileSubquery
87+
onCompileSubquery,
88+
aliasToCollectionId
8789
)
8890
}
8991

@@ -108,8 +110,11 @@ function processJoin(
108110
lazyCollections: Set<string>,
109111
optimizableOrderByCollections: Record<string, OrderByOptimizationInfo>,
110112
rawQuery: QueryIR,
111-
onCompileSubquery: CompileQueryFn
113+
onCompileSubquery: CompileQueryFn,
114+
aliasToCollectionId: Record<string, string>
112115
): NamespacedAndKeyedStream {
116+
const isCollectionRef = joinClause.from.type === `collectionRef`
117+
113118
// Get the joined table alias and input stream
114119
const {
115120
alias: joinedTableAlias,
@@ -125,11 +130,17 @@ function processJoin(
125130
optimizableOrderByCollections,
126131
cache,
127132
queryMapping,
128-
onCompileSubquery
133+
onCompileSubquery,
134+
aliasToCollectionId
129135
)
130136

131137
// Add the joined table to the tables map
132138
tables[joinedTableAlias] = joinedInput
139+
if (isCollectionRef) {
140+
// Only direct collection references form new alias bindings. Subquery
141+
// aliases reuse the mapping returned from the recursive compilation above.
142+
aliasToCollectionId[joinedTableAlias] = joinedCollectionId
143+
}
133144

134145
const mainCollection = collections[mainTableId]
135146
const joinedCollection = collections[joinedCollectionId]
@@ -260,9 +271,9 @@ function processJoin(
260271
subscriptions[`__collection:${lazyCollection.id}`]
261272

262273
if (!lazyCollectionSubscription) {
263-
// The alias was not subscribed (e.g. belongs to a nested subquery),
264-
// so we skip the lazy loading optimization for this join.
265-
return
274+
throw new Error(
275+
`Internal error: subscription for collection is missing in join pipeline. Make sure the live query collection sets the subscription before running the pipeline.`
276+
)
266277
}
267278

268279
if (lazyCollectionSubscription.hasLoadedInitialState()) {
@@ -401,14 +412,16 @@ function processJoinSource(
401412
optimizableOrderByCollections: Record<string, OrderByOptimizationInfo>,
402413
cache: QueryCache,
403414
queryMapping: QueryMapping,
404-
onCompileSubquery: CompileQueryFn
415+
onCompileSubquery: CompileQueryFn,
416+
aliasToCollectionId: Record<string, string>
405417
): { alias: string; input: KeyedStream; collectionId: string } {
406418
switch (from.type) {
407419
case `collectionRef`: {
408420
const input = allInputs[from.alias] ?? allInputs[from.collection.id]
409421
if (!input) {
410422
throw new CollectionInputNotFoundError(from.alias, from.collection.id)
411423
}
424+
aliasToCollectionId[from.alias] = from.collection.id
412425
return { alias: from.alias, input, collectionId: from.collection.id }
413426
}
414427
case `queryRef`: {
@@ -428,6 +441,10 @@ function processJoinSource(
428441
queryMapping
429442
)
430443

444+
// Pull the nested alias map up so the caller can subscribe to those aliases
445+
// and keep the current alias pointing at the subquery's collection.
446+
Object.assign(aliasToCollectionId, subQueryResult.aliasToCollectionId)
447+
431448
// Extract the pipeline from the compilation result
432449
const subQueryInput = subQueryResult.pipeline
433450

packages/db/src/query/live/collection-config-builder.ts

Lines changed: 78 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ export class CollectionConfigBuilder<
3232
private readonly id: string
3333
readonly query: QueryIR
3434
private readonly collections: Record<string, Collection<any, any, any>>
35-
private readonly aliasToCollectionId: Record<string, string>
3635
private readonly collectionByAlias: Record<string, Collection<any, any, any>>
36+
// Populated during compilation to include optimizer-generated aliases
37+
private compiledAliasToCollectionId: Record<string, string> = {}
3738

3839
// WeakMap to store the keys of the results
3940
// so that we can retrieve them in the getKey function
@@ -72,13 +73,11 @@ export class CollectionConfigBuilder<
7273
this.collections = extractCollectionsFromQuery(this.query)
7374
const collectionAliasesById = extractCollectionAliases(this.query)
7475

75-
this.aliasToCollectionId = {}
7676
this.collectionByAlias = {}
7777
for (const [collectionId, aliases] of collectionAliasesById.entries()) {
7878
const collection = this.collections[collectionId]
7979
if (!collection) continue
8080
for (const alias of aliases) {
81-
this.aliasToCollectionId[alias] = collectionId
8281
this.collectionByAlias[alias] = collection
8382
}
8483
}
@@ -111,11 +110,15 @@ export class CollectionConfigBuilder<
111110
}
112111

113112
getCollectionIdForAlias(alias: string): string {
114-
const collectionId = this.aliasToCollectionId[alias]
115-
if (!collectionId) {
116-
throw new Error(`Unknown collection alias "${alias}"`)
113+
const compiled = this.compiledAliasToCollectionId[alias]
114+
if (compiled) {
115+
return compiled
117116
}
118-
return collectionId
117+
const collection = this.collectionByAlias[alias]
118+
if (collection) {
119+
return collection.id
120+
}
121+
throw new Error(`Unknown collection alias "${alias}"`)
119122
}
120123

121124
// The callback function is called after the graph has run.
@@ -224,11 +227,8 @@ export class CollectionConfigBuilder<
224227
])
225228
)
226229

227-
// Compile the query and get both pipeline and collection WHERE clauses
228-
const {
229-
pipeline: pipelineCache,
230-
collectionWhereClauses: collectionWhereClausesCache,
231-
} = compileQuery(
230+
// Compile the query and capture alias metadata produced during optimisation
231+
let compilation = compileQuery(
232232
this.query,
233233
this.inputsCache as Record<string, KeyedStream>,
234234
this.collections,
@@ -238,8 +238,37 @@ export class CollectionConfigBuilder<
238238
this.optimizableOrderByCollections
239239
)
240240

241-
this.pipelineCache = pipelineCache
242-
this.collectionWhereClausesCache = collectionWhereClausesCache
241+
this.pipelineCache = compilation.pipeline
242+
this.collectionWhereClausesCache = compilation.collectionWhereClauses
243+
this.compiledAliasToCollectionId = compilation.aliasToCollectionId
244+
// Optimized queries can introduce aliases beyond those declared on the
245+
// builder. If that happens, provision inputs for the missing aliases and
246+
// recompile so the pipeline is fully wired before execution.
247+
const missingAliases = Object.keys(this.compiledAliasToCollectionId).filter(
248+
(alias) => !Object.hasOwn(this.inputsCache!, alias)
249+
)
250+
251+
if (missingAliases.length > 0) {
252+
for (const alias of missingAliases) {
253+
this.inputsCache[alias] = this.graphCache.newInput<any>()
254+
}
255+
256+
compilation = compileQuery(
257+
this.query,
258+
this.inputsCache as Record<string, KeyedStream>,
259+
this.collections,
260+
this.subscriptions,
261+
this.lazyCollectionsCallbacks,
262+
this.lazyCollections,
263+
this.optimizableOrderByCollections,
264+
new WeakMap(),
265+
new WeakMap()
266+
)
267+
268+
this.pipelineCache = compilation.pipeline
269+
this.collectionWhereClausesCache = compilation.collectionWhereClauses
270+
this.compiledAliasToCollectionId = compilation.aliasToCollectionId
271+
}
243272
}
244273

245274
private maybeCompileBasePipeline() {
@@ -355,31 +384,41 @@ export class CollectionConfigBuilder<
355384
config: Parameters<SyncConfig<TResult>[`sync`]>[0],
356385
syncState: FullSyncState
357386
) {
358-
const loaders = Object.entries(this.collectionByAlias).map(
359-
([alias, collection]) => {
360-
const collectionId = this.aliasToCollectionId[alias]!
361-
const collectionSubscriber = new CollectionSubscriber(
362-
alias,
363-
collectionId,
364-
collection,
365-
config,
366-
syncState,
367-
this
368-
)
369-
370-
const subscription = collectionSubscriber.subscribe()
371-
this.subscriptions[alias] = subscription
372-
const collectionKey = `__collection:${collectionId}`
373-
this.subscriptions[collectionKey] = subscription
374-
375-
const loadMore = collectionSubscriber.loadMoreIfNeeded.bind(
376-
collectionSubscriber,
377-
subscription
378-
)
379-
380-
return loadMore
381-
}
382-
)
387+
const compiledAliases = Object.entries(this.compiledAliasToCollectionId)
388+
if (compiledAliases.length === 0) {
389+
throw new Error(
390+
`Compiler returned no alias metadata for query '${this.id}'. This should not happen; please report.`
391+
)
392+
}
393+
394+
// Subscribe to each alias the compiler reported.
395+
const aliasEntries = compiledAliases
396+
397+
const loaders = aliasEntries.map(([alias, collectionId]) => {
398+
const collection =
399+
this.collectionByAlias[alias] ?? this.collections[collectionId]!
400+
401+
const collectionSubscriber = new CollectionSubscriber(
402+
alias,
403+
collectionId,
404+
collection,
405+
config,
406+
syncState,
407+
this
408+
)
409+
410+
const subscription = collectionSubscriber.subscribe()
411+
this.subscriptions[alias] = subscription
412+
const collectionKey = `__collection:${collectionId}`
413+
this.subscriptions[collectionKey] = subscription
414+
415+
const loadMore = collectionSubscriber.loadMoreIfNeeded.bind(
416+
collectionSubscriber,
417+
subscription
418+
)
419+
420+
return loadMore
421+
})
383422

384423
const loadMoreDataCallback = () => {
385424
loaders.map((loader) => loader())

0 commit comments

Comments
 (0)