Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ strfry is a relay for the [nostr protocol](https://github.com/nostr-protocol/nos
* Durable writes: The relay never returns an `OK` until an event has been confirmed as committed to the DB
* Built-in support for real-time streaming (up/down/both) events from remote relays, and bulk import/export of events from/to jsonl files
* [negentropy](https://github.com/hoytech/negentropy)-based set reconcilliation for efficient syncing with clients or between relays, accurate counting of events between relays, and more
* Prometheus metrics endpoint for monitoring relay activity (client/relay messages by verb, events by kind)

If you are using strfry, please [join our telegram chat](https://t.me/strfry_users). Hopefully soon we'll migrate this to nostr.

Expand All @@ -31,6 +32,8 @@ If you are using strfry, please [join our telegram chat](https://t.me/strfry_use
* [Fried Exports](#fried-exports)
* [Stream](#stream)
* [Sync](#sync)
* [Monitoring](#monitoring)
* [Prometheus Metrics](#prometheus-metrics)
* [Advanced](#advanced)
* [DB Upgrade](#db-upgrade)
* [DB Compaction](#db-compaction)
Expand Down Expand Up @@ -185,6 +188,34 @@ By default strfry keeps a precomputed BTree to speed up full-DB syncs. You can a



## Monitoring

### Prometheus Metrics

strfry includes built-in Prometheus metrics support for monitoring relay activity. Metrics are exposed via HTTP at the `/metrics` endpoint on the same port as the relay WebSocket server.

For example, if your relay is running on `localhost:7777`, you can access metrics at `http://localhost:7777/metrics`

The following metrics are available:

* **`nostr_client_messages_total{verb}`** - Total number of messages received from clients, broken down by verb (EVENT, REQ, CLOSE, NEG-OPEN, NEG-MSG, NEG-CLOSE)
* **`nostr_relay_messages_total{verb}`** - Total number of messages sent to clients, broken down by verb (EVENT, OK, EOSE, NOTICE, NEG-MSG, NEG-ERR)
* **`nostr_events_total{kind}`** - Total number of events processed, broken down by event kind (0, 1, 3, 4, etc.)

To scrape these metrics with Prometheus, add a job to your `prometheus.yml`:

```yaml
scrape_configs:
- job_name: 'strfry'
static_configs:
- targets: ['localhost:7777']
metrics_path: '/metrics'
```

See the [Prometheus metrics documentation](docs/prometheus-metrics.md) for detailed information and example Grafana queries.



## Advanced

### DB Upgrade
Expand Down
118 changes: 118 additions & 0 deletions src/PrometheusMetrics.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#pragma once

#include <atomic>
#include <string>
#include <sstream>
#include <map>
#include <mutex>
#include <shared_mutex>

// Simple thread-safe Prometheus metrics implementation
// Supports counters with labels

class PrometheusMetrics {
public:
// Counter for tracking cumulative values
class Counter {
private:
std::atomic<uint64_t> value{0};

public:
void inc(uint64_t n = 1) {
value.fetch_add(n, std::memory_order_relaxed);
}

uint64_t get() const {
return value.load(std::memory_order_relaxed);
}
};

// Labeled counter - allows multiple counters with different label values
class LabeledCounter {
private:
mutable std::shared_mutex mutex;
std::map<std::string, Counter> counters;

public:
void inc(const std::string& label, uint64_t n = 1) {
// Try read lock first for common case
{
std::shared_lock<std::shared_mutex> lock(mutex);
auto it = counters.find(label);
if (it != counters.end()) {
it->second.inc(n);
return;
}
}

// Need to create new counter
std::unique_lock<std::shared_mutex> lock(mutex);
counters[label].inc(n);
}

std::map<std::string, uint64_t> getAll() const {
std::shared_lock<std::shared_mutex> lock(mutex);
std::map<std::string, uint64_t> result;
for (const auto& [label, counter] : counters) {
result[label] = counter.get();
}
return result;
}
};

// Singleton instance
static PrometheusMetrics& getInstance() {
static PrometheusMetrics instance;
return instance;
}

// Nostr client message counters (messages FROM clients TO relay)
LabeledCounter nostrClientMessages;

// Nostr relay message counters (messages FROM relay TO clients)
LabeledCounter nostrRelayMessages;

// Nostr event counters (by kind)
LabeledCounter nostrEventsByKind;

// Generate Prometheus text format output
std::string render() const {
std::ostringstream out;

// Client messages
out << "# HELP nostr_client_messages_total Total number of Nostr client messages by verb\n";
out << "# TYPE nostr_client_messages_total counter\n";
auto clientMsgs = nostrClientMessages.getAll();
for (const auto& [verb, count] : clientMsgs) {
out << "nostr_client_messages_total{verb=\"" << verb << "\"} " << count << "\n";
}

// Relay messages
out << "# HELP nostr_relay_messages_total Total number of Nostr relay messages by verb\n";
out << "# TYPE nostr_relay_messages_total counter\n";
auto relayMsgs = nostrRelayMessages.getAll();
for (const auto& [verb, count] : relayMsgs) {
out << "nostr_relay_messages_total{verb=\"" << verb << "\"} " << count << "\n";
}

// Events by kind
out << "# HELP nostr_events_total Total number of Nostr events by kind\n";
out << "# TYPE nostr_events_total counter\n";
auto events = nostrEventsByKind.getAll();
for (const auto& [kind, count] : events) {
out << "nostr_events_total{kind=\"" << kind << "\"} " << count << "\n";
}

return out.str();
}

private:
PrometheusMetrics() = default;
PrometheusMetrics(const PrometheusMetrics&) = delete;
PrometheusMetrics& operator=(const PrometheusMetrics&) = delete;
};

// Convenience macros for incrementing metrics
#define PROM_INC_CLIENT_MSG(verb) PrometheusMetrics::getInstance().nostrClientMessages.inc(verb)
#define PROM_INC_RELAY_MSG(verb) PrometheusMetrics::getInstance().nostrRelayMessages.inc(verb)
#define PROM_INC_EVENT_KIND(kind) PrometheusMetrics::getInstance().nostrEventsByKind.inc(kind)
7 changes: 7 additions & 0 deletions src/apps/relay/RelayIngester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ void RelayServer::runIngester(ThreadPool<MsgIngester>::Thread &thr) {
auto &cmd = jsonGetString(arr[0], "first element not a command like REQ");

if (cmd == "EVENT") {
PROM_INC_CLIENT_MSG("EVENT");
if (cfg().relay__logging__dumpInEvents) LI << "[" << msg->connId << "] dumpInEvent: " << msg->payload;

try {
Expand All @@ -36,6 +37,7 @@ void RelayServer::runIngester(ThreadPool<MsgIngester>::Thread &thr) {
if (cfg().relay__logging__invalidEvents) LI << "Rejected invalid event: " << e.what();
}
} else if (cmd == "REQ") {
PROM_INC_CLIENT_MSG("REQ");
if (cfg().relay__logging__dumpInReqs) LI << "[" << msg->connId << "] dumpInReq: " << msg->payload;

try {
Expand All @@ -44,6 +46,7 @@ void RelayServer::runIngester(ThreadPool<MsgIngester>::Thread &thr) {
sendNoticeError(msg->connId, std::string("bad req: ") + e.what());
}
} else if (cmd == "CLOSE") {
PROM_INC_CLIENT_MSG("CLOSE");
if (cfg().relay__logging__dumpInReqs) LI << "[" << msg->connId << "] dumpInReq: " << msg->payload;

try {
Expand All @@ -52,6 +55,7 @@ void RelayServer::runIngester(ThreadPool<MsgIngester>::Thread &thr) {
sendNoticeError(msg->connId, std::string("bad close: ") + e.what());
}
} else if (cmd.starts_with("NEG-")) {
PROM_INC_CLIENT_MSG(std::string(cmd));
if (!cfg().relay__negentropy__enabled) throw herr("negentropy disabled");

try {
Expand Down Expand Up @@ -91,6 +95,9 @@ void RelayServer::ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, std::str
parseAndVerifyEvent(origJson, secpCtx, true, true, packedStr, jsonStr);

PackedEventView packed(packedStr);

// Track event kind metrics
PROM_INC_EVENT_KIND(std::to_string(packed.kind()));

{
bool foundProtected = false;
Expand Down
4 changes: 4 additions & 0 deletions src/apps/relay/RelayNegentropy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
} catch (std::exception &e) {
LI << "[" << connId << "] Error parsing negentropy message: " << e.what();

PROM_INC_RELAY_MSG("NEG-ERR");
sendToConn(connId, tao::json::to_string(tao::json::value::array({
"NEG-ERR",
subId.str(),
Expand All @@ -111,6 +112,7 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
return;
}

PROM_INC_RELAY_MSG("NEG-MSG");
sendToConn(connId, tao::json::to_string(tao::json::value::array({
"NEG-MSG",
subId.str(),
Expand Down Expand Up @@ -146,6 +148,7 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
if (view->levIds.size() > cfg().relay__negentropy__maxSyncEvents) {
LI << "[" << sub.connId << "] Negentropy query size exceeded " << cfg().relay__negentropy__maxSyncEvents;

PROM_INC_RELAY_MSG("NEG-ERR");
sendToConn(sub.connId, tao::json::to_string(tao::json::value::array({
"NEG-ERR",
sub.subId.str(),
Expand Down Expand Up @@ -225,6 +228,7 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
} else if (auto msg = std::get_if<MsgNegentropy::NegMsg>(&newMsg.msg)) {
auto *userView = views.findView(msg->connId, msg->subId);
if (!userView) {
PROM_INC_RELAY_MSG("NEG-ERR");
sendToConn(msg->connId, tao::json::to_string(tao::json::value::array({
"NEG-ERR",
msg->subId.str(),
Expand Down
1 change: 1 addition & 0 deletions src/apps/relay/RelayReqWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ void RelayServer::runReqWorker(ThreadPool<MsgReqWorker>::Thread &thr) {
};

queries.onComplete = [&](lmdb::txn &, Subscription &sub){
PROM_INC_RELAY_MSG("EOSE");
sendToConn(sub.connId, tao::json::to_string(tao::json::value::array({ "EOSE", sub.subId.str() })));
tpReqMonitor.dispatch(sub.connId, MsgReqMonitor{MsgReqMonitor::NewSub{std::move(sub)}});
};
Expand Down
4 changes: 4 additions & 0 deletions src/apps/relay/RelayServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "filters.h"
#include "jsonParseUtils.h"
#include "Decompressor.h"
#include "PrometheusMetrics.h"



Expand Down Expand Up @@ -197,6 +198,7 @@ struct RelayServer {
}

void sendEvent(uint64_t connId, const SubId &subId, std::string_view evJson) {
PROM_INC_RELAY_MSG("EVENT");
auto subIdSv = subId.sv();

std::string reply;
Expand All @@ -217,13 +219,15 @@ struct RelayServer {
}

void sendNoticeError(uint64_t connId, std::string &&payload) {
PROM_INC_RELAY_MSG("NOTICE");
LI << "sending error to [" << connId << "]: " << payload;
auto reply = tao::json::value::array({ "NOTICE", std::string("ERROR: ") + payload });
tpWebsocket.dispatch(0, MsgWebsocket{MsgWebsocket::Send{connId, std::move(tao::json::to_string(reply))}});
hubTrigger->send();
}

void sendOKResponse(uint64_t connId, std::string_view eventIdHex, bool written, std::string_view message) {
PROM_INC_RELAY_MSG("OK");
auto reply = tao::json::value::array({ "OK", eventIdHex, written, message });
tpWebsocket.dispatch(0, MsgWebsocket{MsgWebsocket::Send{connId, std::move(tao::json::to_string(reply))}});
hubTrigger->send();
Expand Down
7 changes: 6 additions & 1 deletion src/apps/relay/RelayWebsocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,11 @@ void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
std::string host = req.getHeader("host").toString();
std::string url = req.getUrl().toString();

if (url == "/.well-known/nodeinfo") {
if (url == "/metrics") {
auto metrics = PrometheusMetrics::getInstance().render();
auto response = preGenerateHttpResponse("text/plain; version=0.0.4", metrics);
res->write(response.data(), response.size());
} else if (url == "/.well-known/nodeinfo") {
auto nodeInfo = getNodeInfoHttpResponse(host);
res->write(nodeInfo.data(), nodeInfo.size());
} else if (url == "/nodeinfo/2.1") {
Expand Down Expand Up @@ -291,6 +295,7 @@ void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
tempBuf += "]";

for (auto &item : msg->list) {
PROM_INC_RELAY_MSG("EVENT");
auto subIdSv = item.subId.sv();
auto *p = tempBuf.data() + MAX_SUBID_SIZE - subIdSv.size();
memcpy(p, "[\"EVENT\",\"", 10);
Expand Down
Loading