Skip to content

Commit eb6442d

Browse files
feat(records): add upsert and delete endpoints for mutable records (#1353)
This PR adds support for the upsert and delete endpoints for mutable records. This is the final PR in a series, enabling full feature parity of the Records and Streams API in the Javascript SDK.
1 parent 1e07129 commit eb6442d

File tree

3 files changed

+337
-2
lines changed

3 files changed

+337
-2
lines changed

packages/stable/src/__tests__/api/records.int.spec.ts

Lines changed: 260 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
// Copyright 2025 Cognite AS
22

3-
import { beforeAll, describe, expect, test, vi } from 'vitest';
3+
import { afterAll, beforeAll, describe, expect, test, vi } from 'vitest';
44
import type CogniteClient from '../../cogniteClient';
5-
import type { ContainerCreateDefinition } from '../../types';
5+
import type { ContainerCreateDefinition, RecordDelete } from '../../types';
66
import {
77
RECORDS_TEST_SPACE,
88
randomInt,
@@ -623,3 +623,261 @@ describe('records integration test', () => {
623623
);
624624
});
625625
});
626+
627+
describe('mutable records integration test', () => {
628+
let client: CogniteClient;
629+
630+
const mutableStreamId = 'sdk_test_mutable_stream';
631+
// Reuse the same space and container from the immutable records tests
632+
const testSpaceId = RECORDS_TEST_SPACE;
633+
const testContainerId = 'sdk_test_records_container';
634+
635+
// Track records created during tests for cleanup
636+
const recordsToCleanup: RecordDelete[] = [];
637+
638+
beforeAll(async () => {
639+
client = setupLoggedInClient();
640+
641+
// Check if mutable stream exists, create if not
642+
try {
643+
await client.streams.retrieve({ externalId: mutableStreamId });
644+
} catch {
645+
await client.streams.create({
646+
externalId: mutableStreamId,
647+
settings: {
648+
template: {
649+
name: 'BasicLiveData',
650+
},
651+
},
652+
});
653+
}
654+
}, 60_000);
655+
656+
afterAll(async () => {
657+
// Clean up any records created during tests
658+
if (recordsToCleanup.length > 0) {
659+
try {
660+
await client.records.delete(mutableStreamId, recordsToCleanup);
661+
} catch {
662+
// Ignore cleanup errors
663+
}
664+
}
665+
});
666+
667+
test('upsert creates, updates, and delete removes records', async () => {
668+
const testName = `upsert_lifecycle_${randomInt()}`;
669+
const recordId = `lifecycle_record_${randomInt()}`;
670+
const initialValue = 10.0;
671+
const updatedValue = 99.9;
672+
673+
const source = {
674+
type: 'container' as const,
675+
space: testSpaceId,
676+
externalId: testContainerId,
677+
};
678+
679+
// 1. Create a new record via upsert
680+
await client.records.upsert(mutableStreamId, [
681+
{
682+
space: testSpaceId,
683+
externalId: recordId,
684+
sources: [
685+
{
686+
source,
687+
properties: {
688+
name: testName,
689+
value: initialValue,
690+
timestamp: '2025-01-01T00:00:00.000Z',
691+
},
692+
},
693+
],
694+
},
695+
]);
696+
697+
// Verify the record was created
698+
const createdRecord = await vi.waitFor(
699+
async () => {
700+
const records = await client.records.filter(mutableStreamId, {
701+
sources: [{ source, properties: ['*'] }],
702+
filter: {
703+
equals: {
704+
property: [testSpaceId, testContainerId, 'name'],
705+
value: testName,
706+
},
707+
},
708+
});
709+
expect(records.length).toBe(1);
710+
return records[0];
711+
},
712+
{ timeout: 10_000, interval: 200 }
713+
);
714+
715+
expect(createdRecord.space).toBe(testSpaceId);
716+
expect(createdRecord.externalId).toBe(recordId);
717+
expect(createdRecord.properties[testSpaceId][testContainerId].value).toBe(
718+
initialValue
719+
);
720+
721+
// 2. Update the record via upsert
722+
await client.records.upsert(mutableStreamId, [
723+
{
724+
space: testSpaceId,
725+
externalId: recordId,
726+
sources: [
727+
{
728+
source,
729+
properties: {
730+
name: testName,
731+
value: updatedValue,
732+
timestamp: '2025-01-02T00:00:00.000Z',
733+
},
734+
},
735+
],
736+
},
737+
]);
738+
739+
// Verify the record was updated
740+
const updatedRecord = await vi.waitFor(
741+
async () => {
742+
const records = await client.records.filter(mutableStreamId, {
743+
sources: [{ source, properties: ['*'] }],
744+
filter: {
745+
equals: {
746+
property: [testSpaceId, testContainerId, 'name'],
747+
value: testName,
748+
},
749+
},
750+
});
751+
expect(records.length).toBe(1);
752+
expect(records[0].properties[testSpaceId][testContainerId].value).toBe(
753+
updatedValue
754+
);
755+
return records[0];
756+
},
757+
{ timeout: 10_000, interval: 200 }
758+
);
759+
760+
expect(updatedRecord.externalId).toBe(recordId);
761+
762+
// 3. Delete the record
763+
await client.records.delete(mutableStreamId, [
764+
{ space: testSpaceId, externalId: recordId },
765+
]);
766+
767+
// Verify the record is no longer returned by filter
768+
await vi.waitFor(
769+
async () => {
770+
const records = await client.records.filter(mutableStreamId, {
771+
sources: [{ source, properties: ['*'] }],
772+
filter: {
773+
equals: {
774+
property: [testSpaceId, testContainerId, 'name'],
775+
value: testName,
776+
},
777+
},
778+
});
779+
expect(records.length).toBe(0);
780+
},
781+
{ timeout: 10_000, interval: 200 }
782+
);
783+
}, 30_000);
784+
785+
test('delete is idempotent (does not error on non-existent records)', async () => {
786+
const nonExistentRecordId = `non_existent_${randomInt()}`;
787+
788+
// Deleting a non-existent record should not throw
789+
await expect(
790+
client.records.delete(mutableStreamId, [
791+
{ space: testSpaceId, externalId: nonExistentRecordId },
792+
])
793+
).resolves.toBeUndefined();
794+
});
795+
796+
test('sync shows deleted records with deleted status', async () => {
797+
const testName = `sync_delete_${randomInt()}`;
798+
const recordId = `sync_delete_record_${randomInt()}`;
799+
800+
const source = {
801+
type: 'container' as const,
802+
space: testSpaceId,
803+
externalId: testContainerId,
804+
};
805+
806+
// Create a record
807+
await client.records.ingest(mutableStreamId, [
808+
{
809+
space: testSpaceId,
810+
externalId: recordId,
811+
sources: [
812+
{
813+
source,
814+
properties: {
815+
name: testName,
816+
value: 50,
817+
timestamp: '2025-01-01T00:00:00.000Z',
818+
},
819+
},
820+
],
821+
},
822+
]);
823+
824+
// Wait for record to be visible via sync with 'created' status
825+
await vi.waitFor(
826+
async () => {
827+
const response = await client.records.sync(mutableStreamId, {
828+
initializeCursor: '1d-ago',
829+
sources: [{ source, properties: ['*'] }],
830+
filter: {
831+
equals: {
832+
property: [testSpaceId, testContainerId, 'name'],
833+
value: testName,
834+
},
835+
},
836+
});
837+
expect(response.items.length).toBe(1);
838+
expect(response.items[0].status).toBe('created');
839+
},
840+
{ timeout: 10_000, interval: 200 }
841+
);
842+
843+
// Delete the record
844+
await client.records.delete(mutableStreamId, [
845+
{ space: testSpaceId, externalId: recordId },
846+
]);
847+
848+
// Verify sync shows the record with 'deleted' status (tombstone)
849+
await vi.waitFor(
850+
async () => {
851+
// Use autoPagingToArray to get all sync items and find the deleted one
852+
const items = await client.records
853+
.sync(mutableStreamId, {
854+
initializeCursor: '1d-ago',
855+
filter: {
856+
and: [
857+
{
858+
equals: {
859+
property: ['space'],
860+
value: testSpaceId,
861+
},
862+
},
863+
{
864+
equals: {
865+
property: ['externalId'],
866+
value: recordId,
867+
},
868+
},
869+
],
870+
},
871+
})
872+
.autoPagingToArray({ limit: 100 });
873+
874+
// Find the deleted tombstone record
875+
const deletedRecord = items.find((r) => r.status === 'deleted');
876+
expect(deletedRecord).toBeDefined();
877+
expect(deletedRecord?.externalId).toBe(recordId);
878+
expect(deletedRecord?.space).toBe(testSpaceId);
879+
},
880+
{ timeout: 10_000, interval: 200 }
881+
);
882+
}, 30_000);
883+
});

