diff --git a/bench/README.md b/bench/README.md new file mode 100644 index 00000000..c161ef8e --- /dev/null +++ b/bench/README.md @@ -0,0 +1,134 @@ +Strfry Benchmark Suite — Plan and Structure + +Purpose +- Measure strfry performance across DB sizes and workloads with NIP-50 disabled and enabled. +- Produce comparable Markdown reports that include sanitized system info (no PII). + +Outcomes +- Repeatable, automated runs that generate: + - Per-scenario Markdown report with metrics and system profile + - Aggregated summary Markdown table across scenarios +- Clear separation of preparation (DB build) vs execution (load + measure) + +High-Level Flow +1) Prepare: build test DB for each scenario + - Generate cryptographically valid nostr events via nak + - Ingest into a fresh strfry DB (separate directory per scenario) + - Optionally pre-compress dictionaries and warm caches +2) Run: execute workload against the prepared DB + - Start strfry with scenario config (NIP-50 on/off) + - Run benchmark client to drive REQ/EVENT traffic + - Capture server logs, resource stats, and client timings +3) Report: aggregate results + - Parse logs and client output + - Emit per-scenario report and a combined summary table + +Repository Layout (bench/) +- bench/ + - README.md — this plan and usage + - SCENARIOS.md — curated scenarios list and guidance + - scenarios/ + - small.yml — ~100k events + - medium.yml — ~1M events + - large.yml — ~10M events (example) +- scripts/ + - prepare.sh — build DB(s) for scenarios + - run.sh — run benchmark(s) and gather metrics + - sysinfo.sh — collect sanitized system profile + - report.py — generate per-scenario + summary Markdown + - client/ — load generator (future; optional) + - results/ + - raw/ — JSON and logs per run + - summary.md — aggregated Markdown table + - work/ — ephemeral DBs and run artifacts + +External Dependencies +- nak: Event generator that produces valid signed nostr events + - Provide binary path via env `NAK_BIN` or place on PATH +- Tools: `bash`, `jq`, `awk`, `sed`, `python3` (for report.py) +- Optional: `mpstat`/`pidstat` (sysstat), `lsblk`, `lscpu` for system profiling + +Scenario Format (YAML) +- name: human-readable name +- db: + - events: integer (total events to generate) + - kinds: pattern (e.g., "1,30023") + - avg_event_size: bytes (approx) + - keyword_inject_rate: float 0..1 (probability to inject keywords into generated event content) + - keywords: list of { term, weight } to control frequency distribution and enable realistic search results + - distribution: optional mix (e.g., replies/hashtags) +- server: + - search_enabled: true|false + - search_backend: lmdb|noop + - config_overrides: map of strfry config keys/values +- workload: + - duration_s: 120 + - warmup_s: 15 + - connections: 100 + - subscriptions_per_conn: 3 + - writers_per_sec: 200 # events/sec sent to relay + - req_mix: + - type: read + filter: { kinds: [1], limit: 200 } + weight: 3 + - type: search + # if the client runner supports it, the search term will be sampled from db.keywords biased by weight + filter: { kinds: [1], search: "best nostr apps", limit: 100 } + weight: 1 + +Metrics +- Throughput: events/s sent; events/s delivered +- Latency: p50/p95/p99 for + - Initial REQ scan to EOSE + - EVENT -> OK roundtrip + - EVENT observe-to-deliver (ingest to delivery to live subscribers) +- Resource: + - strfry RSS/CPU (sampled), total system CPU/mem + - DB size on disk +- Search (when enabled): + - Search query latency p50/p95/p99 + - Index catch-up state at run start/end + - Results cardinality across term classes (common vs rare), using weighted keywords + +System Profile (sanitized) +- OS: kernel version, distro +- CPU: model, sockets, cores, MHz +- Memory: total +- Storage: device type (NVMe/SATA), rotational flag, filesystem +- Notes: + - Do not record hostname, users, IP addresses, or MACs + +Execution +- Prepare one or more scenarios: + - `bench/scripts/prepare.sh -s scenarios/small.yml [--workers N] [--nak /path/to/nak]` + - `--workers N` controls parallel generators (defaults to min(4, nproc) or env GEN_PAR) + - Output DB at `bench/work/small/db/` +- Run benchmark(s): + - `bench/scripts/run.sh -s scenarios/small.yml --out bench/results/raw/small-$(date +%s)` + - Produces: client.json, server.log, sysinfo.json +- Report: + - `bench/scripts/report.py bench/results/raw/* > bench/results/summary.md` + +Output: Markdown Table (example) + +| Scenario | DB events | NIP-50 | Conns | Subs/Conn | Writers/s | EOSE p50/p95/p99 (ms) | Search p50/p95/p99 (ms) | OK p50/p95/p99 (ms) | Delivered/s | RSS max (MB) | CPU avg (%) | +|---------:|----------:|:------:|------:|----------:|----------:|-----------------------:|-------------------------:|--------------------:|-----------:|-------------:|------------:| +| small | 100k | off | 100 | 3 | 200 | 8 / 19 / 42 | — | 5 / 12 / 29 | 12,300 | 620 | 240 | +| small | 100k | on | 100 | 3 | 200 | 9 / 21 / 47 | 14 / 31 / 66 | 5 / 12 / 30 | 12,100 | 640 | 252 | + +Methodology Notes +- Warm-up period excluded from measurements +- Each scenario run twice and best-of-two reported (helps mitigate jitter) +- REQ scan latency measured from REQ send to EOSE receive per sub +- Search latency measured from REQ send to first EVENT, and to EOSE +- OK roundtrip measured per EVENT +- Logs parsed for dbScan perf lines if `relay__logging__dbScanPerf = true` + +PII and Safety +- Scripts must not include hostname, users, IP/MAC addresses +- Sanitize all system data before writing to artifacts + +Future Extensions +- k6-like scenario runner for WebSocket; distributed load generation +- Flame graphs and CPU profiling (perf/pprof) under opt-in +- Additional NIP scenarios (negentropy sync under load) diff --git a/bench/SCENARIOS.md b/bench/SCENARIOS.md new file mode 100644 index 00000000..26bc653e --- /dev/null +++ b/bench/SCENARIOS.md @@ -0,0 +1,32 @@ +Benchmark Scenarios + +This doc describes the standard scenarios and how to create new ones. + +Standard Scenarios +- small.yml + - ~100k events + - Mixed kinds: "1, 30023" + - NIP-50 on and off runs + - Connections: 100, subs/conn: 3, writers/s: 200 +- medium.yml + - ~1M events + - Same workload profile as small +- large.yml (example) + - ~10M events (requires ample disk/RAM) + - Lower writers/s initially to avoid IO bottlenecks + +Creating a Scenario +- Copy an existing YAML file under bench/scenarios/ and edit: + - db.events, db.kinds, db.avg_event_size + - server.search_enabled and server.search_backend + - workload parameters (duration, warmup, connections, writers) + +Tips +- For NIP-50 enabled runs, ensure catch-up indexer is caught up before measuring +- For very large DBs, consider increasing warmup and run duration +- Keep maxCandidateDocs and overfetchFactor balanced to avoid excessive scoring costs +- Choose search terms strategically: + - Common terms (e.g., "nostr", "bitcoin") to stress high-DF scoring paths + - Rare terms (e.g., project-specific tokens) to test low-DF paths and index lookups + - Multi-term phrases (e.g., "best nostr apps") to test multi-token scoring + - Inject keywords into generated content via db.keywords and db.keyword_inject_rate so searches return realistic results diff --git a/bench/scenarios/medium.yml b/bench/scenarios/medium.yml new file mode 100644 index 00000000..389a18c3 --- /dev/null +++ b/bench/scenarios/medium.yml @@ -0,0 +1,42 @@ +name: medium +db: + events: 1000000 + kinds: "1, 30023" + avg_event_size: 600 + keyword_inject_rate: 0.7 + keywords: + - term: nostr + weight: 10 + - term: bitcoin + weight: 7 + - term: lightning + weight: 4 + - term: federation + weight: 2 + - term: "nostr developers" + weight: 2 + - term: "best nostr apps" + weight: 1 +server: + search_enabled: true + search_backend: lmdb + config_overrides: + relay__logging__dbScanPerf: true +workload: + duration_s: 180 + warmup_s: 20 + connections: 200 + subscriptions_per_conn: 3 + writers_per_sec: 400 + req_mix: + - type: read + filter: + kinds: [1] + limit: 200 + weight: 2 + - type: search + filter: + kinds: [1] + search: "nostr developers" + limit: 100 + weight: 1 diff --git a/bench/scenarios/small.yml b/bench/scenarios/small.yml new file mode 100644 index 00000000..2152a011 --- /dev/null +++ b/bench/scenarios/small.yml @@ -0,0 +1,40 @@ +name: small +db: + events: 100000 + kinds: "1, 30023" + avg_event_size: 600 + keyword_inject_rate: 0.6 + keywords: + - term: nostr + weight: 8 + - term: bitcoin + weight: 5 + - term: lightning + weight: 3 + - term: federation + weight: 2 + - term: "best nostr apps" + weight: 1 +server: + search_enabled: false + search_backend: lmdb + config_overrides: + relay__logging__dbScanPerf: true +workload: + duration_s: 120 + warmup_s: 15 + connections: 100 + subscriptions_per_conn: 3 + writers_per_sec: 200 + req_mix: + - type: read + filter: + kinds: [1] + limit: 200 + weight: 3 + - type: search + filter: + kinds: [1] + search: "best nostr apps" + limit: 100 + weight: 1 diff --git a/bench/scripts/prepare.sh b/bench/scripts/prepare.sh new file mode 100755 index 00000000..0b414318 --- /dev/null +++ b/bench/scripts/prepare.sh @@ -0,0 +1,203 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Prepare benchmark databases for a scenario +# - Generates events with nak +# - Ingests into a fresh strfry DB under bench/work//db + +usage() { + echo "Usage: $0 -s [--nak ] [--workers ]" >&2 + exit 1 +} + +SCENARIO="" +NAK_BIN="${NAK_BIN:-nak}" +CLI_WORKERS="" + +while [[ $# -gt 0 ]]; do + case "$1" in + -s|--scenario) SCENARIO="$2"; shift 2;; + --nak) NAK_BIN="$2"; shift 2;; + -w|--workers) CLI_WORKERS="$2"; shift 2;; + *) usage;; + esac +done + +[[ -z "$SCENARIO" ]] && usage + +root_dir="$(cd "$(dirname "$0")/../.." && pwd)" +bench_dir="$root_dir/bench" +work_dir="$bench_dir/work" + +command -v jq >/dev/null || { echo "jq required" >&2; exit 2; } +command -v "${NAK_BIN}" >/dev/null || { echo "nak required on PATH or set NAK_BIN" >&2; exit 2; } + +# Render scenario JSON for easier parsing +y2j() { python3 - <<'PY' +import sys, yaml, json +print(json.dumps(yaml.safe_load(sys.stdin.read()))) +PY +} + +SC_FILE="$bench_dir/$SCENARIO" +SCJ=$(y2j <"$SC_FILE" 2>/dev/null || true) +NAME=$(echo "$SCJ" | jq -r '.name // empty') +EVENTS=$(echo "$SCJ" | jq -r '.db.events // empty') +KINDS=$(echo "$SCJ" | jq -r '.db.kinds // empty') +INJECT_RATE=$(echo "$SCJ" | jq -r '.db.keyword_inject_rate // empty') +KEYWORDS_JSON=$(echo "$SCJ" | jq -c '.db.keywords // empty') + +# Fallback parsing if PyYAML isn't available +if [[ -z "$NAME" || -z "$EVENTS" || -z "$KINDS" ]]; then + NAME=$(grep -E '^name:' "$SC_FILE" | head -1 | sed 's/^name:[[:space:]]*//') + EVENTS=$(grep -E '^[[:space:]]*events:' "$SC_FILE" | head -1 | sed 's/.*events:[[:space:]]*//') + KINDS=$(grep -E '^[[:space:]]*kinds:' "$SC_FILE" | head -1 | sed 's/.*kinds:[[:space:]]*//' | sed 's/^"//; s/"$//') + INJECT_RATE=$(grep -E '^[[:space:]]*keyword_inject_rate:' "$SC_FILE" | head -1 | sed 's/.*keyword_inject_rate:[[:space:]]*//') + KEYWORDS_JSON="" +fi + +# Defaults if still unset +[[ -z "$INJECT_RATE" ]] && INJECT_RATE="0.6" +if [[ -z "$KEYWORDS_JSON" ]]; then KEYWORDS_JSON="[]"; fi + +# Prompt for target DB installation directory +echo "[prepare] WARNING: This will populate a strfry LMDB database at the chosen path." +echo "[prepare] Existing contents may be overwritten." +read -r -p "[prepare] Enter database directory to install (blank = current dir: $(pwd)): " DB_PROMPT +if [[ -z "${DB_PROMPT}" ]]; then + db_dir="$(pwd)" + echo "[prepare] Using current directory: $db_dir" +else + db_dir="${DB_PROMPT}" + echo "[prepare] Using provided directory: $db_dir" +fi + +# Create directory if needed and warn if non-empty +mkdir -p "$db_dir" +if [[ -n "$(ls -A "$db_dir" 2>/dev/null)" ]]; then + read -r -p "[prepare] Directory '$db_dir' is not empty. Continue? [y/N]: " CONFIRM + if [[ ! "$CONFIRM" =~ ^[Yy]$ ]]; then + echo "[prepare] Aborting at user request." >&2 + exit 4 + fi +fi + +mkdir -p "$work_dir/$NAME" +conf_tmp="$work_dir/$NAME/strfry-import.conf" + +echo "[prepare] scenario=$NAME events=$EVENTS kinds=$KINDS" + +# Prepare scenario-specific config: point DB to bench/work and disable search for faster import +cp "$root_dir/strfry.conf" "$conf_tmp" +sed -i "s#^db\s*=.*#db = $db_dir#" "$conf_tmp" +sed -i "s#^relay.search.enabled\s*=.*#relay.search.enabled = false#" "$conf_tmp" || true + +# Simple parser for kinds pattern: supports comma-separated ints and A-B ranges; '*' => kind 1 +parse_kinds() { + local s="$1"; s="${s// /}"; IFS=',' read -r -a toks <<< "$s" + local out=() + for tok in "${toks[@]}"; do + [[ -z "$tok" ]] && continue + if [[ "$tok" == "*" ]]; then + out+=("*") + elif [[ "$tok" =~ ^[0-9]+-[0-9]+$ ]]; then + out+=("$tok") + elif [[ "$tok" =~ ^[0-9]+$ ]]; then + out+=("$tok") + fi + done + printf '%s\n' "${out[@]}" +} + +pick_kind() { + local -a arr=(); while IFS= read -r line; do arr+=("$line"); done < <(parse_kinds "$1") + local tok="${arr[$RANDOM % ${#arr[@]}]}" + if [[ "$tok" == "*" || -z "$tok" ]]; then echo 1; return; fi + if [[ "$tok" =~ ^([0-9]+)-([0-9]+)$ ]]; then + local a=${BASH_REMATCH[1]} b=${BASH_REMATCH[2]} + echo $(( a + (RANDOM % (b - a + 1)) )) + else + echo "$tok" + fi +} + +# Generate events with nak into NDJSON parts in parallel, then import once +parts_dir="$work_dir/$NAME/gen" +rm -rf "$parts_dir" && mkdir -p "$parts_dir" + +# Build weighted keyword pool for random sampling across workers +kw_pool_file="$work_dir/$NAME/kw_pool.txt" +rm -f "$kw_pool_file" +if [[ $(echo "$KEYWORDS_JSON" | jq 'length') -gt 0 ]]; then + echo "$KEYWORDS_JSON" | jq -r '.[] | .term as $t | ((.weight // 1) | tonumber) as $w | [range($w)] | .[] | $t' > "$kw_pool_file" +else + # Defaults if not provided by scenario + printf '%s\n' nostr nostr nostr bitcoin bitcoin lightning federation 'best nostr apps' > "$kw_pool_file" +fi +KW_COUNT=$(wc -l < "$kw_pool_file" | tr -d ' ') +if [[ "$KW_COUNT" -le 0 ]]; then echo "[prepare] keyword pool is empty" >&2; exit 3; fi + +CORES=$( (command -v nproc >/dev/null && nproc) || echo 2 ) +# Choose workers from CLI if provided, else env GEN_PAR, else min(4, CORES) +if [[ -n "$CLI_WORKERS" ]]; then + if ! [[ "$CLI_WORKERS" =~ ^[0-9]+$ ]] || [[ "$CLI_WORKERS" -le 0 ]]; then + echo "[prepare] invalid --workers value: $CLI_WORKERS" >&2; exit 2 + fi + GEN_PAR="$CLI_WORKERS" +else + GEN_PAR=${GEN_PAR:-$(( CORES > 4 ? 4 : CORES ))} +fi +PER_PART=$(( (EVENTS + GEN_PAR - 1) / GEN_PAR )) + +echo "[prepare] generating $EVENTS events using $GEN_PAR parallel workers" + +gen_one_part() { + local idx="$1"; local count="$2"; local outfile="$3" + : > "$outfile" + # Load keyword pool into an array for this worker + mapfile -t KWPOOL < "$kw_pool_file" + # Scale inject rate to 0..1000 integer space for $RANDOM comparisons + local INJ_THOUS + INJ_THOUS=$(awk -v r="$INJECT_RATE" 'BEGIN{printf("%d", (r<0?0:(r>1?1:r))*1000)}') + for ((i=1;i<=count;i++)); do + local k=$(pick_kind "$KINDS") + local content="bench-$NAME-$idx-$i-$(date +%s%N)" + # maybe inject one or two keywords into content + if (( (RANDOM % 1000) < INJ_THOUS )); then + local k1_index=$(( RANDOM % ${#KWPOOL[@]} )) + local k1=${KWPOOL[$k1_index]} + content+=" $k1" + # 30% chance to add a second distinct keyword + if (( (RANDOM % 100) < 30 )) && [[ ${#KWPOOL[@]} -gt 1 ]]; then + local k2_index=$(( RANDOM % ${#KWPOOL[@]} )) + if [[ $k2_index -eq $k1_index ]]; then k2_index=$(((k2_index+1)%${#KWPOOL[@]})); fi + local k2=${KWPOOL[$k2_index]} + content+=" $k2" + fi + fi + if ! ${NAK_BIN} event -k "$k" -c "$content" >> "$outfile"; then + echo "[prepare] nak failed on part $idx at i=$i" >&2; exit 3 + fi + done +} + +pids=() +for ((w=1; w<=GEN_PAR; w++)); do + n=$PER_PART + if [[ $w -eq $GEN_PAR ]]; then n=$(( EVENTS - (PER_PART * (GEN_PAR - 1)) )); fi + [[ $n -le 0 ]] && break + gen_one_part "$w" "$n" "$parts_dir/part_$w.ndjson" & + pids+=("$!") +done + +fail=0 +for pid in "${pids[@]}"; do wait "$pid" || fail=1; done +[[ $fail -ne 0 ]] && { echo "[prepare] generation failed" >&2; exit 3; } + +echo "[prepare] importing events into strfry DB: $db_dir" +cat "$parts_dir"/*.ndjson | "$root_dir/build/strfry" import --config "$conf_tmp" --no-verify + +echo "[prepare] import complete. cleaning up generated parts" +rm -rf "$parts_dir" + +echo "[prepare] Done." diff --git a/bench/scripts/report.py b/bench/scripts/report.py new file mode 100644 index 00000000..37cc1a95 --- /dev/null +++ b/bench/scripts/report.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 +import sys, json, os, glob + +def load_json(path, default=None): + try: + with open(path) as f: + return json.load(f) + except Exception: + return default + +def summarize_run(run_dir): + sysinfo = load_json(os.path.join(run_dir, 'sysinfo.json'), {}) + client = load_json(os.path.join(run_dir, 'client.json'), {}) + # TODO: parse server.log for scan metrics and latencies when available + return { + 'dir': os.path.basename(run_dir.rstrip('/')), + 'sys': sysinfo, + 'client': client, + } + +def main(): + if len(sys.argv) < 2: + print("Usage: report.py ", file=sys.stderr) + sys.exit(1) + + runs = [] + for arg in sys.argv[1:]: + paths = glob.glob(arg) + for p in paths: + if os.path.isdir(p): + runs.append(summarize_run(p)) + + # Emit a simple placeholder Markdown + print("| Run | Kernel | CPU | Mem (KB) | Notes |") + print("|:----|:-------|:----|--------:|:------|") + for r in runs: + sysinfo = r.get('sys', {}) + cpu_model = sysinfo.get('cpu', {}).get('Model name', '') + print(f"| {r['dir']} | {sysinfo.get('kernel','')} | {cpu_model} | {sysinfo.get('memory_kb',0)} | - |") + +if __name__ == '__main__': + main() + diff --git a/bench/scripts/run.sh b/bench/scripts/run.sh new file mode 100644 index 00000000..30f4f367 --- /dev/null +++ b/bench/scripts/run.sh @@ -0,0 +1,89 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Run a benchmark for a scenario +# - Starts strfry with scenario config +# - Collects sysinfo and server logs +# - Invokes client load generator (placeholder) + +usage() { + echo "Usage: $0 -s --out [--port <7777>]" >&2 + exit 1 +} + +SCENARIO="" +OUTDIR="" +PORT=7777 + +while [[ $# -gt 0 ]]; do + case "$1" in + -s|--scenario) SCENARIO="$2"; shift 2;; + --out) OUTDIR="$2"; shift 2;; + --port) PORT="$2"; shift 2;; + *) usage;; + esac +done + +[[ -z "$SCENARIO" || -z "$OUTDIR" ]] && usage + +root_dir="$(cd "$(dirname "$0")/../.." && pwd)" +bench_dir="$root_dir/bench" +work_dir="$bench_dir/work" +out_dir="$OUTDIR" +mkdir -p "$out_dir" + +command -v jq >/dev/null || { echo "jq required" >&2; exit 2; } + +y2j() { python3 - <<'PY' +import sys, yaml, json +print(json.dumps(yaml.safe_load(sys.stdin.read()))) +PY +} + +SCJ=$(y2j <"$bench_dir/$SCENARIO") +NAME=$(echo "$SCJ" | jq -r .name) +SEARCH_ENABLED=$(echo "$SCJ" | jq -r .server.search_enabled) +SEARCH_BACKEND=$(echo "$SCJ" | jq -r .server.search_backend) +DURATION=$(echo "$SCJ" | jq -r .workload.duration_s) +WARMUP=$(echo "$SCJ" | jq -r .workload.warmup_s) + +db_dir="$work_dir/$NAME/db" +log_file="$out_dir/server.log" +sysinfo_json="$out_dir/sysinfo.json" +client_json="$out_dir/client.json" + +echo "[run] scenario=$NAME search=$SEARCH_ENABLED backend=$SEARCH_BACKEND duration=${DURATION}s warmup=${WARMUP}s" + +# Collect sanitized system info +"$bench_dir/scripts/sysinfo.sh" > "$sysinfo_json" + +# Render scenario-specific strfry.conf (override search settings and db path) +conf_tmp="$out_dir/strfry-bench.conf" +cp "$root_dir/strfry.conf" "$conf_tmp" +sed -i "s#^db\s*=.*#db = $db_dir#" "$conf_tmp" +sed -i "s#^relay.search.enabled\s*=.*#relay.search.enabled = $SEARCH_ENABLED#" "$conf_tmp" || true +sed -i "s#^relay.search.backend\s*=.*#relay.search.backend = $SEARCH_BACKEND#" "$conf_tmp" || true +sed -i "s#^relay.port\s*=.*#relay.port = $PORT#" "$conf_tmp" || true + +# Start strfry +echo "[run] starting strfry on port $PORT" | tee -a "$log_file" +"$root_dir/build/strfry" relay --config "$conf_tmp" >> "$log_file" 2>&1 & +SRV_PID=$! +sleep 2 + +cleanup() { + if kill -0 "$SRV_PID" 2>/dev/null; then + kill "$SRV_PID" || true + wait "$SRV_PID" || true + fi +} +trap cleanup EXIT + +# Placeholder: invoke load generator to populate $client_json +echo '{"note":"client metrics placeholder"}' > "$client_json" + +echo "[run] sleeping for duration=${DURATION}s (including warmup=${WARMUP}s)" +sleep "$DURATION" + +echo "[run] complete; shutting down server" | tee -a "$log_file" + diff --git a/bench/scripts/sysinfo.sh b/bench/scripts/sysinfo.sh new file mode 100644 index 00000000..e702afe0 --- /dev/null +++ b/bench/scripts/sysinfo.sh @@ -0,0 +1,35 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Print sanitized system info as JSON to stdout + +to_json_kv() { + local k="$1"; shift + local v="$*" + printf ' "%s": %s' "$k" "$v" +} + +json_escape() { + python3 -c 'import json,sys; print(json.dumps(sys.stdin.read().strip()))' +} + +cpu_json=$(lscpu | awk -F: '{gsub(/^ +| +$/,"",$2); gsub(/^ +| +$/,"",$1); if($1!~/^Model name|^CPU\(s\)|^Thread|^Core|^Socket|^CPU MHz|^Architecture|^Vendor ID/){next}; printf("\"%s\": \"%s\",\n", $1, $2)}' | sed 's/,$//') || cpu_json='' +mem_total_kb=$(awk '/MemTotal:/ {print $2}' /proc/meminfo 2>/dev/null || echo 0) +kernel=$(uname -r | sed 's/"/\"/g') +os_pretty=$( (grep PRETTY_NAME= /etc/os-release || echo PRETTY_NAME=unknown) | cut -d= -f2- | tr -d '"') +fs=$(df -h . | awk 'NR==2{print $1" "$2" "$6}') +storage=$(lsblk -o NAME,ROTA,TYPE,MOUNTPOINT | awk 'NR==1 || /\/$/ {print}' | sed '1d') + +cat <sub.filterGroup.filters) { + // Skip search filters: they are one-shot queries, not live monitors + if (f.hasSearch()) continue; + if (f.ids) { for (size_t i = 0; i < f.ids->size(); i++) { auto res = allIds.try_emplace(Bytes32(f.ids->at(i))); @@ -197,6 +200,9 @@ struct ActiveMonitors : NonCopyable { void uninstallLookups(Monitor *m) { for (auto &f : m->sub.filterGroup.filters) { + // Skip search filters: they were never installed + if (f.hasSearch()) continue; + if (f.ids) { for (size_t i = 0; i < f.ids->size(); i++) { Bytes32 id(f.ids->at(i)); diff --git a/src/DBQuery.h b/src/DBQuery.h index 7e91f4bd..6b7eb10f 100644 --- a/src/DBQuery.h +++ b/src/DBQuery.h @@ -5,6 +5,8 @@ #include "Subscription.h" #include "filters.h" #include "events.h" +#include "search/SearchProvider.h" +#include "search/SearchRunner.h" struct DBScan : NonCopyable { @@ -284,6 +286,8 @@ struct DBQuery : NonCopyable { Subscription sub; std::unique_ptr scanner; + std::unique_ptr searchRunner; + ISearchProvider *searchProvider = nullptr; size_t filterGroupIndex = 0; bool dead = false; // external flag flat_hash_set sentEventsFull; @@ -295,22 +299,27 @@ struct DBQuery : NonCopyable { uint64_t totalTime = 0; uint64_t totalWork = 0; - DBQuery(Subscription &sub) : sub(std::move(sub)) {} + DBQuery(Subscription &sub, ISearchProvider *searchProvider_ = nullptr) : sub(std::move(sub)), searchProvider(searchProvider_) {} DBQuery(const tao::json::value &filter, uint64_t maxLimit = MAX_U64) : sub(Subscription(1, ".", NostrFilterGroup::unwrapped(filter, maxLimit))) {} - // If scan is complete, returns true bool process(lmdb::txn &txn, const std::function &cb, uint64_t timeBudgetMicroseconds = MAX_U64, bool logMetrics = false) { while (filterGroupIndex < sub.filterGroup.size()) { const auto &f = sub.filterGroup.filters[filterGroupIndex]; - if (!scanner) scanner = std::make_unique(f); + // Determine execution strategy: search vs traditional index scan + bool useSearch = f.hasSearch() && searchProvider && cfg().relay__search__enabled; + + if (useSearch && !searchRunner) { + searchRunner = std::make_unique(f, searchProvider); + } else if (!useSearch && !scanner) { + scanner = std::make_unique(f); + } uint64_t startTime = hoytech::curr_time_us(); - bool complete = scanner->scan(txn, [&](uint64_t levId){ + auto handleEvent = [&](uint64_t levId){ if (f.limit == 0) return true; - // If this event came in after our query began, don't send it. It will be sent after the EOSE. if (levId > sub.latestEventId) return false; if (sentEventsFull.find(levId) == sentEventsFull.end()) { @@ -320,13 +329,22 @@ struct DBQuery : NonCopyable { sentEventsCurr.insert(levId); return sentEventsCurr.size() >= f.limit; - }, [&](uint64_t approxWork){ + }; + + auto doPause = [&](uint64_t approxWork){ if (approxWork > lastWorkChecked + 2'000) { lastWorkChecked = approxWork; return hoytech::curr_time_us() - startTime > timeBudgetMicroseconds; } return false; - }); + }; + + bool complete; + if (useSearch) { + complete = searchRunner->scan(txn, handleEvent, doPause); + } else { + complete = scanner->scan(txn, handleEvent, doPause); + } currScanTime += hoytech::curr_time_us() - startTime; @@ -335,21 +353,23 @@ struct DBQuery : NonCopyable { return false; } + uint64_t approxWork = useSearch ? searchRunner->approxWork : scanner->approxWork; totalTime += currScanTime; - totalWork += scanner->approxWork; + totalWork += approxWork; if (logMetrics) { LI << "[" << sub.connId << "] REQ='" << sub.subId.sv() << "'" - << " scan=" << scanner->desc - << " indexOnly=" << scanner->indexOnly + << " scan=" << (useSearch ? "Search" : scanner->desc) + << " indexOnly=" << (useSearch ? false : scanner->indexOnly) << " time=" << currScanTime << "us" << " saveRestores=" << currScanSaveRestores << " recsFound=" << sentEventsCurr.size() - << " work=" << scanner->approxWork; + << " work=" << approxWork; ; } scanner.reset(); + searchRunner.reset(); filterGroupIndex++; sentEventsCurr.clear(); diff --git a/src/QueryScheduler.h b/src/QueryScheduler.h index 72722c82..f2c28a88 100644 --- a/src/QueryScheduler.h +++ b/src/QueryScheduler.h @@ -12,6 +12,9 @@ struct QueryScheduler : NonCopyable { // If false, then onEvent's eventPayload will always be "" bool ensureExists = true; + // Search provider for NIP-50 queries + ISearchProvider *searchProvider = nullptr; + using ConnQueries = flat_hash_map; flat_hash_map conns; // connId -> subId -> DBQuery* std::deque running; @@ -32,7 +35,7 @@ struct QueryScheduler : NonCopyable { return false; } - DBQuery *q = new DBQuery(sub); + DBQuery *q = new DBQuery(sub, searchProvider); connQueries.try_emplace(q->sub.subId, q); running.push_front(q); diff --git a/src/SearchProvider.cpp b/src/SearchProvider.cpp new file mode 100644 index 00000000..27cd1057 --- /dev/null +++ b/src/SearchProvider.cpp @@ -0,0 +1,29 @@ +#include "search/SearchProvider.h" +#include "search/NoopSearchProvider.h" +#include "search/LmdbSearchProvider.h" +#include "golpe.h" + + +std::unique_ptr makeSearchProvider() { + // Check if search is enabled + if (!cfg().relay__search__enabled) { + return std::make_unique(); + } + + // Check backend configuration + std::string backend = cfg().relay__search__backend; + + // Explicit noop request + if (backend == "noop") { + return std::make_unique(); + } + + // Default to LMDB (natural choice for strfry) + if (backend == "lmdb" || backend.empty()) { + return std::make_unique(); + } + + // Unknown backend - log warning and fall back to lmdb + LW << "Unknown search backend: " << backend << ", falling back to lmdb"; + return std::make_unique(); +} diff --git a/src/apps/dbutils/cmd_delete.cpp b/src/apps/dbutils/cmd_delete.cpp index cdbdcf70..4d1fded9 100644 --- a/src/apps/dbutils/cmd_delete.cpp +++ b/src/apps/dbutils/cmd_delete.cpp @@ -5,6 +5,7 @@ #include "DBQuery.h" #include "events.h" +#include "search/SearchProvider.h" static const char USAGE[] = @@ -74,4 +75,22 @@ void cmd_delete(const std::vector &subArgs) { txn.commit(); } + + // Remove from search index if search is enabled + if (cfg().relay__search__enabled) { + auto searchProvider = makeSearchProvider(); + if (searchProvider && searchProvider->healthy()) { + LI << "Removing " << levIds.size() << " events from search index"; + uint64_t removed = 0; + for (uint64_t levId : levIds) { + try { + searchProvider->deleteEvent(levId); + removed++; + } catch (std::exception &e) { + LE << "Failed to remove event from search index levId=" << levId << ": " << e.what(); + } + } + LI << "Removed " << removed << " events from search index"; + } + } } diff --git a/src/apps/dbutils/cmd_search_index_stats.cpp b/src/apps/dbutils/cmd_search_index_stats.cpp new file mode 100644 index 00000000..b28a9e20 --- /dev/null +++ b/src/apps/dbutils/cmd_search_index_stats.cpp @@ -0,0 +1,95 @@ +#include +#include +#include + +#include +#include "golpe.h" +#include "search/LmdbSearchProvider.h" + + +static const char USAGE[] = +R"( + Display LMDB stats for the search index tables. + + Usage: + search_index_stats +)"; + +static std::string humanSize(uint64_t bytes) { + const char *suffixes[] = {"B", "KiB", "MiB", "GiB", "TiB"}; + size_t idx = 0; + double count = static_cast(bytes); + + while (count >= 1024.0 && idx + 1 < std::size(suffixes)) { + count /= 1024.0; + idx++; + } + + std::ostringstream oss; + oss << std::fixed << std::setprecision(idx == 0 ? 0 : 2) << count << ' ' << suffixes[idx]; + return oss.str(); +} + +static void printStat(const std::string &name, const MDB_stat &st) { + uint64_t totalPages = st.ms_branch_pages + st.ms_leaf_pages + st.ms_overflow_pages; + uint64_t approxBytes = totalPages * st.ms_psize; + + std::cout << name << ":\n" + << " entries : " << st.ms_entries << "\n" + << " depth : " << st.ms_depth << "\n" + << " branch pages : " << st.ms_branch_pages << "\n" + << " leaf pages : " << st.ms_leaf_pages << "\n" + << " overflow pages : " << st.ms_overflow_pages << "\n" + << " page size : " << st.ms_psize << " bytes\n" + << " approx size : " << approxBytes << " bytes (" << humanSize(approxBytes) << ")\n"; +} + +void cmd_search_index_stats(const std::vector &subArgs) { + std::map args = docopt::docopt(USAGE, subArgs, true, ""); + (void)args; + + if (!cfg().relay__search__enabled) { + std::cerr << "Error: Search is not enabled (relay.search.enabled = false)\n"; + return; + } + + if (cfg().relay__search__backend != "lmdb") { + std::cerr << "Error: LMDB search backend not configured (relay.search.backend != \"lmdb\")\n"; + return; + } + + auto txn = env.txn_ro(); + + MDB_stat indexStat{}; + MDB_stat docMetaStat{}; + bool statsOk = true; + + try { + indexStat = env.dbi_SearchIndex.stat(txn); + } catch (const std::exception &e) { + std::cerr << "Error: Unable to open SearchIndex table: " << e.what() << "\n"; + statsOk = false; + } + + try { + docMetaStat = env.dbi_SearchDocMeta.stat(txn); + } catch (const std::exception &e) { + std::cerr << "Error: Unable to open SearchDocMeta table: " << e.what() << "\n"; + statsOk = false; + } + + if (!statsOk) return; + + std::cout << "Search index LMDB statistics:\n"; + printStat(" SearchIndex", indexStat); + printStat(" SearchDocMeta", docMetaStat); + + auto stateView = env.lookup_SearchState(txn, 1); + if (stateView) { + std::cout << "SearchState:\n" + << " lastIndexedLevId : " << stateView->lastIndexedLevId() << "\n" + << " indexVersion : " << stateView->indexVersion() << "\n"; + } else { + std::cout << "SearchState: not initialized\n"; + } +} diff --git a/src/apps/dbutils/cmd_search_reindex.cpp b/src/apps/dbutils/cmd_search_reindex.cpp new file mode 100644 index 00000000..3662f96d --- /dev/null +++ b/src/apps/dbutils/cmd_search_reindex.cpp @@ -0,0 +1,188 @@ +#include + +#include +#include "golpe.h" +#include "events.h" +#include "Decompressor.h" +#include "search/LmdbSearchProvider.h" +#include + + +static const char USAGE[] = +R"( + Rebuild search index from scratch. + Drops existing search index and re-indexes all events from EventPayload. + + Usage: + search_reindex [--batch-size=] [--restart] + + Options: + --batch-size= Number of events to index per batch [default: 1000] + --restart Discard any in-progress rebuild and start from levId 1 +)"; + + +void cmd_search_reindex(const std::vector &subArgs) { + std::map args = docopt::docopt(USAGE, subArgs, true, ""); + + uint64_t batchSize = args["--batch-size"].asLong(); + + std::cout << "Rebuilding search index...\n"; + + // Check if search is enabled + if (!cfg().relay__search__enabled) { + std::cerr << "Error: Search is not enabled (relay.search.enabled = false)\n"; + return; + } + + if (cfg().relay__search__backend != "lmdb") { + std::cerr << "Error: LMDB search backend not configured (relay.search.backend != \"lmdb\")\n"; + return; + } + + // Create LMDB search provider + LmdbSearchProvider provider; + + auto persistSearchState = [](lmdb::txn &txn, uint64_t lastIndexedLevId, uint64_t indexVersion) { + auto stateView = env.lookup_SearchState(txn, 1); + if (!stateView) { + env.insert_SearchState(txn, lastIndexedLevId, indexVersion); + return; + } + + defaultDb::environment::Updates_SearchState upd; + upd.lastIndexedLevId = lastIndexedLevId; + if (stateView->indexVersion() != indexVersion) { + upd.indexVersion = indexVersion; + } + env.update_SearchState(txn, *stateView, upd); + }; + + bool restart = args["--restart"].asBool(); + uint64_t resumeFrom = 1; + bool resuming = false; + + { + auto txn = env.txn_ro(); + auto stateView = env.lookup_SearchState(txn, 1); + if (stateView && !restart && stateView->indexVersion() == 0) { + resuming = true; + uint64_t last = stateView->lastIndexedLevId(); + resumeFrom = last < std::numeric_limits::max() ? last + 1 : last; + if (resumeFrom == 0) resumeFrom = 1; + } + } + + if (!resuming) { + std::cout << "Clearing existing search index...\n"; + auto txn = lmdb::txn::begin(env.lmdb_env); + + // Clear SearchIndex + auto indexCursor = lmdb::cursor::open(txn, env.dbi_SearchIndex); + std::string_view key, val; + if (indexCursor.get(key, val, MDB_FIRST)) { + do { + indexCursor.del(); + } while (indexCursor.get(key, val, MDB_NEXT_NODUP)); + } + + // Clear SearchDocMeta + auto docMetaCursor = lmdb::cursor::open(txn, env.dbi_SearchDocMeta); + if (docMetaCursor.get(key, val, MDB_FIRST)) { + do { + docMetaCursor.del(); + } while (docMetaCursor.get(key, val, MDB_NEXT)); + } + + // Mark rebuild as in-progress (indexVersion = 0) + persistSearchState(txn, 0, 0); + + txn.commit(); + std::cout << "Existing index cleared.\n"; + } else { + std::cout << "Resuming search index rebuild from levId " << resumeFrom << "...\n"; + } + + // Get total number of events to index + uint64_t totalEvents = 0; + { + auto txn = env.txn_ro(); + totalEvents = env.get_largest_integer_key_or_zero(txn, env.dbi_EventPayload); + std::cout << "Total events to scan: " << totalEvents << "\n"; + } + + if (resumeFrom > totalEvents) { + auto txn = lmdb::txn::begin(env.lmdb_env); + persistSearchState(txn, totalEvents, LmdbSearchProvider::kIndexVersion); + txn.commit(); + std::cout << "Index already up to date.\n"; + return; + } + + // Re-index events from EventPayload + uint64_t indexed = 0; + uint64_t skipped = 0; + Decompressor decomp; + + auto persistStandalone = [&](uint64_t levId) { + auto txn = lmdb::txn::begin(env.lmdb_env); + persistSearchState(txn, levId, 0); + txn.commit(); + }; + + for (uint64_t levId = resumeFrom; levId <= totalEvents; levId++) { + try { + auto txn = env.txn_ro(); + std::string_view eventPayload; + + if (!env.dbi_EventPayload.get(txn, lmdb::to_sv(levId), eventPayload)) { + skipped++; + persistStandalone(levId); + continue; // Event doesn't exist (sparse levId space) + } + + // Decode event payload + std::string_view json = decodeEventPayload(txn, decomp, eventPayload, nullptr, nullptr); + + // Parse event to get kind and created_at + auto eventJson = tao::json::from_string(json); + uint64_t kind = eventJson.at("kind").get_unsigned(); + uint64_t created_at = eventJson.at("created_at").get_unsigned(); + + // Index the event and persist progress within the same transaction + bool wrote = provider.indexEventWithTxnHook(levId, json, kind, created_at, [&, levId](lmdb::txn &wtxn) { + persistSearchState(wtxn, levId, 0); + }); + + if (wrote) { + indexed++; + } else { + skipped++; + persistStandalone(levId); + } + + } catch (std::exception &e) { + std::cerr << "Warning: Failed to index event levId=" << levId << ": " << e.what() << "\n"; + skipped++; + persistStandalone(levId); + } + + if (((levId - resumeFrom + 1) % batchSize) == 0) { + std::cout << "Progress: " << levId << "/" << totalEvents + << " events scanned, " << indexed << " indexed, " + << skipped << " skipped\n"; + } + } + + // Update SearchState to mark index as fully caught up + { + auto txn = lmdb::txn::begin(env.lmdb_env); + persistSearchState(txn, totalEvents, LmdbSearchProvider::kIndexVersion); + txn.commit(); + } + + std::cout << "\nSearch index rebuild complete!\n"; + std::cout << "Total events scanned: " << totalEvents << "\n"; + std::cout << "Events indexed: " << indexed << "\n"; + std::cout << "Events skipped: " << skipped << "\n"; +} diff --git a/src/apps/dbutils/cmd_search_set_state.cpp b/src/apps/dbutils/cmd_search_set_state.cpp new file mode 100644 index 00000000..da93a6d5 --- /dev/null +++ b/src/apps/dbutils/cmd_search_set_state.cpp @@ -0,0 +1,109 @@ +#include + +#include +#include "golpe.h" +#include "search/LmdbSearchProvider.h" + + +static const char USAGE[] = +R"( + Manually set the search index progress record. + + Usage: + search_set_state --lev-id= [--index-version=] [--allow-lower] [--in-progress] + + Options: + --lev-id= LevId to record as last indexed (required) + --index-version= Index version flag to store (defaults to provider version) + --allow-lower Allow decreasing the stored levId + --in-progress Shortcut for --index-version=0 (resume mode) +)"; + + +void cmd_search_set_state(const std::vector &subArgs) { + std::map args = docopt::docopt(USAGE, subArgs, true, ""); + + if (!cfg().relay__search__enabled) { + std::cerr << "Error: Search is not enabled (relay.search.enabled = false)\n"; + return; + } + + if (cfg().relay__search__backend != "lmdb") { + std::cerr << "Error: LMDB search backend not configured (relay.search.backend != \"lmdb\")\n"; + return; + } + + try { + auto txn = env.txn_ro(); + auto cursorIndex = lmdb::cursor::open(txn, env.dbi_SearchIndex); + auto cursorDocMeta = lmdb::cursor::open(txn, env.dbi_SearchDocMeta); + (void)cursorIndex; + (void)cursorDocMeta; + } catch (const std::exception &e) { + std::cerr << "Error: Search tables are not initialized in this database: " << e.what() << "\n"; + return; + } + + if (!args["--lev-id"]) { + throw herr("missing required --lev-id argument"); + } + + uint64_t levId = 0; + try { + levId = std::stoull(args["--lev-id"].asString()); + } catch (...) { + throw herr("invalid --lev-id value"); + } + + bool markInProgress = args["--in-progress"].asBool(); + uint64_t indexVersion = LmdbSearchProvider::kIndexVersion; + + if (args["--index-version"]) { + try { + indexVersion = std::stoull(args["--index-version"].asString()); + } catch (...) { + throw herr("invalid --index-version value"); + } + } + + if (markInProgress) { + if (args["--index-version"] && indexVersion != 0) { + throw herr("--in-progress conflicts with explicit --index-version"); + } + indexVersion = 0; + } + + if (levId == 0) { + throw herr("--lev-id must be >= 1"); + } + + bool allowLower = args["--allow-lower"].asBool(); + + auto txn = lmdb::txn::begin(env.lmdb_env); + + uint64_t oldLevId = 0; + uint64_t oldVersion = 0; + + auto stateView = env.lookup_SearchState(txn, 1); + if (stateView) { + oldLevId = stateView->lastIndexedLevId(); + oldVersion = stateView->indexVersion(); + + if (!allowLower && levId < oldLevId) { + throw herr("refusing to decrease lastIndexedLevId (have ", oldLevId, ", requested ", levId, "). Use --allow-lower to override."); + } + + defaultDb::environment::Updates_SearchState upd; + upd.lastIndexedLevId = levId; + upd.indexVersion = indexVersion; + env.update_SearchState(txn, *stateView, upd); + } else { + env.insert_SearchState(txn, levId, indexVersion); + } + + txn.commit(); + + std::cout << "SearchState updated.\n" + << " previous: levId=" << oldLevId << " indexVersion=" << oldVersion << "\n" + << " current: levId=" << levId << " indexVersion=" << indexVersion << "\n"; +} diff --git a/src/apps/relay/RelayCron.cpp b/src/apps/relay/RelayCron.cpp index 64e9ea96..85d4d11e 100644 --- a/src/apps/relay/RelayCron.cpp +++ b/src/apps/relay/RelayCron.cpp @@ -56,6 +56,17 @@ void RelayServer::runCron() { txn.commit(); + // Remove from search index + if (searchProvider && searchProvider->healthy()) { + for (uint64_t levId : expiredLevIds) { + try { + searchProvider->deleteEvent(levId); + } catch (std::exception &e) { + LE << "Failed to remove expired event from search index levId=" << levId << ": " << e.what(); + } + } + } + if (numDeleted) LI << "Deleted " << numDeleted << " events (ephemeral=" << numEphemeral << " expired=" << numExpired << ")"; } }); diff --git a/src/apps/relay/RelayReqWorker.cpp b/src/apps/relay/RelayReqWorker.cpp index 5478c522..5aec59a4 100644 --- a/src/apps/relay/RelayReqWorker.cpp +++ b/src/apps/relay/RelayReqWorker.cpp @@ -6,6 +6,9 @@ void RelayServer::runReqWorker(ThreadPool::Thread &thr) { Decompressor decomp; QueryScheduler queries; + // Wire up search provider for NIP-50 support + queries.searchProvider = searchProvider.get(); + queries.onEvent = [&](lmdb::txn &txn, const auto &sub, uint64_t levId, std::string_view eventPayload){ if (sub.countOnly) return; sendEvent(sub.connId, sub.subId, decodeEventPayload(txn, decomp, eventPayload, nullptr, nullptr)); diff --git a/src/apps/relay/RelayServer.h b/src/apps/relay/RelayServer.h index 387aa926..a846c267 100644 --- a/src/apps/relay/RelayServer.h +++ b/src/apps/relay/RelayServer.h @@ -18,11 +18,9 @@ #include "filters.h" #include "jsonParseUtils.h" #include "Decompressor.h" +#include "search/SearchProvider.h" #include "PrometheusMetrics.h" - - - struct MsgWebsocket : NonCopyable { struct Send { uint64_t connId; @@ -152,6 +150,9 @@ struct MsgNegentropy : NonCopyable { struct RelayServer { uS::Async *hubTrigger = nullptr; + // Search Provider + std::unique_ptr searchProvider; + // Thread Pools ThreadPool tpWebsocket; @@ -162,6 +163,8 @@ struct RelayServer { ThreadPool tpNegentropy; std::thread cronThread; std::thread signalHandlerThread; + std::thread searchIndexerThread; + std::atomic searchIndexerRunning{false}; void run(); diff --git a/src/apps/relay/RelayWebsocket.cpp b/src/apps/relay/RelayWebsocket.cpp index 30e97759..e85e4602 100644 --- a/src/apps/relay/RelayWebsocket.cpp +++ b/src/apps/relay/RelayWebsocket.cpp @@ -55,9 +55,9 @@ void RelayServer::runWebsocket(ThreadPool::Thread &thr) { if (cfg().relay__maxFilterLimitCount > 0) output.push_back(45); if (cfg().relay__negentropy__enabled) output.push_back(77); + if (searchProvider && searchProvider->healthy()) output.push_back(50); std::sort(output.get_array().begin(), output.get_array().end()); - if (cfg().relay__info__nips.size() == 0) return output; try { diff --git a/src/apps/relay/RelayWriter.cpp b/src/apps/relay/RelayWriter.cpp index 3ed1e876..0bd41f18 100644 --- a/src/apps/relay/RelayWriter.cpp +++ b/src/apps/relay/RelayWriter.cpp @@ -62,7 +62,7 @@ void RelayServer::runWriter(ThreadPool::Thread &thr) { try { auto txn = env.txn_rw(); - writeEvents(txn, neFilterCache, newEvents); + writeEvents(txn, neFilterCache, newEvents, 1, searchProvider.get()); txn.commit(); } catch (std::exception &e) { LE << "Error writing " << newEvents.size() << " events: " << e.what(); @@ -81,6 +81,23 @@ void RelayServer::runWriter(ThreadPool::Thread &thr) { continue; } + // Index events for search (NIP-50) + // Always index and run search when provider exists, regardless of healthy() status + // healthy() only gates NIP-11 advertisement + if (searchProvider) { + for (auto &newEvent : newEvents) { + if (newEvent.status == EventWriteStatus::Written) { + PackedEventView packed(newEvent.packedStr); + try { + searchProvider->indexEvent(newEvent.levId, newEvent.jsonStr, packed.kind(), packed.created_at()); + } catch (std::exception &e) { + // Don't fail writes if indexing fails, just log + LE << "Search indexing failed for levId=" << newEvent.levId << ": " << e.what(); + } + } + } + } + // Log for (auto &newEvent : newEvents) { diff --git a/src/apps/relay/cmd_relay.cpp b/src/apps/relay/cmd_relay.cpp index 725cba4f..4e05699a 100644 --- a/src/apps/relay/cmd_relay.cpp +++ b/src/apps/relay/cmd_relay.cpp @@ -2,6 +2,7 @@ #include #include "RelayServer.h" +#include "search/LmdbSearchProvider.h" @@ -35,6 +36,12 @@ void RelayServer::run() { if (s != 0) throw herr("Unable to set sigmask: ", strerror(errno)); } + // Initialize search provider (NIP-50) + searchProvider = makeSearchProvider(); + LI << "Search provider initialized: backend=" << cfg().relay__search__backend + << " enabled=" << cfg().relay__search__enabled + << " healthy=" << searchProvider->healthy(); + tpWebsocket.init("Websocket", 1, [this](auto &thr){ runWebsocket(thr); }); @@ -67,6 +74,17 @@ void RelayServer::run() { runSignalHandler(); }); + // Start search catch-up indexer if LMDB backend is enabled + if (cfg().relay__search__enabled && cfg().relay__search__backend == "lmdb") { + auto *lmdbProvider = dynamic_cast(searchProvider.get()); + if (lmdbProvider) { + searchIndexerRunning = true; + searchIndexerThread = std::thread([this, lmdbProvider]{ + lmdbProvider->runCatchupIndexer(searchIndexerRunning); + }); + } + } + // Monitor for config file reloads checkConfig(); @@ -86,4 +104,11 @@ void RelayServer::run() { tpWebsocket.join(); + + // Shutdown search indexer thread if running + if (searchIndexerThread.joinable()) { + searchIndexerRunning = false; + searchIndexerThread.join(); + LI << "Search indexer thread shutdown complete"; + } } diff --git a/src/apps/relay/golpe.yaml b/src/apps/relay/golpe.yaml index ad93988e..c77e281a 100644 --- a/src/apps/relay/golpe.yaml +++ b/src/apps/relay/golpe.yaml @@ -43,7 +43,7 @@ config: desc: "NIP-11: Terms of service URL" default: "" - name: relay__info__nips - desc: "List of supported lists as JSON array, or empty string to use default. Example: \"[1,2]\"" + desc: "NIP-11: Supported NIPs as JSON array, or empty string for auto-detection. WARNING: Setting this OVERRIDES all auto-detection including NIP-50 (search) gating. Example: \"[1,2,50]\"" default: "" - name: relay__maxWebsocketPayloadSize @@ -129,6 +129,45 @@ config: desc: "Maximum records that sync will process before returning an error" default: 1000000 + - name: relay__search__enabled + desc: "Enable NIP-50 search capability (requires search backend)" + default: false + - name: relay__search__backend + desc: "Search backend to use: lmdb, noop (or external in future)" + default: "lmdb" + - name: relay__search__maxQueryTerms + desc: "Maximum number of search terms allowed in a query" + default: 16 + - name: relay__search__indexedKinds + desc: "Comma-separated kinds/ranges to index. Supports: single (1), ranges (1000-1999), wildcard (*), exclusions (-5000-5999)" + default: "1, 30023" + - name: relay__search__maxPostingsPerToken + desc: "Maximum number of postings (documents) per search token" + default: 100000 + - name: relay__search__maxCandidateDocs + desc: "Maximum candidate documents to fetch during search (multiple of limit)" + default: 1000 + - name: relay__search__recencyBoostPercent + desc: "Recency tie-breaker percent (0–100); 1 = 1% boost" + default: 0 + - name: relay__search__overfetchFactor + desc: "Over-fetch multiplier to compensate for post-filtering (candidates = limit × factor, bounded by maxCandidateDocs)" + default: 5 + - name: relay__search__candidateRanking + desc: "Candidate ranking order before scoring: terms-tf-recency | terms-recency-tf | tf-terms-recency | tf-recency-terms | recency-terms-tf | recency-tf-terms" + default: "terms-tf-recency" + - name: relay__search__candidateRankMode + desc: "Candidate ranking mode: order | weighted" + default: "order" + - name: relay__search__rankWeightTerms + desc: "Weighted mode: weight for matched terms component" + default: 100 + - name: relay__search__rankWeightTf + desc: "Weighted mode: weight for aggregate term frequency component" + default: 50 + - name: relay__search__rankWeightRecency + desc: "Weighted mode: weight for recency component" + default: 10 - name: relay__filterValidation__enabled desc: "Enable strict filter validation for REQ messages" default: false diff --git a/src/events.cpp b/src/events.cpp index aadb6186..bc80e500 100644 --- a/src/events.cpp +++ b/src/events.cpp @@ -3,6 +3,7 @@ #include "events.h" #include "jsonParseUtils.h" +#include "search/SearchProvider.h" std::string nostrJsonToPackedEvent(const tao::json::value &v) { @@ -245,7 +246,7 @@ bool deleteEventBasic(lmdb::txn &txn, uint64_t levId) { -void writeEvents(lmdb::txn &txn, NegentropyFilterCache &neFilterCache, std::vector &evs, uint64_t logLevel) { +void writeEvents(lmdb::txn &txn, NegentropyFilterCache &neFilterCache, std::vector &evs, uint64_t logLevel, ISearchProvider *searchProvider) { std::sort(evs.begin(), evs.end(), [](auto &a, auto &b) { auto aC = a.createdAt(); auto bC = b.createdAt(); @@ -343,6 +344,16 @@ void writeEvents(lmdb::txn &txn, NegentropyFilterCache &neFilterCache, std::vect if (!evToDel) continue; // already deleted updateNegentropy(PackedEventView(evToDel->buf), false); deleteEventBasic(txn, levId); + + // Remove from search index + if (searchProvider && searchProvider->healthy()) { + try { + searchProvider->deleteEvent(levId); + } catch (std::exception &e) { + // Don't fail deletions if search removal fails, just log + LE << "Search delete failed for levId=" << levId << ": " << e.what(); + } + } } levIdsToDelete.clear(); diff --git a/src/events.h b/src/events.h index 62da51ba..f3a39ccc 100644 --- a/src/events.h +++ b/src/events.h @@ -111,7 +111,7 @@ struct EventToWrite { }; -void writeEvents(lmdb::txn &txn, NegentropyFilterCache &neFilterCache, std::vector &evs, uint64_t logLevel = 1); +void writeEvents(lmdb::txn &txn, NegentropyFilterCache &neFilterCache, std::vector &evs, uint64_t logLevel = 1, class ISearchProvider *searchProvider = nullptr); bool deleteEventBasic(lmdb::txn &txn, uint64_t levId); template diff --git a/src/filters.h b/src/filters.h index 066adaac..30d5e4e7 100644 --- a/src/filters.h +++ b/src/filters.h @@ -113,6 +113,7 @@ struct NostrFilter { std::optional authors; std::optional kinds; flat_hash_map tags; + std::optional search; uint64_t since = 0; uint64_t until = MAX_U64; @@ -198,6 +199,10 @@ struct NostrFilter { until = jsonGetUnsigned(v, "error parsing until"); } else if (k == "limit") { limit = jsonGetUnsigned(v, "error parsing limit"); + } else if (k == "search") { + if (!v.is_string()) throw herr("search must be a string"); + search.emplace(v.get_string()); + // Note: search is not counted in numMajorFields to preserve indexOnlyScans heuristics } else { throw herr("unrecognised filter item: ", k); } @@ -245,6 +250,10 @@ struct NostrFilter { bool isFullDbQuery() { return !ids && !authors && !kinds && tags.size() == 0; } + + bool hasSearch() const { + return search.has_value() && !search->empty(); + } }; struct NostrFilterGroup { diff --git a/src/search/KindMatcher.h b/src/search/KindMatcher.h new file mode 100644 index 00000000..9b0de311 --- /dev/null +++ b/src/search/KindMatcher.h @@ -0,0 +1,196 @@ +#pragma once + +#include +#include +#include +#include +#include +#include "golpe.h" + + +class KindMatcher { +private: + std::vector> inclusionRanges; + std::vector> exclusionRanges; + bool hasWildcard = false; + bool parseError = false; + std::string errorMessage; + + static void mergeRanges(std::vector> &ranges) { + if (ranges.empty()) return; + + std::sort(ranges.begin(), ranges.end()); + + std::vector> merged; + merged.push_back(ranges[0]); + + for (size_t i = 1; i < ranges.size(); i++) { + auto &last = merged.back(); + auto &curr = ranges[i]; + + if (curr.first <= last.second + 1) { + last.second = std::max(last.second, curr.second); + } else { + merged.push_back(curr); + } + } + + ranges = std::move(merged); + } + + static bool parseToken(const std::string &token, bool &isWildcard, bool &isExclusion, + uint64_t &start, uint64_t &end, std::string &error) { + std::string trimmed; + for (char c : token) { + if (!std::isspace(c)) trimmed += c; + } + + if (trimmed.empty()) return false; + + if (trimmed == "*") { + isWildcard = true; + return true; + } + + isExclusion = false; + size_t pos = 0; + if (trimmed[0] == '-') { + isExclusion = true; + pos = 1; + } + + size_t dashPos = trimmed.find('-', pos); + + if (dashPos == std::string::npos) { + try { + start = end = std::stoull(trimmed.substr(pos)); + return true; + } catch (...) { + error = "Invalid kind number: " + trimmed; + return false; + } + } else { + try { + start = std::stoull(trimmed.substr(pos, dashPos - pos)); + end = std::stoull(trimmed.substr(dashPos + 1)); + + if (start > end) { + error = "Invalid range (start > end): " + trimmed; + return false; + } + + return true; + } catch (...) { + error = "Invalid range format: " + trimmed; + return false; + } + } + } + +public: + KindMatcher() = default; + + static KindMatcher parse(const std::string &config) { + KindMatcher matcher; + + if (config.empty()) { + matcher.inclusionRanges.push_back({1, 1}); + matcher.inclusionRanges.push_back({30023, 30023}); + return matcher; + } + + std::vector tokens; + std::stringstream ss(config); + std::string token; + while (std::getline(ss, token, ',')) { + tokens.push_back(token); + } + + for (const auto &token : tokens) { + bool isWildcard = false; + bool isExclusion = false; + uint64_t start, end; + std::string error; + + if (!parseToken(token, isWildcard, isExclusion, start, end, error)) { + if (!error.empty()) { + matcher.parseError = true; + matcher.errorMessage = error; + LE << "KindMatcher parse error: " << error; + return matcher; + } + continue; + } + + if (isWildcard) { + matcher.hasWildcard = true; + } else if (isExclusion) { + matcher.exclusionRanges.push_back({start, end}); + } else { + matcher.inclusionRanges.push_back({start, end}); + } + } + + mergeRanges(matcher.inclusionRanges); + mergeRanges(matcher.exclusionRanges); + + return matcher; + } + + bool matches(uint64_t kind) const { + if (parseError) return false; + + bool included = false; + + if (hasWildcard) { + included = true; + } else { + for (const auto &[start, end] : inclusionRanges) { + if (kind >= start && kind <= end) { + included = true; + break; + } + } + } + + if (!included) return false; + + for (const auto &[start, end] : exclusionRanges) { + if (kind >= start && kind <= end) { + return false; + } + } + + return true; + } + + bool hasError() const { return parseError; } + const std::string& getError() const { return errorMessage; } + + std::string toString() const { + std::ostringstream oss; + if (parseError) { + oss << "ERROR: " << errorMessage; + return oss.str(); + } + + if (hasWildcard) { + oss << "* "; + } + for (const auto &[start, end] : inclusionRanges) { + if (start == end) { + oss << start << " "; + } else { + oss << start << "-" << end << " "; + } + } + for (const auto &[start, end] : exclusionRanges) { + if (start == end) { + oss << "-" << start << " "; + } else { + oss << "-" << start << "-" << end << " "; + } + } + return oss.str(); + } +}; diff --git a/src/search/LmdbSearchProvider.h b/src/search/LmdbSearchProvider.h new file mode 100644 index 00000000..ddc0a64d --- /dev/null +++ b/src/search/LmdbSearchProvider.h @@ -0,0 +1,638 @@ +#pragma once + +#include "SearchProvider.h" +#include "Tokenizer.h" +#include "KindMatcher.h" +#include "Decompressor.h" +#include "events.h" +#include "golpe.h" +#include +#include +#include +#include +#include +#include + + +// Pack posting: [levId:48][tf:16] as host-endian uint64 +inline uint64_t packPosting(uint64_t levId, uint16_t tf) { + return (levId << 16) | tf; +} + +inline void unpackPosting(uint64_t packed, uint64_t &levId, uint16_t &tf) { + levId = packed >> 16; + tf = static_cast(packed & 0xFFFF); +} + +// Pack doc metadata: [docLen:16][kind:16][reserved:32] as host-endian uint64 +inline uint64_t packDocMeta(uint16_t docLen, uint16_t kind) { + return (static_cast(docLen) << 48) | (static_cast(kind) << 32); +} + +inline void unpackDocMeta(uint64_t packed, uint16_t &docLen, uint16_t &kind) { + docLen = static_cast(packed >> 48); + kind = static_cast((packed >> 32) & 0xFFFF); +} + + +// LMDB-backed search provider with BM25 scoring +class LmdbSearchProvider : public ISearchProvider { +private: + static constexpr float k1 = 1.2f; + static constexpr float b = 0.75f; + + mutable KindMatcher kindMatcher; + mutable bool kindMatcherInitialized = false; + + const KindMatcher& getKindMatcher() const { + if (!kindMatcherInitialized) { + kindMatcher = KindMatcher::parse(cfg().relay__search__indexedKinds); + kindMatcherInitialized = true; + + if (kindMatcher.hasError()) { + LE << "Search indexedKinds config error: " << kindMatcher.getError(); + LE << "Search indexing disabled due to invalid configuration"; + } else { + LI << "Search indexedKinds: " << kindMatcher.toString(); + } + } + return kindMatcher; + } + + bool shouldIndexKind(uint64_t kind) const { + return getKindMatcher().matches(kind); + } + + + uint64_t getTotalDocs(lmdb::txn &txn) const { + uint64_t count = 0; + auto cursor = lmdb::cursor::open(txn, env.dbi_SearchDocMeta); + + std::string_view key, val; + if (cursor.get(key, val, MDB_FIRST)) { + do { + count++; + } while (cursor.get(key, val, MDB_NEXT)); + } + + return count; + } + + uint64_t getDocFreq(lmdb::txn &txn, const std::string &token) const { + auto cursor = lmdb::cursor::open(txn, env.dbi_SearchIndex); + + std::string_view key(token.data(), token.size()); + std::string_view val; + + uint64_t count = 0; + if (cursor.get(key, val, MDB_SET)) { + do { + count++; + } while (cursor.get(key, val, MDB_NEXT_DUP)); + } + + return count; + } + + float getAvgDocLen(lmdb::txn &txn) const { + uint64_t totalLen = 0; + uint64_t count = 0; + + auto cursor = lmdb::cursor::open(txn, env.dbi_SearchDocMeta); + + std::string_view key, val; + if (cursor.get(key, val, MDB_FIRST)) { + do { + uint64_t packed = *reinterpret_cast(val.data()); + uint16_t docLen, kind; + unpackDocMeta(packed, docLen, kind); + totalLen += docLen; + count++; + } while (cursor.get(key, val, MDB_NEXT)); + } + + return count > 0 ? static_cast(totalLen) / count : 0.0f; + } + + float computeIDF(uint64_t N, uint64_t df) const { + if (df == 0) return 0.0f; + return std::log((N - df + 0.5f) / (df + 0.5f) + 1.0f); + } + + float computeBM25(uint64_t docLen, float avgDocLen, const std::vector> &termFreqs, + const std::vector &idfs) const { + float score = 0.0f; + + for (size_t i = 0; i < termFreqs.size(); i++) { + uint16_t tf = termFreqs[i].second; + float idf = idfs[i]; + + float numerator = tf * (k1 + 1.0f); + float denominator = tf + k1 * (1.0f - b + b * docLen / avgDocLen); + + score += idf * (numerator / denominator); + } + + return score; + } + + void persistSearchState(lmdb::txn &txn, uint64_t lastIndexedLevId, uint64_t indexVersion) const { + auto stateView = env.lookup_SearchState(txn, 1); + if (!stateView) { + env.insert_SearchState(txn, lastIndexedLevId, indexVersion); + return; + } + + defaultDb::environment::Updates_SearchState upd; + upd.lastIndexedLevId = lastIndexedLevId; + if (stateView->indexVersion() != indexVersion) { + upd.indexVersion = indexVersion; + } + env.update_SearchState(txn, *stateView, upd); + } + +public: + static constexpr uint64_t kIndexVersion = 1; + + bool healthy() const override { + if (!cfg().relay__search__enabled) return false; + if (cfg().relay__search__backend != "lmdb") return false; + + // Tolerant readiness: + // - Treat empty DBs as healthy + // - If doc meta exists and head is small, consider healthy + // - If SearchState exists, ensure it's close to head (within 1000 events), + // using saturating arithmetic to avoid underflow + try { + auto txn = env.txn_ro(); + + uint64_t head = env.get_largest_integer_key_or_zero(txn, env.dbi_EventPayload); + if (head == 0) return true; // empty DB + + uint64_t docCount = getTotalDocs(txn); + if (docCount > 0 && head <= 1000) return true; + + auto stateView = env.lookup_SearchState(txn, 1); + if (!stateView) { + // No explicit state, but index exists + return docCount > 0; + } + + uint64_t last = stateView->lastIndexedLevId(); + if (last > head) last = head; // saturate + return (head - last) < 1000; + } catch (...) { + return false; // conservative on errors + } + } + + bool indexEventWithTxnHook(uint64_t levId, std::string_view json, uint64_t kind, uint64_t created_at, + const std::function &txnHook = {}) { + if (!shouldIndexKind(kind)) return false; + + auto eventJson = tao::json::from_string(json); + + std::string text = Tokenizer::extractText(eventJson); + if (text.empty()) return false; + + auto tokens = Tokenizer::tokenize(text); + if (tokens.empty()) return false; + + uint16_t docLen = 0; + for (const auto &token : tokens) { + docLen += token.tf; + } + if (docLen > 65535) docLen = 65535; // Clamp to uint16 max + + auto txn = lmdb::txn::begin(env.lmdb_env); + + // Skip if already indexed (on-write path may have indexed while catch-up is behind) + std::string_view existingMeta; + if (env.dbi_SearchDocMeta.get(txn, lmdb::to_sv(levId), existingMeta)) { + if (txnHook) txnHook(txn); + txn.commit(); + return true; + } + + uint64_t docMetaPacked = packDocMeta(docLen, static_cast(kind)); + env.dbi_SearchDocMeta.put(txn, lmdb::to_sv(levId), + std::string_view(reinterpret_cast(&docMetaPacked), sizeof(docMetaPacked))); + + uint64_t maxPostings = cfg().relay__search__maxPostingsPerToken; + auto cursor = lmdb::cursor::open(txn, env.dbi_SearchIndex); + + for (const auto &token : tokens) { + uint64_t posting = packPosting(levId, token.tf); + std::string_view postingView(reinterpret_cast(&posting), sizeof(posting)); + + env.dbi_SearchIndex.put(txn, token.text, postingView, MDB_APPENDDUP); + + uint64_t count = 0; + std::string_view key = token.text; + std::string_view val; + + if (cursor.get(key, val, MDB_SET)) { + do { + count++; + } while (cursor.get(key, val, MDB_NEXT_DUP)); + } + + // If over limit, delete (count - maxPostings) oldest entries + if (count > maxPostings) { + uint64_t toDelete = count - maxPostings; + + // Re-position cursor to first duplicate and delete oldest + key = token.text; // Reset key for MDB_SET + if (cursor.get(key, val, MDB_SET)) { + for (uint64_t i = 0; i < toDelete; i++) { + cursor.del(); // Delete current (oldest by levId due to APPENDDUP ordering) + if (i + 1 < toDelete && !cursor.get(key, val, MDB_NEXT_DUP)) break; + } + } + } + } + + if (txnHook) txnHook(txn); + + txn.commit(); + return true; + } + + void indexEvent(uint64_t levId, std::string_view json, uint64_t kind, uint64_t created_at) override { + indexEventWithTxnHook(levId, json, kind, created_at, nullptr); + } + + void deleteEvent(uint64_t levId) override { + try { + auto txn = lmdb::txn::begin(env.lmdb_env); + + try { + env.dbi_SearchDocMeta.del(txn, lmdb::to_sv(levId)); + } catch (...) { + // Entry may not exist, that's ok + } + { + auto cursor = lmdb::cursor::open(txn, env.dbi_SearchIndex); + + std::string_view key, val; + if (cursor.get(key, val, MDB_FIRST)) { + do { + // Check all duplicates for this key + std::string_view dupKey = key; + std::string_view dupVal; + + if (cursor.get(dupKey, dupVal, MDB_FIRST_DUP)) { + do { + uint64_t posting = *reinterpret_cast(dupVal.data()); + uint64_t postingLevId; + uint16_t tf; + unpackPosting(posting, postingLevId, tf); + + if (postingLevId == levId) { + cursor.del(); + } + } while (cursor.get(dupKey, dupVal, MDB_NEXT_DUP)); + } + } while (cursor.get(key, val, MDB_NEXT_NODUP)); + } + } + + txn.commit(); + + } catch (std::exception &e) { + LE << "Failed to delete event from search index levId=" << levId << ": " << e.what(); + } + } + + std::vector query(const SearchQuery& q, lmdb::txn &txn) override { + std::vector results; + + try { + // Parse query into tokens + auto queryTokens = Tokenizer::parseQuery(q.q); + if (queryTokens.empty()) return results; + + // Enforce max query terms limit + if (queryTokens.size() > cfg().relay__search__maxQueryTerms) { + queryTokens.resize(cfg().relay__search__maxQueryTerms); + } + + // Get corpus statistics + uint64_t N = getTotalDocs(txn); + if (N == 0) return results; + + float avgDocLen = getAvgDocLen(txn); + + // Compute IDF for each query term + std::vector idfs; + std::vector dfs; + for (const auto &token : queryTokens) { + uint64_t df = getDocFreq(txn, token); + dfs.push_back(df); + idfs.push_back(computeIDF(N, df)); + } + + // Collect candidate documents + // Strategy: Fetch postings for each query term, union all docs, compute scores + flat_hash_map>> docTerms; // levId -> [(token, tf)] + + for (size_t i = 0; i < queryTokens.size(); i++) { + const auto &token = queryTokens[i]; + + auto cursor = lmdb::cursor::open(txn, env.dbi_SearchIndex); + std::string_view key(token.data(), token.size()); + std::string_view val; + + // Use MDB_SET to find the key, then MDB_FIRST_DUP to ensure we start from the first duplicate + if (cursor.get(key, val, MDB_SET)) { + // Position to first duplicate for this key + if (cursor.get(key, val, MDB_FIRST_DUP)) { + uint64_t fetchedPostings = 0; + uint64_t maxFetch = cfg().relay__search__maxPostingsPerToken; + + do { + if (fetchedPostings >= maxFetch) break; + + uint64_t posting = *reinterpret_cast(val.data()); + uint64_t levId; + uint16_t tf; + unpackPosting(posting, levId, tf); + + docTerms[levId].push_back({token, tf}); + fetchedPostings++; + } while (cursor.get(key, val, MDB_NEXT_DUP)); + } + } + } + + // Limit candidate documents + if (docTerms.size() > cfg().relay__search__maxCandidateDocs) { + // Two modes: order-based or weighted ranking + std::string mode = cfg().relay__search__candidateRankMode; + if (mode == "weighted") { + // Weighted ranking: score = w_terms * norm(terms) + w_tf * norm(tfSum) + w_recency * norm(recency) + // Normalization: + // - terms: matchedTerms / queryTokens.size() + // - tfSum: tfSum / maxTfSum over candidates + // - recency: levId / head + uint64_t head = env.get_largest_integer_key_or_zero(txn, env.dbi_EventPayload); + uint32_t maxTfSum = 0; + for (const auto &kv : docTerms) { + uint32_t s = 0; for (const auto &p : kv.second) s += p.second; if (s > maxTfSum) maxTfSum = s; + } + float denomTerms = queryTokens.size() > 0 ? static_cast(queryTokens.size()) : 1.0f; + float denomTf = maxTfSum > 0 ? static_cast(maxTfSum) : 1.0f; + float denomRec = head > 0 ? static_cast(head) : 1.0f; + + uint64_t wTerms = cfg().relay__search__rankWeightTerms; + uint64_t wTf = cfg().relay__search__rankWeightTf; + uint64_t wRec = cfg().relay__search__rankWeightRecency; + + struct Scored { uint64_t levId; float score; }; + std::vector scored; + scored.reserve(docTerms.size()); + for (const auto &[levId, terms] : docTerms) { + uint32_t tfSum = 0; for (const auto &p : terms) tfSum += p.second; + float tNorm = static_cast(terms.size()) / denomTerms; + float tfNorm = static_cast(tfSum) / denomTf; + float rNorm = static_cast(levId) / denomRec; + float s = wTerms * tNorm + wTf * tfNorm + wRec * rNorm; + scored.push_back(Scored{ levId, s }); + } + std::sort(scored.begin(), scored.end(), [](const Scored &a, const Scored &b){ return a.score > b.score; }); + + flat_hash_map>> limited; + uint64_t maxKeep = cfg().relay__search__maxCandidateDocs; + size_t kept = 0; + for (const auto &s : scored) { + if (kept++ >= maxKeep) break; + auto it = docTerms.find(s.levId); + if (it != docTerms.end()) limited.emplace(s.levId, std::move(it->second)); + } + docTerms = std::move(limited); + } else { + // Order-based ranking by configured key order + struct CandInfo { uint64_t levId; uint32_t matchedTerms; uint32_t tfSum; }; + std::vector ranked; ranked.reserve(docTerms.size()); + for (const auto &[levId, terms] : docTerms) { + uint32_t tfSum = 0; for (const auto &kv : terms) tfSum += kv.second; + ranked.push_back(CandInfo{ levId, static_cast(terms.size()), tfSum }); + } + + auto strategy = cfg().relay__search__candidateRanking; + auto cmp_terms_tf_recency = [](const CandInfo &a, const CandInfo &b){ + if (a.matchedTerms != b.matchedTerms) return a.matchedTerms > b.matchedTerms; + if (a.tfSum != b.tfSum) return a.tfSum > b.tfSum; + return a.levId > b.levId; + }; + auto cmp_terms_recency_tf = [](const CandInfo &a, const CandInfo &b){ + if (a.matchedTerms != b.matchedTerms) return a.matchedTerms > b.matchedTerms; + if (a.levId != b.levId) return a.levId > b.levId; + return a.tfSum > b.tfSum; + }; + auto cmp_tf_terms_recency = [](const CandInfo &a, const CandInfo &b){ + if (a.tfSum != b.tfSum) return a.tfSum > b.tfSum; + if (a.matchedTerms != b.matchedTerms) return a.matchedTerms > b.matchedTerms; + return a.levId > b.levId; + }; + auto cmp_tf_recency_terms = [](const CandInfo &a, const CandInfo &b){ + if (a.tfSum != b.tfSum) return a.tfSum > b.tfSum; + if (a.levId != b.levId) return a.levId > b.levId; + return a.matchedTerms > b.matchedTerms; + }; + auto cmp_recency_terms_tf = [](const CandInfo &a, const CandInfo &b){ + if (a.levId != b.levId) return a.levId > b.levId; + if (a.matchedTerms != b.matchedTerms) return a.matchedTerms > b.matchedTerms; + return a.tfSum > b.tfSum; + }; + auto cmp_recency_tf_terms = [](const CandInfo &a, const CandInfo &b){ + if (a.levId != b.levId) return a.levId > b.levId; + if (a.tfSum != b.tfSum) return a.tfSum > b.tfSum; + return a.matchedTerms > b.matchedTerms; + }; + + if (strategy == "terms-recency-tf") std::sort(ranked.begin(), ranked.end(), cmp_terms_recency_tf); + else if (strategy == "tf-terms-recency") std::sort(ranked.begin(), ranked.end(), cmp_tf_terms_recency); + else if (strategy == "tf-recency-terms") std::sort(ranked.begin(), ranked.end(), cmp_tf_recency_terms); + else if (strategy == "recency-terms-tf") std::sort(ranked.begin(), ranked.end(), cmp_recency_terms_tf); + else if (strategy == "recency-tf-terms") std::sort(ranked.begin(), ranked.end(), cmp_recency_tf_terms); + else std::sort(ranked.begin(), ranked.end(), cmp_terms_tf_recency); + + flat_hash_map>> limited; + uint64_t maxKeep = cfg().relay__search__maxCandidateDocs; + size_t kept = 0; + for (const auto &ci : ranked) { + if (kept++ >= maxKeep) break; + auto it = docTerms.find(ci.levId); + if (it != docTerms.end()) limited.emplace(ci.levId, std::move(it->second)); + } + docTerms = std::move(limited); + } + } + + // Fetch doc metadata and compute BM25 scores + for (const auto &[levId, termFreqs] : docTerms) { + std::string_view val; + + if (!env.dbi_SearchDocMeta.get(txn, lmdb::to_sv(levId), val)) continue; // Doc deleted + + uint64_t packed = *reinterpret_cast(val.data()); + uint16_t docLen, kind; + unpackDocMeta(packed, docLen, kind); + + // Apply kind filter if specified + if (q.kinds.has_value()) { + bool kindMatches = false; + for (uint64_t k : *q.kinds) { + if (k == kind) { + kindMatches = true; + break; + } + } + if (!kindMatches) continue; + } + + // Compute BM25 score with optional recency tie-breaker + // levId is monotonically increasing (newer events have higher levId) + // If recencyBoostPercent (> 0), add a small recency boost normalized by percent/100 + float bm25Score = computeBM25(docLen, avgDocLen, termFreqs, idfs); + float score = bm25Score; + + uint64_t recencyBoostPercent = cfg().relay__search__recencyBoostPercent; + // Clamp to [0, 100] to avoid unexpected scaling + if (recencyBoostPercent > 100) recencyBoostPercent = 100; + if (recencyBoostPercent > 0 && N > 0) { + float factor = static_cast(recencyBoostPercent) / 100.0f; // 1 = 1% + float recencyBoost = (static_cast(levId) / static_cast(N)) * factor; + score += recencyBoost; + } + + results.push_back({levId, score}); + } + + // Sort by score descending (BM25 + optional recency tie-breaker if configured) + std::sort(results.begin(), results.end(), [](const SearchHit &a, const SearchHit &b) { + return a.score > b.score; + }); + + // Over-fetch to compensate for SearchRunner's post-filtering + // SearchRunner will apply full filter matching (tags, etc.) which may drop results + // Fetch (limit × overfetchFactor) candidates, bounded by maxCandidateDocs + uint64_t overfetchFactor = cfg().relay__search__overfetchFactor; + uint64_t overFetchLimit = std::min(q.limit * overfetchFactor, uint64_t(cfg().relay__search__maxCandidateDocs)); + if (results.size() > overFetchLimit) { + results.resize(overFetchLimit); + } + + } catch (std::exception &e) { + LE << "Search query failed: " << e.what(); + return {}; + } + + return results; + } + + // Background catch-up indexer - indexes events that were written before search was enabled + // or while indexing was offline. Should be run in a background thread. + void runCatchupIndexer(std::atomic &running) { + try { + LI << "Search catch-up indexer started"; + + Decompressor decomp; // For decompressing event payloads + + while (running) { + auto txn = lmdb::txn::begin(env.lmdb_env, nullptr, MDB_RDONLY); + + // Get last indexed levId from SearchState + uint64_t lastIndexedLevId = 0; + auto stateView = env.lookup_SearchState(txn, 1); // ID 1 is the single state record + if (stateView) { + lastIndexedLevId = stateView->lastIndexedLevId(); + } + + // Get current head (most recent levId) + uint64_t mostRecentLevId = env.get_largest_integer_key_or_zero(txn, env.dbi_EventPayload); + + if (lastIndexedLevId >= mostRecentLevId) { + // Index is caught up + txn.commit(); + std::this_thread::sleep_for(std::chrono::seconds(10)); + continue; + } + + uint64_t startLevId = lastIndexedLevId + 1; + uint64_t endLevId = std::min(lastIndexedLevId + 1000, mostRecentLevId); // Batch size: 1000 events + + LI << "Search indexer catching up: " << startLevId << " to " << endLevId << " (head: " << mostRecentLevId << ")"; + + txn.commit(); + + // Index batch + uint64_t indexed = 0; + uint64_t skipped = 0; + uint64_t lastProcessedLevId = lastIndexedLevId; + + for (uint64_t levId = startLevId; levId <= endLevId && running; levId++) { + lastProcessedLevId = levId; // always track progress + try { + // Read and decode event from EventPayload + auto rtxn = lmdb::txn::begin(env.lmdb_env, nullptr, MDB_RDONLY); + std::string_view eventPayload; + if (!env.dbi_EventPayload.get(rtxn, lmdb::to_sv(levId), eventPayload)) { + rtxn.commit(); + skipped++; + continue; // Event doesn't exist — lastProcessedLevId already advanced + } + + // Decode event payload (handles compression) + std::string_view json = decodeEventPayload(rtxn, decomp, eventPayload, nullptr, nullptr); + rtxn.commit(); + + // Parse event to get kind and created_at + auto eventJson = tao::json::from_string(json); + uint64_t kind = eventJson.at("kind").get_unsigned(); + uint64_t created_at = eventJson.at("created_at").get_unsigned(); + + // Index the event and persist progress inside the same transaction when data was written + bool wrote = indexEventWithTxnHook(levId, json, kind, created_at, [this, levId](lmdb::txn &txn) { + persistSearchState(txn, levId, kIndexVersion); + }); + + if (wrote) { + indexed++; + } else { + skipped++; + } + } catch (std::exception &e) { + LE << "Failed to index event during catch-up levId=" << levId << ": " << e.what(); + skipped++; + } + } + + // Single batch-end persist covers all skipped/missing trailing events + if (lastProcessedLevId > lastIndexedLevId) { + auto wtxn = lmdb::txn::begin(env.lmdb_env); + persistSearchState(wtxn, lastProcessedLevId, kIndexVersion); + wtxn.commit(); + } + + // Always log progress so it doesn't appear stuck + LI << "Search indexer: indexed=" << indexed << " skipped=" << skipped + << " range=[" << startLevId << ".." << lastProcessedLevId << "]" + << " head=" << mostRecentLevId; + + // Small sleep to avoid busy loop + if (endLevId >= mostRecentLevId) { + std::this_thread::sleep_for(std::chrono::seconds(10)); + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + } + + LI << "Search catch-up indexer stopped"; + + } catch (std::exception &e) { + LE << "Search catch-up indexer failed: " << e.what(); + } + } +}; diff --git a/src/search/NoopSearchProvider.h b/src/search/NoopSearchProvider.h new file mode 100644 index 00000000..86836154 --- /dev/null +++ b/src/search/NoopSearchProvider.h @@ -0,0 +1,35 @@ +#pragma once + +#include "SearchProvider.h" + +// No-op search provider (default when search is disabled) +// Accepts all operations but performs no indexing and returns empty results +class NoopSearchProvider : public ISearchProvider { +public: + NoopSearchProvider() = default; + ~NoopSearchProvider() override = default; + + bool healthy() const override { + // Always report as healthy (no backend to fail) + return true; + } + + void indexEvent(uint64_t levId, std::string_view json, uint64_t kind, uint64_t created_at) override { + // No-op: ignore indexing requests + (void)levId; + (void)json; + (void)kind; + (void)created_at; + } + + void deleteEvent(uint64_t levId) override { + // No-op: ignore deletion requests + (void)levId; + } + + std::vector query(const SearchQuery& q, lmdb::txn &) override { + // Return empty results + (void)q; + return {}; + } +}; diff --git a/src/search/SearchProvider.h b/src/search/SearchProvider.h new file mode 100644 index 00000000..fb47de4e --- /dev/null +++ b/src/search/SearchProvider.h @@ -0,0 +1,53 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include "golpe.h" + + +// Search query structure +struct SearchQuery { + std::string q; // Raw query string + std::optional> kinds; // Filter by kinds + std::optional> authors; // Filter by authors (hex pubkeys) + std::optional since; // Timestamp lower bound + std::optional until; // Timestamp upper bound + uint64_t limit = 100; // Maximum results to return +}; + +// Search result hit with relevance score +struct SearchHit { + uint64_t levId; // Event's internal LMDB ID + float score; // Relevance score (higher = more relevant) +}; + +// Abstract search provider interface +class ISearchProvider { +public: + virtual ~ISearchProvider() = default; + + // Health check - returns true if provider is operational + virtual bool healthy() const = 0; + + // Index an event (called after successful write) + // json: Full event JSON + // kind: Event kind + // created_at: Event timestamp + virtual void indexEvent(uint64_t levId, std::string_view json, uint64_t kind, uint64_t created_at) = 0; + + // Remove an event from the index (called on delete/replace/expiration) + virtual void deleteEvent(uint64_t levId) = 0; + + // Execute a search query using the provided read-only LMDB transaction + // Fills hits sorted by score descending + virtual std::vector query(const SearchQuery& q, lmdb::txn &txn) = 0; +}; + + +// Factory function to create search provider based on config +// Declared here, implemented in SearchProvider.cpp +std::unique_ptr makeSearchProvider(); diff --git a/src/search/SearchRunner.h b/src/search/SearchRunner.h new file mode 100644 index 00000000..180d86f8 --- /dev/null +++ b/src/search/SearchRunner.h @@ -0,0 +1,110 @@ +#pragma once + +#include "SearchProvider.h" +#include "filters.h" +#include "golpe.h" +#include "Tokenizer.h" +#include "Decompressor.h" +#include "events.h" +#include +#include + + +struct SearchRunner : NonCopyable { + const NostrFilter &f; + ISearchProvider *provider; + std::deque hits; + size_t nextHitIndex = 0; + uint64_t approxWork = 0; + bool initialized = false; + Decompressor decomp; + + SearchRunner(const NostrFilter &f, ISearchProvider *provider) + : f(f), provider(provider) {} + + void initialize(lmdb::txn &txn) { + if (initialized) return; + initialized = true; + + if (!f.hasSearch()) return; + if (!provider) return; + + SearchQuery q; + q.q = *f.search; + q.limit = std::min(f.limit, uint64_t(cfg().relay__maxFilterLimit)); + + if (f.kinds) { + q.kinds.emplace(); + for (size_t i = 0; i < f.kinds->size(); i++) { + q.kinds->push_back(f.kinds->at(i)); + } + } + + if (f.authors) { + q.authors.emplace(); + for (size_t i = 0; i < f.authors->size(); i++) { + q.authors->push_back(to_hex(f.authors->at(i))); + } + } + + if (f.since > 0) q.since = f.since; + if (f.until < MAX_U64) q.until = f.until; + + auto searchResults = provider->query(q, txn); + hits = std::deque(searchResults.begin(), searchResults.end()); + approxWork = hits.size(); + } + + bool scan(lmdb::txn &txn, const std::function &handleEvent, const std::function &doPause) { + if (!initialized) initialize(txn); + + while (nextHitIndex < hits.size()) { + approxWork++; + if (doPause(approxWork)) return false; + + const auto &hit = hits[nextHitIndex++]; + uint64_t levId = hit.levId; + + approxWork += 10; + auto view = env.lookup_Event(txn, levId); + if (!view) continue; + + if (f.hasSearch()) { + try { + std::string_view eventPayload; + if (!env.dbi_EventPayload.get(txn, lmdb::to_sv(levId), eventPayload)) continue; + std::string_view eventJson = decodeEventPayload(txn, decomp, eventPayload, nullptr, nullptr); + + auto eventJsonObj = tao::json::from_string(eventJson); + std::string searchableText = Tokenizer::extractText(eventJsonObj); + + auto queryTokens = Tokenizer::parseQuery(*f.search); + auto eventTokens = Tokenizer::tokenize(searchableText); + + flat_hash_set eventTokenSet; + for (const auto &token : eventTokens) { + eventTokenSet.insert(token.text); + } + + bool allTokensMatch = true; + for (const auto &queryToken : queryTokens) { + if (eventTokenSet.find(queryToken) == eventTokenSet.end()) { + allTokensMatch = false; + break; + } + } + + if (!allTokensMatch) continue; + } catch (...) { + continue; + } + } + + if (!f.doesMatch(PackedEventView(view->buf))) continue; + + if (handleEvent(levId)) return true; + } + + return true; + } +}; diff --git a/src/search/Tokenizer.h b/src/search/Tokenizer.h new file mode 100644 index 00000000..3c1b7364 --- /dev/null +++ b/src/search/Tokenizer.h @@ -0,0 +1,108 @@ +#pragma once + +#include +#include +#include +#include +#include +#include "golpe.h" +#include + + +struct Token { + std::string text; + uint16_t tf = 1; +}; + +class Tokenizer { +public: + static std::vector tokenize(std::string_view text) { + std::vector tokens; + flat_hash_map termFreq; + + std::string currentToken; + currentToken.reserve(64); + + for (char c : text) { + if (std::isalnum(static_cast(c))) { + currentToken += std::tolower(static_cast(c)); + } else if (!currentToken.empty()) { + if (currentToken.size() >= 2 && currentToken.size() <= 48) { + auto &count = termFreq[currentToken]; + if (count < 65535) count++; + } + currentToken.clear(); + } + } + + if (!currentToken.empty() && currentToken.size() >= 2 && currentToken.size() <= 48) { + auto &count = termFreq[currentToken]; + if (count < 65535) count++; + } + + tokens.reserve(termFreq.size()); + for (const auto &[text, tf] : termFreq) { + tokens.push_back(Token{text, tf}); + } + + return tokens; + } + + static std::string extractText(const tao::json::value &eventJson) { + std::string result; + + if (eventJson.find("content") != nullptr) { + const auto &content = eventJson.at("content"); + if (content.is_string()) { + result = content.get_string(); + } + } + + if (eventJson.find("tags") != nullptr) { + const auto &tags = eventJson.at("tags"); + if (tags.is_array()) { + for (const auto &tag : tags.get_array()) { + if (tag.is_array() && tag.get_array().size() >= 2) { + const auto &tagName = tag.get_array()[0]; + const auto &tagValue = tag.get_array()[1]; + + if (tagName.is_string() && tagName.get_string() == "subject" && + tagValue.is_string()) { + if (!result.empty()) result += " "; + result += tagValue.get_string(); + } + } + } + } + } + + return result; + } + + static std::vector parseQuery(std::string_view query) { + std::vector tokens; + std::string currentToken; + currentToken.reserve(64); + + for (char c : query) { + if (std::isalnum(static_cast(c))) { + currentToken += std::tolower(static_cast(c)); + } else if (!currentToken.empty()) { + if (currentToken.size() >= 2 && currentToken.size() <= 48) { + if (std::find(tokens.begin(), tokens.end(), currentToken) == tokens.end()) { + tokens.push_back(currentToken); + } + } + currentToken.clear(); + } + } + + if (!currentToken.empty() && currentToken.size() >= 2 && currentToken.size() <= 48) { + if (std::find(tokens.begin(), tokens.end(), currentToken) == tokens.end()) { + tokens.push_back(currentToken); + } + } + + return tokens; + } +}; diff --git a/strfry.conf b/strfry.conf index 9db7d9e0..f09fc470 100644 --- a/strfry.conf +++ b/strfry.conf @@ -47,7 +47,7 @@ relay { port = 7777 # Set OS-limit on maximum number of open files/sockets (if 0, don't attempt to set) (restart required) - nofiles = 1000000 + nofiles = 0 # HTTP header that contains the client's real IP, before reverse proxying (ie x-real-ip) (MUST be all lower-case) realIpHeader = "" @@ -135,7 +135,7 @@ relay { dumpInReqs = false # Log performance metrics for initial REQ database scans - dbScanPerf = false + dbScanPerf = true # Log reason for invalid event rejection? Can be disabled to silence excessive logging invalidEvents = true @@ -163,6 +163,48 @@ relay { maxSyncEvents = 1000000 } + search { + # Enable NIP-50 search capability (requires search backend) + enabled = true + + # Search backend to use: lmdb, noop (or external in future) + backend = "lmdb" + + # Maximum number of search terms allowed in a query + maxQueryTerms = 6 + + # Comma-separated kinds/ranges to index. Supports: single (1), ranges (1000-1999), wildcard (*), exclusions (-5000-5999) + indexedKinds = "*" + + # Maximum number of postings (documents) per search token + maxPostingsPerToken = 100000 + + # Maximum candidate documents to fetch during search (multiple of limit) + maxCandidateDocs = 1000 + + # Recency tie-breaker percent (0–100); 1 = 1% boost for newest events + recencyBoostPercent = 1 + + # Over-fetch multiplier to compensate for post-filtering (candidates = limit × factor, bounded by maxCandidateDocs) + overfetchFactor = 5 + + # Candidate ranking order before scoring: terms-tf-recency | terms-recency-tf | tf-terms-recency | tf-recency-terms | recency-terms-tf | recency-tf-terms + candidateRanking = "terms-tf-recency" + + # Candidate ranking mode: order | weighted + candidateRankMode = "weighted" + + # Weighted ranking weights (only used when candidateRankMode = "weighted") + rankWeightTerms = 100 + rankWeightTf = 50 + rankWeightRecency = 10 + + bm25 { + k1 = 1.2 + b = 0.75 + } + } + filterValidation { # Enable strict filter validation for REQ messages enabled = false