Skip to content

Commit 00c8ec5

Browse files
authored
Merge pull request #273 from rush-db/fix/nested-upsert-deduplication-issue
Fix deduplication issue for nested upsert
2 parents 150ef00 + d28281b commit 00c8ec5

File tree

4 files changed

+127
-5
lines changed

4 files changed

+127
-5
lines changed

.changeset/red-boxes-float.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
'@rushdb/javascript-sdk': patch
3+
'rushdb-core': patch
4+
'rushdb-docs': patch
5+
'@rushdb/mcp-server': patch
6+
'rushdb-dashboard': patch
7+
'rushdb-website': patch
8+
---
9+
10+
Fix deduplication issue for nested upsert
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import path from 'path'
2+
import dotenv from 'dotenv'
3+
4+
dotenv.config({ path: path.resolve(__dirname, '../.env') })
5+
6+
import RushDB from '../src/index.node'
7+
8+
jest.setTimeout(60_000)
9+
10+
describe('records.importJson upsert nested linking (e2e)', () => {
11+
const apiKey = process.env.RUSHDB_API_KEY
12+
const apiUrl = process.env.RUSHDB_API_URL || 'http://localhost:3000'
13+
14+
if (!apiKey) {
15+
it('skips because RUSHDB_API_KEY is not set', () => {
16+
expect(true).toBe(true)
17+
})
18+
return
19+
}
20+
21+
const db = new RushDB(apiKey, { url: apiUrl })
22+
23+
it('reuses child record and links it to new parent when parent upsert creates a new record', async () => {
24+
const tenantId = `nested-upsert-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 8)}`
25+
26+
const payloadA = {
27+
name: 'Abshire - Farrell',
28+
address: '928 Conroy Village Suite 785',
29+
foundedAt: '1949-10-06T22:07:28.709Z',
30+
rating: 1.9,
31+
tenantId,
32+
department: [
33+
{
34+
name: 'Sports',
35+
description: 'The sleek and filthy Gloves comes with sky blue LED lighting for smart functionality',
36+
tenantId
37+
}
38+
]
39+
}
40+
41+
const payloadB = {
42+
...payloadA,
43+
rating: 2 // slight change should force new top-level record when mergeBy is all keys
44+
}
45+
46+
// First import creates parent and child
47+
await db.records.importJson({ label: 'Company', data: payloadA, options: { suggestTypes: true } })
48+
49+
// Second import should create new parent record but reuse existing child and link it
50+
await db.records.importJson({
51+
label: 'Company',
52+
data: payloadB,
53+
options: { suggestTypes: true, mergeBy: [] }
54+
})
55+
56+
// Verify there are two Company records and one Department record linked to both via default relation
57+
const companies = await db.records.find({ labels: ['Company'], where: { tenantId } })
58+
expect(companies.total).toBe(2)
59+
60+
const departments = await db.records.find({ labels: ['department'], where: { tenantId } })
61+
// label normalization in service uses original key; depending on capitalization option it might be 'department'
62+
// The label comes from the original key 'department' in the payload
63+
expect(departments.total).toBe(1)
64+
65+
// Fetch relations and ensure both companies are connected to the same department
66+
const relationsResp = await db.relationships.find({ where: { tenantId } })
67+
const rels = relationsResp.data.filter(
68+
(r) =>
69+
r.type &&
70+
(r.type.includes('RUSHDB_DEFAULT_RELATION') || r.type.includes('__RUSHDB__RELATION__DEFAULT__'))
71+
)
72+
73+
const departmentId = departments.data[0].id()
74+
const companyIds = companies.data.map((c) => c.id())
75+
76+
// For each company, there must be at least one relation to the department (either direction)
77+
const relatedPairs = new Set(rels.map((r) => `${r.sourceId}->${r.targetId}`))
78+
for (const cid of companyIds) {
79+
const has = relatedPairs.has(`${cid}->${departmentId}`) || relatedPairs.has(`${departmentId}->${cid}`)
80+
expect(has).toBe(true)
81+
}
82+
83+
// Cleanup
84+
await db.records.delete({ where: { tenantId } })
85+
})
86+
})