packages/stable/src/api/records/recordsApi.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import type {
66
RecordAggregateRequest,
77
RecordAggregateResponse,
88
RecordAggregateResults,
9+
RecordDelete,
910
RecordFilterRequest,
1011
RecordFilterResponse,
1112
RecordItem,
@@ -53,6 +54,68 @@ export class RecordsAPI extends BaseResourceAPI<RecordItem> {
5354
});
5455
};
5556

57+
/**
58+
* [Upsert records into a stream](https://developer.cognite.com/api#tag/Records/operation/upsertRecords)
59+
*
60+
* Create or update records in a mutable stream. If a record with the same
61+
* space + externalId already exists, it will be fully replaced (no partial updates).
62+
*
63+
* **Note:** This endpoint is only available for mutable streams.
64+
*
65+
* ```js
66+
* await client.records.upsert('my_mutable_stream', [
67+
* {
68+
* space: 'mySpace',
69+
* externalId: 'record1',
70+
* sources: [
71+
* {
72+
* source: { type: 'container', space: 'mySpace', externalId: 'myContainer' },
73+
* properties: { temperature: 30.0, timestamp: '2025-01-01T00:00:00Z' }
74+
* }
75+
* ]
76+
* }
77+
* ]);
78+
* ```
79+
*/
80+
public upsert = async (
81+
streamExternalId: string,
82+
items: RecordWrite[]
83+
): Promise<void> => {
84+
const path = this.url(
85+
`${encodeURIComponent(streamExternalId)}/records/upsert`
86+
);
87+
await this.post<object>(path, {
88+
data: { items },
89+
});
90+
};
91+
92+
/**
93+
* [Delete records from a stream](https://developer.cognite.com/api#tag/Records/operation/deleteRecords)
94+
*
95+
* Delete records from a mutable stream. The operation is idempotent - deleting
96+
* non-existent records will not cause an error.
97+
*
98+
* **Note:** This endpoint is only available for mutable streams.
99+
*
100+
* ```js
101+
* await client.records.delete('my_mutable_stream', [
102+
* { space: 'mySpace', externalId: 'record1' },
103+
* { space: 'mySpace', externalId: 'record2' }
104+
* ]);
105+
* ```
106+
*/
107+
public delete = async (
108+
streamExternalId: string,
109+
items: RecordDelete[]
110+
): Promise<void> => {
111+
const path = this.url(
112+
`${encodeURIComponent(streamExternalId)}/records/delete`
113+
);
114+
await this.post<object>(path, {
115+
data: { items },
116+
});
117+
};
118+
56119
/**
57120
* [Filter records from a stream](https://developer.cognite.com/api#tag/Records/operation/filterRecords)
58121
*

packages/stable/src/api/records/types.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,20 @@ export interface RecordWrite {
8282
sources: RecordData[];
8383
}
8484

85+
/**
86+
* Record identifier for deletion (only space + externalId needed)
87+
*/
88+
export interface RecordDelete {
89+
/**
90+
* The space that the record belongs to
91+
*/
92+
space: RecordSpaceId;
93+
/**
94+
* External ID of the record to delete
95+
*/
96+
externalId: RecordExternalId;
97+
}
98+
8599
/**
86100
* Source selector for specifying which container properties to return
87101
*/

0 commit comments

Comments
 (0)