-
Notifications
You must be signed in to change notification settings - Fork 0
Bug/blocking calls #12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 3 commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
7ce6387
Attempt to run as unblocking across multiple devices
proffalken 65bb4f4
Move sending of telemetry to second core if it exists
proffalken 70dd207
Update include/OtelSender.h
proffalken a3ce038
Update src/OtelSender.cpp
proffalken 92beb6d
Merge branch 'main' into bug/blocking_calls
proffalken File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,20 +1,74 @@ | ||
| #ifndef OTEL_SENDER_H | ||
| #define OTEL_SENDER_H | ||
|
|
||
| #pragma once | ||
| #include <Arduino.h> | ||
| #include <ArduinoJson.h> | ||
| #include <atomic> | ||
|
|
||
| #ifndef OTEL_COLLECTOR_HOST | ||
| #define OTEL_COLLECTOR_HOST "http://192.168.8.10:4318" | ||
| // Optional compile-time on/off switch for all network sends. | ||
| // You can set -DOTEL_SEND_ENABLE=0 in platformio.ini for latency tests. | ||
| #ifndef OTEL_SEND_ENABLE | ||
| #define OTEL_SEND_ENABLE 1 | ||
| #endif | ||
|
|
||
| namespace OTel { | ||
| #ifndef OTEL_WORKER_BURST | ||
| #define OTEL_WORKER_BURST 8 | ||
| #endif | ||
|
|
||
| #ifndef OTEL_WORKER_SLEEP_MS | ||
| #define OTEL_WORKER_SLEEP_MS 0 | ||
| #endif | ||
|
|
||
| #ifndef OTEL_QUEUE_CAPACITY | ||
| #define OTEL_QUEUE_CAPACITY 128 | ||
| #endif | ||
| // Base URL of your OTLP/HTTP collector (no trailing slash), e.g. "http://192.168.8.50:4318" | ||
| // You can override this via build_flags: -DOTEL_COLLECTOR_BASE_URL="\"http://…:4318\"" | ||
| #ifndef OTEL_COLLECTOR_BASE_URL | ||
| #define OTEL_COLLECTOR_BASE_URL "http://192.168.8.50:4318" | ||
| #endif | ||
|
|
||
| // Internal queue capacity for async sender on RP2040. | ||
| // Keep small to bound RAM; increase if you see drops. | ||
|
|
||
| struct OTelQueuedItem { | ||
| const char* path; // "/v1/logs", "/v1/traces", "/v1/metrics" | ||
| String payload; // serialized JSON | ||
| }; | ||
|
|
||
| class OTelSender { | ||
| public: | ||
| // Main API: called by logger/tracer/metrics to send serialized JSON to OTLP/HTTP | ||
| static void sendJson(const char* path, JsonDocument& doc); | ||
| }; | ||
|
|
||
| } // namespace OTel | ||
| // Start the RP2040 core-1 worker (no-op on non-RP2040). Call once after Wi-Fi is ready. | ||
| static void beginAsyncWorker(); | ||
|
|
||
| // Diagnostics (published via your health metrics if you like) | ||
| static uint32_t droppedCount(); // number of items dropped due to full queue | ||
| static bool queueIsHealthy(); // worker started? | ||
|
|
||
| private: | ||
| // ---------- SPSC ring buffer (core0 producer -> core1 consumer) ---------- | ||
| static constexpr size_t QCAP = OTEL_QUEUE_CAPACITY; | ||
| static OTelQueuedItem q_[QCAP]; | ||
| static std::atomic<size_t> head_; // producer writes | ||
| static std::atomic<size_t> tail_; // consumer writes | ||
| static std::atomic<uint32_t> drops_; | ||
| static std::atomic<bool> worker_started_; | ||
|
|
||
| static bool enqueue_(const char* path, String&& payload); | ||
| static bool dequeue_(OTelQueuedItem& out); | ||
|
|
||
| // ---------- Worker ---------- | ||
| static void pumpOnce_(); // send one item if present | ||
| static void workerLoop_(); // runs on core 1 (RP2040) | ||
| static void launchWorkerOnce_(); | ||
|
|
||
| // ---------- Utilities ---------- | ||
| static String fullUrl_(const char* path); // build collector URL + path | ||
|
|
||
| // inside class OTelSender (near the bottom) | ||
proffalken marked this conversation as resolved.
Show resolved
Hide resolved
proffalken marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| #ifdef ARDUINO_ARCH_RP2040 | ||
| friend void otel_worker_entry(); | ||
| #endif | ||
| }; | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,83 +1,176 @@ | ||
| #include "OtelDebug.h" | ||
|
|
||
| #include "OtelSender.h" | ||
| // —————————————————————————————————————————————————————————— | ||
| // Platform-specific networking includes | ||
| // —————————————————————————————————————————————————————————— | ||
|
|
||
| // --- HTTP + WiFi includes (portable) --- | ||
| #if defined(ESP8266) | ||
| #include <ESP8266WiFi.h> | ||
| #include <ESP8266HTTPClient.h> | ||
| #elif defined(ARDUINO_ARCH_ESP32) || defined(ARDUINO_ARCH_RP2040) | ||
| #elif defined(ESP32) | ||
| #include <WiFi.h> | ||
| #include <HTTPClient.h> | ||
| #elif defined(ARDUINO_ARCH_RP2040) | ||
| #include <WiFi.h> // Earle Philhower core | ||
| #include <HTTPClient.h> // Arduino HTTPClient | ||
| #else | ||
| #error "Unsupported platform: must be ESP8266, ESP32 or RP2040" | ||
| #error "Unsupported platform: need WiFi + HTTPClient" | ||
| #endif | ||
|
|
||
| namespace OTel { | ||
|
|
||
| static String baseUrl() { | ||
| String base = String(OTEL_COLLECTOR_HOST); // may be "http://192.168.8.10" or "192.168.8.10:4318" etc. | ||
|
|
||
| // Ensure there is a scheme | ||
| if (!(base.startsWith("http://") || base.startsWith("https://"))) { | ||
| base = String("http://") + base; | ||
| } | ||
|
|
||
| // Ensure there is a port (check for ":" after the scheme) | ||
| int scheme_end = base.indexOf("://") + 3; | ||
| int colon_after_scheme = base.indexOf(':', scheme_end); | ||
| int slash_after_scheme = base.indexOf('/', scheme_end); | ||
| if (colon_after_scheme == -1 || (slash_after_scheme != -1 && colon_after_scheme > slash_after_scheme)) { | ||
| // No explicit port present; append compile-time port | ||
| base += ":" + String(OTEL_COLLECTOR_PORT); | ||
| } | ||
| #ifdef ARDUINO_ARCH_RP2040 | ||
| #include "pico/multicore.h" | ||
| #endif | ||
|
|
||
| // Strip any trailing slash before we append path | ||
| if (base.endsWith("/")) base.remove(base.length() - 1); | ||
| return base; | ||
| // ===== statics ===== | ||
| OTelQueuedItem OTelSender::q_[QCAP]; | ||
| std::atomic<size_t> OTelSender::head_{0}; | ||
| std::atomic<size_t> OTelSender::tail_{0}; | ||
| std::atomic<uint32_t> OTelSender::drops_{0}; | ||
| std::atomic<bool> OTelSender::worker_started_{false}; | ||
| // Begin HTTP on all platforms (ESP8266 requires WiFiClient) | ||
| static bool httpBeginCompat(HTTPClient& http, const String& url) { | ||
| #if defined(ESP8266) | ||
| WiFiClient client; // or WiFiClientSecure if you later do HTTPS | ||
| return http.begin(client, url); // new API on ESP8266 | ||
| #else | ||
| return http.begin(url); // ESP32 / RP2040 | ||
| #endif | ||
| } | ||
|
|
||
| static String fullUrl(const char* path) { | ||
| String p = path ? String(path) : String(); | ||
| if (!p.startsWith("/")) p = "/" + p; | ||
| return baseUrl() + p; | ||
|
|
||
| // Build "http://host:4318" + "/v1/…" | ||
| String OTelSender::fullUrl_(const char* path) { | ||
| // Avoid double slashes if a user accidentally sets a trailing slash | ||
| String base = String(OTEL_COLLECTOR_BASE_URL); | ||
| if (base.endsWith("/")) base.remove(base.length() - 1); | ||
| if (path && *path == '/') return base + String(path); | ||
| return base + "/" + String(path ? path : ""); | ||
| } | ||
|
|
||
| void OTelSender::sendJson(const char* path, JsonDocument& doc) { | ||
| if (doc.overflowed()){ | ||
| DBG_PRINTLN("Document Overflowed"); | ||
| return; | ||
| // ---------- Queue (SPSC) ---------- | ||
| // Single-producer (core0) enqueue; drop oldest on overflow | ||
| bool OTelSender::enqueue_(const char* path, String&& payload) { | ||
| size_t h = head_.load(std::memory_order_relaxed); | ||
| size_t t = tail_.load(std::memory_order_acquire); | ||
| size_t next = (h + 1) % QCAP; | ||
|
|
||
| if (next == t) { | ||
| // Full: drop oldest (advance tail) | ||
| size_t new_t = (t + 1) % QCAP; | ||
| tail_.store(new_t, std::memory_order_release); | ||
| drops_.fetch_add(1, std::memory_order_relaxed); | ||
| } | ||
|
|
||
| q_[h].path = path; | ||
| q_[h].payload = std::move(payload); | ||
| head_.store(next, std::memory_order_release); | ||
| return true; | ||
| } | ||
|
|
||
| bool OTelSender::dequeue_(OTelQueuedItem& out) { | ||
| size_t t = tail_.load(std::memory_order_relaxed); | ||
| size_t h = head_.load(std::memory_order_acquire); | ||
| if (t == h) return false; // empty | ||
|
|
||
| String payload; | ||
| serializeJson(doc, payload); | ||
| out = std::move(q_[t]); | ||
| q_[t].payload = String(); // release memory | ||
| size_t next = (t + 1) % QCAP; | ||
| tail_.store(next, std::memory_order_release); | ||
| return true; | ||
| } | ||
|
|
||
| String url = fullUrl(path); | ||
| DBG_PRINT("HTTP begin URL: >"); DBG_PRINT(url); DBG_PRINTLN("<"); | ||
| // ---------- Worker ---------- | ||
| void OTelSender::pumpOnce_() { | ||
| #if OTEL_SEND_ENABLE | ||
| OTelQueuedItem it; | ||
| if (!dequeue_(it)) return; | ||
|
|
||
| #ifdef ESP8266 | ||
| WiFiClient *clientPtr = nullptr; | ||
| WiFiClient client; // or WiFiClientSecure if using https | ||
| clientPtr = &client; | ||
| HTTPClient http; | ||
| http.begin(*clientPtr, url); | ||
| if (httpBeginCompat(http, fullUrl_(it.path))) { | ||
| http.addHeader("Content-Type", "application/json"); | ||
| // Fire the POST; the blocking happens on core 1, not in the control path. | ||
| (void)http.POST(it.payload); | ||
| http.end(); | ||
| } | ||
| #else | ||
| HTTPClient http; | ||
| http.begin(url); | ||
| // If globally disabled, just drain the queue without sending. | ||
| OTelQueuedItem sink; | ||
| (void)dequeue_(sink); | ||
proffalken marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| #endif | ||
| } | ||
|
|
||
| http.addHeader("Content-Type", "application/json"); | ||
| DBG_PRINT("Sending Payload: "); | ||
| DBG_PRINTLN(payload); | ||
| int code = http.POST(payload); | ||
| DBG_PRINT("HTTP POST returned: "); DBG_PRINTLN(code); | ||
| if (code < 0) { DBG_PRINTLN(http.errorToString(code)); } | ||
| http.end(); | ||
| void OTelSender::workerLoop_() { | ||
| for (;;) { | ||
| for (int i = 0; i < OTEL_WORKER_BURST; ++i) { | ||
| OTelQueuedItem it; | ||
| if (!dequeue_(it)) break; | ||
|
|
||
| HTTPClient http; | ||
| // Keep-alive where supported; harmless otherwise | ||
| #if defined(HTTPCLIENT_1_2_COMPATIBLE) || defined(ESP8266) || defined(ESP32) | ||
proffalken marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| http.setReuse(true); | ||
| #endif | ||
| if (httpBeginCompat(http,fullUrl_(it.path))) { | ||
proffalken marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
proffalken marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| http.addHeader("Content-Type", "application/json"); | ||
| (void)http.POST(it.payload); | ||
| http.end(); | ||
| } | ||
| } | ||
| delay(OTEL_WORKER_SLEEP_MS); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| #ifdef ARDUINO_ARCH_RP2040 | ||
| void otel_worker_entry() { OTelSender::workerLoop_(); } | ||
| #endif | ||
|
|
||
|
|
||
| void OTelSender::launchWorkerOnce_() { | ||
| #ifdef ARDUINO_ARCH_RP2040 | ||
| bool expected = false; | ||
| if (worker_started_.compare_exchange_strong(expected, true)) { | ||
| multicore_launch_core1(otel_worker_entry); | ||
| } | ||
| #endif | ||
| } | ||
|
|
||
| void OTelSender::beginAsyncWorker() { | ||
| launchWorkerOnce_(); | ||
| } | ||
|
|
||
| uint32_t OTelSender::droppedCount() { | ||
| return drops_.load(std::memory_order_relaxed); | ||
| } | ||
|
|
||
| bool OTelSender::queueIsHealthy() { | ||
| return worker_started_.load(std::memory_order_relaxed); | ||
| } | ||
|
|
||
| } // namespace OTel | ||
| // ---------- Public send API ---------- | ||
| void OTelSender::sendJson(const char* path, JsonDocument& doc) { | ||
| #if !OTEL_SEND_ENABLE | ||
| // Compile-time: completely disable sends (useful for latency tests) | ||
| (void)path; (void)doc; | ||
| return; | ||
| #else | ||
| // Serialize on the caller's core (cheap), then: | ||
| // - RP2040: enqueue for core-1 worker to POST (non-blocking for control path) | ||
| // - others: POST synchronously (unchanged behaviour) | ||
| String payload; | ||
| serializeJson(doc, payload); | ||
|
|
||
| #ifdef ARDUINO_ARCH_RP2040 | ||
| // Ensure worker is launched (safe to call repeatedly) | ||
| launchWorkerOnce_(); | ||
| enqueue_(path, std::move(payload)); | ||
| #else | ||
| HTTPClient http; | ||
| if (httpBeginCompat(http, fullUrl_(path))) { | ||
| http.addHeader("Content-Type", "application/json"); | ||
| (void)http.POST(payload); | ||
| http.end(); | ||
| } | ||
| #endif | ||
| #endif | ||
| } | ||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.