Skip to content
Open
5 changes: 5 additions & 0 deletions .changeset/fix-self-join-bug.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db": patch
---

Fix self-join bug by implementing per-alias subscriptions in live queries
92 changes: 79 additions & 13 deletions packages/db/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,23 @@ export class LimitOffsetRequireOrderByError extends QueryCompilationError {
}
}

/**
* Error thrown when a collection input stream is not found during query compilation.
* In self-joins, each alias (e.g., 'employee', 'manager') requires its own input stream.
*/
export class CollectionInputNotFoundError extends QueryCompilationError {
constructor(collectionId: string) {
super(`Input for collection "${collectionId}" not found in inputs map`)
constructor(
alias: string,
collectionId?: string,
availableKeys?: Array<string>
) {
const details = collectionId
? `alias "${alias}" (collection "${collectionId}")`
: `collection "${alias}"`
const availableKeysMsg = availableKeys?.length
? `. Available keys: ${availableKeys.join(`, `)}`
: ``
super(`Input for ${details} not found in inputs map${availableKeysMsg}`)
}
}

Expand Down Expand Up @@ -399,32 +413,32 @@ export class UnsupportedJoinTypeError extends JoinError {
}
}

export class InvalidJoinConditionSameTableError extends JoinError {
constructor(tableAlias: string) {
export class InvalidJoinConditionSameSourceError extends JoinError {
constructor(sourceAlias: string) {
super(
`Invalid join condition: both expressions refer to the same table "${tableAlias}"`
`Invalid join condition: both expressions refer to the same source "${sourceAlias}"`
)
}
}

export class InvalidJoinConditionTableMismatchError extends JoinError {
export class InvalidJoinConditionSourceMismatchError extends JoinError {
constructor() {
super(`Invalid join condition: expressions must reference table aliases`)
super(`Invalid join condition: expressions must reference source aliases`)
}
}

export class InvalidJoinConditionLeftTableError extends JoinError {
constructor(tableAlias: string) {
export class InvalidJoinConditionLeftSourceError extends JoinError {
constructor(sourceAlias: string) {
super(
`Invalid join condition: left expression refers to an unavailable table "${tableAlias}"`
`Invalid join condition: left expression refers to an unavailable source "${sourceAlias}"`
)
}
}

export class InvalidJoinConditionRightTableError extends JoinError {
constructor(tableAlias: string) {
export class InvalidJoinConditionRightSourceError extends JoinError {
constructor(sourceAlias: string) {
super(
`Invalid join condition: right expression does not refer to the joined table "${tableAlias}"`
`Invalid join condition: right expression does not refer to the joined source "${sourceAlias}"`
)
}
}
Expand Down Expand Up @@ -563,3 +577,55 @@ export class CannotCombineEmptyExpressionListError extends QueryOptimizerError {
super(`Cannot combine empty expression list`)
}
}

/**
* Internal error when the query optimizer fails to convert a WHERE clause to a collection filter.
*/
export class WhereClauseConversionError extends QueryOptimizerError {
constructor(collectionId: string, alias: string) {
super(
`Failed to convert WHERE clause to collection filter for collection '${collectionId}' alias '${alias}'. This indicates a bug in the query optimization logic.`
)
}
}

/**
* Error when a subscription cannot be found during lazy join processing.
* For subqueries, aliases may be remapped (e.g., 'activeUser' → 'user').
*/
export class SubscriptionNotFoundError extends QueryCompilationError {
constructor(
resolvedAlias: string,
originalAlias: string,
collectionId: string,
availableAliases: Array<string>
) {
super(
`Internal error: subscription for alias '${resolvedAlias}' (remapped from '${originalAlias}', collection '${collectionId}') is missing in join pipeline. Available aliases: ${availableAliases.join(`, `)}. This indicates a bug in alias tracking.`
)
}
}

/**
* Error thrown when aggregate expressions are used outside of a GROUP BY context.
*/
export class AggregateNotSupportedError extends QueryCompilationError {
constructor() {
super(
`Aggregate expressions are not supported in this context. Use GROUP BY clause for aggregates.`
)
}
}

/**
* Internal error when the compiler returns aliases that don't have corresponding input streams.
* This should never happen since all aliases come from user declarations.
*/
export class MissingAliasInputsError extends QueryCompilationError {
constructor(missingAliases: Array<string>) {
super(
`Internal error: compiler returned aliases without inputs: ${missingAliases.join(`, `)}. ` +
`This indicates a bug in query compilation. Please report this issue.`
)
}
}
2 changes: 1 addition & 1 deletion packages/db/src/query/builder/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ export type SchemaFromSource<T extends Source> = Prettify<{
* GetAliases - Extracts all table aliases available in a query context
*
* Simple utility type that returns the keys of the schema, representing
* all table/collection aliases that can be referenced in the current query.
* all table/source aliases that can be referenced in the current query.
*/
export type GetAliases<TContext extends Context> = keyof TContext[`schema`]

Expand Down
127 changes: 95 additions & 32 deletions packages/db/src/query/compiler/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,51 @@ import type {
import type { QueryCache, QueryMapping } from "./types.js"

/**
* Result of query compilation including both the pipeline and collection-specific WHERE clauses
* Result of query compilation including both the pipeline and source-specific WHERE clauses
*/
export interface CompilationResult {
/** The ID of the main collection */
collectionId: string
/** The compiled query pipeline */

/** The compiled query pipeline (D2 stream) */
pipeline: ResultStream
/** Map of collection aliases to their WHERE clauses for index optimization */
collectionWhereClauses: Map<string, BasicExpression<boolean>>

/** Map of source aliases to their WHERE clauses for index optimization */
sourceWhereClauses: Map<string, BasicExpression<boolean>>

/**
* Maps each source alias to its collection ID. Enables per-alias subscriptions for self-joins.
* Example: `{ employee: 'employees-col-id', manager: 'employees-col-id' }`
*/
aliasToCollectionId: Record<string, string>
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a mapping of the alias used in a from or join to the collection id that is to be used for that source.


/**
* Maps outer alias to inner alias for subqueries (e.g., `{ activeUser: 'user' }`).
* Used to resolve subscriptions during lazy loading when aliases differ.
*/
aliasRemapping: Record<string, string>
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this query below the alias mapping is used to track that activeUser is an alias of the user alias.

q.from(issue: issueCollection)
  .join(
    { activeUser: q.from({ user: userCollection }) },
    ({ issue }) => ....

}

/**
* Compiles a query2 IR into a D2 pipeline
* Compiles a query IR into a D2 pipeline
* @param rawQuery The query IR to compile
* @param inputs Mapping of collection names to input streams
* @param inputs Mapping of source aliases to input streams (e.g., `{ employee: input1, manager: input2 }`)
* @param collections Mapping of collection IDs to Collection instances
* @param subscriptions Mapping of source aliases to CollectionSubscription instances
* @param callbacks Mapping of source aliases to lazy loading callbacks
* @param lazySources Set of source aliases that should load data lazily
* @param optimizableOrderByCollections Map of collection IDs to order-by optimization info
* @param cache Optional cache for compiled subqueries (used internally for recursion)
* @param queryMapping Optional mapping from optimized queries to original queries
* @returns A CompilationResult with the pipeline and collection WHERE clauses
* @returns A CompilationResult with the pipeline, source WHERE clauses, and alias metadata
*/
export function compileQuery(
rawQuery: QueryIR,
inputs: Record<string, KeyedStream>,
collections: Record<string, Collection<any, any, any, any, any>>,
subscriptions: Record<string, CollectionSubscription>,
callbacks: Record<string, LazyCollectionCallbacks>,
lazyCollections: Set<string>,
lazySources: Set<string>,
optimizableOrderByCollections: Record<string, OrderByOptimizationInfo>,
cache: QueryCache = new WeakMap(),
queryMapping: QueryMapping = new WeakMap()
Expand All @@ -68,8 +87,7 @@ export function compileQuery(
}

// Optimize the query before compilation
const { optimizedQuery: query, collectionWhereClauses } =
optimizeQuery(rawQuery)
const { optimizedQuery: query, sourceWhereClauses } = optimizeQuery(rawQuery)

// Create mapping from optimized query to original for caching
queryMapping.set(query, rawQuery)
Expand All @@ -78,12 +96,24 @@ export function compileQuery(
// Create a copy of the inputs map to avoid modifying the original
const allInputs = { ...inputs }

// Create a map of table aliases to inputs
const tables: Record<string, KeyedStream> = {}
// Track alias to collection id relationships discovered during compilation.
// This includes all user-declared aliases plus inner aliases from subqueries.
const aliasToCollectionId: Record<string, string> = {}

// Track alias remapping for subqueries (outer alias → inner alias)
// e.g., when .join({ activeUser: subquery }) where subquery uses .from({ user: collection })
// we store: aliasRemapping['activeUser'] = 'user'
const aliasRemapping: Record<string, string> = {}

// Create a map of source aliases to input streams.
// Inputs MUST be keyed by alias (e.g., `{ employee: input1, manager: input2 }`),
// not by collection ID. This enables per-alias subscriptions where different aliases
// of the same collection (e.g., self-joins) maintain independent filtered streams.
const sources: Record<string, KeyedStream> = {}

// Process the FROM clause to get the main table
// Process the FROM clause to get the main source
const {
alias: mainTableAlias,
alias: mainSource,
input: mainInput,
collectionId: mainCollectionId,
} = processFrom(
Expand All @@ -92,18 +122,20 @@ export function compileQuery(
collections,
subscriptions,
callbacks,
lazyCollections,
lazySources,
optimizableOrderByCollections,
cache,
queryMapping
queryMapping,
aliasToCollectionId,
aliasRemapping
)
tables[mainTableAlias] = mainInput
sources[mainSource] = mainInput

// Prepare the initial pipeline with the main table wrapped in its alias
// Prepare the initial pipeline with the main source wrapped in its alias
let pipeline: NamespacedAndKeyedStream = mainInput.pipe(
map(([key, row]) => {
// Initialize the record with a nested structure
const ret = [key, { [mainTableAlias]: row }] as [
const ret = [key, { [mainSource]: row }] as [
string,
Record<string, typeof row>,
]
Expand All @@ -116,19 +148,21 @@ export function compileQuery(
pipeline = processJoins(
pipeline,
query.join,
tables,
sources,
mainCollectionId,
mainTableAlias,
mainSource,
allInputs,
cache,
queryMapping,
collections,
subscriptions,
callbacks,
lazyCollections,
lazySources,
optimizableOrderByCollections,
rawQuery,
compileQuery
compileQuery,
aliasToCollectionId,
aliasRemapping
)
}

Expand Down Expand Up @@ -185,7 +219,7 @@ export function compileQuery(
map(([key, namespacedRow]) => {
const selectResults =
!query.join && !query.groupBy
? namespacedRow[mainTableAlias]
? namespacedRow[mainSource]
: namespacedRow

return [
Expand Down Expand Up @@ -286,7 +320,9 @@ export function compileQuery(
const compilationResult = {
collectionId: mainCollectionId,
pipeline: result,
collectionWhereClauses,
sourceWhereClauses,
aliasToCollectionId,
aliasRemapping,
}
cache.set(rawQuery, compilationResult)

Expand Down Expand Up @@ -314,33 +350,43 @@ export function compileQuery(
const compilationResult = {
collectionId: mainCollectionId,
pipeline: result,
collectionWhereClauses,
sourceWhereClauses,
aliasToCollectionId,
aliasRemapping,
}
cache.set(rawQuery, compilationResult)

return compilationResult
}

/**
* Processes the FROM clause to extract the main table alias and input stream
* Processes the FROM clause, handling direct collection references and subqueries.
* Populates `aliasToCollectionId` and `aliasRemapping` for per-alias subscription tracking.
*/
function processFrom(
from: CollectionRef | QueryRef,
allInputs: Record<string, KeyedStream>,
collections: Record<string, Collection>,
subscriptions: Record<string, CollectionSubscription>,
callbacks: Record<string, LazyCollectionCallbacks>,
lazyCollections: Set<string>,
lazySources: Set<string>,
optimizableOrderByCollections: Record<string, OrderByOptimizationInfo>,
cache: QueryCache,
queryMapping: QueryMapping
queryMapping: QueryMapping,
aliasToCollectionId: Record<string, string>,
aliasRemapping: Record<string, string>
): { alias: string; input: KeyedStream; collectionId: string } {
switch (from.type) {
case `collectionRef`: {
const input = allInputs[from.collection.id]
const input = allInputs[from.alias]
if (!input) {
throw new CollectionInputNotFoundError(from.collection.id)
throw new CollectionInputNotFoundError(
from.alias,
from.collection.id,
Object.keys(allInputs)
)
}
aliasToCollectionId[from.alias] = from.collection.id
return { alias: from.alias, input, collectionId: from.collection.id }
}
case `queryRef`: {
Expand All @@ -354,12 +400,29 @@ function processFrom(
collections,
subscriptions,
callbacks,
lazyCollections,
lazySources,
optimizableOrderByCollections,
cache,
queryMapping
)

// Pull up inner alias mappings from subquery compilation
Object.assign(aliasToCollectionId, subQueryResult.aliasToCollectionId)
Object.assign(aliasRemapping, subQueryResult.aliasRemapping)

// Create remapping when outer alias differs from inner alias.
// Example: .join({ activeUser: subquery }) where subquery uses .from({ user: ... })
// Creates: aliasRemapping['activeUser'] = 'user'
// Needed for subscription resolution during lazy loading.
const innerAlias = Object.keys(subQueryResult.aliasToCollectionId).find(
(alias) =>
subQueryResult.aliasToCollectionId[alias] ===
subQueryResult.collectionId
)
if (innerAlias && innerAlias !== from.alias) {
aliasRemapping[from.alias] = innerAlias
}

// Extract the pipeline from the compilation result
const subQueryInput = subQueryResult.pipeline

Expand Down
Loading
Loading