platform/core/src/core/entity/entity-query.service.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,10 @@ export class EntityQueryService {
159159

160160
if (withResults) {
161161
queryBuilder.append(
162-
`RETURN collect(DISTINCT record {${PROPERTY_WILDCARD_PROJECTION}, ${label()}}) as data`
162+
`RETURN collect({draftId: r.id, persistedId: record.${RUSHDB_KEY_ID}}) as idmap, collect(DISTINCT record {${PROPERTY_WILDCARD_PROJECTION}, ${label()}}) as data`
163163
)
164+
} else {
165+
queryBuilder.append(`RETURN collect({draftId: r.id, persistedId: record.${RUSHDB_KEY_ID}}) as idmap`)
164166
}
165167

166168
return queryBuilder.getQuery()
@@ -220,8 +222,10 @@ export class EntityQueryService {
220222

221223
if (withResults) {
222224
queryBuilder.append(
223-
`RETURN collect(DISTINCT record {${PROPERTY_WILDCARD_PROJECTION}, ${label()}}) as data`
225+
`RETURN collect({draftId: r.id, persistedId: record.${RUSHDB_KEY_ID}}) as idmap, collect(DISTINCT record {${PROPERTY_WILDCARD_PROJECTION}, ${label()}}) as data`
224226
)
227+
} else {
228+
queryBuilder.append(`RETURN collect({draftId: r.id, persistedId: record.${RUSHDB_KEY_ID}}) as idmap`)
225229
}
226230

227231
return queryBuilder.getQuery()

platform/core/src/core/entity/import-export/import.service.ts

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,8 @@ export class ImportService {
307307

308308
// @TODO: Accumulate result only if records <= 1000. Otherwise - ignore options.returnResult
309309
let result = []
310+
// Map draft record ids (generated during serialization) to actual persisted record ids after upsert/create
311+
const draftToPersistedId = new Map<string, string>()
310312
for (let i = 0; i < records.length; i += CHUNK_SIZE) {
311313
const recordsChunk = records.slice(i, i + CHUNK_SIZE)
312314

@@ -317,13 +319,33 @@ export class ImportService {
317319
projectId
318320
})
319321

322+
// Extract id map and results (if requested)
323+
const idmap = data.records?.[0]?.get('idmap') ?? []
324+
if (Array.isArray(idmap)) {
325+
for (const item of idmap) {
326+
if (item && item.draftId && item.persistedId) {
327+
draftToPersistedId.set(item.draftId, item.persistedId)
328+
}
329+
}
330+
}
331+
320332
if (options.returnResult) {
321-
result = result.concat(data.records?.[0]?.get('data'))
333+
const chunkData = data.records?.[0]?.get('data')
334+
if (Array.isArray(chunkData)) {
335+
result = result.concat(chunkData)
336+
}
322337
}
323338
}
324339

325-
for (let i = 0; i < relations.length; i += CHUNK_SIZE) {
326-
const relationsChunk = relations.slice(i, i + CHUNK_SIZE)
340+
// Remap relations to persisted IDs in case upsert matched existing records
341+
const remappedRelations = relations.map((rel) => ({
342+
source: draftToPersistedId.get(rel.source) ?? rel.source,
343+
target: draftToPersistedId.get(rel.target) ?? rel.target,
344+
type: rel.type
345+
}))
346+
347+
for (let i = 0; i < remappedRelations.length; i += CHUNK_SIZE) {
348+
const relationsChunk = remappedRelations.slice(i, i + CHUNK_SIZE)
327349
await this.processRelationshipsChunk({
328350
relationsChunk,
329351
projectId,

0 commit comments

Comments
 (0)