Skip to content

Commit a21bdd7

Browse files
committed
Improve error handling
1 parent 6291669 commit a21bdd7

File tree

10 files changed

+299
-162
lines changed

10 files changed

+299
-162
lines changed

.changeset/huge-chefs-change.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,14 @@ Standardize SSE event handling to use EventEmitter pattern:
66
- Remove `onConnection()` helper method - use `on("connect")` and `on("disconnect")` instead
77
- Add `"all"` event type to subscribe to all SSE events at once
88
- Update documentation and examples to reflect new patterns
9+
10+
Fix unhandled promise rejections in SSE streaming:
11+
- Add error handling to `connect()` method's internal connection loop
12+
- `updateFields()` now returns a promise that resolves/rejects when the debounced update completes
13+
- Multiple calls within the debounce window share the same promise
14+
- `addField()` now properly returns the promise from `updateFields()`
15+
16+
Refactor `updateFields()` internals:
17+
- Consolidate batch state into single `_fieldUpdateBatch` object
18+
- Extract flush logic into `_flushFieldUpdate()` method
19+
- Use Deferred pattern for cleaner promise handling

.changeset/node-red-sse-events.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,15 @@ Update SSE event handling to use standard EventEmitter pattern:
66
- Replace `onData()`, `onState()`, etc. with `on("data")`, `on("state")` pattern
77
- Use new `on("all")` event for subscribing to all events
88
- Add VIN filtering in event callbacks
9-
- Emit catchable errors only on initial connection failure (not on disconnect)
9+
10+
Improve error handling:
11+
- Config node tracks errors from initial API verification (auth/subscription issues)
12+
- All nodes now show "Error" status with red ring when API verification fails
13+
- Nodes wait for API verification before attempting SSE connections
14+
- Clearer error messages in node logs
15+
16+
Refactor shared helpers:
17+
- Add `getInstance()` to get and validate config instance
18+
- Add `hasInstanceError()` for synchronous error checking (command nodes)
19+
- Add `verifyInstance()` for async verification before SSE connection
20+
- Add `getErrorMessage()` to extract useful messages from any error type (handles hey-api response objects)

packages/api/src/TeslemetryVehicleStream.ts

