Skip to content

Commit cd9a752

Browse files
committed
Implement USB topology monitoring and WebSocket integration
- Added a WebSocket controller for real-time streaming of USB device topology. - Introduced a background worker to scan USB devices and update the topology state. - Enhanced the UI to display USB topology with dynamic updates via WebSocket. - Updated the device data structure to include additional fields for USB nodes. - Improved error handling and data fetching mechanisms in the UI for better user experience.
1 parent 077126b commit cd9a752

File tree

2 files changed

+420
-131
lines changed

2 files changed

+420
-131
lines changed

provisioner-service/src/devices.cpp

Lines changed: 271 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,78 @@
11
#include <drogon/HttpResponse.h>
22
#include <drogon/HttpAppFramework.h>
3+
#include <drogon/WebSocketController.h>
34
#include <devices.h>
45
#include <sqlite3.h>
56
#include <json/json.h>
67
#include <fstream>
78
#include <sstream>
89
#include <systemd/sd-bus.h>
10+
#include <filesystem>
11+
#include <unordered_map>
12+
#include <set>
13+
#include <thread>
14+
#include <mutex>
15+
#include <atomic>
16+
#include <chrono>
17+
#include <algorithm>
918
#include "utils.h"
1019
#include "include/audit.h"
1120

1221
using namespace drogon;
1322

