@@ -3,6 +3,7 @@ import { BucketInclusionReason, BucketPriority, DEFAULT_BUCKET_PRIORITY } from '
33import { BucketParameterQuerier , PendingQueriers } from '../BucketParameterQuerier.js' ;
44import { BucketSource , BucketSourceType , ResultSetDescription } from '../BucketSource.js' ;
55import { ColumnDefinition } from '../ExpressionType.js' ;
6+ import { CompatibilityContext } from '../quirks.js' ;
67import { SourceTableInterface } from '../SourceTableInterface.js' ;
78import { GetQuerierOptions , RequestedStream } from '../SqlSyncRules.js' ;
89import { TablePattern } from '../TablePattern.js' ;
@@ -12,8 +13,10 @@ import {
1213 EvaluationResult ,
1314 RequestParameters ,
1415 SourceSchema ,
15- SqliteRow
16+ SqliteRow ,
17+ TableRow
1618} from '../types.js' ;
19+ import { applyRowContext } from '../utils.js' ;
1720import { StreamVariant } from './variant.js' ;
1821
1922export class SyncStream implements BucketSource {
@@ -23,7 +26,11 @@ export class SyncStream implements BucketSource {
2326 variants : StreamVariant [ ] ;
2427 data : BaseSqlDataQuery ;
2528
26- constructor ( name : string , data : BaseSqlDataQuery ) {
29+ constructor (
30+ name : string ,
31+ data : BaseSqlDataQuery ,
32+ private readonly compatibility : CompatibilityContext
33+ ) {
2734 this . name = name ;
2835 this . subscribedToByDefault = false ;
2936 this . priority = DEFAULT_BUCKET_PRIORITY ;
@@ -165,13 +172,19 @@ export class SyncStream implements BucketSource {
165172 }
166173
167174 const stream = this ;
175+ const mappedRow = applyRowContext ( options . record , this . compatibility ) ;
176+ const row : TableRow = {
177+ sourceTable : options . sourceTable ,
178+ record : mappedRow
179+ } ;
180+
168181 return this . data . evaluateRowWithOptions ( {
169182 table : options . sourceTable ,
170- row : options . record ,
183+ row : applyRowContext ( options . record , this . compatibility ) ,
171184 bucketIds ( ) {
172185 const bucketIds : string [ ] = [ ] ;
173186 for ( const variant of stream . variants ) {
174- bucketIds . push ( ...variant . bucketIdsForRow ( stream . name , options ) ) ;
187+ bucketIds . push ( ...variant . bucketIdsForRow ( stream . name , row ) ) ;
175188 }
176189
177190 return bucketIds ;
0 commit comments