Skip to content

Commit 4cf86c1

Browse files
clean up subscriptions
1 parent abfcba8 commit 4cf86c1

File tree

11 files changed

+367
-199
lines changed

11 files changed

+367
-199
lines changed

core/setup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
# "pyyaml==6.0.2",
2727
# "rpi-lgpio==0.6"
2828
# "lgpio==0.2.2.0"
29-
# pillow==12.0.0
30-
# adafruit-circuitpython-ssd1306==2.12.22
29+
# "pillow==12.0.0"
30+
# "adafruit-circuitpython-ssd1306==2.12.22"
3131
]
3232

3333

frontend/src/Inventory.jsx

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ function WorkerCard({worker, config, leaderVersion}) {
414414

415415

416416
const [experimentAssigned, setExperimentAssigned] = React.useState(null)
417-
const {client, subscribeToTopic} = useMQTT();
417+
const {client, subscribeToTopic, unsubscribeFromTopic} = useMQTT();
418418
const selfTestDefinition = useSelfTestJobDefinition();
419419
const [state, setState] = React.useState(null)
420420
const [versions, setVersions] = React.useState({})
@@ -584,36 +584,46 @@ function WorkerCard({worker, config, leaderVersion}) {
584584

585585

586586
React.useEffect(() => {
587-
if (unit && client) {
588-
subscribeToTopic(`pioreactor/${unit}/$experiment/monitor/+`, onMonitorData, "WorkerCard");
587+
if (!unit || !client) {
588+
return undefined;
589+
}
590+
const topic = `pioreactor/${unit}/$experiment/monitor/+`;
591+
subscribeToTopic(topic, onMonitorData, "WorkerCard");
589592

590-
const fetchExperiment = async () => {
591-
try {
592-
const response = await fetch(`/api/workers/${unit}/experiment`);
593-
if (!response.ok) {
594-
throw new Error(`No experiment found.`);
595-
}
596-
const json = await response.json();
597-
setExperimentAssigned(json['experiment']);
598-
} catch (error) {
599-
return
593+
const fetchExperiment = async () => {
594+
try {
595+
const response = await fetch(`/api/workers/${unit}/experiment`);
596+
if (!response.ok) {
597+
throw new Error(`No experiment found.`);
600598
}
601-
};
599+
const json = await response.json();
600+
setExperimentAssigned(json['experiment']);
601+
} catch (error) {
602+
return
603+
}
604+
};
602605

603-
fetchExperiment();
604-
}
605-
}, [unit, client]);
606+
fetchExperiment();
607+
608+
return () => {
609+
unsubscribeFromTopic(topic, "WorkerCard");
610+
};
611+
}, [unit, client, subscribeToTopic, unsubscribeFromTopic]);
606612

607613
React.useEffect(() => {
608-
if (!client || !selfTestDefinition) {
609-
return;
614+
if (!client || !selfTestDefinition) {
615+
return undefined;
610616
}
611617
const baseTopic = `pioreactor/${unit}/${selfTestExperiment}/self_test`;
612-
subscribeToTopic(`${baseTopic}/$state`, onSelfTestData, "WorkerCard-self-test");
613-
for (const setting of selfTestDefinition.published_settings) {
614-
subscribeToTopic(`${baseTopic}/${setting.key}`, onSelfTestData, "WorkerCard-self-test");
615-
}
616-
}, [client, onSelfTestData, selfTestDefinition, subscribeToTopic, unit]);
618+
const topics = [
619+
`${baseTopic}/$state`,
620+
...selfTestDefinition.published_settings.map((setting) => `${baseTopic}/${setting.key}`),
621+
];
622+
subscribeToTopic(topics, onSelfTestData, "WorkerCard-self-test");
623+
return () => {
624+
unsubscribeFromTopic(topics, "WorkerCard-self-test");
625+
};
626+
}, [client, onSelfTestData, selfTestDefinition, subscribeToTopic, unsubscribeFromTopic, unit]);
617627

