Skip to content

Commit d286451

Browse files
Fix unbounded session table growth in directory (#614)
## Summary - **Stringify Address objects** in the Directory MQTT client. - **Delete old sessions after change-notify completes.** The session table was append-only — every birth inserted a row but nothing ever deleted historical sessions. Over years this accumulated millions of rows (and ~3.5x that in `schema_used`). Now `on_session_notify` deletes all non-current sessions for a device once it's finished computing the schema diff. Existing installs clean up organically as devices rebirth. - **Add vitest test framework** to `acs-directory` and `js-service-client` with 15 tests covering Address interning, session cleanup queries, and `find_schemas` metric tree traversal.
2 parents e71e1e3 + 3018395 commit d286451

File tree

10 files changed

+193
-13
lines changed

10 files changed

+193
-13
lines changed

acs-directory/lib/mqttcli.js

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,12 @@ export default class MQTTCli {
321321

322322
if (notify.length)
323323
this.publish_changed(notify);
324+
325+
/* Clean up old sessions now we've finished the change-notify.
326+
* Only do this for the current session; the old session's
327+
* notify will also fire but we skip it here. */
328+
if (session.next_for_device == null)
329+
await this.model.cleanup_old_sessions(id);
324330
}
325331

326332
async on_service_notify(id) {
@@ -446,7 +452,7 @@ export default class MQTTCli {
446452

447453
async on_birth(address, payload) {
448454
this.log("device", `Registering BIRTH for ${address}`);
449-
this.online.add(address);
455+
this.online.add(address.toString());
450456

451457
let tree;
452458
if (payload.uuid === UUIDs.FactoryPlus) {
@@ -479,7 +485,7 @@ export default class MQTTCli {
479485
this.log("device", `Registering DEATH for ${address}`);
480486

481487
this.alerts.delete(address);
482-
this.online.delete(address);
488+
this.online.delete(address.toString());
483489
await this.model.death({address, time});
484490

485491
this.log("device", `Finished DEATH for ${address}`);
@@ -546,7 +552,7 @@ export default class MQTTCli {
546552
* rebirth any device we haven't seen a birth for, even if the
547553
* database says it's online. This is important because we might
548554
* have the wrong schema information. */
549-
if (this.online.has(addr) || pending[addr]) return false;
555+
if (this.online.has(addr.toString()) || pending[addr]) return false;
550556

551557
/* Mark that we're working on this device and wait 5-10s to see if
552558
* it rebirths on its own. */
@@ -555,7 +561,7 @@ export default class MQTTCli {
555561

556562
/* Clear our marker first so we retry next time */
557563
delete (pending[addr]);
558-
if (this.online.has(addr)) return false;
564+
if (this.online.has(addr.toString())) return false;
559565

560566
sent[addr] = Date.now();
561567
return true;

acs-directory/lib/queries.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,14 @@ export default class Queries {
183183
return dbr.rows.map(r => r.uuid);
184184
}
185185

186+
async cleanup_old_sessions(session_id) {
187+
await this.query(`
188+
delete from session
189+
where device = (select device from session where id = $1)
190+
and next_for_device is not null
191+
`, [session_id]);
192+
}
193+
186194
async record_schema(session, schema) {
187195
const schid = await this.find_or_create("schema", schema);
188196
if (schid == null) return;

acs-directory/package.json

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,21 @@
1616
"type": "module",
1717
"scripts": {
1818
"mqtt": "node bin/directory-mqtt",
19-
"webapi": "node bin/directory-webapi"
19+
"webapi": "node bin/directory-webapi",
20+
"test": "vitest run"
2021
},
2122
"dependencies": {
23+
"@amrc-factoryplus/pg-client": "file:../lib/js-pg-client",
2224
"@amrc-factoryplus/rx-client": "file:../lib/js-rx-client",
2325
"@amrc-factoryplus/rx-util": "file:../lib/js-rx-util",
24-
"@amrc-factoryplus/service-client": "file:../lib/js-service-client",
2526
"@amrc-factoryplus/service-api": "file:../lib/js-service-api",
26-
"@amrc-factoryplus/pg-client": "file:../lib/js-pg-client",
27+
"@amrc-factoryplus/service-client": "file:../lib/js-service-client",
2728
"async": "^3.2.4",
2829
"express": "^4.17.1",
29-
"express-openapi-validator": "^4.13.2"
30+
"express-openapi-validator": "^4.13.2",
31+
"long": "^5.3.2"
32+
},
33+
"devDependencies": {
34+
"vitest": "^3.0.4"
3035
}
3136
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import { describe, it, expect } from "vitest";
2+
import { MetricBranch } from "@amrc-factoryplus/service-client";
3+
import MQTTCli from "../lib/mqttcli.js";
4+
5+
function make_cli() {
6+
return Object.create(MQTTCli.prototype);
7+
}
8+
9+
describe("find_schemas", () => {
10+
it("returns empty set for empty tree", () => {
11+
const cli = make_cli();
12+
const result = cli.find_schemas({});
13+
expect(result.size).toBe(0);
14+
});
15+
16+
it("finds top-level Schema_UUID", () => {
17+
const cli = make_cli();
18+
const tree = {
19+
Schema_UUID: { value: "schema-1" },
20+
};
21+
const result = cli.find_schemas(tree);
22+
expect([...result]).toEqual(["schema-1"]);
23+
});
24+
25+
it("finds nested Schema_UUIDs in MetricBranch", () => {
26+
const cli = make_cli();
27+
const branch = new MetricBranch();
28+
branch.Schema_UUID = { value: "schema-nested" };
29+
const tree = { sub: branch };
30+
const result = cli.find_schemas(tree);
31+
expect(result.has("schema-nested")).toBe(true);
32+
});
33+
34+
it("finds multiple schemas at different levels", () => {
35+
const cli = make_cli();
36+
const branch = new MetricBranch();
37+
branch.Schema_UUID = { value: "schema-2" };
38+
const tree = {
39+
Schema_UUID: { value: "schema-1" },
40+
sub: branch,
41+
};
42+
const result = cli.find_schemas(tree);
43+
expect(result.size).toBe(2);
44+
expect(result.has("schema-1")).toBe(true);
45+
expect(result.has("schema-2")).toBe(true);
46+
});
47+
48+
it("deduplicates repeated schema UUIDs", () => {
49+
const cli = make_cli();
50+
const branch = new MetricBranch();
51+
branch.Schema_UUID = { value: "schema-1" };
52+
const tree = {
53+
Schema_UUID: { value: "schema-1" },
54+
sub: branch,
55+
};
56+
const result = cli.find_schemas(tree);
57+
expect(result.size).toBe(1);
58+
});
59+
});
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { describe, it, expect, vi } from "vitest";
2+
import Queries from "../lib/queries.js";
3+
4+
describe("cleanup_old_sessions", () => {
5+
it("issues a delete for non-current sessions by session id", async () => {
6+
const mockQuery = vi.fn().mockResolvedValue({ rows: [], rowCount: 0 });
7+
const q = new Queries(mockQuery);
8+
9+
await q.cleanup_old_sessions(42);
10+
11+
expect(mockQuery).toHaveBeenCalledOnce();
12+
const [sql, params] = mockQuery.mock.calls[0];
13+
expect(sql).toMatch(/delete from session/i);
14+
expect(sql).toMatch(/next_for_device is not null/i);
15+
expect(params).toEqual([42]);
16+
});
17+
});
18+
19+
describe("session_notification_info", () => {
20+
it("returns session info with prev_for_device", async () => {
21+
const row = {
22+
device: "uuid-1",
23+
group_id: "G",
24+
node_id: "N",
25+
device_id: "D",
26+
next_for_device: null,
27+
next_for_address: null,
28+
prev_for_device: 10,
29+
};
30+
const mockQuery = vi.fn().mockResolvedValue({ rows: [row] });
31+
const q = new Queries(mockQuery);
32+
33+
const result = await q.session_notification_info(42);
34+
35+
expect(result).toEqual(row);
36+
expect(result.prev_for_device).toBe(10);
37+
});
38+
});

acs-directory/vitest.config.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import { defineConfig } from "vitest/config";
2+
3+
export default defineConfig({
4+
test: {
5+
include: ["test/**/*.test.js"],
6+
},
7+
});

lib/js-service-client/lib/sparkplug/util.js

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,10 @@ export class Address {
6767
* @arg device Device ID
6868
*/
6969
constructor (group, node, device) {
70-
this.group = group;
71-
this.node = node;
72-
7370
if (device == null || device == "")
7471
device = undefined;
72+
this.group = group;
73+
this.node = node;
7574
this.device = device;
7675
}
7776

lib/js-service-client/package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"main": "lib/index.js",
66
"type": "module",
77
"scripts": {
8-
"test": "echo \"Error: no test specified\" && exit 1"
8+
"test": "vitest run"
99
},
1010
"keywords": [],
1111
"author": "",
@@ -26,6 +26,7 @@
2626
"@eslint/eslintrc": "^3.1.0",
2727
"@eslint/js": "^9.13.0",
2828
"eslint": "^9.13.0",
29-
"globals": "^15.11.0"
29+
"globals": "^15.11.0",
30+
"vitest": "^3.0.4"
3031
}
3132
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import { describe, it, expect } from "vitest";
2+
import { Address } from "../lib/sparkplug/util.js";
3+
4+
describe("Address", () => {
5+
it("treats empty string device as node address", () => {
6+
const a = new Address("G", "N", "");
7+
expect(a.device).toBeUndefined();
8+
});
9+
10+
it("treats null device as node address", () => {
11+
const a = new Address("G", "N", null);
12+
expect(a.device).toBeUndefined();
13+
});
14+
15+
it("toString formats node address", () => {
16+
const a = new Address("G", "N");
17+
expect(a.toString()).toBe("G/N");
18+
});
19+
20+
it("toString formats device address", () => {
21+
const a = new Address("G", "N", "D");
22+
expect(a.toString()).toBe("G/N/D");
23+
});
24+
25+
it("equals compares by value", () => {
26+
const a = new Address("G", "N", "D");
27+
const b = new Address("G", "N", "D");
28+
expect(a.equals(b)).toBe(true);
29+
});
30+
31+
it("equals returns false for different addresses", () => {
32+
const a = new Address("G", "N", "D1");
33+
const b = new Address("G", "N", "D2");
34+
expect(a.equals(b)).toBe(false);
35+
});
36+
37+
it("parse round-trips with toString", () => {
38+
const a = new Address("G", "N", "D");
39+
const b = Address.parse(a.toString());
40+
expect(b.equals(a)).toBe(true);
41+
});
42+
43+
it("parent_node returns node address", () => {
44+
const a = new Address("G", "N", "D");
45+
const parent = a.parent_node();
46+
expect(parent.group).toBe("G");
47+
expect(parent.node).toBe("N");
48+
expect(parent.device).toBeUndefined();
49+
});
50+
});
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import { defineConfig } from "vitest/config";
2+
3+
export default defineConfig({
4+
test: {
5+
include: ["test/**/*.test.js"],
6+
},
7+
});

0 commit comments

Comments
 (0)