diff --git a/README.md b/README.md index 28831a0..faa0845 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,12 @@ All examples assume use of the Arduino framework under PlatformIO, and that you The example code shows how to do this with the `time` library and NTP. +### Concurrency and performance + +Where supported (RP2040 and *some* ESP32 boards), the code to process and send the data is moved to the second core of the device. + +This removes any blocking code and ensures that the HTTP POST call does not interfere with the main loop. + --- ## 🚀 Installation with PlatformIO @@ -173,15 +179,18 @@ Override defaults in `OtelDefaults.h` or via `-D` flags: | ------------------------ | ------------------ | ----------------------------------------------- | | `WIFI_SSID` | `"default"` | Wi‑Fi SSID | | `WIFI_PASS` | `"default"` | Wi‑Fi password | -| `OTEL_COLLECTOR_HOST` | `"http://…:4318"` | OTLP HTTP endpoint | -| `OTEL_COLLECTOR_PORT` | `4318` | OTLP HTTP port | +| `OTEL_COLLECTOR_BASE_URL`| `Null` | The base URL (http://192.168.8.10:4318) of the otel collector | | `OTEL_SERVICE_NAME` | `"demo_service"` | Name of your service | | `OTEL_SERVICE_NAMESPACE` | `"demo_namespace"` | Service namespace | | `OTEL_SERVICE_VERSION` | `"v1.0.0"` | Semantic version | | `OTEL_SERVICE_INSTANCE` | `"instance-1"` | Unique instance ID | | `OTEL_DEPLOY_ENV` | `"dev"` | Deployment environment (e.g. `prod`, `staging`) | +| `OTEL_WORKER_BURST` | `16` | The number of telemetry messages to process at a time | +| `OTEL_WORKER_SLEEP_MS` | `0` | How long to sleep between processing messages (0 is instant) | +| `OTEL_QUEUE_CAPACITY` | `128` | The maximum number of telemetry messages we can store before we start to drop data | | `DEBUG` | `Null` | Print verbose messages including OTEL Payload to the serial port | + --- ## 🤝 Contributing diff --git a/include/OtelSender.h b/include/OtelSender.h index a0761bf..40a13bd 100644 --- a/include/OtelSender.h +++ b/include/OtelSender.h @@ -1,20 +1,77 @@ -#ifndef OTEL_SENDER_H -#define OTEL_SENDER_H - +#pragma once +#include #include +#include + +// Optional compile-time on/off switch for all network sends. +// You can set -DOTEL_SEND_ENABLE=0 in platformio.ini for latency tests. +#ifndef OTEL_SEND_ENABLE +#define OTEL_SEND_ENABLE 1 +#endif + +#ifndef OTEL_WORKER_BURST +#define OTEL_WORKER_BURST 8 +#endif + +#ifndef OTEL_WORKER_SLEEP_MS +#define OTEL_WORKER_SLEEP_MS 0 +#endif + +#ifndef OTEL_QUEUE_CAPACITY +#define OTEL_QUEUE_CAPACITY 128 +#endif +// Base URL of your OTLP/HTTP collector (no trailing slash), e.g. "http://192.168.8.50:4318" +// You can override this via build_flags: -DOTEL_COLLECTOR_BASE_URL="\"http://…:4318\"" +#ifndef OTEL_COLLECTOR_BASE_URL +#define OTEL_COLLECTOR_BASE_URL "http://192.168.8.50:4318" +#endif -#ifndef OTEL_COLLECTOR_HOST -#define OTEL_COLLECTOR_HOST "http://192.168.8.10:4318" +// Internal queue capacity for async sender on RP2040. +// Keep small to bound RAM; increase if you see drops. +#ifndef OTEL_QUEUE_CAPACITY +#define OTEL_QUEUE_CAPACITY 16 #endif -namespace OTel { +struct OTelQueuedItem { + const char* path; // "/v1/logs", "/v1/traces", "/v1/metrics" + String payload; // serialized JSON +}; class OTelSender { public: + // Main API: called by logger/tracer/metrics to send serialized JSON to OTLP/HTTP static void sendJson(const char* path, JsonDocument& doc); -}; -} // namespace OTel + // Start the RP2040 core-1 worker (no-op on non-RP2040). Call once after Wi-Fi is ready. + static void beginAsyncWorker(); + + // Diagnostics (published via your health metrics if you like) + static uint32_t droppedCount(); // number of items dropped due to full queue + static bool queueIsHealthy(); // worker started? + +private: + // ---------- SPSC ring buffer (core0 producer -> core1 consumer) ---------- + static constexpr size_t QCAP = OTEL_QUEUE_CAPACITY; + static OTelQueuedItem q_[QCAP]; + static std::atomic head_; // producer writes + static std::atomic tail_; // consumer writes + static std::atomic drops_; + static std::atomic worker_started_; + static bool enqueue_(const char* path, String&& payload); + static bool dequeue_(OTelQueuedItem& out); + + // ---------- Worker ---------- + static void pumpOnce_(); // send one item if present + static void workerLoop_(); // runs on core 1 (RP2040) + static void launchWorkerOnce_(); + + // ---------- Utilities ---------- + static String fullUrl_(const char* path); // build collector URL + path + + // inside class OTelSender (near the bottom) +#ifdef ARDUINO_ARCH_RP2040 + friend void otel_worker_entry(); #endif +}; diff --git a/library.json b/library.json index fc7ec7f..ae904c7 100644 --- a/library.json +++ b/library.json @@ -1,6 +1,6 @@ { "name": "otel-embedded-cpp", - "version": "1.0.1", + "version": "1.0.2", "description": "OpenTelemetry logging, tracing, and metrics for embedded C++ devices (ESP32, RP2040 Pico W, ESP8266).", "keywords": [ "OpenTelemetry", @@ -16,7 +16,7 @@ ], "authors": [ { - "name": "Your Name", + "name": "Matthew Macdonald-Wallace", "maintainer": true } ], diff --git a/platformio.ini b/platformio.ini index 30af1dd..fa98f7f 100644 --- a/platformio.ini +++ b/platformio.ini @@ -15,8 +15,7 @@ lib_deps = build_flags = -DWIFI_SSID=\"${sysenv.WIFI_SSID}\" -DWIFI_PASS=\"${sysenv.WIFI_PASS}\" - -DOTEL_COLLECTOR_HOST=\"${sysenv.OTEL_COLLECTOR_HOST}\" - -DOTEL_COLLECTOR_PORT=${sysenv.OTEL_COLLECTOR_PORT} + -DOTEL_COLLECTOR_BASE_URL="\"http://192.168.8.10:4318\"" -DOTEL_SERVICE_NAME=\"${sysenv.OTEL_SERVICE_NAME}\" -DOTEL_SERVICE_NAMESPACE=\"${sysenv.OTEL_SERVICE_NAMESPACE}\" -DOTEL_SERVICE_VERSION=\"${sysenv.OTEL_SERVICE_VERSION}\" @@ -37,8 +36,7 @@ lib_deps = build_flags = -DWIFI_SSID=\"${sysenv.WIFI_SSID}\" -DWIFI_PASS=\"${sysenv.WIFI_PASS}\" - -DOTEL_COLLECTOR_HOST=\"${sysenv.OTEL_COLLECTOR_HOST}\" - -DOTEL_COLLECTOR_PORT=${sysenv.OTEL_COLLECTOR_PORT} + -DOTEL_COLLECTOR_BASE_URL="\"http://192.168.8.10:4318\"" -DOTEL_SERVICE_NAME=\"${sysenv.OTEL_SERVICE_NAME}\" -DOTEL_SERVICE_NAMESPACE=\"${sysenv.OTEL_SERVICE_NAMESPACE}\" -DOTEL_SERVICE_VERSION=\"${sysenv.OTEL_SERVICE_VERSION}\" @@ -59,10 +57,9 @@ lib_deps = build_flags = -DWIFI_SSID=\"${sysenv.WIFI_SSID}\" -DWIFI_PASS=\"${sysenv.WIFI_PASS}\" - -DOTEL_COLLECTOR_HOST=\"${sysenv.OTEL_COLLECTOR_HOST}\" - -DOTEL_COLLECTOR_PORT=${sysenv.OTEL_COLLECTOR_PORT} + -DOTEL_COLLECTOR_BASE_URL="\"http://192.168.8.10:4318\"" -DOTEL_SERVICE_NAME=\"${sysenv.OTEL_SERVICE_NAME}\" -DOTEL_SERVICE_NAMESPACE=\"${sysenv.OTEL_SERVICE_NAMESPACE}\" -DOTEL_SERVICE_VERSION=\"${sysenv.OTEL_SERVICE_VERSION}\" - -DOTEL_SERVICE_INSTANCE=\"${sysenv.OTEL_SERVICE_INSTANCE}\" - -DOTEL_DEPLOY_ENV=\"${sysenv.OTEL_DEPLOY_ENV}\" + -DOTEL_SERVICE_INSTANCE=\"esp8266\" + -DOTEL_DEPLOY_ENV=\"esp8266\" diff --git a/src/OtelSender.cpp b/src/OtelSender.cpp index 0a9ddd5..bfb5e27 100644 --- a/src/OtelSender.cpp +++ b/src/OtelSender.cpp @@ -1,83 +1,176 @@ -#include "OtelDebug.h" - #include "OtelSender.h" -// —————————————————————————————————————————————————————————— -// Platform-specific networking includes -// —————————————————————————————————————————————————————————— + +// --- HTTP + WiFi includes (portable) --- #if defined(ESP8266) #include #include -#elif defined(ARDUINO_ARCH_ESP32) || defined(ARDUINO_ARCH_RP2040) +#elif defined(ESP32) #include #include +#elif defined(ARDUINO_ARCH_RP2040) + #include // Earle Philhower core + #include // Arduino HTTPClient #else - #error "Unsupported platform: must be ESP8266, ESP32 or RP2040" + #error "Unsupported platform: need WiFi + HTTPClient" #endif -namespace OTel { - -static String baseUrl() { - String base = String(OTEL_COLLECTOR_HOST); // may be "http://192.168.8.10" or "192.168.8.10:4318" etc. - // Ensure there is a scheme - if (!(base.startsWith("http://") || base.startsWith("https://"))) { - base = String("http://") + base; - } - // Ensure there is a port (check for ":" after the scheme) - int scheme_end = base.indexOf("://") + 3; - int colon_after_scheme = base.indexOf(':', scheme_end); - int slash_after_scheme = base.indexOf('/', scheme_end); - if (colon_after_scheme == -1 || (slash_after_scheme != -1 && colon_after_scheme > slash_after_scheme)) { - // No explicit port present; append compile-time port - base += ":" + String(OTEL_COLLECTOR_PORT); - } +#ifdef ARDUINO_ARCH_RP2040 + #include "pico/multicore.h" +#endif - // Strip any trailing slash before we append path - if (base.endsWith("/")) base.remove(base.length() - 1); - return base; +// ===== statics ===== +OTelQueuedItem OTelSender::q_[QCAP]; +std::atomic OTelSender::head_{0}; +std::atomic OTelSender::tail_{0}; +std::atomic OTelSender::drops_{0}; +std::atomic OTelSender::worker_started_{false}; +// Begin HTTP on all platforms (ESP8266 requires WiFiClient) +static bool httpBeginCompat(HTTPClient& http, const String& url) { +#if defined(ESP8266) + WiFiClient client; // or WiFiClientSecure if you later do HTTPS + return http.begin(client, url); // new API on ESP8266 +#else + return http.begin(url); // ESP32 / RP2040 +#endif } -static String fullUrl(const char* path) { - String p = path ? String(path) : String(); - if (!p.startsWith("/")) p = "/" + p; - return baseUrl() + p; + +// Build "http://host:4318" + "/v1/…" +String OTelSender::fullUrl_(const char* path) { + // Avoid double slashes if a user accidentally sets a trailing slash + String base = String(OTEL_COLLECTOR_BASE_URL); + if (base.endsWith("/")) base.remove(base.length() - 1); + if (path && *path == '/') return base + String(path); + return base + "/" + String(path ? path : ""); } -void OTelSender::sendJson(const char* path, JsonDocument& doc) { - if (doc.overflowed()){ - DBG_PRINTLN("Document Overflowed"); - return; +// ---------- Queue (SPSC) ---------- +// Single-producer (core0) enqueue; drop oldest on overflow +bool OTelSender::enqueue_(const char* path, String&& payload) { + size_t h = head_.load(std::memory_order_relaxed); + size_t t = tail_.load(std::memory_order_acquire); + size_t next = (h + 1) % QCAP; + + if (next == t) { + // Full: drop oldest (advance tail) + size_t new_t = (t + 1) % QCAP; + tail_.store(new_t, std::memory_order_release); + drops_.fetch_add(1, std::memory_order_relaxed); } + q_[h].path = path; + q_[h].payload = std::move(payload); + head_.store(next, std::memory_order_release); + return true; +} +bool OTelSender::dequeue_(OTelQueuedItem& out) { + size_t t = tail_.load(std::memory_order_relaxed); + size_t h = head_.load(std::memory_order_acquire); + if (t == h) return false; // empty - String payload; - serializeJson(doc, payload); + out = std::move(q_[t]); + q_[t].payload = String(); // release memory + size_t next = (t + 1) % QCAP; + tail_.store(next, std::memory_order_release); + return true; +} - String url = fullUrl(path); -DBG_PRINT("HTTP begin URL: >"); DBG_PRINT(url); DBG_PRINTLN("<"); +// ---------- Worker ---------- +void OTelSender::pumpOnce_() { +#if OTEL_SEND_ENABLE + OTelQueuedItem it; + if (!dequeue_(it)) return; -#ifdef ESP8266 - WiFiClient *clientPtr = nullptr; - WiFiClient client; // or WiFiClientSecure if using https - clientPtr = &client; HTTPClient http; - http.begin(*clientPtr, url); + if (httpBeginCompat(http, fullUrl_(it.path))) { + http.addHeader("Content-Type", "application/json"); + // Fire the POST; the blocking happens on core 1, not in the control path. + (void)http.POST(it.payload); + http.end(); + } #else - HTTPClient http; - http.begin(url); + // If globally disabled, just drain the queue without sending. + OTelQueuedItem sink; + (void)dequeue_(sink); #endif +} -http.addHeader("Content-Type", "application/json"); -DBG_PRINT("Sending Payload: "); -DBG_PRINTLN(payload); -int code = http.POST(payload); -DBG_PRINT("HTTP POST returned: "); DBG_PRINTLN(code); -if (code < 0) { DBG_PRINTLN(http.errorToString(code)); } -http.end(); +void OTelSender::workerLoop_() { + for (;;) { + for (int i = 0; i < OTEL_WORKER_BURST; ++i) { + OTelQueuedItem it; + if (!dequeue_(it)) break; + + HTTPClient http; + // Keep-alive where supported; harmless otherwise + #if defined(HTTPCLIENT_1_2_COMPATIBLE) || defined(ESP8266) || defined(ESP32) + http.setReuse(true); + #endif + if (httpBeginCompat(http,fullUrl_(it.path))) { + http.addHeader("Content-Type", "application/json"); + (void)http.POST(it.payload); + http.end(); + } + } + delay(OTEL_WORKER_SLEEP_MS); + } +} + + +#ifdef ARDUINO_ARCH_RP2040 +void otel_worker_entry() { OTelSender::workerLoop_(); } +#endif + + +void OTelSender::launchWorkerOnce_() { +#ifdef ARDUINO_ARCH_RP2040 + bool expected = false; + if (worker_started_.compare_exchange_strong(expected, true)) { + multicore_launch_core1(otel_worker_entry); + } +#endif +} + +void OTelSender::beginAsyncWorker() { + launchWorkerOnce_(); +} + +uint32_t OTelSender::droppedCount() { + return drops_.load(std::memory_order_relaxed); +} +bool OTelSender::queueIsHealthy() { + return worker_started_.load(std::memory_order_relaxed); } -} // namespace OTel +// ---------- Public send API ---------- +void OTelSender::sendJson(const char* path, JsonDocument& doc) { +#if !OTEL_SEND_ENABLE + // Compile-time: completely disable sends (useful for latency tests) + (void)path; (void)doc; + return; +#else + // Serialize on the caller's core (cheap), then: + // - RP2040: enqueue for core-1 worker to POST (non-blocking for control path) + // - others: POST synchronously (unchanged behaviour) + String payload; + serializeJson(doc, payload); + + #ifdef ARDUINO_ARCH_RP2040 + // Ensure worker is launched (safe to call repeatedly) + launchWorkerOnce_(); + enqueue_(path, std::move(payload)); + #else + HTTPClient http; + if (httpBeginCompat(http, fullUrl_(path))) { + http.addHeader("Content-Type", "application/json"); + (void)http.POST(payload); + http.end(); + } + #endif +#endif +}