Skip to content

Commit 6c7c573

Browse files
lauckhartclaudemergify[bot]
authored
More client decomposition (#3414)
* Decouple DatasourceCache from ClientEndpointStore for reuse by remote nodes DatasourceCache now takes an options object with writer and optional persist/erase callbacks instead of a direct ClientEndpointStore reference. RemoteWriteParticipant takes a generic RemoteWriter function and uses it as both the writer and the transaction dedup key. ClientEndpointStore.participantFor() is removed as its logic moves into DatasourceCache.set(). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Normalize ClientBehaviorBacking change keys from attribute IDs to property names ClientBehaviorBacking uses primaryKey "id", causing its Datasource to report changed properties as attribute ID strings (e.g. "0" for onOff). Downstream consumers (ProtocolService, StateStream) expect property names. Add a reverse id-to-name map on RootSupervisor and use it in ClientBehaviorBacking.broadcastChanges to normalize before broadcast. Also fix StateStream dirty tracking to deepCopy individual changed properties rather than the full state object. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
1 parent 8d3eb2b commit 6c7c573

File tree

9 files changed

+124
-37
lines changed

9 files changed

+124
-37
lines changed

packages/node/src/behavior/internal/ClientBehaviorBacking.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,18 @@ export class ClientBehaviorBacking extends BehaviorBacking {
5656
return options;
5757
}
5858

59+
/**
60+
* Map attribute ID keys back to property names before broadcasting.
61+
*
62+
* The Datasource reports changed keys using the backing's primaryKey format. Since this backing uses
63+
* `primaryKey: "id"`, props are attribute ID strings (e.g. "0" for onOff). Downstream consumers
64+
* ({@link ChangeNotificationService}, {@link ProtocolService}) expect property names.
65+
*/
66+
protected override broadcastChanges(props: string[]) {
67+
const idToName = this.type.supervisor.propertyIdsAndNames;
68+
super.broadcastChanges(props.map(id => idToName.get(id) ?? id));
69+
}
70+
5971
override close(): MaybePromise {
6072
// Prepare the store for reuse in the case of reset
6173
(this.store as DatasourceCache).reclaimValues?.();

packages/node/src/behavior/supervision/RootSupervisor.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ export class RootSupervisor implements ValueSupervisor {
6161
#root: ValueSupervisor;
6262
#memberNames?: Set<string>;
6363
#propertyNamesAndIds?: Map<string, AttributeId | undefined>;
64+
#propertyIdsAndNames?: Map<string, string>;
6465

6566
/**
6667
* Create a new supervisor.
@@ -190,6 +191,24 @@ export class RootSupervisor implements ValueSupervisor {
190191
return names;
191192
}
192193

194+
/**
195+
* Reverse of {@link propertyNamesAndIds}: maps attribute ID strings to property names.
196+
*/
197+
get propertyIdsAndNames() {
198+
let ids = this.#propertyIdsAndNames;
199+
if (!ids) {
200+
ids = new Map();
201+
for (const member of this.#members) {
202+
const id = member.id;
203+
if (id !== undefined) {
204+
ids.set(id.toString(), camelize(member.name));
205+
}
206+
}
207+
this.#propertyIdsAndNames = ids;
208+
}
209+
return ids;
210+
}
211+
193212
/**
194213
* Retrieve members for schema in {@link scope}.
195214
*

packages/node/src/node/integration/StateStream.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import { Node } from "#node/Node.js";
1010
import { ServerNode } from "#node/ServerNode.js";
1111
import { Abort, deepCopy, Duration, Gate, Millis, Timer } from "@matter/general";
1212
import { DatatypeModel, FieldElement } from "@matter/model";
13-
import { Val } from "@matter/protocol";
1413
import { EndpointNumber } from "@matter/types";
1514
import { ChangeNotificationService } from "./ChangeNotificationService.js";
1615

@@ -103,14 +102,17 @@ export function StateStream(
103102
// Property update
104103
const state = stateOfBehavior(node.id, endpoint.number, behavior.id);
105104
state.queueEntry = undefined;
106-
let changes = endpoint.stateOf(behavior);
105+
106+
let changes: Record<string, unknown>;
107107
if (state.dirty) {
108-
changes = Object.fromEntries(
109-
[...state.dirty].map(name => [name, (changes as Val.Struct)[name]]),
110-
);
108+
const allState = endpoint.stateOf(behavior) as Record<string, unknown>;
109+
changes = {};
110+
for (const name of state.dirty) {
111+
changes[name] = deepCopy(allState[name]);
112+
}
111113
state.dirty = undefined;
112114
} else {
113-
changes = deepCopy(changes);
115+
changes = deepCopy(endpoint.stateOf(behavior));
114116
}
115117
yield {
116118
kind: "update",

packages/node/src/storage/client/ClientEndpointStore.ts

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,11 @@
66

77
import { EndpointStore } from "#storage/EndpointStore.js";
88
import { DatasourceStore } from "#storage/server/DatasourceStore.js";
9-
import { StorageContext, Transaction } from "@matter/general";
9+
import { StorageContext } from "@matter/general";
1010
import { PeerAddress } from "@matter/protocol";
1111
import type { EndpointNumber } from "@matter/types";
1212
import type { ClientNodeStore } from "./ClientNodeStore.js";
1313
import { DatasourceCache } from "./DatasourceCache.js";
14-
import { RemoteWriteParticipant } from "./RemoteWriteParticipant.js";
1514

1615
export class ClientEndpointStore extends EndpointStore {
1716
#owner: ClientNodeStore;
@@ -42,21 +41,18 @@ export class ClientEndpointStore extends EndpointStore {
4241
return this.initialValues.get("commissioning")?.["peerAddress"];
4342
}
4443

45-
participantFor(transaction: Transaction) {
46-
let participant = transaction.getParticipant(this.#owner);
47-
if (participant === undefined) {
48-
participant = new RemoteWriteParticipant(this.#owner);
49-
transaction.addParticipants(participant);
50-
}
51-
return participant;
52-
}
53-
5444
/**
5545
* Create a {@link Datasource.ExternallyMutableStore} for a behavior.
5646
*/
5747
createStoreForBehavior(behaviorId: string) {
5848
const initialValues = this.consumeInitialValues(behaviorId);
59-
return DatasourceCache(this, behaviorId, initialValues);
49+
return DatasourceCache({
50+
writer: this.#owner.write,
51+
endpointNumber: this.#number,
52+
behaviorId,
53+
initialValues,
54+
localWriter: this.#owner.localWriter,
55+
});
6056
}
6157

6258
/**

packages/node/src/storage/client/ClientNodeStore.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { InternalError, StorageContext, StorageContextFactory } from "@matter/ge
1010
import { EndpointNumber } from "@matter/types";
1111
import { NodeStore } from "../NodeStore.js";
1212
import { ClientEndpointStore } from "./ClientEndpointStore.js";
13+
import { LocalWriter } from "./LocalWriter.js";
1314
import type { RemoteWriter } from "./RemoteWriter.js";
1415

1516
/**
@@ -20,6 +21,7 @@ export class ClientNodeStore extends NodeStore {
2021
#storage?: StorageContext;
2122
#stores = new Map<EndpointNumber, ClientEndpointStore>();
2223
#write?: RemoteWriter;
24+
#localWriter?: LocalWriter;
2325
#isPreexisting: boolean;
2426
#onErase?: () => void;
2527

@@ -54,10 +56,25 @@ export class ClientNodeStore extends NodeStore {
5456
this.#write = write;
5557
}
5658

59+
get localWriter() {
60+
if (this.#localWriter === undefined) {
61+
this.#localWriter = new LocalWriter(this);
62+
}
63+
return this.#localWriter;
64+
}
65+
5766
get endpointStores() {
5867
return this.#stores.values();
5968
}
6069

70+
storeForEndpointNumber(endpointNumber: EndpointNumber) {
71+
const store = this.#stores.get(endpointNumber);
72+
if (store === undefined) {
73+
throw new InternalError(`No endpoint store for endpoint ${endpointNumber}`);
74+
}
75+
return store;
76+
}
77+
6178
override async erase() {
6279
this.#stores = new Map();
6380
this.#onErase?.();

packages/node/src/storage/client/DatasourceCache.ts

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77
import { Datasource } from "#behavior/state/managed/Datasource.js";
88
import { InternalError, MaybePromise, Transaction } from "@matter/general";
99
import { Val } from "@matter/protocol";
10-
import type { ClientEndpointStore } from "./ClientEndpointStore.js";
11-
import type { RemoteWriteParticipant } from "./RemoteWriteParticipant.js";
10+
import { EndpointNumber } from "@matter/types";
11+
import type { LocalWriter } from "./LocalWriter.js";
12+
import { RemoteWriteParticipant } from "./RemoteWriteParticipant.js";
13+
import type { RemoteWriter } from "./RemoteWriter.js";
1214

1315
/**
1416
* The default implementation of {@link Datasource.ExternallyMutableStore}.
@@ -27,11 +29,9 @@ export interface DatasourceCache extends Datasource.ExternallyMutableStore {
2729
erase(): MaybePromise<void>;
2830
}
2931

30-
export function DatasourceCache(
31-
store: ClientEndpointStore,
32-
behaviorId: string,
33-
initialValues: Val.Struct | undefined,
34-
): DatasourceCache {
32+
export function DatasourceCache(options: DatasourceCache.Options): DatasourceCache {
33+
const { writer, endpointNumber, behaviorId, initialValues } = options;
34+
3535
let version = initialValues?.[DatasourceCache.VERSION_KEY] as number;
3636
if (typeof version !== "number") {
3737
version = Datasource.UNKNOWN_VERSION;
@@ -41,8 +41,12 @@ export function DatasourceCache(
4141
initialValues,
4242

4343
async set(transaction: Transaction, values: Val.Struct) {
44-
const participant = store.participantFor(transaction) as RemoteWriteParticipant;
45-
participant.set(store.number, behaviorId, values);
44+
let participant = transaction.getParticipant(writer);
45+
if (participant === undefined) {
46+
participant = new RemoteWriteParticipant(writer);
47+
transaction.addParticipants(participant);
48+
}
49+
(participant as RemoteWriteParticipant).set(endpointNumber, behaviorId, values);
4650
},
4751

4852
async externalSet(values: Val.StructMap) {
@@ -52,7 +56,7 @@ export function DatasourceCache(
5256
}
5357

5458
const valuesStruct = Object.fromEntries(values) as Val.Struct;
55-
await store.set({ [behaviorId]: valuesStruct });
59+
await options.localWriter?.persist(endpointNumber, behaviorId, valuesStruct);
5660

5761
if (this.externalChangeListener) {
5862
await this.externalChangeListener(values);
@@ -83,7 +87,7 @@ export function DatasourceCache(
8387
},
8488

8589
async erase() {
86-
await store.eraseStoreForBehavior(behaviorId);
90+
await options.localWriter?.erase(endpointNumber, behaviorId);
8791
},
8892
};
8993
}
@@ -95,4 +99,12 @@ export namespace DatasourceCache {
9599
* This conveys the version to the {@link Datasource}.
96100
*/
97101
export const VERSION_KEY = "__version__";
102+
103+
export interface Options {
104+
writer: RemoteWriter;
105+
endpointNumber: EndpointNumber;
106+
behaviorId: string;
107+
initialValues?: Val.Struct;
108+
localWriter?: LocalWriter;
109+
}
98110
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/**
2+
* @license
3+
* Copyright 2022-2026 Matter.js Authors
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
import { MaybePromise } from "@matter/general";
8+
import { Val } from "@matter/protocol";
9+
import { EndpointNumber } from "@matter/types";
10+
import type { ClientNodeStore } from "./ClientNodeStore.js";
11+
12+
/**
13+
* Handles local storage persistence for client datasource caches.
14+
*/
15+
export class LocalWriter {
16+
#nodeStore: ClientNodeStore;
17+
18+
constructor(nodeStore: ClientNodeStore) {
19+
this.#nodeStore = nodeStore;
20+
}
21+
22+
async persist(endpointNumber: EndpointNumber, behaviorId: string, values: Val.Struct) {
23+
await this.#nodeStore.storeForEndpointNumber(endpointNumber).set({ [behaviorId]: values });
24+
}
25+
26+
erase(endpointNumber: EndpointNumber, behaviorId: string): MaybePromise<void> {
27+
return this.#nodeStore.storeForEndpointNumber(endpointNumber).eraseStoreForBehavior(behaviorId);
28+
}
29+
}

packages/node/src/storage/client/RemoteWriteParticipant.ts

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import { Transaction } from "@matter/general";
88
import { Val } from "@matter/protocol";
99
import { EndpointNumber } from "@matter/types";
10-
import type { ClientNodeStore } from "./ClientNodeStore.js";
1110
import type { RemoteWriter } from "./RemoteWriter.js";
1211

1312
/**
@@ -17,13 +16,13 @@ import type { RemoteWriter } from "./RemoteWriter.js";
1716
*/
1817
export class RemoteWriteParticipant implements Transaction.Participant {
1918
#request: RemoteWriter.Request = [];
20-
#store: ClientNodeStore;
19+
#writer: RemoteWriter;
2120

2221
/**
23-
* There is one participant for each transaction/client node pair. We therefore use the store as the role.
22+
* There is one participant for each transaction/writer pair. We use the writer function itself as the dedup key.
2423
*/
2524
get role() {
26-
return this.#store;
25+
return this.#writer;
2726
}
2827

2928
/**
@@ -45,18 +44,18 @@ export class RemoteWriteParticipant implements Transaction.Participant {
4544
const request = this.#request;
4645
this.#request = [];
4746

48-
await this.#store.write(request);
47+
await this.#writer(request);
4948
}
5049

5150
rollback() {
5251
this.#request = [];
5352
}
5453

5554
toString() {
56-
return `writer#${this.#store.id}`;
55+
return `remote-writer`;
5756
}
5857

59-
constructor(store: ClientNodeStore) {
60-
this.#store = store;
58+
constructor(writer: RemoteWriter) {
59+
this.#writer = writer;
6160
}
6261
}

packages/node/src/storage/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
export * from "./client/ClientNodeStore.js";
88
export * from "./client/ClientNodeStores.js";
99
export * from "./client/DatasourceCache.js";
10+
export * from "./client/LocalWriter.js";
1011
export * from "./NodeStore.js";
1112
export * from "./server/DatasourceStore.js";
1213
export * from "./server/ServerEndpointStore.js";

0 commit comments

Comments
 (0)