23+
// Forward declare internal snapshot function from this TU's anonymous namespace
24+
namespace { std::string topologySnapshotString(); }
25+
26+
// WebSocket controller for device topology streaming
27+
class DevicesWebSocketController : public drogon::WebSocketController<DevicesWebSocketController> {
28+
public:
29+
static std::vector<drogon::WebSocketConnectionPtr> subscribers;
30+
static std::mutex subscribersMutex;
31+
32+
static void broadcast(const std::string &message) {
33+
std::lock_guard<std::mutex> lock(subscribersMutex);
34+
auto it = subscribers.begin();
35+
while (it != subscribers.end()) {
36+
if ((*it)->connected()) {
37+
(*it)->send(message);
38+
++it;
39+
} else {
40+
it = subscribers.erase(it);
41+
}
42+
}
43+
}
44+
45+
void handleNewMessage(const drogon::WebSocketConnectionPtr& wsConnPtr, std::string&& message, const drogon::WebSocketMessageType& type) override {
46+
// No-op for now; could support subscription filters later
47+
(void)wsConnPtr; (void)message; (void)type;
48+
}
49+
50+
void handleNewConnection(const drogon::HttpRequestPtr& req, const drogon::WebSocketConnectionPtr& wsConnPtr) override {
51+
(void)req;
52+
{
53+
std::lock_guard<std::mutex> lock(subscribersMutex);
54+
subscribers.push_back(wsConnPtr);
55+
}
56+
// Send initial snapshot
57+
wsConnPtr->send(topologySnapshotString());
58+
}
59+
60+
void handleConnectionClosed(const drogon::WebSocketConnectionPtr& wsConnPtr) override {
61+
std::lock_guard<std::mutex> lock(subscribersMutex);
62+
subscribers.erase(std::remove(subscribers.begin(), subscribers.end(), wsConnPtr), subscribers.end());
63+
}
64+
65+
WS_PATH_LIST_BEGIN
66+
WS_PATH_ADD("/ws/devices");
67+
WS_PATH_LIST_END
68+
};
69+
70+
std::vector<drogon::WebSocketConnectionPtr> DevicesWebSocketController::subscribers;
71+
std::mutex DevicesWebSocketController::subscribersMutex;
72+
73+
// Forward declaration for snapshot helper
74+
namespace provisioner { std::string topologySnapshotString(); }
75+
1476
namespace provisioner {
1577

1678
struct Device {
@@ -25,6 +87,203 @@ namespace provisioner {
2587
std::vector<Device> devices;
2688
};
2789

90+
// Representation of a USB node (hub or device)
91+
struct UsbNode {
92+
std::string id; // unique path id (e.g. 1-1.4)
93+
std::string parentId; // parent path id or empty for root
94+
std::string vendor; // idVendor
95+
std::string product; // idProduct
96+
std::string serial; // serial (if available)
97+
bool isHub{false};
98+
// Optional provisioning info
99+
std::string state;
100+
std::string image;
101+
std::string ip;
102+
};
103+
104+
// Tracker state
105+
namespace {
106+
std::thread topologyThread;
107+
std::atomic<bool> topologyRunning{false};
108+
std::mutex topologyMutex;
109+
// Map id -> node
110+
std::unordered_map<std::string, UsbNode> currentTopology;
111+
112+
std::string readFileTrimmed(const std::filesystem::path &p) {
113+
try {
114+
std::ifstream f(p);
115+
if (!f.is_open()) return "";
116+
std::string s;
117+
std::getline(f, s);
118+
while (!s.empty() && (s.back()=='\n' || s.back()=='\r' || s.back()=='\t' || s.back()==' ')) s.pop_back();
119+
return s;
120+
} catch (...) { return ""; }
121+
}
122+
123+
std::string baseDeviceDirName(const std::string &name) {
124+
// Strip interface suffix after ':' (e.g. 1-1.2:1.0 -> 1-1.2)
125+
auto pos = name.find(':');
126+
if (pos != std::string::npos) return name.substr(0, pos);
127+
return name;
128+
}
129+
130+
std::string computeParentId(const std::string &id) {
131+
// Parent is prefix before last '.' if present, otherwise attach to server
132+
auto pos = id.rfind('.');
133+
if (pos != std::string::npos) {
134+
return id.substr(0, pos);
135+
}
136+
// If id contains '-' treat as directly connected to server
137+
return "server";
138+
}
139+
140+
std::unordered_map<std::string, UsbNode> scanUsbSysfs() {
141+
std::unordered_map<std::string, UsbNode> nodes;
142+
// Always include the server root node
143+
UsbNode server;
144+
server.id = "server";
145+
server.parentId = "";
146+
server.isHub = true;
147+
server.vendor = "Provisioner";
148+
server.product = "Server";
149+
nodes.insert({server.id, server});
150+
151+
const std::filesystem::path usbPath("/sys/bus/usb/devices");
152+
if (!std::filesystem::exists(usbPath)) {
153+
return nodes;
154+
}
155+
156+
for (const auto &entry : std::filesystem::directory_iterator(usbPath)) {
157+
if (!entry.is_directory()) continue;
158+
const std::string name = entry.path().filename().string();
159+
if (name.find('-') == std::string::npos) continue; // skip non-device entries like usb1
160+
const std::string id = baseDeviceDirName(name);
161+
UsbNode node;
162+
node.id = id;
163+
node.parentId = computeParentId(id);
164+
node.vendor = readFileTrimmed(entry.path()/"idVendor");
165+
node.product = readFileTrimmed(entry.path()/"idProduct");
166+
if (node.vendor.empty() && node.product.empty()) {
167+
// Some entries may not be real devices; skip if missing identifiers
168+
// but keep hubs if detectable
169+
}
170+
node.serial = readFileTrimmed(entry.path()/"serial");
171+
std::string bDeviceClass = readFileTrimmed(entry.path()/"bDeviceClass");
172+
// 09 (hex) indicates hub
173+
node.isHub = (bDeviceClass == "09" || bDeviceClass == "9" || bDeviceClass == "0x09");
174+
nodes.insert_or_assign(node.id, node);
175+
}
176+
177+
return nodes;
178+
}
179+
180+
void enrichWithProvisioningState(std::unordered_map<std::string, UsbNode> &nodes) {
181+
sqlite3* db;
182+
int rc = sqlite3_open("/srv/rpi-sb-provisioner/state.db", &db);
183+
if (rc) {
184+
return;
185+
}
186+
const char* sql = "SELECT serial, endpoint, state, image, ip_address FROM devices ORDER BY ts DESC";
187+
sqlite3_stmt* stmt;
188+
rc = sqlite3_prepare_v2(db, sql, -1, &stmt, nullptr);
189+
if (rc != SQLITE_OK) {
190+
sqlite3_close(db);
191+
return;
192+
}
193+
while (sqlite3_step(stmt) == SQLITE_ROW) {
194+
const unsigned char* serial = sqlite3_column_text(stmt, 0);
195+
const unsigned char* endpoint = sqlite3_column_text(stmt, 1);
196+
const unsigned char* state = sqlite3_column_text(stmt, 2);
197+
const unsigned char* image = sqlite3_column_text(stmt, 3);
198+
const unsigned char* ip = sqlite3_column_text(stmt, 4);
199+
if (!endpoint) continue;
200+
std::string endpointStr = reinterpret_cast<const char*>(endpoint);
201+
auto it = nodes.find(endpointStr);
202+
if (it != nodes.end()) {
203+
// Only set if not already populated or to reflect latest row (desc order)
204+
it->second.serial = serial ? reinterpret_cast<const char*>(serial) : it->second.serial;
205+
it->second.state = state ? reinterpret_cast<const char*>(state) : it->second.state;
206+
it->second.image = image ? reinterpret_cast<const char*>(image) : it->second.image;
207+
it->second.ip = ip ? reinterpret_cast<const char*>(ip) : it->second.ip;
208+
}
209+
}
210+
sqlite3_finalize(stmt);
211+
sqlite3_close(db);
212+
}
213+
214+
Json::Value topologyToJson(const std::unordered_map<std::string, UsbNode> &nodes) {
215+
Json::Value root;
216+
root["type"] = "topology";
217+
Json::Value arr(Json::arrayValue);
218+
for (const auto &p : nodes) {
219+
const UsbNode &n = p.second;
220+
Json::Value j;
221+
j["id"] = n.id;
222+
if (!n.parentId.empty()) j["parentId"] = n.parentId; else j["parentId"] = Json::nullValue;
223+
j["isHub"] = n.isHub;
224+
if (!n.vendor.empty()) j["vendor"] = n.vendor;
225+
if (!n.product.empty()) j["product"] = n.product;
226+
if (!n.serial.empty()) j["serial"] = n.serial;
227+
if (!n.state.empty()) j["state"] = n.state;
228+
if (!n.image.empty()) j["image"] = n.image;
229+
if (!n.ip.empty()) j["ip"] = n.ip;
230+
arr.append(j);
231+
}
232+
root["nodes"] = arr;
233+
root["timestamp"] = static_cast<Json::UInt64>(std::chrono::duration_cast<std::chrono::milliseconds>(
234+
std::chrono::system_clock::now().time_since_epoch()).count());
235+
return root;
236+
}
237+
238+
std::string topologySnapshotString() {
239+
std::lock_guard<std::mutex> lock(topologyMutex);
240+
Json::FastWriter w; // compact ok
241+
return w.write(topologyToJson(currentTopology));
242+
}
243+
244+
void topologyWorker() {
245+
LOG_INFO << "Topology worker started";
246+
Json::Value lastJson;
247+
while (topologyRunning) {
248+
auto newMap = scanUsbSysfs();
249+
enrichWithProvisioningState(newMap);
250+
251+
bool changed = false;
252+
{
253+
std::lock_guard<std::mutex> lock(topologyMutex);
254+
// Simple change detection by size and a few key fields; fallback to full JSON compare
255+
if (newMap.size() != currentTopology.size()) {
256+
changed = true;
257+
} else {
258+
for (const auto &p : newMap) {
259+
auto it = currentTopology.find(p.first);
260+
if (it == currentTopology.end()) { changed = true; break; }
261+
const UsbNode &a = p.second;
262+
const UsbNode &b = it->second;
263+
if (a.parentId != b.parentId || a.vendor != b.vendor || a.product != b.product ||
264+
a.serial != b.serial || a.isHub != b.isHub || a.state != b.state ||
265+
a.image != b.image || a.ip != b.ip) { changed = true; break; }
266+
}
267+
}
268+
if (changed) {
269+
currentTopology = std::move(newMap);
270+
}
271+
}
272+
273+
if (changed) {
274+
std::string msg = topologySnapshotString();
275+
DevicesWebSocketController::broadcast(msg);
276+
}
277+
278+
for (int i=0; i<10 && topologyRunning; ++i) {
279+
std::this_thread::sleep_for(std::chrono::milliseconds(300));
280+
}
281+
}
282+
LOG_INFO << "Topology worker stopped";
283+
}
284+
}
285+
}
286+
28287
Devices::Devices()
29288
:
30289
systemd_bus(nullptr, sd_bus_unref)
@@ -35,9 +294,20 @@ namespace provisioner {
35294
throw std::runtime_error("Failed to connect to system bus: " + std::string(strerror(-ret)));
36295
}
37296
systemd_bus = std::unique_ptr<sd_bus, decltype(&sd_bus_unref)>(bus, sd_bus_unref);
297+
// Start topology watcher
298+
if (!topologyRunning) {
299+
topologyRunning = true;
300+
topologyThread = std::thread(topologyWorker);
301+
}
38302
}
39303

40-
Devices::~Devices() = default;
304+
Devices::~Devices() {
305+
// Stop topology watcher
306+
if (topologyRunning) {
307+
topologyRunning = false;
308+
if (topologyThread.joinable()) topologyThread.join();
309+
}
310+
}
41311

42312
void Devices::registerHandlers(drogon::HttpAppFramework &app)
43313
{

0 commit comments

Comments
 (0)