Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,3 @@ services:
timescaledb:
condition: service_healthy
restart: unless-stopped

ingester:
image: docker.redpanda.com/redpandadata/connect
container_name: ingester
command: ["/redpanda-connect", "run", "/connect.yaml"]
volumes:
- ./ingester/connect.yaml:/connect.yaml
ports:
- "4195:4195"
environment:
MQTT_TOPIC: ${MQTT_TOPIC}
MQTT_URL: ${MQTT_URL}
MQTT_CLIENT_ID: ${MQTT_CLIENT_ID:-trakrf-platform-ingester}
PG_URL: ${PG_URL}
depends_on:
timescaledb:
condition: service_healthy
restart: unless-stopped
46 changes: 0 additions & 46 deletions ingester/acceptance/capture-cs463.sh

This file was deleted.

24 changes: 0 additions & 24 deletions ingester/connect.yaml

This file was deleted.

37 changes: 0 additions & 37 deletions ingester/justfile

This file was deleted.

4 changes: 0 additions & 4 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,13 @@ backend *args:
database *args:
cd database && just {{args}}

ingester *args:
cd ingester && just {{args}}

# ============================================================================
# Lazy Dev Aliases
# ============================================================================

alias db := database
alias fe := frontend
alias be := backend
alias ing := ingester

# ============================================================================
# Combined Validation Commands
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,26 @@

TRA-834. Pipeline gate for the GKE Mosquitto broker (TRA-828): publish a
corpus of real CS463 MQTT messages at `mqtt.{env}.gke.trakrf.id` and assert
the rows land in `trakrf.tag_scans`, exercising the Redpanda Connect transform
and DB write end to end.
the rows land in `trakrf.tag_scans`, exercising the in-backend Go subscriber
(TRA-900) and DB write end to end.

Companion to TRA-835 (broker-liveness pub/sub ping, infra side). This deck
deliberately doesn't test broker-liveness on its own — that's TRA-835's job.

## Layout

```
ingester/acceptance/
test/acceptance/cs463-replay/
├── README.md
├── capture-cs463.sh # one-shot corpus capture from live EMQX
├── replay-cs463.sh # replay + DB assertion against the GKE broker
└── corpus/
└── cs463.tsv # 2521 messages, 4 capture points, 71 EPCs
```

`cs463.tsv` is tab-separated `topic<TAB>payload-json`, captured 2026-05-25
from EMQX Cloud after the cs463-214 capture-point rename. Frozen as a
fixture — EMQX Cloud is on the teardown list.
fixture — EMQX Cloud has since been decommissioned (TRA-828), so this corpus
is permanent and not recapturable.

## Run

Expand All @@ -35,7 +35,7 @@ Required env (load from `.env.local`):
```sh
set -a; source .env.local; set +a
MQTT_GKE_HOST=mqtt.preview.gke.trakrf.id \
ingester/acceptance/replay-cs463.sh
test/acceptance/cs463-replay/replay-cs463.sh
```

Assertion runs via `kubectl exec` into the per-env CNPG primary (TRA-823 +
Expand All @@ -62,28 +62,24 @@ defaulting to `CURRENT_TIMESTAMP` (microsecond resolution). Real device
traffic at ~1 msg/s/topic is fine. This script replays the 2521-message
corpus in ~30s (~85 msg/s sustained), which clusters multiple same-topic
messages into the same microsecond — the second one violates the PK and is
dropped, with a corresponding broker backpressure event on the ingester's
loopback subscriber. Expect ~5-10% PARTIAL on a clean run.
dropped, with a corresponding broker backpressure event on the Go
subscriber. Expect ~5-10% PARTIAL on a clean run.

The actual schema fix lives in a separate platform ticket. Until then,
`PARTIAL` is the steady-state expected outcome and is treated as PASS.

## Dependencies

- TRA-828 broker deployed (`mqtt.{env}.gke.trakrf.id:8883` reachable, TLS 1.2)
- Ingester subscribed to the broker, writing to `trakrf_preview` / `trakrf_prod`
- In-backend Go subscriber (TRA-900) running against the broker, writing to `trakrf_preview` / `trakrf_prod`
- `kubectl` configured for the target cluster (default access path)
- `mosquitto_pub`, `jq`, `awk` on `PATH`

## Re-capture (only while EMQX is still live)
## Re-capture

```sh
set -a; source .env.local; set +a
ingester/acceptance/capture-cs463.sh
```

Replaces `corpus/cs463.tsv`. Don't re-run unless you have a reason — the
checked-in corpus is the artifact this ticket exists to preserve.
Not possible — EMQX Cloud was decommissioned (TRA-828) and the
`capture-cs463.sh` script was retired with it (TRA-963). The checked-in
`corpus/cs463.tsv` is the permanent artifact this deck exists to preserve.

## Note on table name

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#
# Pipeline under test:
# mosquitto_pub -> mqtt.{env}.gke.trakrf.id:8883 (Mosquitto, TLS 1.2)
# -> ingester (Redpanda Connect, MQTT input -> sql_raw output)
# -> in-backend Go MQTT subscriber (TRA-900, raw insert)
# -> trakrf.tag_scans (TimescaleDB hypertable in the per-env CNPG cluster)
#
# Each replayed payload gets a unique top-level tra834_replay_id field; the
Expand All @@ -19,12 +19,12 @@
#
# Env (required):
# MQTT_GKE_HOST broker hostname, e.g. mqtt.preview.gke.trakrf.id
# MOSQUITTO_USER broker username (matches helm/trakrf-ingester auth secret)
# MOSQUITTO_USER broker username (broker auth secret, TRA-828)
# MOSQUITTO_PASSWORD broker password
#
# Env (optional):
# MQTT_GKE_PORT default 8883
# CORPUS default ingester/acceptance/corpus/cs463.tsv
# CORPUS default test/acceptance/cs463-replay/corpus/cs463.tsv
# INGEST_WAIT_SECONDS default 15
# ASSERT_PSQL_CMD SQL-from-stdin invocation; defaults to:
# kubectl exec -i -n trakrf-system trakrf-db-1 -c postgres \
Expand Down Expand Up @@ -77,15 +77,15 @@ done < <(cut -f1 "$CORPUS" | sort -u)

echo "published: $published"

echo "waiting ${INGEST_WAIT_SECONDS}s for ingester to drain..."
echo "waiting ${INGEST_WAIT_SECONDS}s for the Go subscriber to drain..."
sleep "$INGEST_WAIT_SECONDS"

LANDED=$(printf "SELECT count(*) FROM trakrf.tag_scans WHERE created_at >= '%s' AND message_data ->> 'tra834_replay_id' = '%s';\n" "$T_START" "$MARKER" | assert_psql)
TOPICS=$(printf "SELECT string_agg(message_topic || ':' || c, ' ' ORDER BY message_topic) FROM (SELECT message_topic, count(*) AS c FROM trakrf.tag_scans WHERE created_at >= '%s' AND message_data ->> 'tra834_replay_id' = '%s' GROUP BY message_topic) t;\n" "$T_START" "$MARKER" | assert_psql)
echo "landed: $LANDED / $published"
echo "by topic: $TOPICS"

# Pipeline acceptance: landed > 0 proves broker -> ingester -> DB works end
# Pipeline acceptance: landed > 0 proves broker -> Go subscriber -> DB works end
# to end. A gap between landed and published is a known schema behaviour
# under burst replay rate (tag_scans PK = (created_at, message_topic) with
# microsecond-resolution CURRENT_TIMESTAMP — same-topic messages within one
Expand Down
Loading