Lines changed: 66 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,20 @@ import type {
2020
import { Logger } from "./logger.js";
2121
import { VehicleCache } from "./TeslemetryStream.js";
2222

23+
/** Simple deferred promise pattern */
24+
class Deferred<T> {
25+
public promise: Promise<T>;
26+
public resolve!: (value: T) => void;
27+
public reject!: (error: Error) => void;
28+
29+
constructor() {
30+
this.promise = new Promise<T>((resolve, reject) => {
31+
this.resolve = resolve;
32+
this.reject = reject;
33+
});
34+
}
35+
}
36+
2337
type TeslemetryStreamEventMap = {
2438
state: SseState;
2539
data: SseData;
@@ -77,8 +91,11 @@ export class TeslemetryVehicleStream extends EventEmitter {
7791
private root: Teslemetry;
7892
public vin: string;
7993
public fields: FieldsRequest = {}; // Allow updates from both requests, and responses
80-
private _pendingFields: FieldsRequest = {}; // Used for accumulating config changes before patching
81-
private _debounceTimeout: NodeJS.Timeout | null = null; // Debounce timeout for patchConfig
94+
private _fieldUpdateBatch: {
95+
fields: FieldsRequest;
96+
deferred: Deferred<void>;
97+
timeout: NodeJS.Timeout;
98+
} | null = null;
8299
public logger: Logger;
83100
public data: TeslemetryVehicleStreamData;
84101

@@ -128,33 +145,56 @@ export class TeslemetryVehicleStream extends EventEmitter {
128145
}
129146
}
130147

131-
/** Safely add field configuration to the vehicle */
132-
public async updateFields(fields: FieldsRequest) {
133-
this._pendingFields = { ...this._pendingFields, ...fields };
134-
135-
// Clear existing timeout if it exists
136-
if (this._debounceTimeout) {
137-
clearTimeout(this._debounceTimeout);
148+
/** Safely add field configuration to the vehicle
149+
* Returns a promise that resolves when the debounced update completes.
150+
* Multiple calls within the debounce window share the same promise.
151+
*/
152+
public updateFields(fields: FieldsRequest): Promise<void> {
153+
if (!this._fieldUpdateBatch) {
154+
this._fieldUpdateBatch = {
155+
fields: {},
156+
deferred: new Deferred<void>(),
157+
timeout: null!,
158+
};
138159
}
139160

140-
// Set new timeout to debounce patchConfig calls
141-
this._debounceTimeout = setTimeout(async () => {
142-
const data = await this.patchConfig(this._pendingFields);
161+
this._fieldUpdateBatch.fields = {
162+
...this._fieldUpdateBatch.fields,
163+
...fields,
164+
};
165+
166+
clearTimeout(this._fieldUpdateBatch.timeout);
167+
this._fieldUpdateBatch.timeout = setTimeout(
168+
() => this._flushFieldUpdate(),
169+
1000,
170+
);
171+
172+
return this._fieldUpdateBatch.deferred.promise;
173+
}
174+
175+
/** Flush pending field updates to the API */
176+
private async _flushFieldUpdate(): Promise<void> {
177+
const batch = this._fieldUpdateBatch!;
178+
this._fieldUpdateBatch = null;
179+
180+
try {
181+
const data = await this.patchConfig(batch.fields);
143182
if (data?.updated_vehicles) {
144183
this.logger.info(
145-
`Updated ${Object.keys(this._pendingFields).length} streaming fields for ${this.vin}`,
184+
`Updated ${Object.keys(batch.fields).length} streaming fields for ${this.vin}`,
146185
);
147-
// null means default, not delete, so its okay
148-
this.fields = { ...this.fields, ...this._pendingFields };
149-
this._pendingFields = {};
186+
this.fields = { ...this.fields, ...batch.fields };
187+
batch.deferred.resolve();
150188
} else {
151-
this.logger.error(
189+
const error = new Error(
152190
`Error updating streaming config for ${this.vin}`,
153-
data,
154191
);
192+
this.logger.error(error.message, data);
193+
batch.deferred.reject(error);
155194
}
156-
this._debounceTimeout = null;
157-
}, 1000);
195+
} catch (error: any) {
196+
batch.deferred.reject(error);
197+
}
158198
}
159199

160200
/** Modify the field configuration of the vehicle */
@@ -181,9 +221,9 @@ export class TeslemetryVehicleStream extends EventEmitter {
181221
* Add a field to the vehicles streaming configuration
182222
* @param field Vehicle Signal
183223
* @param interval
184-
* @returns
224+
* @returns Promise that resolves when the field is added
185225
*/
186-
public async addField(field: Signals, interval?: number): Promise<void> {
226+
public addField(field: Signals, interval?: number): Promise<void> {
187227
if (
188228
this.fields &&
189229
this.fields[field] &&
@@ -193,12 +233,12 @@ export class TeslemetryVehicleStream extends EventEmitter {
193233
this.logger.debug(
194234
`Streaming field ${field} already enabled @ ${this.fields[field]?.interval_seconds || "default"}s`,
195235
);
196-
return;
236+
return Promise.resolve();
197237
}
198238

199239
const value =
200240
interval !== undefined ? { interval_seconds: interval } : null;
201-
this.updateFields({ [field]: value } as FieldsRequest);
241+
return this.updateFields({ [field]: value } as FieldsRequest);
202242
}
203243

204244
/**
@@ -211,8 +251,8 @@ export class TeslemetryVehicleStream extends EventEmitter {
211251
field: T,
212252
callback: (value: Exclude<SseData["data"][T], undefined>) => void,
213253
): () => void {
214-
this.addField(field).catch((error) => {
215-
this.logger.error(`Failed to add field ${field}:`, error);
254+
this.addField(field).catch(() => {
255+
this.logger.error(`Failed to add field ${field}`);
216256
});
217257
const data = this.root.sse.cache?.[this.vin]?.data?.[field];
218258
if (data !== undefined) {
Lines changed: 85 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Node, NodeAPI, NodeDef } from "node-red";
22
import { Teslemetry } from "@teslemetry/api";
3-
import { instances } from "../shared";
3+
import { instances, getErrorMessage } from "../shared";
44

55
export interface TeslemetryConfigNodeDef extends NodeDef {
66
token: string;
@@ -22,11 +22,24 @@ export default function (RED: NodeAPI) {
2222
logger: RED.log,
2323
stream: { cache: false },
2424
});
25-
instances.set(this.id, {
25+
26+
// Create instance first so it can be referenced in catch handler
27+
const instance = {
2628
teslemetry,
27-
products: teslemetry.createProducts(),
28-
});
29-
console.log("Created", this.id);
29+
products: Promise.resolve({ vehicles: {}, energySites: {} }),
30+
error: undefined as string | undefined,
31+
};
32+
instances.set(this.id, instance);
33+
34+
// Fetch products and track errors
35+
instance.products = teslemetry
36+
.createProducts()
37+
.catch((error: unknown) => {
38+
const message = getErrorMessage(error);
39+
RED.log.error(`Teslemetry error: ${message}`);
40+
instance.error = message;
41+
return { vehicles: {}, energySites: {} };
42+
});
3043
}
3144

3245
this.on("close", (done: () => void) => {
@@ -41,79 +54,91 @@ export default function (RED: NodeAPI) {
4154
});
4255

4356
RED.httpAdmin.get("/teslemetry/vehicles", async (req: any, res: any) => {
44-
const config = req.query.config;
57+
try {
58+
const config = req.query.config;
4559

46-
if (!config) {
47-
res.status(400).send("Missing config ID");
48-
return;
49-
}
60+
if (!config) {
61+
res.status(400).send("Missing config ID");
62+
return;
63+
}
5064

51-
// Try to get the active node first
52-
const instance = instances.get(req.query.config);
65+
const instance = instances.get(req.query.config);
5366

54-
if (!instance) {
55-
res.status(400).send("Missing config instance");
56-
return;
57-
}
67+
if (!instance) {
68+
res.status(400).send("Missing config instance");
69+
return;
70+
}
5871

59-
const { vehicles } = await instance.products;
72+
const { vehicles } = await instance.products;
6073

61-
const options = Object.entries(vehicles).map(([id, { name }]) => [
62-
id,
63-
name,
64-
]);
74+
const options = Object.entries(vehicles).map(([id, { name }]) => [
75+
id,
76+
name,
77+
]);
6578

66-
res.json(options);
79+
res.json(options);
80+
} catch (error: unknown) {
81+
RED.log.error(`Failed to fetch vehicles: ${getErrorMessage(error)}`);
82+
res.status(500).send("Failed to fetch vehicles");
83+
}
6784
});
6885

6986
RED.httpAdmin.get("/teslemetry/energy_sites", async (req: any, res: any) => {
70-
const config = req.query.config;
87+
try {
88+
const config = req.query.config;
7189

72-
if (!config) {
73-
res.status(400).send("Missing config ID");
74-
return;
75-
}
90+
if (!config) {
91+
res.status(400).send("Missing config ID");
92+
return;
93+
}
7694

77-
// Try to get the active node first
78-
const instance = instances.get(req.query.config);
95+
const instance = instances.get(req.query.config);
7996

80-
if (!instance) {
81-
res.status(400).send("Missing config instance");
82-
return;
83-
}
97+
if (!instance) {
98+
res.status(400).send("Missing config instance");
99+
return;
100+
}
84101

85-
const { energySites } = await instance.products;
102+
const { energySites } = await instance.products;
86103

87-
const options = Object.entries(energySites).map(([id, { name }]) => [
88-
id,
89-
name,
90-
]);
104+
const options = Object.entries(energySites).map(([id, { name }]) => [
105+
id,
106+
name,
107+
]);
91108

92-
res.json(options);
109+
res.json(options);
110+
} catch (error: unknown) {
111+
RED.log.error(`Failed to fetch energy sites: ${getErrorMessage(error)}`);
112+
res.status(500).send("Failed to fetch energy sites");
113+
}
93114
});
94115

95116
RED.httpAdmin.get("/teslemetry/fields", async (req: any, res: any) => {
96-
const { config, model } = req.query;
97-
98-
if (!config) {
99-
res.status(400).send("Missing config ID");
100-
return;
101-
}
102-
103-
// Try to get the active node first
104-
const instance = instances.get(req.query.config);
105-
106-
if (!instance) {
107-
res.status(400).send("Missing config instance");
108-
return;
117+
try {
118+
const { config, model } = req.query;
119+
120+
if (!config) {
121+
res.status(400).send("Missing config ID");
122+
return;
123+
}
124+
125+
const instance = instances.get(req.query.config);
126+
127+
if (!instance) {
128+
res.status(400).send("Missing config instance");
129+
return;
130+
}
131+
132+
const fields = await instance.teslemetry.api.getFields();
133+
const options = Object.entries(fields)
134+
.filter(([_, { models }]) => {
135+
return model && models ? models.includes(model) : true;
136+
})
137+
.map(([signal]) => signal);
138+
res.json(options);
139+
} catch (error: unknown) {
140+
RED.log.error(`Failed to fetch fields: ${getErrorMessage(error)}`);
141+
res.status(500).send("Failed to fetch fields");
109142
}
110-
111-
const fields = await instance.teslemetry.api.fields();
112-
const options = Object.entries(fields)
113-
.filter(([_, { models }]) => {
114-
return model && models ? models.includes(model) : true;
115-
})
116-
.map(([signal]) => signal);
117-
res.json(options);
118143
});
119144
}

packages/node-red-contrib-teslemetry/src/nodes/teslemetry-energy-command.ts

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Node, NodeAPI, NodeDef } from "node-red";
22
import { Teslemetry } from "@teslemetry/api";
3-
import { instances } from "../shared";
3+
import { getInstance, hasInstanceError } from "../shared";
44
import { validateParameters } from "../validation";
55
import { Msg } from "../types";
66

@@ -24,15 +24,14 @@ export default function (RED: NodeAPI) {
2424
RED.nodes.createNode(this, config);
2525
const node = this;
2626

27-
node.teslemetry = instances.get(config.teslemetryConfig)?.teslemetry;
27+
const instance = getInstance(config.teslemetryConfig, node);
28+
if (!instance) return;
29+
if (hasInstanceError(instance, node)) return;
30+
31+
node.teslemetry = instance.teslemetry;
2832
node.siteId = config.siteId;
2933
node.command = config.command;
30-
31-
if (!node.teslemetry) {
32-
node.status({ fill: "red", shape: "ring", text: "Config missing" });
33-
node.error("No Teslemetry configuration found");
34-
return;
35-
} else node.status({});
34+
node.status({});
3635

3736
node.on("input", async function (msg: Msg, send, done) {
3837
const siteId: string = node.siteId || (msg.siteId as string) || "";

0 commit comments

Comments
 (0)