Skip to content

Commit b6e4e44

Browse files
committed
Add broader partial fetch support including "order" and "where" clauses.
1 parent 06ae6e3 commit b6e4e44

File tree

2 files changed

+153
-42
lines changed

2 files changed

+153
-42
lines changed

packages/trailbase-db-collection/e2e/trailbase.e2e.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import {
1616
createJoinsTestSuite,
1717
createLiveUpdatesTestSuite,
1818
createMutationsTestSuite,
19-
// createPaginationTestSuite,
19+
createPaginationTestSuite,
2020
createPredicatesTestSuite,
2121
createProgressiveTestSuite,
2222
generateSeedData
@@ -402,7 +402,7 @@ describe(`TrailBase Collection E2E Tests`, async () => {
402402

403403
// Run all shared test suites
404404
createPredicatesTestSuite(getConfig)
405-
// createPaginationTestSuite(getConfig)
405+
createPaginationTestSuite(getConfig)
406406
createJoinsTestSuite(getConfig)
407407
createDeduplicationTestSuite(getConfig)
408408
createCollationTestSuite(getConfig)

packages/trailbase-db-collection/src/trailbase.ts

Lines changed: 151 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
ExpectedUpdateTypeError,
77
TimeoutWaitingForIdsError,
88
} from './errors'
9-
import type { Event, RecordApi } from 'trailbase'
9+
import type { CompareOp, Event, FilterOrComposite, RecordApi } from 'trailbase'
1010

1111
import type {
1212
BaseCollectionConfig,
@@ -178,45 +178,71 @@ export function trailBaseCollectionOptions<
178178
sync: (params: SyncParams) => {
179179
const { begin, write, commit, markReady } = params
180180

181-
// Initial fetch.
182-
async function initialFetch() {
183-
const limit = 256
184-
let response = await config.recordApi.list({
185-
pagination: {
186-
limit,
187-
},
188-
})
189-
let cursor = response.cursor
190-
let got = 0
181+
// NOTE: We cache cursors from prior fetches. TanStack/db expects that
182+
// cursors can be derived from a key, which is not true for TB, since
183+
// cursors are encrypted. This is leaky and therefore not ideal.
184+
const cursors = new Map<string | number, string>
191185

192-
begin()
186+
// Load (more) data.
187+
async function load(opts: LoadSubsetOptions) {
188+
const lastKey = opts.cursor?.lastKey
189+
let cursor: string | undefined = lastKey !== undefined ? cursors.get(lastKey) : undefined
190+
let offset: number | undefined = (opts.offset ?? 0) > 0 ? opts.offset : undefined
191+
192+
const order: Array<string> | undefined = buildOrder(opts)
193+
const filters: Array<FilterOrComposite> | undefined = buildFilters(opts, config)
194+
195+
let remaining: number = opts.limit ?? Number.MAX_VALUE
196+
if (remaining <= 0) {
197+
return
198+
}
193199

194200
while (true) {
201+
const limit = Math.min(remaining, 256)
202+
const response = await config.recordApi.list({
203+
pagination: {
204+
limit,
205+
offset,
206+
cursor,
207+
},
208+
order,
209+
filters,
210+
})
211+
195212
const length = response.records.length
196-
if (length === 0) break
213+
if (length === 0) {
214+
// Drained - read everything.
215+
break
216+
}
197217

198-
got = got + length
218+
begin()
199219

200-
for (const item of response.records) {
220+
for (let i = 0; i < Math.min(length, remaining); ++i) {
201221
write({
202222
type: `insert`,
203-
value: parse(item),
223+
value: parse(response.records[i]!),
204224
})
205225
}
206226

207-
if (length < limit) break
227+
commit()
208228

209-
response = await config.recordApi.list({
210-
pagination: {
211-
limit,
212-
cursor,
213-
offset: cursor === undefined ? got : undefined,
214-
},
215-
})
216-
cursor = response.cursor
217-
}
229+
remaining -= length
230+
231+
// Drained or read enough.
232+
if (length < limit || remaining <= 0) {
233+
if (response.cursor) {
234+
cursors.set(getKey(parse(response.records.at(-1)!)), response.cursor)
235+
}
236+
break
237+
}
218238

219-
commit()
239+
// Update params for next iteration.
240+
if (offset !== undefined) {
241+
offset += length
242+
} else {
243+
cursor = response.cursor
244+
}
245+
}
220246
}
221247

222248
// Afterwards subscribe.
@@ -267,7 +293,8 @@ export function trailBaseCollectionOptions<
267293
try {
268294
// Eager mode: perform initial fetch to populate everything
269295
if (internalSyncMode === `eager`) {
270-
await initialFetch()
296+
// Load everything on initial load.
297+
await load({})
271298
fullSyncCompleted = true
272299
}
273300
} catch (e) {
@@ -309,22 +336,11 @@ export function trailBaseCollectionOptions<
309336
return
310337
}
311338

312-
const loadSubset = async (opts: LoadSubsetOptions): Promise<void> => {
313-
if (opts.cursor || opts.orderBy || opts.subscription || opts.offset || opts.limit || opts.where) {
314-
console.warn(`Got unsupported subset opts: ${opts}`);
315-
}
316-
317-
// TODO: Support (some) of the above subset options to enable pagination etc.
318-
await initialFetch()
319-
fullSyncCompleted = true
320-
}
321-
322339
return {
323-
loadSubset,
340+
loadSubset: load,
324341
getSyncMetadata: () =>
325342
({
326343
syncMode: internalSyncMode,
327-
fullSyncComplete: fullSyncCompleted,
328344
}) as const,
329345
}
330346
},
@@ -402,3 +418,98 @@ export function trailBaseCollectionOptions<
402418
},
403419
}
404420
}
421+
422+
function buildOrder(opts: LoadSubsetOptions): undefined | Array<string> {
423+
return opts.orderBy?.map((o) => {
424+
switch (o.expression.type) {
425+
case "ref": {
426+
const field = o.expression.path[0]
427+
if (o.compareOptions.direction == "asc") {
428+
return `+${field}`
429+
}
430+
return `-${field}`
431+
}
432+
default: {
433+
console.warn("Skipping unsupported order clause:", JSON.stringify(o.expression))
434+
return undefined
435+
}
436+
}
437+
}).filter((f) => f !== undefined)
438+
}
439+
440+
function buildCompareOp(name: string): CompareOp | undefined {
441+
switch (name) {
442+
case "eq":
443+
return "equal"
444+
case "ne":
445+
return "notEqual"
446+
case "gt":
447+
return "greaterThan"
448+
case "gte":
449+
return "greaterThanEqual"
450+
case "lt":
451+
return "lessThan"
452+
case "lte":
453+
return "lessThanEqual"
454+
default:
455+
return undefined
456+
}
457+
}
458+
459+
460+
function buildFilters<
461+
TItem extends ShapeOf<TRecord>,
462+
TRecord extends ShapeOf<TItem> = TItem,
463+
TKey extends string | number = string | number,
464+
>(opts: LoadSubsetOptions, config: TrailBaseCollectionConfig<TItem, TRecord, TKey>): undefined | Array<FilterOrComposite> {
465+
const where = opts.where
466+
if (where === undefined) {
467+
return undefined
468+
}
469+
470+
function serializeValue<T = any>(column: string, value: T): string {
471+
const convert = (config.serialize as any)[column]
472+
if (convert) {
473+
return `${convert(value)}`
474+
}
475+
476+
if (typeof value === "boolean") {
477+
return value ? "1" : "0"
478+
}
479+
480+
return `${value}`
481+
}
482+
483+
switch (where.type) {
484+
case "func": {
485+
const field = where.args[0]
486+
const val = where.args[1]
487+
488+
const op = buildCompareOp(where.name)
489+
if (op === undefined) {
490+
break
491+
}
492+
493+
if (field?.type === "ref" && val?.type === "val") {
494+
const column = field.path.at(0)
495+
if (column) {
496+
const f = [{
497+
column: field.path.at(0) ?? "",
498+
op,
499+
value: serializeValue(column, val.value),
500+
}]
501+
502+
return f
503+
}
504+
}
505+
break
506+
}
507+
case "ref":
508+
case "val":
509+
break
510+
}
511+
512+
console.warn("where clause which is not (yet) supported", opts.where)
513+
514+
return undefined
515+
}

0 commit comments

Comments
 (0)