@@ -58,7 +58,7 @@ export function compileQuery(
58
58
collections : Record < string , Collection < any , any , any , any , any > > ,
59
59
subscriptions : Record < string , CollectionSubscription > ,
60
60
callbacks : Record < string , LazyCollectionCallbacks > ,
61
- lazyCollections : Set < string > ,
61
+ lazySources : Set < string > ,
62
62
optimizableOrderByCollections : Record < string , OrderByOptimizationInfo > ,
63
63
cache : QueryCache = new WeakMap ( ) ,
64
64
queryMapping : QueryMapping = new WeakMap ( )
@@ -83,13 +83,16 @@ export function compileQuery(
83
83
// the live layer can subscribe to every alias the optimizer introduces.
84
84
const aliasToCollectionId : Record < string , string > = { }
85
85
86
- // Create a map of table aliases to inputs
87
- // Note: alias keys take precedence over collection keys for input resolution
88
- const tables : Record < string , KeyedStream > = { }
86
+ // Create a map of source aliases to input streams.
87
+ // Note: During input resolution, alias keys take precedence over collection ID keys.
88
+ // This enables per-alias subscriptions: when looking up an input stream, we first check
89
+ // for `inputs[alias]` before falling back to `inputs[collectionId]`. This allows different
90
+ // aliases of the same collection (e.g., self-joins) to have independent filtered streams.
91
+ const sources : Record < string , KeyedStream > = { }
89
92
90
- // Process the FROM clause to get the main table
93
+ // Process the FROM clause to get the main source
91
94
const {
92
- alias : mainTableAlias ,
95
+ alias : mainSource ,
93
96
input : mainInput ,
94
97
collectionId : mainCollectionId ,
95
98
} = processFrom (
@@ -98,19 +101,19 @@ export function compileQuery(
98
101
collections ,
99
102
subscriptions ,
100
103
callbacks ,
101
- lazyCollections ,
104
+ lazySources ,
102
105
optimizableOrderByCollections ,
103
106
cache ,
104
107
queryMapping ,
105
108
aliasToCollectionId
106
109
)
107
- tables [ mainTableAlias ] = mainInput
110
+ sources [ mainSource ] = mainInput
108
111
109
- // Prepare the initial pipeline with the main table wrapped in its alias
112
+ // Prepare the initial pipeline with the main source wrapped in its alias
110
113
let pipeline : NamespacedAndKeyedStream = mainInput . pipe (
111
114
map ( ( [ key , row ] ) => {
112
115
// Initialize the record with a nested structure
113
- const ret = [ key , { [ mainTableAlias ] : row } ] as [
116
+ const ret = [ key , { [ mainSource ] : row } ] as [
114
117
string ,
115
118
Record < string , typeof row > ,
116
119
]
@@ -123,16 +126,16 @@ export function compileQuery(
123
126
pipeline = processJoins (
124
127
pipeline ,
125
128
query . join ,
126
- tables ,
129
+ sources ,
127
130
mainCollectionId ,
128
- mainTableAlias ,
131
+ mainSource ,
129
132
allInputs ,
130
133
cache ,
131
134
queryMapping ,
132
135
collections ,
133
136
subscriptions ,
134
137
callbacks ,
135
- lazyCollections ,
138
+ lazySources ,
136
139
optimizableOrderByCollections ,
137
140
rawQuery ,
138
141
compileQuery ,
@@ -193,7 +196,7 @@ export function compileQuery(
193
196
map ( ( [ key , namespacedRow ] ) => {
194
197
const selectResults =
195
198
! query . join && ! query . groupBy
196
- ? namespacedRow [ mainTableAlias ]
199
+ ? namespacedRow [ mainSource ]
197
200
: namespacedRow
198
201
199
202
return [
@@ -340,7 +343,7 @@ function processFrom(
340
343
collections : Record < string , Collection > ,
341
344
subscriptions : Record < string , CollectionSubscription > ,
342
345
callbacks : Record < string , LazyCollectionCallbacks > ,
343
- lazyCollections : Set < string > ,
346
+ lazySources : Set < string > ,
344
347
optimizableOrderByCollections : Record < string , OrderByOptimizationInfo > ,
345
348
cache : QueryCache ,
346
349
queryMapping : QueryMapping ,
@@ -370,7 +373,7 @@ function processFrom(
370
373
collections ,
371
374
subscriptions ,
372
375
callbacks ,
373
- lazyCollections ,
376
+ lazySources ,
374
377
optimizableOrderByCollections ,
375
378
cache ,
376
379
queryMapping
0 commit comments