618628
const handleStatusChange = (event) => {
619629

frontend/src/Leader.jsx

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ function DirectoryNavigatorCard({leaderHostname}) {
417417
function LeaderCard({leaderHostname}) {
418418

419419
const unit = leaderHostname
420-
const {client, subscribeToTopic} = useMQTT();
420+
const {client, subscribeToTopic, unsubscribeFromTopic} = useMQTT();
421421
const [state, setState] = React.useState(null)
422422
const [versions, setVersions] = React.useState({})
423423
const [ipv4, setIpv4] = React.useState(null)
@@ -426,10 +426,15 @@ function LeaderCard({leaderHostname}) {
426426

427427

428428
React.useEffect(() => {
429-
if (client) {
430-
subscribeToTopic(`pioreactor/${unit}/$experiment/monitor/+`, onMonitorData, "WorkerCard");
429+
if (!client) {
430+
return undefined;
431431
}
432-
}, [client]);
432+
const topic = `pioreactor/${unit}/$experiment/monitor/+`;
433+
subscribeToTopic(topic, onMonitorData, "WorkerCard");
434+
return () => {
435+
unsubscribeFromTopic(topic, "WorkerCard");
436+
};
437+
}, [client, subscribeToTopic, unsubscribeFromTopic, unit]);
433438

434439

435440
const onMonitorData = (topic, message) => {

frontend/src/Pioreactor.jsx

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1675,7 +1675,7 @@ function FlashLEDButton(props){
16751675
function PioreactorCard({ unit, modelDetails, isUnitActive, experiment, config, label: initialLabel }){
16761676
const [jobFetchComplete, setJobFetchComplete] = useState(false)
16771677
const [label, setLabel] = useState(initialLabel || "")
1678-
const {client, subscribeToTopic } = useMQTT();
1678+
const {client, subscribeToTopic, unsubscribeFromTopic } = useMQTT();
16791679
const isXrModel = Boolean(modelDetails.model_name?.toLowerCase().includes("xr"));
16801680

16811681
const [jobs, setJobs] = useState({
@@ -1749,37 +1749,40 @@ function PioreactorCard({ unit, modelDetails, isUnitActive, experiment, config,
17491749
}
17501750

17511751
useEffect(() => {
1752-
1753-
1754-
if (!jobFetchComplete){
1755-
return
1752+
if (!jobFetchComplete) {
1753+
return undefined;
17561754
}
17571755

1758-
if (!experiment){
1759-
return
1756+
if (!experiment) {
1757+
return undefined;
17601758
}
17611759

1762-
if (!client){
1763-
return
1760+
if (!client) {
1761+
return undefined;
17641762
}
17651763

1766-
subscribeToTopic(`pioreactor/${unit}/$experiment/monitor/$state`, onMessage, "PioreactorCard");
1764+
const topics = [`pioreactor/${unit}/$experiment/monitor/$state`];
17671765
for (const job of Object.keys(jobs)) {
1768-
1769-
subscribeToTopic(`pioreactor/${unit}/${experiment}/${job}/$state`, onMessage, "PioreactorCard");
1770-
for (const setting of Object.keys(jobs[job].publishedSettings)){
1771-
var topic = [
1766+
topics.push(`pioreactor/${unit}/${experiment}/${job}/$state`);
1767+
for (const setting of Object.keys(jobs[job].publishedSettings)) {
1768+
topics.push(
1769+
[
17721770
"pioreactor",
17731771
unit,
17741772
(job === "monitor" ? "$experiment" : experiment),
17751773
job,
1776-
setting
1774+
setting,
17771775
].join("/")
1778-
subscribeToTopic(topic, onMessage, "PioreactorCard");
1776+
);
17791777
}
17801778
}
17811779

1782-
},[experiment, jobFetchComplete, client])
1780+
subscribeToTopic(topics, onMessage, "PioreactorCard");
1781+
1782+
return () => {
1783+
unsubscribeFromTopic(topics, "PioreactorCard");
1784+
};
1785+
}, [experiment, jobFetchComplete, client, subscribeToTopic, unsubscribeFromTopic, unit])
17831786

17841787
const onMessage = (topic, message, packet) => {
17851788
if (!message || !topic) return;
@@ -2172,7 +2175,7 @@ function Pioreactor({title}) {
21722175
)}
21732176
else {
21742177
return (
2175-
<MQTTProvider name={unit} config={config} experiment={experimentMetadata.experiment}>
2178+
<MQTTProvider name={unit} config={config}>
21762179
<Grid container rowSpacing={1} columnSpacing={2} justifyContent="space-between">
21772180
<Grid
21782181
size={{

frontend/src/Pioreactors.jsx

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2488,7 +2488,7 @@ function FlashLEDButton({ unit, disabled }){
24882488
function PioreactorCard({unit, isUnitActive, experiment, config, originalLabel, modelDetails = {}}){
24892489
const [jobFetchComplete, setJobFetchComplete] = useState(false)
24902490
const [label, setLabel] = useState("")
2491-
const {client, subscribeToTopic } = useMQTT();
2491+
const {client, subscribeToTopic, unsubscribeFromTopic } = useMQTT();
24922492
const contribJobsList = useContribJobsList();
24932493
const isXrModel = Boolean(modelDetails.model_name?.toLowerCase().includes("xr"));
24942494

@@ -2570,40 +2570,44 @@ function PioreactorCard({unit, isUnitActive, experiment, config, originalLabel,
25702570
}
25712571

25722572
useEffect(() => {
2573-
2574-
if (!isUnitActive){
2575-
return
2573+
if (!isUnitActive) {
2574+
return undefined;
25762575
}
25772576

2578-
if (!jobFetchComplete){
2579-
return
2577+
if (!jobFetchComplete) {
2578+
return undefined;
25802579
}
25812580

2582-
if (!experiment){
2583-
return
2581+
if (!experiment) {
2582+
return undefined;
25842583
}
25852584

2586-
if (!client){
2587-
return
2585+
if (!client) {
2586+
return undefined;
25882587
}
25892588

2590-
subscribeToTopic(`pioreactor/${unit}/$experiment/monitor/$state`, onMessage, "PioreactorCard");
2589+
const topics = [`pioreactor/${unit}/$experiment/monitor/$state`];
25912590
for (const job of Object.keys(jobs)) {
2592-
2593-
subscribeToTopic(`pioreactor/${unit}/${experiment}/${job}/$state`, onMessage, "PioreactorCard");
2594-
for (const setting of Object.keys(jobs[job].publishedSettings)){
2595-
var topic = [
2591+
topics.push(`pioreactor/${unit}/${experiment}/${job}/$state`);
2592+
for (const setting of Object.keys(jobs[job].publishedSettings)) {
2593+
topics.push(
2594+
[
25962595
"pioreactor",
25972596
unit,
25982597
(job === "monitor" ? "$experiment" : experiment),
25992598
job,
2600-
setting
2599+
setting,
26012600
].join("/")
2602-
subscribeToTopic(topic, onMessage, "PioreactorCard");
2601+
);
26032602
}
26042603
}
26052604

2606-
},[experiment, jobFetchComplete, isUnitActive, client])
2605+
subscribeToTopic(topics, onMessage, "PioreactorCard");
2606+
2607+
return () => {
2608+
unsubscribeFromTopic(topics, "PioreactorCard");
2609+
};
2610+
}, [experiment, jobFetchComplete, isUnitActive, client, subscribeToTopic, unsubscribeFromTopic, unit])
26072611

26082612
const onMessage = (topic, message, packet) => {
26092613
if (!message || !topic) return;
@@ -2938,7 +2942,7 @@ function Pioreactors({title}) {
29382942
)
29392943

29402944
return (
2941-
<MQTTProvider name="pioreactor" config={config} experiment={experimentMetadata.experiment}>
2945+
<MQTTProvider name="pioreactor" config={config}>
29422946
{modelCheckKey > 0 && <MissingWorkerModelModal triggerCheckKey={modelCheckKey} />}
29432947
<Grid container spacing={2} >
29442948
<Grid
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import React, { act, useEffect } from "react";
2+
import { render } from "@testing-library/react";
3+
import { MQTTProvider, useMQTT } from "../providers/MQTTContext";
4+
import mqtt from "mqtt";
5+
6+
const createMockClient = () => {
7+
const handlers = {};
8+
return {
9+
once: jest.fn((event, cb) => {
10+
handlers[event] = cb;
11+
}),
12+
on: jest.fn((event, cb) => {
13+
handlers[event] = cb;
14+
}),
15+
end: jest.fn(),
16+
subscribe: jest.fn(),
17+
unsubscribe: jest.fn(),
18+
emit: (event, ...args) => {
19+
if (handlers[event]) {
20+
const handler = handlers[event];
21+
delete handlers[event];
22+
handler(...args);
23+
}
24+
},
25+
};
26+
};
27+
28+
jest.mock("mqtt", () => {
29+
const connect = jest.fn();
30+
return {
31+
__esModule: true,
32+
default: { connect },
33+
connect,
34+
};
35+
});
36+
37+
const TOPICS_FOR = (experiment) => [
38+
`pioreactor/+/${experiment}/logs/+/error`,
39+
`pioreactor/+/${experiment}/logs/+/warning`,
40+
];
41+
42+
const Subscriber = ({ experiment }) => {
43+
const { client, subscribeToTopic, unsubscribeFromTopic } = useMQTT();
44+
45+
useEffect(() => {
46+
if (!client || !experiment) {
47+
return undefined;
48+
}
49+
const topics = TOPICS_FOR(experiment);
50+
subscribeToTopic(topics, () => {}, "Subscriber");
51+
return () => {
52+
unsubscribeFromTopic(topics, "Subscriber");
53+
};
54+
}, [client, experiment, subscribeToTopic, unsubscribeFromTopic]);
55+
56+
return null;
57+
};
58+
59+
const baseConfig = {
60+
mqtt: {
61+
broker_address: "localhost",
62+
ws_protocol: "ws",
63+
broker_ws_port: 9001,
64+
},
65+
};
66+
67+
const flushPromises = () => new Promise((resolve) => setTimeout(resolve, 0));
68+
69+
describe("MQTTContext", () => {
70+
beforeEach(() => {
71+
mqtt.connect.mockReset();
72+
});
73+
74+
test("unsubscribes old topics when experiment changes", async () => {
75+
const mockClient = createMockClient();
76+
mqtt.connect.mockImplementation(() => mockClient);
77+
78+
let rerender;
79+
await act(async () => {
80+
({ rerender } = render(
81+
<MQTTProvider name="test" config={baseConfig}>
82+
<Subscriber experiment="exp_a" />
83+
</MQTTProvider>
84+
));
85+
});
86+
87+
await act(async () => {
88+
await flushPromises();
89+
mockClient.emit("connect");
90+
await flushPromises();
91+
});
92+
93+
expect(mockClient.subscribe).toHaveBeenCalledWith(TOPICS_FOR("exp_a"), { qos: 0 });
94+
95+
await act(async () => {
96+
rerender(
97+
<MQTTProvider name="test" config={baseConfig}>
98+
<Subscriber experiment="exp_b" />
99+
</MQTTProvider>
100+
);
101+
await flushPromises();
102+
});
103+
104+
expect(mockClient.unsubscribe).toHaveBeenCalledWith(TOPICS_FOR("exp_a"));
105+
expect(mockClient.subscribe).toHaveBeenCalledWith(TOPICS_FOR("exp_b"), { qos: 0 });
106+
});
107+
});

0 commit comments

Comments
 (0)