diff --git a/docker-compose.yaml b/docker-compose.yaml index db634b88..4470b121 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -41,21 +41,3 @@ services: timescaledb: condition: service_healthy restart: unless-stopped - - ingester: - image: docker.redpanda.com/redpandadata/connect - container_name: ingester - command: ["/redpanda-connect", "run", "/connect.yaml"] - volumes: - - ./ingester/connect.yaml:/connect.yaml - ports: - - "4195:4195" - environment: - MQTT_TOPIC: ${MQTT_TOPIC} - MQTT_URL: ${MQTT_URL} - MQTT_CLIENT_ID: ${MQTT_CLIENT_ID:-trakrf-platform-ingester} - PG_URL: ${PG_URL} - depends_on: - timescaledb: - condition: service_healthy - restart: unless-stopped diff --git a/ingester/acceptance/capture-cs463.sh b/ingester/acceptance/capture-cs463.sh deleted file mode 100755 index ba0c1230..00000000 --- a/ingester/acceptance/capture-cs463.sh +++ /dev/null @@ -1,46 +0,0 @@ -#!/usr/bin/env bash -# TRA-834 — Capture a fresh CS463 MQTT corpus from the live EMQX broker. -# -# Subscribes to trakrf.id/+/reads on EMQX Cloud and writes raw topic+payload -# pairs (tab-separated) to ingester/acceptance/corpus/cs463.tsv. The output -# format is what replay-cs463.sh consumes. -# -# This is a one-shot capture, time-sensitive: EMQX Cloud is being torn down -# after the GKE broker (TRA-828) cutover. The checked-in corpus is the -# durable artifact; re-run this only if you need fresher / more diverse data -# while EMQX is still live. -# -# Env (required, from .env.local): -# MQTT_HOST MQTT_PORT MQTT_USER MQTT_PASS -# -# Env (optional): -# CAPTURE_WINDOW_SECONDS default 300 (5 min) -# OUT default ingester/acceptance/corpus/cs463.tsv -# -set -euo pipefail - -: "${MQTT_HOST:?required}" -: "${MQTT_PORT:?required}" -: "${MQTT_USER:?required}" -: "${MQTT_PASS:?required}" -CAPTURE_WINDOW_SECONDS="${CAPTURE_WINDOW_SECONDS:-300}" - -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -OUT="${OUT:-$SCRIPT_DIR/corpus/cs463.tsv}" -mkdir -p "$(dirname "$OUT")" - -echo "subscribing to trakrf.id/+/reads on $MQTT_HOST:$MQTT_PORT for ${CAPTURE_WINDOW_SECONDS}s..." -# -W exits after N seconds of inactivity; the outer timeout is a hard ceiling. -timeout $((CAPTURE_WINDOW_SECONDS + 30)) mosquitto_sub \ - -h "$MQTT_HOST" -p "$MQTT_PORT" \ - -u "$MQTT_USER" -P "$MQTT_PASS" \ - --tls-version tlsv1.2 --capath /etc/ssl/certs \ - -i "trakrf-tra834-capture-$$" \ - -t 'trakrf.id/+/reads' \ - -F '%t %p' \ - -W "$CAPTURE_WINDOW_SECONDS" \ - >"$OUT" || true # mosquitto_sub exits non-zero on -W timeout, that's the success case - -echo "captured $(wc -l <"$OUT") messages to $OUT" -echo "by topic:" -cut -f1 "$OUT" | sort | uniq -c | sort -rn | sed 's/^/ /' diff --git a/ingester/connect.yaml b/ingester/connect.yaml deleted file mode 100644 index 5ce370d6..00000000 --- a/ingester/connect.yaml +++ /dev/null @@ -1,24 +0,0 @@ -input: - mqtt: - urls: - - ${MQTT_URL} - client_id: ${MQTT_CLIENT_ID:-trakrf-local} - connect_timeout: 30s - topics: - - ${MQTT_TOPIC} - -pipeline: - processors: [] -# - bloblang: -# mapping: 'root = "um..."' - -output: - sql_raw: - driver: postgres - dsn: ${PG_URL} - query: "INSERT INTO trakrf.tag_scans (message_topic, message_data) VALUES ($1, $2)" - args_mapping: 'root = [ meta("mqtt_topic"), this.string() ]' - -#output: -# stdout: -# codec: lines diff --git a/ingester/justfile b/ingester/justfile deleted file mode 100644 index 53312ab4..00000000 --- a/ingester/justfile +++ /dev/null @@ -1,37 +0,0 @@ -# Ingester Task Runner (MQTT → Database) -set dotenv-load := true -set fallback := true - -# List all available commands -default: - @just --list - -# Start ingester (MQTT → identifier_scans table) -up: - @echo "🔄 Starting ingester (MQTT → database)..." - docker compose up -d ingester - @echo "✅ Ingester started - streaming MQTT data to identifier_scans table" - -# Stop ingester -down: - @echo "🛑 Stopping ingester..." - docker compose stop ingester - -# Restart ingester -restart: - @just down - @just up - -# Show ingester logs -logs: - docker compose logs -f ingester - -# Rebuild and restart ingester -rebuild: - @echo "🔨 Rebuilding ingester..." - docker compose build ingester - @just restart - -# Show ingester status -status: - @docker compose ps ingester diff --git a/justfile b/justfile index 4290920e..9b326488 100644 --- a/justfile +++ b/justfile @@ -21,9 +21,6 @@ backend *args: database *args: cd database && just {{args}} -ingester *args: - cd ingester && just {{args}} - # ============================================================================ # Lazy Dev Aliases # ============================================================================ @@ -31,7 +28,6 @@ ingester *args: alias db := database alias fe := frontend alias be := backend -alias ing := ingester # ============================================================================ # Combined Validation Commands diff --git a/ingester/acceptance/README.md b/test/acceptance/cs463-replay/README.md similarity index 77% rename from ingester/acceptance/README.md rename to test/acceptance/cs463-replay/README.md index 4e4e9090..2955c006 100644 --- a/ingester/acceptance/README.md +++ b/test/acceptance/cs463-replay/README.md @@ -2,8 +2,8 @@ TRA-834. Pipeline gate for the GKE Mosquitto broker (TRA-828): publish a corpus of real CS463 MQTT messages at `mqtt.{env}.gke.trakrf.id` and assert -the rows land in `trakrf.tag_scans`, exercising the Redpanda Connect transform -and DB write end to end. +the rows land in `trakrf.tag_scans`, exercising the in-backend Go subscriber +(TRA-900) and DB write end to end. Companion to TRA-835 (broker-liveness pub/sub ping, infra side). This deck deliberately doesn't test broker-liveness on its own — that's TRA-835's job. @@ -11,9 +11,8 @@ deliberately doesn't test broker-liveness on its own — that's TRA-835's job. ## Layout ``` -ingester/acceptance/ +test/acceptance/cs463-replay/ ├── README.md -├── capture-cs463.sh # one-shot corpus capture from live EMQX ├── replay-cs463.sh # replay + DB assertion against the GKE broker └── corpus/ └── cs463.tsv # 2521 messages, 4 capture points, 71 EPCs @@ -21,7 +20,8 @@ ingester/acceptance/ `cs463.tsv` is tab-separated `topicpayload-json`, captured 2026-05-25 from EMQX Cloud after the cs463-214 capture-point rename. Frozen as a -fixture — EMQX Cloud is on the teardown list. +fixture — EMQX Cloud has since been decommissioned (TRA-828), so this corpus +is permanent and not recapturable. ## Run @@ -35,7 +35,7 @@ Required env (load from `.env.local`): ```sh set -a; source .env.local; set +a MQTT_GKE_HOST=mqtt.preview.gke.trakrf.id \ - ingester/acceptance/replay-cs463.sh + test/acceptance/cs463-replay/replay-cs463.sh ``` Assertion runs via `kubectl exec` into the per-env CNPG primary (TRA-823 + @@ -62,8 +62,8 @@ defaulting to `CURRENT_TIMESTAMP` (microsecond resolution). Real device traffic at ~1 msg/s/topic is fine. This script replays the 2521-message corpus in ~30s (~85 msg/s sustained), which clusters multiple same-topic messages into the same microsecond — the second one violates the PK and is -dropped, with a corresponding broker backpressure event on the ingester's -loopback subscriber. Expect ~5-10% PARTIAL on a clean run. +dropped, with a corresponding broker backpressure event on the Go +subscriber. Expect ~5-10% PARTIAL on a clean run. The actual schema fix lives in a separate platform ticket. Until then, `PARTIAL` is the steady-state expected outcome and is treated as PASS. @@ -71,19 +71,15 @@ The actual schema fix lives in a separate platform ticket. Until then, ## Dependencies - TRA-828 broker deployed (`mqtt.{env}.gke.trakrf.id:8883` reachable, TLS 1.2) -- Ingester subscribed to the broker, writing to `trakrf_preview` / `trakrf_prod` +- In-backend Go subscriber (TRA-900) running against the broker, writing to `trakrf_preview` / `trakrf_prod` - `kubectl` configured for the target cluster (default access path) - `mosquitto_pub`, `jq`, `awk` on `PATH` -## Re-capture (only while EMQX is still live) +## Re-capture -```sh -set -a; source .env.local; set +a -ingester/acceptance/capture-cs463.sh -``` - -Replaces `corpus/cs463.tsv`. Don't re-run unless you have a reason — the -checked-in corpus is the artifact this ticket exists to preserve. +Not possible — EMQX Cloud was decommissioned (TRA-828) and the +`capture-cs463.sh` script was retired with it (TRA-963). The checked-in +`corpus/cs463.tsv` is the permanent artifact this deck exists to preserve. ## Note on table name diff --git a/ingester/acceptance/corpus/cs463.tsv b/test/acceptance/cs463-replay/corpus/cs463.tsv similarity index 100% rename from ingester/acceptance/corpus/cs463.tsv rename to test/acceptance/cs463-replay/corpus/cs463.tsv diff --git a/ingester/acceptance/replay-cs463.sh b/test/acceptance/cs463-replay/replay-cs463.sh similarity index 92% rename from ingester/acceptance/replay-cs463.sh rename to test/acceptance/cs463-replay/replay-cs463.sh index 06171ad2..adcfd879 100755 --- a/ingester/acceptance/replay-cs463.sh +++ b/test/acceptance/cs463-replay/replay-cs463.sh @@ -4,7 +4,7 @@ # # Pipeline under test: # mosquitto_pub -> mqtt.{env}.gke.trakrf.id:8883 (Mosquitto, TLS 1.2) -# -> ingester (Redpanda Connect, MQTT input -> sql_raw output) +# -> in-backend Go MQTT subscriber (TRA-900, raw insert) # -> trakrf.tag_scans (TimescaleDB hypertable in the per-env CNPG cluster) # # Each replayed payload gets a unique top-level tra834_replay_id field; the @@ -19,12 +19,12 @@ # # Env (required): # MQTT_GKE_HOST broker hostname, e.g. mqtt.preview.gke.trakrf.id -# MOSQUITTO_USER broker username (matches helm/trakrf-ingester auth secret) +# MOSQUITTO_USER broker username (broker auth secret, TRA-828) # MOSQUITTO_PASSWORD broker password # # Env (optional): # MQTT_GKE_PORT default 8883 -# CORPUS default ingester/acceptance/corpus/cs463.tsv +# CORPUS default test/acceptance/cs463-replay/corpus/cs463.tsv # INGEST_WAIT_SECONDS default 15 # ASSERT_PSQL_CMD SQL-from-stdin invocation; defaults to: # kubectl exec -i -n trakrf-system trakrf-db-1 -c postgres \ @@ -77,7 +77,7 @@ done < <(cut -f1 "$CORPUS" | sort -u) echo "published: $published" -echo "waiting ${INGEST_WAIT_SECONDS}s for ingester to drain..." +echo "waiting ${INGEST_WAIT_SECONDS}s for the Go subscriber to drain..." sleep "$INGEST_WAIT_SECONDS" LANDED=$(printf "SELECT count(*) FROM trakrf.tag_scans WHERE created_at >= '%s' AND message_data ->> 'tra834_replay_id' = '%s';\n" "$T_START" "$MARKER" | assert_psql) @@ -85,7 +85,7 @@ TOPICS=$(printf "SELECT string_agg(message_topic || ':' || c, ' ' ORDER BY messa echo "landed: $LANDED / $published" echo "by topic: $TOPICS" -# Pipeline acceptance: landed > 0 proves broker -> ingester -> DB works end +# Pipeline acceptance: landed > 0 proves broker -> Go subscriber -> DB works end # to end. A gap between landed and published is a known schema behaviour # under burst replay rate (tag_scans PK = (created_at, message_topic) with # microsecond-resolution CURRENT_TIMESTAMP — same-topic messages within one