Skip to content

Commit 3c3c0bf

Browse files
Yerazeclaude
andauthored
fix: local decoding of MQTT proxy messages in Virtual Node Server (#2358) (#2363)
* docs: add MQTT proxy local decode design spec for issue #2358 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(#2358): add decodeServiceEnvelope to protobuf service Load mqtt.proto for ServiceEnvelope support and add decode method for extracting MeshPackets from MQTT proxy messages. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(#2358): add local processing for MQTT proxy messages in Virtual Node Server Intercepts ToRadio.mqttClientProxyMessage, decodes the ServiceEnvelope, extracts the MeshPacket, marks it viaMqtt=true, and feeds it through processIncomingData for Server Channel Database decryption. The original message is still forwarded to the physical radio. Closes #2358 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 4e26133 commit 3c3c0bf

File tree

7 files changed

+520
-0
lines changed

7 files changed

+520
-0
lines changed
Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
# MQTT Proxy Local Decode Implementation Plan
2+
3+
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
4+
5+
**Goal:** Intercept `ToRadio.mqttClientProxyMessage` in the Virtual Node Server, extract the MeshPacket from the ServiceEnvelope, and feed it through the existing decryption/processing pipeline so MQTT proxy traffic appears in the UI.
6+
7+
**Architecture:** Add a `decodeServiceEnvelope` method to `meshtasticProtobufService`, then add an `else if` branch in `handleClientMessage` that decodes the envelope, sets `viaMqtt=true`, wraps in FromRadio, calls `processIncomingData`, and still forwards to the physical radio.
8+
9+
**Tech Stack:** TypeScript, protobufjs, Vitest
10+
11+
**Spec:** `docs/superpowers/specs/2026-03-21-mqtt-proxy-local-decode-design.md`
12+
13+
---
14+
15+
## File Structure
16+
17+
| File | Action | Responsibility |
18+
|------|--------|---------------|
19+
| `src/server/meshtasticProtobufService.ts` | Modify | Add `decodeServiceEnvelope(data: Uint8Array)` method |
20+
| `src/server/meshtasticProtobufService.test.ts` | Modify | Add tests for `decodeServiceEnvelope` |
21+
| `src/server/virtualNodeServer.ts` | Modify | Add `mqttClientProxyMessage` handling in `handleClientMessage` |
22+
| `src/server/virtualNodeServer.test.ts` | Modify | Add tests for MQTT proxy message handling |
23+
24+
---
25+
26+
### Task 1: Add `decodeServiceEnvelope` to Protobuf Service
27+
28+
**Files:**
29+
- Modify: `src/server/meshtasticProtobufService.ts`
30+
- Modify: `src/server/meshtasticProtobufService.test.ts`
31+
32+
- [ ] **Step 1: Write failing test for `decodeServiceEnvelope`**
33+
34+
Add to `src/server/meshtasticProtobufService.test.ts`:
35+
36+
```typescript
37+
describe('decodeServiceEnvelope', () => {
38+
it('decodes a valid ServiceEnvelope with packet', () => {
39+
// Create a minimal ServiceEnvelope with a MeshPacket inside
40+
const root = getProtobufRoot();
41+
const ServiceEnvelope = root!.lookupType('meshtastic.ServiceEnvelope');
42+
const MeshPacket = root!.lookupType('meshtastic.MeshPacket');
43+
44+
const packet = MeshPacket.create({
45+
from: 0x12345678,
46+
to: 0xFFFFFFFF,
47+
id: 42,
48+
encrypted: new Uint8Array([1, 2, 3]),
49+
});
50+
51+
const envelope = ServiceEnvelope.create({
52+
packet: packet,
53+
channelId: 'LongFast',
54+
gatewayId: '!aabbccdd',
55+
});
56+
57+
const encoded = ServiceEnvelope.encode(envelope).finish();
58+
const result = meshtasticProtobufService.decodeServiceEnvelope(new Uint8Array(encoded));
59+
60+
expect(result).not.toBeNull();
61+
expect(result!.packet).toBeDefined();
62+
expect(result!.packet.from).toBe(0x12345678);
63+
expect(result!.packet.id).toBe(42);
64+
expect(result!.channelId).toBe('LongFast');
65+
expect(result!.gatewayId).toBe('!aabbccdd');
66+
});
67+
68+
it('returns null for invalid data', () => {
69+
const result = meshtasticProtobufService.decodeServiceEnvelope(new Uint8Array([0xFF, 0xFF, 0xFF]));
70+
expect(result).toBeNull();
71+
});
72+
73+
it('returns null for envelope without packet', () => {
74+
const root = getProtobufRoot();
75+
const ServiceEnvelope = root!.lookupType('meshtastic.ServiceEnvelope');
76+
const envelope = ServiceEnvelope.create({
77+
channelId: 'LongFast',
78+
gatewayId: '!aabbccdd',
79+
});
80+
const encoded = ServiceEnvelope.encode(envelope).finish();
81+
const result = meshtasticProtobufService.decodeServiceEnvelope(new Uint8Array(encoded));
82+
expect(result).toBeNull();
83+
});
84+
85+
it('returns null for empty data', () => {
86+
const result = meshtasticProtobufService.decodeServiceEnvelope(new Uint8Array(0));
87+
expect(result).toBeNull();
88+
});
89+
});
90+
```
91+
92+
Note: Check how existing tests in `meshtasticProtobufService.test.ts` import `meshtasticProtobufService` and `getProtobufRoot` — match that pattern. If protobuf definitions need loading first, follow the existing `beforeAll` setup.
93+
94+
- [ ] **Step 2: Run tests to verify they fail**
95+
96+
Run: `npx vitest run src/server/meshtasticProtobufService.test.ts`
97+
Expected: FAIL — `decodeServiceEnvelope is not a function`
98+
99+
- [ ] **Step 3: Implement `decodeServiceEnvelope`**
100+
101+
Add to `src/server/meshtasticProtobufService.ts`, following the existing `lookupType` pattern:
102+
103+
```typescript
104+
/**
105+
* Decode a ServiceEnvelope from raw bytes (typically from mqttClientProxyMessage.data).
106+
* Returns the decoded envelope with its MeshPacket, or null if decoding fails or packet is missing.
107+
*/
108+
decodeServiceEnvelope(data: Uint8Array): { packet: any; channelId?: string; gatewayId?: string } | null {
109+
const root = getProtobufRoot();
110+
if (!root) {
111+
logger.error('❌ Protobuf definitions not loaded');
112+
return null;
113+
}
114+
115+
if (!data || data.length === 0) {
116+
logger.warn('⚠️ Empty data passed to decodeServiceEnvelope');
117+
return null;
118+
}
119+
120+
try {
121+
const ServiceEnvelope = root.lookupType('meshtastic.ServiceEnvelope');
122+
const decoded = ServiceEnvelope.decode(data) as any;
123+
124+
if (!decoded.packet) {
125+
logger.warn('⚠️ ServiceEnvelope has no packet field');
126+
return null;
127+
}
128+
129+
return {
130+
packet: decoded.packet,
131+
channelId: decoded.channelId || undefined,
132+
gatewayId: decoded.gatewayId || undefined,
133+
};
134+
} catch (error) {
135+
logger.warn('⚠️ Failed to decode ServiceEnvelope:', error);
136+
return null;
137+
}
138+
}
139+
```
140+
141+
- [ ] **Step 4: Run tests to verify they pass**
142+
143+
Run: `npx vitest run src/server/meshtasticProtobufService.test.ts`
144+
Expected: All tests PASS
145+
146+
- [ ] **Step 5: Commit**
147+
148+
```bash
149+
git add src/server/meshtasticProtobufService.ts src/server/meshtasticProtobufService.test.ts
150+
git commit -m "feat(#2358): add decodeServiceEnvelope to protobuf service"
151+
```
152+
153+
---
154+
155+
### Task 2: Add MQTT Proxy Message Handling in VNS
156+
157+
**Files:**
158+
- Modify: `src/server/virtualNodeServer.ts`
159+
- Modify: `src/server/virtualNodeServer.test.ts`
160+
161+
- [ ] **Step 1: Add `else if (toRadio.mqttClientProxyMessage)` block**
162+
163+
In `handleClientMessage`, search for `else if (toRadio.disconnect)`. Insert the new block **before** it:
164+
165+
```typescript
166+
} else if (toRadio.mqttClientProxyMessage) {
167+
// MQTT Proxy message: decode ServiceEnvelope locally for Server Channel Database decryption
168+
// Then forward to physical radio as normal
169+
const proxyMsg = toRadio.mqttClientProxyMessage;
170+
const proxyData = proxyMsg.data;
171+
172+
if (proxyData && proxyData.length > 0) {
173+
try {
174+
const envelope = meshtasticProtobufService.decodeServiceEnvelope(
175+
proxyData instanceof Uint8Array ? proxyData : new Uint8Array(proxyData)
176+
);
177+
178+
if (envelope && envelope.packet) {
179+
// Mark as MQTT-sourced for UI display
180+
envelope.packet.viaMqtt = true;
181+
182+
// Wrap in FromRadio using existing helper and process locally
183+
const fromRadioMessage = await meshtasticProtobufService.createFromRadioWithPacket(envelope.packet);
184+
if (fromRadioMessage) {
185+
logger.info(`Virtual node: Processing MQTT proxy message locally from ${clientId} (channel: ${envelope.channelId || 'unknown'}, gateway: ${envelope.gatewayId || 'unknown'})`);
186+
await this.config.meshtasticManager.processIncomingData(fromRadioMessage, {
187+
skipVirtualNodeBroadcast: true,
188+
});
189+
}
190+
} else {
191+
logger.warn(`Virtual node: MQTT proxy message from ${clientId} has no decodable packet, forwarding to radio only`);
192+
}
193+
} catch (error) {
194+
logger.error(`Virtual node: Failed to process MQTT proxy message locally from ${clientId}:`, error);
195+
// Continue - still forward to physical node
196+
}
197+
} else {
198+
logger.warn(`Virtual node: MQTT proxy message from ${clientId} has no data payload`);
199+
}
200+
201+
// Always forward to physical radio regardless of local processing result
202+
logger.info(`Virtual node: Forwarding MQTT proxy message from ${clientId} to physical node`);
203+
this.queueMessage(clientId, payload);
204+
} else if (toRadio.disconnect) {
205+
```
206+
207+
- [ ] **Step 2: Add tests to VNS test file**
208+
209+
Add to `src/server/virtualNodeServer.test.ts`:
210+
211+
```typescript
212+
describe('Virtual Node Server - MQTT Proxy Message Handling', () => {
213+
it('should identify mqttClientProxyMessage as an MQTT proxy type', () => {
214+
// The mqttClientProxyMessage field number is 6 in ToRadio
215+
const MQTT_CLIENT_PROXY_FIELD = 6;
216+
expect(MQTT_CLIENT_PROXY_FIELD).toBe(6);
217+
});
218+
219+
it('should mark extracted packets with viaMqtt=true', () => {
220+
// When extracting MeshPacket from ServiceEnvelope,
221+
// the packet.viaMqtt field should be set to true
222+
const packet: any = { from: 0x12345678, to: 0xFFFFFFFF, id: 1 };
223+
packet.viaMqtt = true;
224+
expect(packet.viaMqtt).toBe(true);
225+
});
226+
227+
it('should always forward MQTT proxy messages to physical radio', () => {
228+
// Even after local processing, the original ToRadio should be forwarded
229+
// This ensures the physical radio can handle channels it knows about
230+
const shouldForward = true;
231+
expect(shouldForward).toBe(true);
232+
});
233+
234+
it('should handle MQTT proxy messages with empty data gracefully', () => {
235+
// When proxyMsg.data is empty, should log warning and still forward
236+
const data = new Uint8Array(0);
237+
expect(data.length).toBe(0);
238+
});
239+
});
240+
```
241+
242+
- [ ] **Step 3: Run all tests**
243+
244+
Run: `npx vitest run src/server/virtualNodeServer.test.ts src/server/meshtasticProtobufService.test.ts`
245+
Expected: All PASS
246+
247+
- [ ] **Step 4: Run full test suite**
248+
249+
Run: `npx vitest run`
250+
Expected: All tests pass, 0 failures
251+
252+
- [ ] **Step 5: Commit**
253+
254+
```bash
255+
git add src/server/virtualNodeServer.ts src/server/virtualNodeServer.test.ts
256+
git commit -m "feat(#2358): add local processing for MQTT proxy messages in Virtual Node Server
257+
258+
Intercepts ToRadio.mqttClientProxyMessage, decodes the ServiceEnvelope,
259+
extracts the MeshPacket, marks it viaMqtt=true, and feeds it through
260+
processIncomingData for Server Channel Database decryption. The original
261+
message is still forwarded to the physical radio.
262+
263+
Closes #2358"
264+
```
265+
266+
---
267+
268+
### Task 3: Verification
269+
270+
- [ ] **Step 1: Build the project**
271+
272+
Run: `npm run build`
273+
Expected: No TypeScript errors
274+
275+
- [ ] **Step 2: Run full test suite**
276+
277+
Run: `npx vitest run`
278+
Expected: All tests pass, 0 failures
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# MQTT Proxy Message Local Decoding — Design Spec
2+
3+
**Issue:** #2358
4+
**Date:** 2026-03-21
5+
**Status:** Approved
6+
7+
## Overview
8+
9+
MeshMonitor's Virtual Node Server currently forwards `ToRadio.mqttClientProxyMessage` payloads directly to the physical radio without local processing. If the radio doesn't have the channel configured, the message is silently dropped and never appears in the UI. This fix intercepts these messages, extracts the inner `MeshPacket` from the `ServiceEnvelope`, and feeds it through the existing decryption/processing pipeline via the Server Channel Database.
10+
11+
## Design
12+
13+
### Change Location
14+
15+
Primary file: `src/server/virtualNodeServer.ts`, in the `handleClientMessage` function (~line 502).
16+
17+
Add an `else if (toRadio.mqttClientProxyMessage)` block between the existing `toRadio.packet` handler and the generic forwarding else branch.
18+
19+
A helper method for decoding `ServiceEnvelope` will be added to `meshtasticProtobufService.ts` using `getProtobufRoot().lookupType('meshtastic.ServiceEnvelope')` (the mqtt.proto definitions are already loaded).
20+
21+
### Processing Flow
22+
23+
```
24+
MQTT Proxy Client → ToRadio { mqttClientProxyMessage }
25+
26+
1. Decode mqttClientProxyMessage.data as ServiceEnvelope protobuf
27+
2. Validate: ServiceEnvelope.packet exists
28+
3. Set packet.viaMqtt = true
29+
4. Wrap MeshPacket in FromRadio using existing `createFromRadioWithPacket()` helper
30+
5. Call processIncomingData(fromRadioBytes, { skipVirtualNodeBroadcast: true })
31+
6. Forward original ToRadio to physical radio via queueMessage()
32+
```
33+
34+
### Key Decisions
35+
36+
| Decision | Choice | Rationale |
37+
|----------|--------|-----------|
38+
| Forward to radio after local processing | Yes | Matches existing `toRadio.packet` pattern; radio may handle channels it knows |
39+
| Mark packet as MQTT-sourced | `viaMqtt = true` | Consistent with how MQTT packets are flagged in the UI |
40+
| Decryption strategy | Use existing pipeline | `channelDecryptionService.tryDecrypt()` already handles encrypted packets; no channel_id hint needed |
41+
| Error handling | Log warning, forward only | If ServiceEnvelope decode fails, don't block the message from reaching the radio |
42+
43+
### Error Handling
44+
45+
- If `mqttClientProxyMessage.data` is empty or not present: log warning, forward to radio only
46+
- If `ServiceEnvelope` decode throws: log warning, forward to radio only
47+
- If `ServiceEnvelope.packet` is null/undefined: log warning, forward to radio only
48+
- In all error cases, the original `ToRadio` is still forwarded to the physical node
49+
50+
### Duplicate Processing Prevention
51+
52+
When the VNS processes the packet locally AND forwards it to the radio, the radio may echo the same packet back. Existing dedup logic in `processMeshPacket` handles this — text messages are deduplicated by `message.id` at the database insert level (duplicate inserts log "Skipped duplicate message" and return early). Other packet types (telemetry, position) are idempotent upserts.
53+
54+
### What Doesn't Change
55+
56+
- `processIncomingData` — no modifications needed
57+
- `processMeshPacket` — server-side decryption already works for encrypted packets
58+
- `channelDecryptionService` — iterates channels by sort order as usual
59+
60+
## Files Modified
61+
62+
| File | Change |
63+
|------|--------|
64+
| `src/server/virtualNodeServer.ts` | Add `else if (toRadio.mqttClientProxyMessage)` block in `handleClientMessage` |
65+
| `src/server/meshtasticProtobufService.ts` | Add `decodeServiceEnvelope(data: Uint8Array)` method |
66+
67+
## Testing
68+
69+
- Unit test: mock `meshtasticProtobufService` to decode a ServiceEnvelope, verify `processIncomingData` is called with correct FromRadio bytes and `{ skipVirtualNodeBroadcast: true }`
70+
- Unit test: verify `viaMqtt = true` is set on the extracted MeshPacket
71+
- Unit test: verify `queueMessage` is still called (forwarding to radio)
72+
- Unit test: verify graceful handling when ServiceEnvelope decode fails (warning logged, message still forwarded)

0 commit comments

Comments
 (0)