diff --git a/backend/internal/cmd/serve/serve.go b/backend/internal/cmd/serve/serve.go index b8374c73..937c959f 100644 --- a/backend/internal/cmd/serve/serve.go +++ b/backend/internal/cmd/serve/serve.go @@ -35,6 +35,7 @@ import ( "github.com/trakrf/platform/backend/internal/services/email" orgsservice "github.com/trakrf/platform/backend/internal/services/orgs" readstreamsvc "github.com/trakrf/platform/backend/internal/services/readstream" + "github.com/trakrf/platform/backend/internal/services/topicroute" "github.com/trakrf/platform/backend/internal/storage" "github.com/trakrf/platform/backend/internal/util/jwt" ) @@ -103,6 +104,15 @@ func Run(ctx context.Context, info buildinfo.Info, frontendFS fs.FS) error { readBroadcaster := readstreamsvc.New() defer readBroadcaster.Stop() + // TRA-922: the topic registry owns the publish_topic→route map (message + // routing) and the broker subscription set. Constructed unconditionally so + // the scan-device CRUD handler can keep it current even when ingestion is off; + // the subscriber attaches as its SubscriptionManager when MQTT is enabled. + topicRegistry := topicroute.NewRegistry(store, *log) + if err := topicRegistry.Reconcile(ctx); err != nil { + log.Warn().Err(err).Msg("initial topic registry load failed; ticker will retry") + } + mqttCfg := ingest.ConfigFromEnv() var alarmDispatcher alarm.Dispatcher if mqttCfg.Enabled() { @@ -120,13 +130,33 @@ func Run(ctx context.Context, info buildinfo.Info, frontendFS fs.FS) error { geofenceEngine.Start() defer geofenceEngine.Stop() - subscriber := ingest.NewSubscriber(mqttCfg, store, geofenceEngine, readBroadcaster, log) + subscriber := ingest.NewSubscriber(mqttCfg, store, topicRegistry, geofenceEngine, readBroadcaster, log) if err := subscriber.Start(); err != nil { log.Error().Err(err).Msg("Failed to start MQTT subscriber") return err } defer subscriber.Stop() log.Info().Msg("MQTT subscriber started") + + // TRA-922: periodic reconcile is the safety net for missed CRUD events, + // direct DB edits, and future multi-replica drift; CRUD reconciles inline + // and OnConnect bulk-subscribes, so this only catches the gaps. + reconcileStop := make(chan struct{}) + go func() { + t := time.NewTicker(5 * time.Minute) + defer t.Stop() + for { + select { + case <-reconcileStop: + return + case <-t.C: + if err := topicRegistry.Reconcile(ctx); err != nil { + log.Warn().Err(err).Msg("topic registry reconcile failed") + } + } + } + }() + defer close(reconcileStop) } else { // No broker: http-only dispatcher (nil mqtt → mqtt devices error clearly). alarmDispatcher = alarm.NewDispatcher(shellyClient, nil) @@ -145,7 +175,7 @@ func Run(ctx context.Context, info buildinfo.Info, frontendFS fs.FS) error { locationsHandler := locationshandler.NewHandler(store) inventoryHandler := inventoryhandler.NewHandler(store) reportsHandler := reportshandler.NewHandler(store) - scanDevicesHandler := scandeviceshandler.NewHandler(store) + scanDevicesHandler := scandeviceshandler.NewHandler(store, topicRegistry) scanPointsHandler := scanpointshandler.NewHandler(store) // 2s test-fire pulse: long enough for an operator to see the strobe, short // enough not to leave the relay latched after a confidence check. diff --git a/backend/internal/cmd/serve/serve_test.go b/backend/internal/cmd/serve/serve_test.go index b4dccc91..0bbcc339 100644 --- a/backend/internal/cmd/serve/serve_test.go +++ b/backend/internal/cmd/serve/serve_test.go @@ -47,7 +47,7 @@ func setupTestRouter(t *testing.T) *chi.Mux { locationsHandler := locationshandler.NewHandler(store) inventoryHandler := inventoryhandler.NewHandler(store) reportsHandler := reportshandler.NewHandler(store) - scanDevicesHandler := scandeviceshandler.NewHandler(store) + scanDevicesHandler := scandeviceshandler.NewHandler(store, nil) scanPointsHandler := scanpointshandler.NewHandler(store) outputDevicesHandler := outputdeviceshandler.NewHandler(store, alarm.NewDispatcher(shelly.New(0), nil), 0) lookupHandler := lookuphandler.NewHandler(store) diff --git a/backend/internal/handlers/readstream/readstream_test.go b/backend/internal/handlers/readstream/readstream_test.go index 49fa08e8..c2539412 100644 --- a/backend/internal/handlers/readstream/readstream_test.go +++ b/backend/internal/handlers/readstream/readstream_test.go @@ -87,7 +87,7 @@ func TestStream_OrgFilteredFramingSurvivesWriteTimeout(t *testing.T) { t.Fatal("received another org's read — org filtering broken") } if strings.HasPrefix(line, "data:") && strings.Contains(line, "EPC-1") { - if !strings.Contains(line, `"readerKey":"dock-9"`) { + if !strings.Contains(line, `"readerKey":"trakrf.id/dock-9/reads"`) { t.Fatalf("data frame missing readerKey: %s", line) } return // success diff --git a/backend/internal/handlers/scandevices/scandevices.go b/backend/internal/handlers/scandevices/scandevices.go index e16d78ab..a6fdb080 100644 --- a/backend/internal/handlers/scandevices/scandevices.go +++ b/backend/internal/handlers/scandevices/scandevices.go @@ -5,6 +5,7 @@ package scandevices import ( + "context" "net/http" "strconv" "strings" @@ -16,6 +17,7 @@ import ( "github.com/trakrf/platform/backend/internal/models/scandevice" "github.com/trakrf/platform/backend/internal/models/scanpoint" "github.com/trakrf/platform/backend/internal/models/shared" + "github.com/trakrf/platform/backend/internal/services/topicroute" "github.com/trakrf/platform/backend/internal/storage" "github.com/trakrf/platform/backend/internal/util/httputil" ) @@ -28,11 +30,50 @@ var validate = func() *validator.Validate { }() type Handler struct { - storage *storage.Storage + storage *storage.Storage + registry *topicroute.Registry // TRA-922: reconciled after CRUD so the subscriber tracks topic changes } -func NewHandler(storage *storage.Storage) *Handler { - return &Handler{storage: storage} +func NewHandler(storage *storage.Storage, registry *topicroute.Registry) *Handler { + return &Handler{storage: storage, registry: registry} +} + +// validateTopicPrefix enforces the {org_slug}/ prefix on a publish_topic +// (TRA-922). The org slug (organizations.identifier) is globally unique, so the +// prefix makes the per-org publish_topic uniqueness effectively global and lays +// down the {org_slug}/# ACL namespace (TRA-857). An empty topic or a web_ble +// (handheld) device is exempt (no MQTT topic). Returns a user-facing message and +// false on violation. +func (h *Handler) validateTopicPrefix(ctx context.Context, orgID int, transport, topic string) (string, bool) { + if topic == "" || transport == scandevice.TransportWebBLE { + return "", true + } + org, err := h.storage.GetOrganizationByID(ctx, orgID) + if err != nil || org == nil || org.Identifier == "" { + return "organization has no identifier; cannot set a publish_topic", false + } + if !strings.HasPrefix(topic, org.Identifier+"/") { + return "publish_topic must start with \"" + org.Identifier + "/\"", false + } + return "", true +} + +// reconcile re-syncs the topic registry after a successful scan-device mutation +// (TRA-922). Best-effort: the mutation already committed and the periodic ticker +// backstops, so a reconcile error must not fail the request. +func (h *Handler) reconcile(ctx context.Context) { + if h.registry == nil { + return + } + _ = h.registry.Reconcile(ctx) +} + +// derefOr returns *p or the fallback when p is nil. +func derefOr(p *string, fallback string) string { + if p == nil { + return fallback + } + return *p } // RegisterRoutes wires the scan-device routes (and the device-nested scan-point @@ -123,11 +164,16 @@ func (h *Handler) Create(w http.ResponseWriter, r *http.Request) { httputil.RespondValidationError(w, r, err, reqID) return } + if msg, ok := h.validateTopicPrefix(r.Context(), orgID, req.Transport, derefOr(req.PublishTopic, "")); !ok { + httputil.WriteJSONError(w, r, http.StatusBadRequest, modelerrors.ErrValidation, msg, reqID) + return + } device, err := h.storage.CreateScanDevice(r.Context(), orgID, req) if err != nil { writeConflictOrInternal(w, r, err, reqID) return } + h.reconcile(r.Context()) w.Header().Set("Location", "/api/v1/scan-devices/"+strconv.Itoa(device.ID)) httputil.WriteJSON(w, http.StatusCreated, scandevice.ScanDeviceResponse{Data: *device}) } @@ -193,6 +239,15 @@ func (h *Handler) Update(w http.ResponseWriter, r *http.Request) { httputil.RespondValidationError(w, r, err, reqID) return } + // TRA-922: enforce the {org_slug}/ prefix only when publish_topic is being + // changed. An update that omits it leaves the stored value untouched, so + // existing (grandfathered) topics are not retroactively rejected. + if req.PublishTopic != nil { + if msg, ok := h.validateTopicPrefix(r.Context(), orgID, derefOr(req.Transport, ""), *req.PublishTopic); !ok { + httputil.WriteJSONError(w, r, http.StatusBadRequest, modelerrors.ErrValidation, msg, reqID) + return + } + } device, err := h.storage.UpdateScanDevice(r.Context(), orgID, id, req) if err != nil { writeConflictOrInternal(w, r, err, reqID) @@ -202,6 +257,7 @@ func (h *Handler) Update(w http.ResponseWriter, r *http.Request) { httputil.Respond404(w, r, "scan device not found", reqID) return } + h.reconcile(r.Context()) httputil.WriteJSON(w, http.StatusOK, scandevice.ScanDeviceResponse{Data: *device}) } @@ -232,6 +288,7 @@ func (h *Handler) Delete(w http.ResponseWriter, r *http.Request) { httputil.Respond404(w, r, "scan device not found", reqID) return } + h.reconcile(r.Context()) w.WriteHeader(http.StatusNoContent) } diff --git a/backend/internal/handlers/scandevices/scandevices_integration_test.go b/backend/internal/handlers/scandevices/scandevices_integration_test.go index 7641bbd1..124dc9eb 100644 --- a/backend/internal/handlers/scandevices/scandevices_integration_test.go +++ b/backend/internal/handlers/scandevices/scandevices_integration_test.go @@ -12,11 +12,14 @@ import ( "time" "github.com/go-chi/chi/v5" + "github.com/rs/zerolog" "github.com/stretchr/testify/require" "github.com/trakrf/platform/backend/internal/handlers/scandevices" "github.com/trakrf/platform/backend/internal/handlers/scanpoints" "github.com/trakrf/platform/backend/internal/middleware" + "github.com/trakrf/platform/backend/internal/models/scandevice" + "github.com/trakrf/platform/backend/internal/services/topicroute" "github.com/trakrf/platform/backend/internal/testutil" "github.com/trakrf/platform/backend/internal/util/jwt" ) @@ -26,13 +29,19 @@ func withOrg(req *http.Request, orgID int) *http.Request { return req.WithContext(context.WithValue(req.Context(), middleware.UserClaimsKey, claims)) } +// newScanDevicesHandler builds the handler with a live topic registry (TRA-922), +// so the post-CRUD reconcile path is exercised against the test DB. +func newScanDevicesHandler(db *testutil.TestDB) *scandevices.Handler { + return scandevices.NewHandler(db.Store, topicroute.NewRegistry(db.Store, zerolog.Nop())) +} + func TestScanDevicesHandler_RoundTrip(t *testing.T) { db := testutil.SetupTestDBFull(t) orgID := testutil.CreateTestAccount(t, db.AdminPool) r := chi.NewRouter() r.Use(middleware.RequestID) - scandevices.NewHandler(db.Store).RegisterRoutes(r) + newScanDevicesHandler(db).RegisterRoutes(r) scanpoints.NewHandler(db.Store).RegisterRoutes(r) do := func(method, path string, body any) *httptest.ResponseRecorder { @@ -51,7 +60,7 @@ func TestScanDevicesHandler_RoundTrip(t *testing.T) { // Create rec := do(http.MethodPost, "/api/v1/scan-devices", map[string]any{ - "name": "Dock Reader", "type": "csl_cs463", "publish_topic": "trakrf.id/cs463-214/reads", + "name": "Dock Reader", "type": "csl_cs463", "publish_topic": "test-org/cs463-214/reads", }) require.Equal(t, http.StatusCreated, rec.Code, rec.Body.String()) var created struct { @@ -64,7 +73,7 @@ func TestScanDevicesHandler_RoundTrip(t *testing.T) { require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &created)) require.NotZero(t, created.Data.ID) require.Equal(t, "mqtt", created.Data.Transport) - require.Equal(t, "trakrf.id/cs463-214/reads", created.Data.PublishTopic) + require.Equal(t, "test-org/cs463-214/reads", created.Data.PublishTopic) devicePath := "/api/v1/scan-devices/" + itoa(created.Data.ID) // Get @@ -123,6 +132,79 @@ func itoa(i int) string { return string(b) } +// TestScanDevicesHandler_TopicPrefix pins the TRA-922 {org_slug}/ prefix rule: +// new/edited mqtt publish_topics must start with the caller org's identifier; +// grandfathered (unchanged) topics are left alone; web_ble devices are exempt. +func TestScanDevicesHandler_TopicPrefix(t *testing.T) { + db := testutil.SetupTestDBFull(t) + orgID := testutil.CreateTestAccount(t, db.AdminPool) // identifier "test-org" + + r := chi.NewRouter() + r.Use(middleware.RequestID) + newScanDevicesHandler(db).RegisterRoutes(r) + + do := func(orgCtx int, method, path string, body any) *httptest.ResponseRecorder { + var buf bytes.Buffer + if body != nil { + require.NoError(t, json.NewEncoder(&buf).Encode(body)) + } + req := httptest.NewRequest(method, path, &buf) + if body != nil { + req.Header.Set("Content-Type", "application/json") + } + rec := httptest.NewRecorder() + r.ServeHTTP(rec, withOrg(req, orgCtx)) + return rec + } + + // (a) Wrong prefix rejected. + require.Equal(t, http.StatusBadRequest, do(orgID, http.MethodPost, "/api/v1/scan-devices", map[string]any{ + "name": "Bad", "type": "csl_cs463", "publish_topic": "trakrf.id/dock-1/reads", + }).Code) + + // (b) Correct prefix accepted. + rec := do(orgID, http.MethodPost, "/api/v1/scan-devices", map[string]any{ + "name": "Good", "type": "csl_cs463", "publish_topic": "test-org/dock-1/reads", + }) + require.Equal(t, http.StatusCreated, rec.Code, rec.Body.String()) + + // (c) web_ble device with no topic is exempt. + require.Equal(t, http.StatusCreated, do(orgID, http.MethodPost, "/api/v1/scan-devices", map[string]any{ + "name": "Handheld", "type": "csl_cs463", "transport": "web_ble", + }).Code) + + // (d) Grandfathered device (seeded directly with a non-prefixed topic): + // a metadata-only edit succeeds; changing the topic to a bad value is + // rejected; changing it to a conforming value succeeds. + legacyTopic := "trakrf.id/legacy-9/reads" + legacy, err := db.Store.CreateScanDevice(context.Background(), orgID, scandevice.CreateScanDeviceRequest{ + Name: "Legacy", Type: scandevice.DeviceTypeCS463, PublishTopic: &legacyTopic, + }) + require.NoError(t, err) + legacyPath := "/api/v1/scan-devices/" + itoa(legacy.ID) + + require.Equal(t, http.StatusOK, do(orgID, http.MethodPatch, legacyPath, map[string]any{ + "name": "Legacy Renamed", + }).Code, "metadata-only edit must not trigger the prefix check") + + require.Equal(t, http.StatusBadRequest, do(orgID, http.MethodPatch, legacyPath, map[string]any{ + "publish_topic": "trakrf.id/legacy-9/reads-v2", + }).Code, "changing the topic to a non-prefixed value must be rejected") + + require.Equal(t, http.StatusOK, do(orgID, http.MethodPatch, legacyPath, map[string]any{ + "publish_topic": "test-org/legacy-9/reads", + }).Code, "changing the topic to a conforming value must succeed") + + // (e) An org with no identifier cannot set a publish_topic. + var noIDOrg int + require.NoError(t, db.AdminPool.QueryRow(context.Background(), + `INSERT INTO trakrf.organizations (name, identifier, is_active) VALUES ('No Slug', '', true) RETURNING id`, + ).Scan(&noIDOrg)) + require.Equal(t, http.StatusBadRequest, do(noIDOrg, http.MethodPost, "/api/v1/scan-devices", map[string]any{ + "name": "x", "type": "csl_cs463", "publish_topic": "anything/dock/reads", + }).Code) +} + // TestScanPoints_UpdateLocationIDPersists pins the geofence-relevant behavior // for TRA-931: PATCH /scan-points must persist a provided location_id (set it), // and persist an explicit null (clear it). Regression guard for the handler @@ -134,7 +216,7 @@ func TestScanPoints_UpdateLocationIDPersists(t *testing.T) { r := chi.NewRouter() r.Use(middleware.RequestID) - scandevices.NewHandler(db.Store).RegisterRoutes(r) + newScanDevicesHandler(db).RegisterRoutes(r) scanpoints.NewHandler(db.Store).RegisterRoutes(r) do := func(method, path string, body any) *httptest.ResponseRecorder { @@ -153,7 +235,7 @@ func TestScanPoints_UpdateLocationIDPersists(t *testing.T) { // A single-point gateway. Device create auto-provisions scan_point 1. rec := do(http.MethodPost, "/api/v1/scan-devices", map[string]any{ - "name": "Gateway 1", "type": "gl_s10", "publish_topic": "trakrf.id/gw-1/reads", + "name": "Gateway 1", "type": "gl_s10", "publish_topic": "test-org/gw-1/reads", }) require.Equal(t, http.StatusCreated, rec.Code, rec.Body.String()) var dev struct { diff --git a/backend/internal/ingest/config.go b/backend/internal/ingest/config.go index 5a924f3c..52461d1f 100644 --- a/backend/internal/ingest/config.go +++ b/backend/internal/ingest/config.go @@ -6,24 +6,22 @@ import "os" // (keeps local dev, tests, and pre-cutover prod inert). type Config struct { URL string // mqtts://user:pass@host:port (MQTT_URL) - Topic string // subscription filter (MQTT_TOPIC), e.g. trakrf.id/# or $share/grp/trakrf.id/# ClientID string // base client id (MQTT_CLIENT_ID); subscriber appends a per-process suffix } // Enabled reports whether the subscriber should start. func (c Config) Enabled() bool { return c.URL != "" } -// ConfigFromEnv reads the MQTT subscriber config from the environment, applying -// defaults for the topic filter and client id. +// ConfigFromEnv reads the MQTT subscriber config from the environment. +// +// TRA-922: MQTT_TOPIC is retired. The subscriber no longer uses a static +// subscription filter — it subscribes to exactly the registered publish_topics +// (data-driven, via the topicroute registry), so there is no topic to configure. func ConfigFromEnv() Config { c := Config{ URL: os.Getenv("MQTT_URL"), - Topic: os.Getenv("MQTT_TOPIC"), ClientID: os.Getenv("MQTT_CLIENT_ID"), } - if c.Topic == "" { - c.Topic = "trakrf.id/#" - } if c.ClientID == "" { c.ClientID = "trakrf-subscriber" } diff --git a/backend/internal/ingest/config_test.go b/backend/internal/ingest/config_test.go index d6639867..9a94fec5 100644 --- a/backend/internal/ingest/config_test.go +++ b/backend/internal/ingest/config_test.go @@ -13,19 +13,15 @@ func TestConfigEnabled(t *testing.T) { func TestConfigFromEnvDefaults(t *testing.T) { t.Setenv("MQTT_URL", "mqtts://u:p@host:8883") - t.Setenv("MQTT_TOPIC", "") t.Setenv("MQTT_CLIENT_ID", "") c := ConfigFromEnv() assert.Equal(t, "mqtts://u:p@host:8883", c.URL) - assert.Equal(t, "trakrf.id/#", c.Topic) assert.Equal(t, "trakrf-subscriber", c.ClientID) } func TestConfigFromEnvOverrides(t *testing.T) { t.Setenv("MQTT_URL", "mqtts://u:p@host:8883") - t.Setenv("MQTT_TOPIC", "$share/grp/trakrf.id/#") t.Setenv("MQTT_CLIENT_ID", "custom-id") c := ConfigFromEnv() - assert.Equal(t, "$share/grp/trakrf.id/#", c.Topic) assert.Equal(t, "custom-id", c.ClientID) } diff --git a/backend/internal/ingest/subscriber.go b/backend/internal/ingest/subscriber.go index 4ebceb52..814bfab2 100644 --- a/backend/internal/ingest/subscriber.go +++ b/backend/internal/ingest/subscriber.go @@ -10,6 +10,7 @@ import ( "github.com/rs/zerolog" "github.com/trakrf/platform/backend/internal/models/scanread" + "github.com/trakrf/platform/backend/internal/services/topicroute" "github.com/trakrf/platform/backend/internal/storage" ) @@ -33,18 +34,50 @@ type ReadPublisher interface { // Subscriber consumes MQTT reads and derives asset_scans (TRA-900). It is the // observable replacement for the silent process_tag_scans trigger. type Subscriber struct { - cfg Config - store *storage.Storage - eval ReadEvaluator // optional; nil disables geofence evaluation - feed ReadPublisher // optional; nil disables live-feed fan-out - log zerolog.Logger - client mqtt.Client + cfg Config + store *storage.Storage + registry *topicroute.Registry // routing map + subscription set (TRA-922) + eval ReadEvaluator // optional; nil disables geofence evaluation + feed ReadPublisher // optional; nil disables live-feed fan-out + log zerolog.Logger + client mqtt.Client } // NewSubscriber builds a subscriber. It does not connect; call Start. eval and -// feed may each be nil (no geofence evaluation / no live-feed fan-out). -func NewSubscriber(cfg Config, store *storage.Storage, eval ReadEvaluator, feed ReadPublisher, log *zerolog.Logger) *Subscriber { - return &Subscriber{cfg: cfg, store: store, eval: eval, feed: feed, log: log.With().Str("component", "ingest").Logger()} +// feed may each be nil (no geofence evaluation / no live-feed fan-out). The +// subscriber registers itself as the registry's SubscriptionManager so CRUD and +// the reconcile ticker drive its broker subscriptions (TRA-922). +func NewSubscriber(cfg Config, store *storage.Storage, registry *topicroute.Registry, eval ReadEvaluator, feed ReadPublisher, log *zerolog.Logger) *Subscriber { + s := &Subscriber{cfg: cfg, store: store, registry: registry, eval: eval, feed: feed, log: log.With().Str("component", "ingest").Logger()} + registry.SetManager(s) + return s +} + +// Subscribe adds a single topic to the live broker subscription (TRA-922). +// Called by the registry when a scan device is created/updated. While +// disconnected it is a no-op — OnConnect bulk-subscribes the full registry set +// on every (re)connect, which is the source of truth. +func (s *Subscriber) Subscribe(topic string) { + if s.client == nil || !s.client.IsConnected() { + return + } + if tok := s.client.Subscribe(topic, 1, s.handleMessage); tok.Wait() && tok.Error() != nil { + s.log.Error().Err(tok.Error()).Str("topic", topic).Msg("subscribe failed") + return + } + s.log.Info().Str("topic", topic).Msg("subscribed") +} + +// Unsubscribe drops a single topic from the live broker subscription (TRA-922). +func (s *Subscriber) Unsubscribe(topic string) { + if s.client == nil || !s.client.IsConnected() { + return + } + if tok := s.client.Unsubscribe(topic); tok.Wait() && tok.Error() != nil { + s.log.Error().Err(tok.Error()).Str("topic", topic).Msg("unsubscribe failed") + return + } + s.log.Info().Str("topic", topic).Msg("unsubscribed") } // Start begins connecting in the background and returns immediately — it never @@ -66,11 +99,24 @@ func (s *Subscriber) Start() error { SetConnectRetry(true). SetConnectRetryInterval(5 * time.Second). SetOnConnectHandler(func(c mqtt.Client) { - if tok := c.Subscribe(s.cfg.Topic, 1, s.handleMessage); tok.Wait() && tok.Error() != nil { - s.log.Error().Err(tok.Error()).Str("topic", s.cfg.Topic).Msg("subscribe failed") + // TRA-922: subscribe to exactly the registered publish_topics, not a + // static filter. Fires on initial connect AND every reconnect, so the + // full set is (re)subscribed for free. Add/remove between connects is + // handled incrementally by Subscribe/Unsubscribe via the registry. + topics := s.registry.Topics() + if len(topics) == 0 { + s.log.Info().Msg("connected; no registered topics to subscribe") + return + } + filters := make(map[string]byte, len(topics)) + for _, t := range topics { + filters[t] = 1 + } + if tok := c.SubscribeMultiple(filters, s.handleMessage); tok.Wait() && tok.Error() != nil { + s.log.Error().Err(tok.Error()).Int("count", len(topics)).Msg("bulk subscribe failed") return } - s.log.Info().Str("topic", s.cfg.Topic).Msg("subscribed") + s.log.Info().Int("count", len(topics)).Msg("subscribed to registered topics") }). SetConnectionLostHandler(func(_ mqtt.Client, err error) { s.log.Warn().Err(err).Msg("mqtt connection lost; auto-reconnecting") @@ -82,7 +128,7 @@ func (s *Subscriber) Start() error { // whenever the broker is down at boot. The connection is established // asynchronously and self-heals. s.client.Connect() - s.log.Info().Str("client_id", clientID).Str("topic", s.cfg.Topic).Msg("mqtt subscriber connecting") + s.log.Info().Str("client_id", clientID).Msg("mqtt subscriber connecting") return nil } @@ -120,12 +166,19 @@ func (s *Subscriber) handleMessage(_ mqtt.Client, m mqtt.Message) { return } - // 2. Route topic -> org/device (SECURITY DEFINER; no org context yet). - route, found, err := s.store.ResolveScanTopic(ctx, topic) - if err != nil { - s.log.Error().Err(err).Str("topic", topic).Msg("topic resolution failed") - metricMessages.WithLabelValues("resolve_error").Inc() - return + // 2. Route topic -> org/device. Primary path is the in-memory registry (the + // same set we subscribe to). A miss should not normally happen — we only + // receive topics we subscribed to — so fall back to resolve_scan_topic + // (SECURITY DEFINER) defensively, covering any Subscribe/registry race. + route, found := s.registry.Lookup(topic) + if !found { + var err error + route, found, err = s.store.ResolveScanTopic(ctx, topic) + if err != nil { + s.log.Error().Err(err).Str("topic", topic).Msg("topic resolution failed") + metricMessages.WithLabelValues("resolve_error").Inc() + return + } } if !found { s.log.Debug().Str("topic", topic).Msg("unregistered topic; audit kept, no derivation") diff --git a/backend/internal/ingest/subscriber_publish_test.go b/backend/internal/ingest/subscriber_publish_test.go index f5805dee..f47e5a5e 100644 --- a/backend/internal/ingest/subscriber_publish_test.go +++ b/backend/internal/ingest/subscriber_publish_test.go @@ -7,6 +7,7 @@ import ( "github.com/rs/zerolog" "github.com/trakrf/platform/backend/internal/models/scanread" + "github.com/trakrf/platform/backend/internal/services/topicroute" ) // handleMessage isn't unit-tested directly: it depends on the concrete @@ -37,13 +38,14 @@ func TestNewSubscriber_AcceptsReadPublisher(t *testing.T) { var _ ReadPublisher = (*fakePublisher)(nil) log := zerolog.Nop() - sub := NewSubscriber(Config{}, nil, nil, &fakePublisher{}, &log) + reg := topicroute.NewRegistry(nil, log) + sub := NewSubscriber(Config{}, nil, reg, nil, &fakePublisher{}, &log) if sub.feed == nil { t.Fatal("expected feed publisher to be stored on the subscriber") } // nil feed must also be accepted (fan-out disabled). - subNil := NewSubscriber(Config{}, nil, nil, nil, &log) + subNil := NewSubscriber(Config{}, nil, topicroute.NewRegistry(nil, log), nil, nil, &log) if subNil.feed != nil { t.Fatal("expected nil feed when none provided") } diff --git a/backend/internal/models/organization/organization.go b/backend/internal/models/organization/organization.go index b83afd59..0a712325 100644 --- a/backend/internal/models/organization/organization.go +++ b/backend/internal/models/organization/organization.go @@ -42,7 +42,10 @@ type UserOrg struct { type UserOrgWithRole struct { ID int `json:"id"` Name string `json:"name"` - Role string `json:"role"` + // Identifier is the org's globally-unique URL-safe slug. Surfaced so the UI + // can pre-fill the required {org_slug}/ publish_topic prefix (TRA-922). + Identifier string `json:"identifier"` + Role string `json:"role"` } // SetCurrentOrgRequest for POST /users/me/current-org diff --git a/backend/internal/services/orgs/profile_integration_test.go b/backend/internal/services/orgs/profile_integration_test.go new file mode 100644 index 00000000..dbe671e4 --- /dev/null +++ b/backend/internal/services/orgs/profile_integration_test.go @@ -0,0 +1,38 @@ +//go:build integration + +package orgs_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + orgsservice "github.com/trakrf/platform/backend/internal/services/orgs" + "github.com/trakrf/platform/backend/internal/testutil" +) + +// TRA-922: /users/me must surface the current org's identifier (slug) so the UI +// can pre-fill the required {org_slug}/ publish_topic prefix. +func TestGetUserProfile_IncludesCurrentOrgIdentifier(t *testing.T) { + db := testutil.SetupTestDBFull(t) + ctx := context.Background() + orgID := testutil.CreateTestAccount(t, db.AdminPool) // identifier "test-org" + + var userID int + require.NoError(t, db.AdminPool.QueryRow(ctx, + `INSERT INTO trakrf.users (email, name, password_hash, is_superadmin) VALUES ('tra922@t.com', 'TRA922', 'x', false) RETURNING id`, + ).Scan(&userID)) + _, err := db.AdminPool.Exec(ctx, + `INSERT INTO trakrf.org_users (org_id, user_id, role, status) VALUES ($1, $2, 'admin', 'active')`, + orgID, userID) + require.NoError(t, err) + + svc := orgsservice.NewService(db.AdminPool, db.Store, nil) + profile, err := svc.GetUserProfile(ctx, userID) + require.NoError(t, err) + require.NotNil(t, profile.CurrentOrg) + assert.Equal(t, orgID, profile.CurrentOrg.ID) + assert.Equal(t, "test-org", profile.CurrentOrg.Identifier) +} diff --git a/backend/internal/services/orgs/service.go b/backend/internal/services/orgs/service.go index d5b80ecb..c90a42bf 100644 --- a/backend/internal/services/orgs/service.go +++ b/backend/internal/services/orgs/service.go @@ -136,11 +136,18 @@ func (s *Service) GetUserProfile(ctx context.Context, userID int) (*organization if err == nil { for _, org := range orgs { if org.ID == currentOrgID { - profile.CurrentOrg = &organization.UserOrgWithRole{ + cur := &organization.UserOrgWithRole{ ID: org.ID, Name: org.Name, Role: string(role), } + // TRA-922: include the org slug so the UI can pre-fill the + // required {org_slug}/ publish_topic prefix. Best-effort — a + // lookup miss leaves Identifier empty rather than failing /me. + if full, ferr := s.storage.GetOrganizationByID(ctx, currentOrgID); ferr == nil && full != nil { + cur.Identifier = full.Identifier + } + profile.CurrentOrg = cur break } } diff --git a/backend/internal/services/readstream/tracker.go b/backend/internal/services/readstream/tracker.go index 535a8f74..944e16b1 100644 --- a/backend/internal/services/readstream/tracker.go +++ b/backend/internal/services/readstream/tracker.go @@ -22,7 +22,6 @@ package readstream import ( "encoding/json" - "regexp" "sync" "time" @@ -224,7 +223,11 @@ func (t *Tracker) Publish(orgID int, topic string, reads []scanread.Read) { if len(reads) == 0 { return } - key := readerKeyFromTopic(topic) + // TRA-922: the publish_topic is the reader key, used verbatim. Routing is a + // direct topic→device match (resolve_scan_topic), so there is nothing to + // parse out of the topic — one topic is one device is one reader. The + // frontend derives the same key from device.publish_topic for ?reader=. + key := topic now := t.now() t.mu.Lock() @@ -336,15 +339,3 @@ func marshalEvent(oe orgEvent) (Event, bool) { return Event{}, false } } - -var topicRe = regexp.MustCompile(`^trakrf\.id/([^/]+)/reads$`) - -// readerKeyFromTopic extracts the reader key from a `trakrf.id/{key}/reads` -// topic, mirroring the frontend. Non-matching topics fall back to the full topic -// string so the key is never empty. -func readerKeyFromTopic(topic string) string { - if m := topicRe.FindStringSubmatch(topic); m != nil { - return m[1] - } - return topic -} diff --git a/backend/internal/services/readstream/tracker_test.go b/backend/internal/services/readstream/tracker_test.go index 37addeea..8df0ab70 100644 --- a/backend/internal/services/readstream/tracker_test.go +++ b/backend/internal/services/readstream/tracker_test.go @@ -114,7 +114,7 @@ func TestTracker_PublishDeliversUpsert(t *testing.T) { if err := json.Unmarshal(ev.Data, &ts); err != nil { t.Fatalf("bad upsert json: %v", err) } - if ts.EPC != "EPC2" || ts.ReaderKey != "dock-9" || ts.AntennaPort != 3 { + if ts.EPC != "EPC2" || ts.ReaderKey != "trakrf.id/dock-9/reads" || ts.AntennaPort != 3 { t.Fatalf("unexpected upsert tag: %+v", ts) } } @@ -185,7 +185,7 @@ func TestTracker_ScopedSubscriberOnlySeesItsReader(t *testing.T) { tr := NewTracker(idleCfg()) defer tr.Stop() - ch, cancel := tr.Subscribe(7, "dock-1") // scoped to dock-1 + ch, cancel := tr.Subscribe(7, "trakrf.id/dock-1/reads") // scoped to dock-1's topic defer cancel() recv(t, ch, time.Second) // seed @@ -200,7 +200,7 @@ func TestTracker_ScopedSubscriberOnlySeesItsReader(t *testing.T) { if err := json.Unmarshal(ev.Data, &ts); err != nil { t.Fatalf("bad upsert json: %v", err) } - if ev.Type != eventUpsert || ts.EPC != "MINE" || ts.ReaderKey != "dock-1" { + if ev.Type != eventUpsert || ts.EPC != "MINE" || ts.ReaderKey != "trakrf.id/dock-1/reads" { t.Fatalf("scoped session should see only dock-1's read, got %s %+v", ev.Type, ts) } } diff --git a/backend/internal/services/topicroute/registry.go b/backend/internal/services/topicroute/registry.go new file mode 100644 index 00000000..43f7c208 --- /dev/null +++ b/backend/internal/services/topicroute/registry.go @@ -0,0 +1,115 @@ +// Package topicroute owns the in-memory publish_topic -> ScanRoute map used to +// route incoming MQTT reads, AND the broker subscription set those topics imply. +// One structure, two jobs: the set of map keys is exactly the set of topics the +// subscriber subscribes to. Reconcile() re-derives both from the DB, so the +// subscriber subscribes to exactly the registered reads topics instead of +// vacuuming a broker firehose (TRA-922). +package topicroute + +import ( + "context" + "sync" + + "github.com/rs/zerolog" + + "github.com/trakrf/platform/backend/internal/storage" +) + +// TopicLister is the storage dependency (satisfied by *storage.Storage). Kept as +// an interface so the registry is unit-testable without a live DB. +type TopicLister interface { + ListScanTopics(ctx context.Context) (map[string]storage.ScanRoute, error) +} + +// SubscriptionManager applies subscription deltas to the live broker client. +// Implemented by *ingest.Subscriber; nil until a subscriber attaches (when MQTT +// is disabled the registry is map-only and these are never called). +type SubscriptionManager interface { + Subscribe(topic string) + Unsubscribe(topic string) +} + +// Registry is the process-wide topic->route map and subscription set. +type Registry struct { + lister TopicLister + log zerolog.Logger + mu sync.RWMutex + routes map[string]storage.ScanRoute + mgr SubscriptionManager +} + +// NewRegistry builds an empty registry. Call Reconcile to populate it. +func NewRegistry(lister TopicLister, log zerolog.Logger) *Registry { + return &Registry{ + lister: lister, + log: log.With().Str("component", "topicroute").Logger(), + routes: map[string]storage.ScanRoute{}, + } +} + +// SetManager attaches the subscription manager (the MQTT subscriber). Until set, +// Reconcile only maintains the in-memory map. +func (r *Registry) SetManager(m SubscriptionManager) { + r.mu.Lock() + r.mgr = m + r.mu.Unlock() +} + +// Lookup returns the route for a topic from the in-memory map (message path). +func (r *Registry) Lookup(topic string) (storage.ScanRoute, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + rt, ok := r.routes[topic] + return rt, ok +} + +// Topics returns a snapshot of all known topics, for OnConnect bulk-subscribe. +func (r *Registry) Topics() []string { + r.mu.RLock() + defer r.mu.RUnlock() + out := make([]string, 0, len(r.routes)) + for t := range r.routes { + out = append(out, t) + } + return out +} + +// Reconcile re-derives the map from the DB and applies the add/remove deltas to +// the subscription manager (if attached). Safe to call on boot (no manager => +// map-only), on scan-device CRUD, and on a periodic ticker — it converges the +// live subscription set to the registered topics either way. +func (r *Registry) Reconcile(ctx context.Context) error { + fresh, err := r.lister.ListScanTopics(ctx) + if err != nil { + return err + } + var toSub, toUnsub []string + r.mu.Lock() + for topic := range r.routes { + if _, ok := fresh[topic]; !ok { + delete(r.routes, topic) + toUnsub = append(toUnsub, topic) + } + } + for topic, route := range fresh { + if _, ok := r.routes[topic]; !ok { + toSub = append(toSub, topic) + } + r.routes[topic] = route // refresh route even when the topic is unchanged + } + mgr := r.mgr + r.mu.Unlock() + + if mgr != nil { + for _, t := range toSub { + mgr.Subscribe(t) + } + for _, t := range toUnsub { + mgr.Unsubscribe(t) + } + } + if len(toSub) > 0 || len(toUnsub) > 0 { + r.log.Info().Int("added", len(toSub)).Int("removed", len(toUnsub)).Msg("topic registry reconciled") + } + return nil +} diff --git a/backend/internal/services/topicroute/registry_test.go b/backend/internal/services/topicroute/registry_test.go new file mode 100644 index 00000000..8802867c --- /dev/null +++ b/backend/internal/services/topicroute/registry_test.go @@ -0,0 +1,92 @@ +package topicroute + +import ( + "context" + "io" + "testing" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/trakrf/platform/backend/internal/storage" +) + +func testLogger() zerolog.Logger { return zerolog.New(io.Discard) } + +type fakeLister struct { + m map[string]storage.ScanRoute + err error +} + +func (f *fakeLister) ListScanTopics(context.Context) (map[string]storage.ScanRoute, error) { + return f.m, f.err +} + +type fakeMgr struct { + subs []string + unsubs []string +} + +func (f *fakeMgr) Subscribe(t string) { f.subs = append(f.subs, t) } +func (f *fakeMgr) Unsubscribe(t string) { f.unsubs = append(f.unsubs, t) } + +func TestReconcile_AddsAndLooksUp(t *testing.T) { + l := &fakeLister{m: map[string]storage.ScanRoute{ + "org-a/dock-1/reads": {OrgID: 1, ScanDeviceID: 10, DeviceType: "csl_cs463"}, + }} + mgr := &fakeMgr{} + r := NewRegistry(l, testLogger()) + r.SetManager(mgr) + + require.NoError(t, r.Reconcile(context.Background())) + + got, ok := r.Lookup("org-a/dock-1/reads") + assert.True(t, ok) + assert.Equal(t, 10, got.ScanDeviceID) + assert.Equal(t, 1, got.OrgID) + assert.Equal(t, []string{"org-a/dock-1/reads"}, mgr.subs) + assert.Empty(t, mgr.unsubs) +} + +func TestReconcile_RemovesGoneTopics(t *testing.T) { + l := &fakeLister{m: map[string]storage.ScanRoute{"org-a/d/reads": {OrgID: 1, ScanDeviceID: 1}}} + mgr := &fakeMgr{} + r := NewRegistry(l, testLogger()) + r.SetManager(mgr) + require.NoError(t, r.Reconcile(context.Background())) + + l.m = map[string]storage.ScanRoute{} // device deleted + require.NoError(t, r.Reconcile(context.Background())) + + _, ok := r.Lookup("org-a/d/reads") + assert.False(t, ok) + assert.Equal(t, []string{"org-a/d/reads"}, mgr.unsubs) +} + +func TestReconcile_NoDeltaIsQuiet(t *testing.T) { + l := &fakeLister{m: map[string]storage.ScanRoute{"o/d/reads": {ScanDeviceID: 5}}} + mgr := &fakeMgr{} + r := NewRegistry(l, testLogger()) + r.SetManager(mgr) + require.NoError(t, r.Reconcile(context.Background())) + require.NoError(t, r.Reconcile(context.Background())) // identical second pass + + assert.Equal(t, []string{"o/d/reads"}, mgr.subs) // subscribed exactly once + assert.Empty(t, mgr.unsubs) +} + +func TestReconcile_NoManagerIsMapOnly(t *testing.T) { + l := &fakeLister{m: map[string]storage.ScanRoute{"o/d/reads": {ScanDeviceID: 5}}} + r := NewRegistry(l, testLogger()) // no SetManager + require.NoError(t, r.Reconcile(context.Background())) + _, ok := r.Lookup("o/d/reads") + assert.True(t, ok) +} + +func TestTopicsSnapshot(t *testing.T) { + l := &fakeLister{m: map[string]storage.ScanRoute{"a/x/reads": {}, "b/y/reads": {}}} + r := NewRegistry(l, testLogger()) + require.NoError(t, r.Reconcile(context.Background())) + assert.ElementsMatch(t, []string{"a/x/reads", "b/y/reads"}, r.Topics()) +} diff --git a/backend/internal/storage/ingest.go b/backend/internal/storage/ingest.go index aabb9644..f8cce44f 100644 --- a/backend/internal/storage/ingest.go +++ b/backend/internal/storage/ingest.go @@ -35,6 +35,28 @@ func (s *Storage) ResolveScanTopic(ctx context.Context, topic string) (ScanRoute return r, true, nil } +// ListScanTopics returns every live mqtt device's publish_topic mapped to its +// route, for the subscription registry (TRA-922). SECURITY DEFINER under the +// hood, so no org context is needed — same as ResolveScanTopic. +func (s *Storage) ListScanTopics(ctx context.Context) (map[string]ScanRoute, error) { + rows, err := s.pool.Query(ctx, + `SELECT org_id, scan_device_id, device_type, publish_topic FROM trakrf.list_active_scan_topics()`) + if err != nil { + return nil, fmt.Errorf("list scan topics: %w", err) + } + defer rows.Close() + out := map[string]ScanRoute{} + for rows.Next() { + var r ScanRoute + var topic string + if err := rows.Scan(&r.OrgID, &r.ScanDeviceID, &r.DeviceType, &topic); err != nil { + return nil, fmt.Errorf("scan scan-topic row: %w", err) + } + out[topic] = r + } + return out, rows.Err() +} + // InsertRawTagScan appends the raw MQTT message to the tag_scans audit log and // returns the new row id. tag_scans has no RLS, so no org context is needed. func (s *Storage) InsertRawTagScan(ctx context.Context, topic string, payload []byte) (int64, error) { diff --git a/backend/internal/storage/ingest_integration_test.go b/backend/internal/storage/ingest_integration_test.go index 45abddcf..e89621f7 100644 --- a/backend/internal/storage/ingest_integration_test.go +++ b/backend/internal/storage/ingest_integration_test.go @@ -99,6 +99,47 @@ func TestResolveScanTopic_ByPublishTopic(t *testing.T) { assert.Equal(t, scandevice.DeviceTypeCS463, route.DeviceType) } +func TestListScanTopics(t *testing.T) { + db := testutil.SetupTestDBFull(t) + ctx := context.Background() + orgID := testutil.CreateTestAccount(t, db.AdminPool) + + devA := registerDevice(t, db, orgID, "cs463-a") + devB := registerGLS10Device(t, db, orgID, "gls10-b") + // A web_ble (handheld) device has no MQTT topic and must be excluded. + _, err := db.Store.CreateScanDevice(ctx, orgID, scandevice.CreateScanDeviceRequest{ + Name: "Handheld", Type: scandevice.DeviceTypeCS463, Transport: scandevice.TransportWebBLE, + }) + require.NoError(t, err) + + topics, err := db.Store.ListScanTopics(ctx) + require.NoError(t, err) + + rA, okA := topics["trakrf.id/cs463-a/reads"] + require.True(t, okA, "device A topic must be listed") + assert.Equal(t, orgID, rA.OrgID) + assert.Equal(t, devA.ID, rA.ScanDeviceID) + assert.Equal(t, scandevice.DeviceTypeCS463, rA.DeviceType) + + rB, okB := topics["trakrf.id/gls10-b/reads"] + require.True(t, okB, "device B topic must be listed") + assert.Equal(t, devB.ID, rB.ScanDeviceID) + assert.Equal(t, scandevice.DeviceTypeGLS10, rB.DeviceType) + + // Exactly the two mqtt topics — web_ble device excluded. + assert.Len(t, topics, 2) + + // Soft-deleting a device drops it from the active set. + ok, err := db.Store.DeleteScanDevice(ctx, orgID, devA.ID) + require.NoError(t, err) + require.True(t, ok) + topics, err = db.Store.ListScanTopics(ctx) + require.NoError(t, err) + _, stillThere := topics["trakrf.id/cs463-a/reads"] + assert.False(t, stillThere, "soft-deleted device must drop out") + assert.Len(t, topics, 1) +} + func TestResolveScanTopic_UnknownTopic(t *testing.T) { db := testutil.SetupTestDBFull(t) ctx := context.Background() diff --git a/backend/migrations/000021_list_active_scan_topics.down.sql b/backend/migrations/000021_list_active_scan_topics.down.sql new file mode 100644 index 00000000..7266bd93 --- /dev/null +++ b/backend/migrations/000021_list_active_scan_topics.down.sql @@ -0,0 +1 @@ +DROP FUNCTION IF EXISTS trakrf.list_active_scan_topics(); diff --git a/backend/migrations/000021_list_active_scan_topics.up.sql b/backend/migrations/000021_list_active_scan_topics.up.sql new file mode 100644 index 00000000..009c7c4f --- /dev/null +++ b/backend/migrations/000021_list_active_scan_topics.up.sql @@ -0,0 +1,18 @@ +-- TRA-922: full active-topic set for the ingest subscription registry. +-- The subscriber subscribes to exactly the registered publish_topics (data-driven, +-- not a static MQTT_TOPIC filter), so it needs to enumerate every org's live mqtt +-- topics at boot/reconcile. SECURITY DEFINER so the RLS-enforced trakrf-app role +-- can list across orgs with no org context set (same pattern as resolve_scan_topic). +CREATE OR REPLACE FUNCTION trakrf.list_active_scan_topics() +RETURNS TABLE (org_id bigint, scan_device_id bigint, device_type trakrf.scan_device_type, publish_topic text) +LANGUAGE sql +STABLE +SECURITY DEFINER +SET search_path = trakrf, public +AS $$ + SELECT d.org_id, d.id, d.type, d.publish_topic + FROM trakrf.scan_devices d + WHERE d.deleted_at IS NULL + AND d.transport = 'mqtt' + AND d.publish_topic IS NOT NULL; +$$; diff --git a/deploy/edge/.env.example b/deploy/edge/.env.example index f7d75616..d60a75d4 100644 --- a/deploy/edge/.env.example +++ b/deploy/edge/.env.example @@ -4,7 +4,8 @@ POSTGRES_DB=postgres PG_URL=postgres://postgres:CHANGEME@timescaledb:5432/postgres?sslmode=disable&options=-c%20search_path%3Dtrakrf,public # MQTT (local Mosquitto, basic auth) — password must match mosquitto/passwd MQTT_URL=mqtt://trakrf-mqtt:CHANGEME@mosquitto:1883 -MQTT_TOPIC=trakrf.id/# +# MQTT_TOPIC retired (TRA-922): the subscriber subscribes to exactly the +# registered publish_topics (data-driven), so there is no static filter to set. MQTT_CLIENT_ID=trakrf-demo-box # Backend JWT_SECRET=CHANGEME diff --git a/docs/superpowers/plans/2026-06-09-tra-922-mqtt-topic-routing.md b/docs/superpowers/plans/2026-06-09-tra-922-mqtt-topic-routing.md new file mode 100644 index 00000000..4a92189f --- /dev/null +++ b/docs/superpowers/plans/2026-06-09-tra-922-mqtt-topic-routing.md @@ -0,0 +1,481 @@ +# TRA-922 MQTT Topic Routing — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Force an `{org_slug}/` prefix on new `publish_topic`s, and replace the static `MQTT_TOPIC` broker filter with data-driven per-topic subscriptions driven by a `topicroute` registry. + +**Architecture:** A process-wide `topicroute.Registry` owns both the in-memory `publish_topic → ScanRoute` map (message routing) and the broker subscription set. It reconciles against a new `list_active_scan_topics()` SECURITY DEFINER function on boot, on scan-device CRUD, on a periodic ticker, and on every MQTT (re)connect. Scan-device create/edit enforce the `{org_slug}/` prefix at the handler layer (no DB constraint; existing rows grandfathered). + +**Tech Stack:** Go (pgx, paho.mqtt.golang, chi, testify), TimescaleDB/Postgres, React/TypeScript (zustand, vitest). + +**Spec:** `docs/superpowers/specs/2026-06-09-tra-922-mqtt-topic-routing-design.md` + +--- + +### Task 1: `list_active_scan_topics()` DB function + `storage.ListScanTopics` + +**Files:** +- Create: `backend/migrations/000021_list_active_scan_topics.up.sql` +- Create: `backend/migrations/000021_list_active_scan_topics.down.sql` +- Modify: `backend/internal/storage/ingest.go` (add `ListScanTopics`) +- Test: `backend/internal/storage/ingest_integration_test.go` (add `TestListScanTopics`) + +- [ ] **Step 1: Write the migration up/down** + +`000021_list_active_scan_topics.up.sql`: +```sql +-- TRA-922: full active-topic set for the ingest subscription registry. +-- SECURITY DEFINER so the RLS-enforced trakrf-app role can list every org's +-- topics at boot/reconcile with no org context (same pattern as resolve_scan_topic). +CREATE OR REPLACE FUNCTION trakrf.list_active_scan_topics() +RETURNS TABLE (org_id bigint, scan_device_id bigint, device_type trakrf.scan_device_type, publish_topic text) +LANGUAGE sql +STABLE +SECURITY DEFINER +SET search_path = trakrf, public +AS $$ + SELECT d.org_id, d.id, d.type, d.publish_topic + FROM trakrf.scan_devices d + WHERE d.deleted_at IS NULL + AND d.transport = 'mqtt' + AND d.publish_topic IS NOT NULL; +$$; +``` +`000021_list_active_scan_topics.down.sql`: +```sql +DROP FUNCTION IF EXISTS trakrf.list_active_scan_topics(); +``` + +- [ ] **Step 2: Add `ListScanTopics` to storage** (in `ingest.go`, after `ResolveScanTopic`) + +```go +// ListScanTopics returns every live mqtt device's publish_topic mapped to its +// route, for the subscription registry (TRA-922). SECURITY DEFINER, so no org +// context is needed. +func (s *Storage) ListScanTopics(ctx context.Context) (map[string]ScanRoute, error) { + rows, err := s.pool.Query(ctx, + `SELECT org_id, scan_device_id, device_type, publish_topic FROM trakrf.list_active_scan_topics()`) + if err != nil { + return nil, fmt.Errorf("list scan topics: %w", err) + } + defer rows.Close() + out := map[string]ScanRoute{} + for rows.Next() { + var r ScanRoute + var topic string + if err := rows.Scan(&r.OrgID, &r.ScanDeviceID, &r.DeviceType, &topic); err != nil { + return nil, fmt.Errorf("scan scan topic row: %w", err) + } + out[topic] = r + } + return out, rows.Err() +} +``` + +- [ ] **Step 3: Write integration test** (`TestListScanTopics`) — register two mqtt devices (distinct topics) + one web_ble device (no topic); assert the map has exactly the two mqtt topics with correct org/device/type, and excludes the web_ble device. Soft-delete one mqtt device; assert it drops out. Follow the existing `registerDevice`/helpers used by `TestResolveScanTopic_ByPublishTopic`. + +- [ ] **Step 4: Run** `just backend test-integration` (or the integration target) for `TestListScanTopics`. Expected: PASS. + +- [ ] **Step 5: Commit** `feat(tra-922): list_active_scan_topics fn + storage.ListScanTopics` + +--- + +### Task 2: `topicroute.Registry` package + +**Files:** +- Create: `backend/internal/services/topicroute/registry.go` +- Test: `backend/internal/services/topicroute/registry_test.go` + +- [ ] **Step 1: Write failing unit tests** (`registry_test.go`) with a fake lister + fake manager: + +```go +package topicroute + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/trakrf/platform/backend/internal/storage" +) + +type fakeLister struct{ m map[string]storage.ScanRoute; err error } +func (f *fakeLister) ListScanTopics(context.Context) (map[string]storage.ScanRoute, error) { return f.m, f.err } + +type fakeMgr struct{ subs, unsubs []string } +func (f *fakeMgr) Subscribe(t string) { f.subs = append(f.subs, t) } +func (f *fakeMgr) Unsubscribe(t string) { f.unsubs = append(f.unsubs, t) } + +func TestReconcile_AddsAndLooksUp(t *testing.T) { + l := &fakeLister{m: map[string]storage.ScanRoute{"org-a/dock-1/reads": {OrgID: 1, ScanDeviceID: 10, DeviceType: "csl_cs463"}}} + mgr := &fakeMgr{} + r := NewRegistry(l, testLogger()) + r.SetManager(mgr) + assert.NoError(t, r.Reconcile(context.Background())) + got, ok := r.Lookup("org-a/dock-1/reads") + assert.True(t, ok) + assert.Equal(t, 10, got.ScanDeviceID) + assert.Equal(t, []string{"org-a/dock-1/reads"}, mgr.subs) +} + +func TestReconcile_RemovesGoneTopics(t *testing.T) { + l := &fakeLister{m: map[string]storage.ScanRoute{"org-a/d/reads": {OrgID: 1, ScanDeviceID: 1}}} + mgr := &fakeMgr{} + r := NewRegistry(l, testLogger()) + r.SetManager(mgr) + _ = r.Reconcile(context.Background()) + l.m = map[string]storage.ScanRoute{} // device deleted + _ = r.Reconcile(context.Background()) + _, ok := r.Lookup("org-a/d/reads") + assert.False(t, ok) + assert.Equal(t, []string{"org-a/d/reads"}, mgr.unsubs) +} + +func TestReconcile_NoManagerIsMapOnly(t *testing.T) { + l := &fakeLister{m: map[string]storage.ScanRoute{"o/d/reads": {ScanDeviceID: 5}}} + r := NewRegistry(l, testLogger()) // no SetManager + assert.NoError(t, r.Reconcile(context.Background())) + _, ok := r.Lookup("o/d/reads") + assert.True(t, ok) +} + +func TestTopicsSnapshot(t *testing.T) { + l := &fakeLister{m: map[string]storage.ScanRoute{"a/x/reads": {}, "b/y/reads": {}}} + r := NewRegistry(l, testLogger()) + _ = r.Reconcile(context.Background()) + assert.ElementsMatch(t, []string{"a/x/reads", "b/y/reads"}, r.Topics()) +} +``` +(`testLogger()` returns a discard `zerolog.Logger`; define a small helper in the test file.) + +- [ ] **Step 2: Run** `go test ./internal/services/topicroute/...` Expected: FAIL (package missing). + +- [ ] **Step 3: Implement `registry.go`** + +```go +// Package topicroute owns the in-memory publish_topic -> ScanRoute map used to +// route incoming MQTT reads, AND the broker subscription set those topics imply. +// One structure, two jobs: the set of map keys is exactly the set of topics the +// subscriber subscribes to. Reconcile() re-derives both from the DB (TRA-922). +package topicroute + +import ( + "context" + "sync" + + "github.com/rs/zerolog" + "github.com/trakrf/platform/backend/internal/storage" +) + +// TopicLister is the storage dependency (satisfied by *storage.Storage). +type TopicLister interface { + ListScanTopics(ctx context.Context) (map[string]storage.ScanRoute, error) +} + +// SubscriptionManager applies subscription deltas to the live broker client. +// Implemented by *ingest.Subscriber; nil until a subscriber attaches (MQTT off). +type SubscriptionManager interface { + Subscribe(topic string) + Unsubscribe(topic string) +} + +type Registry struct { + lister TopicLister + log zerolog.Logger + mu sync.RWMutex + routes map[string]storage.ScanRoute + mgr SubscriptionManager +} + +func NewRegistry(lister TopicLister, log zerolog.Logger) *Registry { + return &Registry{lister: lister, log: log.With().Str("component", "topicroute").Logger(), routes: map[string]storage.ScanRoute{}} +} + +func (r *Registry) SetManager(m SubscriptionManager) { + r.mu.Lock() + r.mgr = m + r.mu.Unlock() +} + +// Lookup returns the route for a topic from the in-memory map (message path). +func (r *Registry) Lookup(topic string) (storage.ScanRoute, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + rt, ok := r.routes[topic] + return rt, ok +} + +// Topics returns a snapshot of all known topics (for OnConnect bulk-subscribe). +func (r *Registry) Topics() []string { + r.mu.RLock() + defer r.mu.RUnlock() + out := make([]string, 0, len(r.routes)) + for t := range r.routes { + out = append(out, t) + } + return out +} + +// Reconcile re-derives the map from the DB and applies the add/remove deltas to +// the subscription manager (if attached). Safe to call on boot (no manager => +// map-only), on CRUD, and on a ticker. +func (r *Registry) Reconcile(ctx context.Context) error { + fresh, err := r.lister.ListScanTopics(ctx) + if err != nil { + return err + } + var toSub, toUnsub []string + r.mu.Lock() + for topic := range r.routes { + if _, ok := fresh[topic]; !ok { + delete(r.routes, topic) + toUnsub = append(toUnsub, topic) + } + } + for topic, route := range fresh { + if _, ok := r.routes[topic]; !ok { + toSub = append(toSub, topic) + } + r.routes[topic] = route // refresh route even if topic unchanged + } + mgr := r.mgr + r.mu.Unlock() + if mgr != nil { + for _, t := range toSub { + mgr.Subscribe(t) + } + for _, t := range toUnsub { + mgr.Unsubscribe(t) + } + } + if len(toSub) > 0 || len(toUnsub) > 0 { + r.log.Info().Int("added", len(toSub)).Int("removed", len(toUnsub)).Msg("topic registry reconciled") + } + return nil +} +``` + +- [ ] **Step 4: Run** `go test ./internal/services/topicroute/...` Expected: PASS. +- [ ] **Step 5: Commit** `feat(tra-922): topicroute registry (routing map + subscription set)` + +--- + +### Task 3: Subscriber → data-driven subscriptions; retire `MQTT_TOPIC` + +**Files:** +- Modify: `backend/internal/ingest/subscriber.go` +- Modify: `backend/internal/ingest/config.go` +- Modify: `backend/internal/ingest/config_test.go` +- Modify: `backend/internal/cmd/serve/serve.go` (construct registry + ticker; pass registry to subscriber) +- Modify: `deploy/edge/.env.example` + +- [ ] **Step 1: Update `config.go`** — remove the `Topic` field, its env read, and the `trakrf.id/#` default. Keep `URL`, `ClientID`. Update the doc comment to note `MQTT_TOPIC` is retired (subscriptions are data-driven, TRA-922). + +- [ ] **Step 2: Update `config_test.go`** — drop `MQTT_TOPIC` assertions/overrides; keep URL + ClientID default/override tests. Run `go test ./internal/ingest/ -run TestConfig` after Step 6. + +- [ ] **Step 3: Rework `subscriber.go`:** + - `NewSubscriber(cfg Config, store *storage.Storage, registry *topicroute.Registry, eval ReadEvaluator, feed ReadPublisher, log *zerolog.Logger)` — store `registry` on the struct and call `registry.SetManager(s)`. + - Add methods implementing `SubscriptionManager`: + ```go + func (s *Subscriber) Subscribe(topic string) { + if s.client == nil || !s.client.IsConnected() { + return // OnConnect will (re)subscribe the full set from the registry + } + if tok := s.client.Subscribe(topic, 1, s.handleMessage); tok.Wait() && tok.Error() != nil { + s.log.Error().Err(tok.Error()).Str("topic", topic).Msg("subscribe failed") + return + } + s.log.Info().Str("topic", topic).Msg("subscribed") + } + func (s *Subscriber) Unsubscribe(topic string) { + if s.client == nil || !s.client.IsConnected() { + return + } + if tok := s.client.Unsubscribe(topic); tok.Wait() && tok.Error() != nil { + s.log.Error().Err(tok.Error()).Str("topic", topic).Msg("unsubscribe failed") + } + } + ``` + - Replace the `OnConnect` body: bulk-subscribe `s.registry.Topics()` via `SubscribeMultiple`: + ```go + SetOnConnectHandler(func(c mqtt.Client) { + topics := s.registry.Topics() + if len(topics) == 0 { + s.log.Info().Msg("connected; no registered topics to subscribe") + return + } + filters := make(map[string]byte, len(topics)) + for _, t := range topics { + filters[t] = 1 + } + if tok := c.SubscribeMultiple(filters, s.handleMessage); tok.Wait() && tok.Error() != nil { + s.log.Error().Err(tok.Error()).Int("count", len(topics)).Msg("bulk subscribe failed") + return + } + s.log.Info().Int("count", len(topics)).Msg("subscribed to registered topics") + }). + ``` + - In `handleMessage`, replace the `store.ResolveScanTopic` call with a registry lookup + fallback: + ```go + route, ok := s.registry.Lookup(topic) + if !ok { + var found bool + route, found, err = s.store.ResolveScanTopic(ctx, topic) // defensive fallback + if err != nil { /* existing error handling */ } + if !found { /* existing unregistered-topic handling (log + return) */ } + } + ``` + (Adapt to the exact existing variable names/flow at `subscriber.go:124`.) + - Remove logging of `s.cfg.Topic` in `Start()` (no longer a single topic). + +- [ ] **Step 4: Update `serve.go`** — before the `mqttCfg` block, construct the registry unconditionally and do an initial load: + ```go + topicRegistry := topicroute.NewRegistry(store, log) + if err := topicRegistry.Reconcile(ctx); err != nil { + log.Warn().Err(err).Msg("initial topic registry load failed; will retry on ticker") + } + ``` + Inside `if mqttCfg.Enabled()`: pass `topicRegistry` to `ingest.NewSubscriber`, and start a reconcile ticker: + ```go + reconcileStop := make(chan struct{}) + go func() { + t := time.NewTicker(5 * time.Minute) + defer t.Stop() + for { + select { + case <-reconcileStop: + return + case <-t.C: + if err := topicRegistry.Reconcile(ctx); err != nil { + log.Warn().Err(err).Msg("topic registry reconcile failed") + } + } + } + }() + defer close(reconcileStop) + ``` + Keep `topicRegistry` in scope to pass to the scandevices handler in Task 4. + +- [ ] **Step 5: Update `deploy/edge/.env.example`** — remove the `MQTT_TOPIC=trakrf.id/#` line; add a comment: `# MQTT_TOPIC retired (TRA-922): subscriptions are data-driven from registered publish_topics.` + +- [ ] **Step 6: Run** `just backend build` (compiles serve + ingest) and `go test ./internal/ingest/...`. Expected: build OK, config tests PASS. + +- [ ] **Step 7: Commit** `feat(tra-922): data-driven MQTT subscriptions; retire MQTT_TOPIC filter` + +--- + +### Task 4: Prefix validation + registry reconcile on scan-device CRUD + +**Files:** +- Modify: `backend/internal/handlers/scandevices/scandevices.go` (Handler gets registry; validate prefix; reconcile after mutation) +- Modify: `backend/internal/cmd/serve/serve.go` (`NewHandler(store, topicRegistry)`) +- Test: `backend/internal/handlers/scandevices/scandevices_prefix_integration_test.go` (new) + +- [ ] **Step 1: Write failing integration tests** covering: (a) create mqtt device with `publish_topic` not starting `{slug}/` → 422; (b) create with correct prefix → 201; (c) create mqtt device for an org whose `identifier` is empty → 422; (d) PATCH a grandfathered device WITHOUT changing `publish_topic` → 200 (no prefix check); (e) PATCH changing `publish_topic` to a non-prefixed value → 422; (f) create web_ble device with no topic → 201 (no prefix check). Use the existing scandevices integration test harness/org fixtures. + +- [ ] **Step 2: Run** the new test. Expected: FAIL (validation not implemented). + +- [ ] **Step 3: Implement** in `scandevices.go`: + - `Handler` struct gains `registry *topicroute.Registry`; `NewHandler(storage *storage.Storage, registry *topicroute.Registry) *Handler`. + - Add a helper: + ```go + // requireTopicPrefix enforces the {org_slug}/ prefix on a publish_topic + // (TRA-922). Empty topic => allowed (no-topic / grandfather). Returns a + // user-facing message on violation. + func (h *Handler) validateTopicPrefix(ctx context.Context, orgID int, transport, topic string) (string, bool) { + if topic == "" || transport == "web_ble" { + return "", true + } + org, err := h.storage.GetOrganizationByID(ctx, orgID) + if err != nil || org == nil || org.Identifier == "" { + return "organization has no identifier; cannot set a publish_topic", false + } + if !strings.HasPrefix(topic, org.Identifier+"/") { + return "publish_topic must start with \"" + org.Identifier + "/\"", false + } + return "", true + } + ``` + - In `Create`: after `validate.Struct`, resolve transport (`req.Transport`, "" treated as mqtt → not web_ble) and topic (`deref(req.PublishTopic)`); call `validateTopicPrefix`; on failure `httputil.WriteJSONError(w, r, http.StatusUnprocessableEntity, modelerrors.ErrValidation, msg, reqID)`. After a successful create, call `h.reconcile(r.Context())` (best-effort; see Step 4). + - In `Update`: only when `req.PublishTopic != nil` (topic is being changed), validate the new value (transport from `req.Transport` if set, else fetch the existing device's transport via `GetScanDeviceByID`). On failure → 422. After a successful update, `h.reconcile`. + - In `Delete`: after success, `h.reconcile`. + ```go + func (h *Handler) reconcile(ctx context.Context) { + if h.registry == nil { + return + } + if err := h.registry.Reconcile(ctx); err != nil { + // best-effort: the periodic ticker backstops; the mutation already succeeded + _ = err + } + } + ``` + - Confirm `modelerrors.ErrValidation` exists (else use the validation error constant already used by `RespondValidationError`). + +- [ ] **Step 4: Update `serve.go`** — `scanDevicesHandler := scandeviceshandler.NewHandler(store, topicRegistry)`. + +- [ ] **Step 5: Run** `just backend build` + the new integration test + existing `TestScanDevice_*`. Expected: PASS. + +- [ ] **Step 6: Commit** `feat(tra-922): enforce {org_slug}/ publish_topic prefix; reconnect registry on CRUD` + +--- + +### Task 5: Expose current org `identifier` on `/users/me` + +**Files:** +- Modify: `backend/internal/models/organization/organization.go` (`UserOrgWithRole.Identifier`) +- Modify: `backend/internal/services/orgs/service.go` (populate it) +- Test: `backend/internal/services/orgs/service_test.go` (assert identifier present) OR a handler test that already exercises `/users/me`. + +- [ ] **Step 1: Write/extend a failing test** asserting `profile.CurrentOrg.Identifier` equals the org's identifier. +- [ ] **Step 2: Run** → FAIL. +- [ ] **Step 3: Implement** — add `Identifier string \`json:"identifier"\`` to `UserOrgWithRole`. In `service.go` where `CurrentOrg` is built (around line 139), fetch the identifier: call `s.storage.GetOrganizationByID(ctx, currentOrgID)` and set `Identifier: org.Identifier` (the `org` in the loop is a `UserOrg` with only id/name, so use the storage fetch). Guard nil. +- [ ] **Step 4: Run** → PASS. +- [ ] **Step 5: Commit** `feat(tra-922): include org identifier in /users/me current_org` + +--- + +### Task 6: Backend `readerKeyFromTopic` — relax the fixed root + +**Files:** +- Modify: `backend/internal/services/readstream/tracker.go:340` +- Test: `backend/internal/services/readstream/tracker_test.go` + +- [ ] **Step 1: Add a failing test** asserting `readerKeyFromTopic("organized-chaos/dock-1/reads") == "dock-1"` and the grandfathered `readerKeyFromTopic("trakrf.id/dock-1/reads") == "dock-1"` still holds. (`readerKeyFromTopic` is unexported — add the test in-package.) +- [ ] **Step 2: Run** → FAIL on the new-scheme case. +- [ ] **Step 3: Change** the regex `^trakrf\.id/([^/]+)/reads$` → `^[^/]+/([^/]+)/reads$`. Update the doc comment to "extracts the reader key from a `{prefix}/{key}/reads` topic". +- [ ] **Step 4: Run** → PASS. +- [ ] **Step 5: Commit** `feat(tra-922): readerKeyFromTopic accepts any prefix segment` + +--- + +### Task 7: Frontend — prefill + relaxed topic parsing + +**Files:** +- Modify: `frontend/src/types/org/index.ts` (`UserOrgWithRole.identifier`) +- Modify: `frontend/src/lib/scandevices/deviceProfile.ts` (`TOPIC_RE`) +- Modify: `frontend/src/lib/scandevices/deviceProfile.test.ts` +- Modify: `frontend/src/components/scandevices/ScanDeviceForm.tsx` (prefill + help text) + +- [ ] **Step 1: Add `identifier: string` to `UserOrgWithRole`** in `types/org/index.ts`. +- [ ] **Step 2: Relax `TOPIC_RE`** to `/^[^/]+\/([^/]+)\/reads$/`; update the doc comment. Add/extend `deviceProfile.test.ts`: `readerKeyFromTopic('organized-chaos/dock-1/reads')` → `'dock-1'`; grandfathered `trakrf.id/dock-1/reads` → `'dock-1'`. Run `pnpm test deviceProfile`. +- [ ] **Step 3: Prefill in `ScanDeviceForm.tsx`** — read the current org slug from the store (`useOrgStore((s) => s.currentOrg?.identifier)`). On create mode (no `device`), when transport is `mqtt` and `publish_topic` is empty, initialize it to `${slug}/`. Update the placeholder to `e.g., ${slug}/dock-reader-1/reads` (fallback to the generic example if slug is unavailable) and the help text to mention the required `{org}/` prefix. Do NOT prefill in edit mode (grandfathered values must be shown verbatim). +- [ ] **Step 4: Run** `just frontend test` (deviceProfile + any ScanDeviceForm tests) and `just frontend lint`. Expected: PASS. +- [ ] **Step 5: Commit** `feat(tra-922): prefill {org_slug}/ in scan-device form; relaxed topic parse` + +--- + +### Task 8: Full validation, Linear, PR + +- [ ] **Step 1:** `just lint` (backend + frontend). Fix any findings. +- [ ] **Step 2:** `just test` (backend unit + frontend) and the integration suite. Confirm green; capture output. +- [ ] **Step 3:** If the OpenAPI spec is generated and `/users/me` schema changed, run `just backend api-spec` and commit drift. +- [ ] **Step 4:** Push the branch; open a PR (merge-commit, not squash) titled `feat(tra-922): MQTT topic routing — {org_slug}/ prefix + data-driven subscriptions`. Body: summary, the four decisions, test evidence, and an explicit **infra/ops note**: the live edge box `.env` should drop `MQTT_TOPIC` (subscriptions are now data-driven); grandfathered demo topics keep working unchanged. +- [ ] **Step 5:** Comment on TRA-922 in Linear with the PR link and the grandfather/edge-env note. **Hold for review — do not merge.** + +--- + +## Self-review notes +- **Spec coverage:** D1 prefix → Task 4 (+ frontend prefill Task 7, slug exposure Task 5); D2 immutability → no code (documented); D3 grandfather → Task 4 (validate only on change) + no data migration; D4 data-driven subs → Tasks 1–3; readerKey consequence → Tasks 6–7; performance/cache → the registry map (Task 2), no negative cache needed. +- **Type consistency:** `storage.ScanRoute{OrgID,ScanDeviceID,DeviceType}` reused throughout (ints, matches `ingest.go`). Registry depends on `TopicLister` (interface) + `SubscriptionManager` (interface, implemented by `*ingest.Subscriber`). No `topicroute → ingest` import (no cycle). +- **Build order:** signature changes (`NewSubscriber`, `NewHandler`) land with their `serve.go` updates in the same task to keep the tree compiling. diff --git a/docs/superpowers/specs/2026-06-09-tra-922-mqtt-topic-routing-design.md b/docs/superpowers/specs/2026-06-09-tra-922-mqtt-topic-routing-design.md new file mode 100644 index 00000000..f7cf04ce --- /dev/null +++ b/docs/superpowers/specs/2026-06-09-tra-922-mqtt-topic-routing-design.md @@ -0,0 +1,128 @@ +# TRA-922 — MQTT topic routing: org-slug-prefixed `publish_topic` + data-driven subscriptions + +**Date:** 2026-06-09 +**Ticket:** [TRA-922](https://linear.app/trakrf/issue/TRA-922) (High, In Progress) +**Branch:** `feat/tra-922-mqtt-topic-org-slug-prefix` +**Related:** TRA-900 (ingest subscriber + `resolve_scan_topic`), TRA-956 (match by `(device, antenna_port)`, dropped `external_key`), TRA-857 (per-topic ACLs, out of scope), TRA-907 (multi-replica `$share`, deferred) + +## Problem + +Read ingestion routes each MQTT message via `resolve_scan_topic` — a DB lookup of `scan_devices.publish_topic`. The broker-side subscription is a single static filter (`MQTT_TOPIC`, deployed as `trakrf.id/+/reads` via infra #147), a leftover from when the middle topic segment was the org slug used to route by org. That org-slug routing is gone. Two problems fall out: + +1. **The coarse filter silently constrains a "free-form" field.** Any `publish_topic` not matching `trakrf.id/{one-segment}/reads` is never delivered and is silently dropped. +2. **Cross-org collision is latent.** The unique index is `(org_id, publish_topic)` — only per-org. Two orgs could both register `trakrf.id/dock-1/reads`, making topic→device routing ambiguous. + +## Decisions (resolved with Mike, 2026-06-09) + +- **D1 — Topic scheme: slug-as-root.** New `publish_topic` values are forced to start with `{org_slug}/` where `org_slug = organizations.identifier` (globally unique, treated as immutable — no rename path exists in code). Drop the `trakrf.id/` literal root. Example: `organized-chaos/dock-1/reads`. The globally-unique slug prefix makes the existing per-org `(org_id, publish_topic)` index effectively global for conforming rows — cross-org collision becomes structurally impossible. `{org_slug}/#` is a ready-made per-tenant ACL scope for TRA-857. +- **D2 — Org-slug mutability: non-issue.** `organizations.identifier` is `UNIQUE` and has no rename path in the codebase; treated as immutable. No cascade logic needed. +- **D3 — Existing rows: grandfather.** No data migration. Prefix is enforced only on create and on edit-that-changes-the-topic. Existing demo rows (`trakrf.id/C4DEE229A176/reads`, MK107, etc.) are left untouched and keep working. +- **D4 — Broker filtering: data-driven per-topic subscriptions (not a firehose).** Instead of subscribing to `#` and re-filtering in-app, the backend subscribes to exactly the set of registered `publish_topic`s. The broker delivers only registered reads topics — no Shelly command/status noise, no firehose, no negative cache. The static `MQTT_TOPIC` filter retires entirely (data-driven, not pattern-driven), so there is **no cloud infra hand-off** for a chart `mqtt.topic` value. This keeps `publish_topic` truly free-form (any depth/shape) and aligns 1:1 with per-topic ACLs (TRA-857). + +### Approaches considered for D4 + +| Approach | Subscription | Broker delivers | Verdict | +|---|---|---|---| +| A. `#` firehose + in-app negative cache | one static `#` | all broker traffic | Rejected — re-implements broker filtering; needs negative cache to survive junk; dirtiest | +| B. One structural filter `+/+/reads` | one static filter | 3-segment `.../reads` only | Rejected — re-imposes fixed structure (no `{org}/bldg-a/dock-1/reads`); kills free-form | +| **C. Per-topic exact (chosen)** | one exact sub per device, managed dynamically | only registered reads topics | **Chosen** — broker filters precisely; free-form preserved; subscription set *is* the routing map; retires `MQTT_TOPIC`; ACL-aligned | + +## Architecture + +### New component: `internal/services/topicroute` registry + +A process-wide registry that owns (a) the in-memory `publish_topic → Route` map used to route incoming messages and (b) the broker subscription set. One structure, two jobs — there is no separate cache. + +``` +type Route struct { OrgID int; ScanDeviceID int64; DeviceType string } + +type SubscriptionManager interface { // implemented by *ingest.Subscriber + Subscribe(topic string) + Unsubscribe(topic string) +} + +type Registry struct { + store *storage.Storage + mu sync.RWMutex + routes map[string]Route // publish_topic -> route + mgr SubscriptionManager // nil until a subscriber attaches (MQTT off => nil) +} + +func NewRegistry(store *storage.Storage) *Registry +func (r *Registry) SetManager(m SubscriptionManager) // wired after subscriber built +func (r *Registry) Load(ctx context.Context) error // bulk-populate from DB +func (r *Registry) Lookup(topic string) (Route, bool) // message-path routing +func (r *Registry) Topics() []string // snapshot for OnConnect bulk-subscribe +func (r *Registry) Add(route Route, topic string) // CRUD create/update-new: map + mgr.Subscribe +func (r *Registry) Remove(topic string) // CRUD delete/update-old: map + mgr.Unsubscribe +func (r *Registry) Reconcile(ctx context.Context) error // periodic: re-Load, diff, sub/unsub deltas +``` + +- `Add`/`Remove` mutate the map under the lock and, if a manager is attached, call `mgr.Subscribe`/`Unsubscribe`. With MQTT off (no manager) they keep the map current and are subscription no-ops. +- Boundaries: `topicroute` imports only `storage`. `ingest` and `scandevices` import `topicroute`. The `SubscriptionManager` interface is defined in `topicroute` and implemented by `*ingest.Subscriber` — no import cycle (no `topicroute → ingest` edge). + +### New DB function (migration `000021`): `trakrf.list_active_scan_topics()` + +The subscriber runs as the RLS-enforced `trakrf-app` role with no org context, so it cannot list every org's devices directly. A `SECURITY DEFINER` function returns the full active set for bootstrap + reconcile, mirroring `resolve_scan_topic`: + +```sql +CREATE OR REPLACE FUNCTION trakrf.list_active_scan_topics() +RETURNS TABLE (org_id bigint, scan_device_id bigint, device_type trakrf.scan_device_type, publish_topic text) +LANGUAGE sql STABLE SECURITY DEFINER SET search_path = trakrf, public +AS $$ + SELECT d.org_id, d.id, d.type, d.publish_topic + FROM trakrf.scan_devices d + WHERE d.deleted_at IS NULL + AND d.transport = 'mqtt' + AND d.publish_topic IS NOT NULL; +$$; +``` + +`resolve_scan_topic` is **kept** as the message-path fallback (registry `Lookup` miss → `resolve_scan_topic`), a defensive backstop for any race between Subscribe and map population. `.down.sql` drops `list_active_scan_topics`. + +### Subscriber changes (`internal/ingest`) + +- `NewSubscriber` gains a `*topicroute.Registry` param; the subscriber registers itself via `registry.SetManager(s)` and implements `Subscribe`/`Unsubscribe` against the live `mqtt.Client` (best-effort: a Subscribe while disconnected just logs; `OnConnect` reconciles). +- `OnConnect` no longer subscribes a static `s.cfg.Topic`. It bulk-subscribes `registry.Topics()` via `client.SubscribeMultiple` (fires on initial connect and every reconnect — handles resubscribe for free). +- `handleMessage` routes via `registry.Lookup(topic)`; on miss, falls back to `store.ResolveScanTopic`. Everything downstream (Parse → PersistReads → geofence → feed) is unchanged. +- A periodic `Reconcile` (ticker, ~5 min) is started alongside the subscriber as a safety net for missed CRUD events / direct DB edits / future multi-replica drift. +- `MQTT_TOPIC` retired: remove `Topic` from the subscribe path in `Config`/`ConfigFromEnv`; keep `MQTT_URL`, `MQTT_CLIENT_ID`. Update `config_test.go`. Remove the `MQTT_TOPIC` line from `deploy/edge/.env.example`. + +### CRUD changes (`internal/handlers/scandevices` + `internal/storage/scan_devices`) + +- **Prefix validation (D1).** On create and on update-that-changes `publish_topic` (mqtt transport): fetch the caller's org via `storage.GetOrganizationByID(GetRequestOrgID(r)).Identifier` and require `publish_topic` to start with `{identifier}/`. Reject with **422** + a clear message otherwise. Org with NULL `identifier` → 422 (can't form a valid topic). Update with an **unchanged** `publish_topic` is left alone (grandfather, D3). +- **Registry wiring.** Handler gains a `*topicroute.Registry`. After a successful create → `registry.Add`; successful delete → `registry.Remove`; update with topic change → `registry.Remove(old)` + `registry.Add(new)`. Non-mqtt (web_ble) devices have no topic → skipped. + +### Frontend (`frontend/src/components/scandevices` + `lib/scandevices`) + +- **Prefill (D1).** On create, prefill the `publish_topic` input with `{org_slug}/` from the existing frontend org context; update placeholder/help text to the new scheme. +- **`readerKeyFromTopic` (`deviceProfile.ts`) + backend `broadcaster.go`.** Relax the hardcoded `trakrf.id/` first segment to a wildcard so `{seg}/{key}/reads` extracts the key for both grandfathered and new topics; fall back to the whole topic for non-conforming shapes. (Live Reads correlation only; org-scoped SSE already isolates cross-org key collisions.) + +### Composition (`internal/cmd/serve/serve.go`) + +- Construct `registry := topicroute.NewRegistry(store)` **unconditionally** (the handler always needs it) and call `registry.Load(ctx)` before the MQTT block. +- Inside `if mqttCfg.Enabled()`: pass `registry` to `NewSubscriber` (which calls `registry.SetManager(self)`); start the reconcile ticker; `defer` its stop. +- Pass `registry` to `scandeviceshandler.NewHandler(store, registry)`. + +## Testing (TDD) + +- `topicroute` unit tests: `Load` populates from a fake/seeded store; `Lookup` hit/miss; `Add`/`Remove` mutate the map **and** call a fake `SubscriptionManager`'s Subscribe/Unsubscribe; `Reconcile` computes correct sub/unsub deltas; no-manager (MQTT-off) path is a subscription no-op. +- Storage integration test for `list_active_scan_topics()` (returns active mqtt rows across orgs; excludes deleted / web_ble / null-topic). +- CRUD integration tests: create with conforming prefix → ok + registry.Add called; create with wrong/missing prefix → 422; create for NULL-identifier org → 422; update unchanged topic on a grandfathered row → ok (no prefix check); update to non-conforming topic → 422; delete → registry.Remove called. +- `ingest` test: `handleMessage` routes via registry hit without touching the DB; registry miss falls back to `resolve_scan_topic`. +- `config_test.go`: `MQTT_TOPIC` retired (no static topic in subscribe path). +- Frontend: `readerKeyFromTopic` extracts key for `{org_slug}/{key}/reads` and grandfathered `trakrf.id/{key}/reads`; `ScanDeviceForm` prefills `{org_slug}/` on create. + +## Out of scope + +- Per-topic ACLs / per-device broker creds → TRA-857 (this sets up the `{org_slug}/#` namespace). +- Multi-replica ingest → TRA-907 (would use `$share/grp/{topic}` shared subscriptions; the registry/reconcile model composes with it). +- Data migration / re-provisioning of existing rows (grandfathered, D3). +- GL-S10/reader command channels (separate follow-up). + +## Risks / notes + +- **Subscribe-before-map race:** mitigated by ordering (map update precedes Subscribe in `Add`) and the `resolve_scan_topic` fallback on `Lookup` miss. +- **Disconnected Subscribe:** best-effort; `OnConnect` bulk-subscribe from `registry.Topics()` is the source of truth on every (re)connect. +- **Reconcile cost:** one `list_active_scan_topics` query every ~5 min; negligible. +- **No DB-level prefix enforcement:** a CHECK can't join to `organizations` and a trigger would break grandfathered edits, so the prefix is enforced in the app layer only (consistent with D3). diff --git a/frontend/src/components/scandevices/ScanDeviceForm.test.tsx b/frontend/src/components/scandevices/ScanDeviceForm.test.tsx new file mode 100644 index 00000000..809d8b71 --- /dev/null +++ b/frontend/src/components/scandevices/ScanDeviceForm.test.tsx @@ -0,0 +1,61 @@ +import '@testing-library/jest-dom'; +import { describe, it, expect, vi, afterEach } from 'vitest'; +import { render, screen, cleanup } from '@testing-library/react'; +import { ScanDeviceForm } from './ScanDeviceForm'; +import { useOrgStore } from '@/stores/orgStore'; + +afterEach(cleanup); + +function setOrg(identifier: string | null) { + useOrgStore.setState({ + currentOrg: identifier === null ? null : { id: 1, name: 'Org', identifier, role: 'admin' }, + }); +} + +describe('ScanDeviceForm publish_topic prefill (TRA-922)', () => { + const noop = vi.fn(); + + it('pre-fills the create form with the {org_slug}/ prefix', () => { + setOrg('organized-chaos'); + render(); + const input = screen.getByLabelText(/publish topic/i) as HTMLInputElement; + expect(input.value).toBe('organized-chaos/'); + }); + + it('leaves the create form empty when no org slug is available', () => { + setOrg(null); + render(); + const input = screen.getByLabelText(/publish topic/i) as HTMLInputElement; + expect(input.value).toBe(''); + }); + + it('does not prefix in edit mode — the stored topic shows verbatim', () => { + setOrg('organized-chaos'); + render( + + ); + const input = screen.getByLabelText(/publish topic/i) as HTMLInputElement; + expect(input.value).toBe('trakrf.id/legacy/reads'); + }); +}); diff --git a/frontend/src/components/scandevices/ScanDeviceForm.tsx b/frontend/src/components/scandevices/ScanDeviceForm.tsx index 791068d5..ec6b775d 100644 --- a/frontend/src/components/scandevices/ScanDeviceForm.tsx +++ b/frontend/src/components/scandevices/ScanDeviceForm.tsx @@ -1,5 +1,6 @@ import { useState, useEffect, FormEvent } from 'react'; import { validateName } from '@/lib/location/validators'; +import { useOrgStore } from '@/stores/orgStore'; import type { ScanDevice, ScanDeviceType, @@ -67,6 +68,10 @@ export function ScanDeviceForm({ const [formData, setFormData] = useState(EMPTY_FORM); const [fieldErrors, setFieldErrors] = useState({}); + // TRA-922: every publish_topic must start with {org_slug}/. Pre-fill the + // create form with that prefix so operators only type the device-specific tail. + const orgSlug = useOrgStore((s) => s.currentOrg?.identifier ?? ''); + useEffect(() => { if (mode === 'edit' && device) { setFormData({ @@ -80,9 +85,9 @@ export function ScanDeviceForm({ is_active: device.is_active, }); } else if (mode === 'create') { - setFormData(EMPTY_FORM); + setFormData({ ...EMPTY_FORM, publish_topic: orgSlug ? `${orgSlug}/` : '' }); } - }, [mode, device]); + }, [mode, device, orgSlug]); const validateForm = (): boolean => { const errors: FieldErrors = {}; @@ -233,13 +238,18 @@ export function ScanDeviceForm({ onChange={(e) => handleChange('publish_topic', e.target.value)} disabled={loading} className={inputClass(!!fieldErrors.publish_topic)} - placeholder="e.g., trakrf.id/dock-reader-1/reads" + placeholder={orgSlug ? `e.g., ${orgSlug}/dock-reader-1/reads` : 'e.g., your-org/dock-reader-1/reads'} /> {fieldErrors.publish_topic && (

{fieldErrors.publish_topic}

)}

The MQTT topic this device publishes reads on — the routing key that ties wire traffic to this reader. + {orgSlug && ( + <> + {' '}Must start with {orgSlug}/. + + )}

diff --git a/frontend/src/components/scandevices/ScanDeviceFormModal.test.tsx b/frontend/src/components/scandevices/ScanDeviceFormModal.test.tsx index 869826ee..924e2e63 100644 --- a/frontend/src/components/scandevices/ScanDeviceFormModal.test.tsx +++ b/frontend/src/components/scandevices/ScanDeviceFormModal.test.tsx @@ -162,8 +162,8 @@ describe('ScanDeviceFormModal', () => { ); - // publish_topic is trakrf.id/dock_reader_1/reads → key dock_reader_1. - expect(screen.getByTestId('scoped-feed')).toHaveTextContent('feed:dock_reader_1'); + // publish_topic is the reader key verbatim (TRA-922: no parsing). + expect(screen.getByTestId('scoped-feed')).toHaveTextContent('feed:trakrf.id/dock_reader_1/reads'); }); it('does not show the commissioning sections in create mode', () => { @@ -191,7 +191,7 @@ describe('ScanDeviceFormModal', () => { ).toBeInTheDocument(); // …along with the scoped commissioning sections… expect(screen.getByTestId('reader-points')).toHaveTextContent('points:csl_cs463'); - expect(screen.getByTestId('scoped-feed')).toHaveTextContent('feed:dock_reader_1'); + expect(screen.getByTestId('scoped-feed')).toHaveTextContent('feed:trakrf.id/dock_reader_1/reads'); // …but the modal chrome (backdrop close button + title bar) is gone. expect(screen.queryByLabelText('Close modal')).not.toBeInTheDocument(); expect( @@ -223,7 +223,7 @@ describe('ScanDeviceFormModal', () => { expect( screen.getByRole('button', { name: /Update Scan Device/i }) ).toBeInTheDocument(); - expect(screen.getByTestId('scoped-feed')).toHaveTextContent('feed:dock_reader_1'); + expect(screen.getByTestId('scoped-feed')).toHaveTextContent('feed:trakrf.id/dock_reader_1/reads'); expect(screen.queryByLabelText('Close modal')).not.toBeInTheDocument(); }); }); diff --git a/frontend/src/lib/scandevices/deviceProfile.test.ts b/frontend/src/lib/scandevices/deviceProfile.test.ts index 25f37344..fb0958cd 100644 --- a/frontend/src/lib/scandevices/deviceProfile.test.ts +++ b/frontend/src/lib/scandevices/deviceProfile.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect } from 'vitest'; -import { deviceProfile, readerKeyFromTopic, readerKeyForDevice } from './deviceProfile'; +import { deviceProfile, readerKeyForDevice } from './deviceProfile'; import type { ScanDevice } from '@/types/scandevices'; function device(overrides: Partial): ScanDevice { @@ -46,21 +46,13 @@ describe('deviceProfile', () => { }); }); -describe('readerKeyFromTopic', () => { - it('extracts the {key} segment from a standard reads topic', () => { - expect(readerKeyFromTopic('trakrf.id/dock-7/reads')).toBe('dock-7'); - }); - - it('falls back to the full topic for non-matching strings', () => { - expect(readerKeyFromTopic('weird/topic')).toBe('weird/topic'); - }); -}); - describe('readerKeyForDevice', () => { - it('derives the key from publish_topic when present', () => { + // TRA-922: the reader key is the publish_topic verbatim (direct topic→device + // match; nothing is parsed out of it), matching how the backend keys reads. + it('uses the publish_topic verbatim as the key', () => { expect( - readerKeyForDevice(device({ publish_topic: 'trakrf.id/custom-key/reads' })) - ).toBe('custom-key'); + readerKeyForDevice(device({ publish_topic: 'organized-chaos/custom-key/reads' })) + ).toBe('organized-chaos/custom-key/reads'); }); // TRA-956: external_key is gone — a device with no publish_topic has no diff --git a/frontend/src/lib/scandevices/deviceProfile.ts b/frontend/src/lib/scandevices/deviceProfile.ts index f876ec3b..40cfa03b 100644 --- a/frontend/src/lib/scandevices/deviceProfile.ts +++ b/frontend/src/lib/scandevices/deviceProfile.ts @@ -29,26 +29,14 @@ export function deviceProfile(device: Pick): D return 'single_point'; } -const TOPIC_RE = /^trakrf\.id\/([^/]+)\/reads$/; - -/** - * Extract the reader key from a `trakrf.id/{key}/reads` topic, mirroring the - * backend's readerKeyFromTopic (broadcaster.go). Non-matching topics fall back - * to the full string so the key is never empty. - */ -export function readerKeyFromTopic(topic: string): string { - const m = TOPIC_RE.exec(topic); - return m ? m[1] : topic; -} - /** - * The reader key the live feed tags this device's reads with. The backend - * derives readerKey from the topic a device publishes on, so we derive the same - * key from publish_topic. A device with no publish_topic has no live-feed key. + * The reader key the live feed tags this device's reads with. TRA-922: routing + * is a direct topic→device match (no parsing), so the reader key is simply the + * device's publish_topic used verbatim — the same string the backend keys reads + * by. A device with no publish_topic has no live-feed key. */ export function readerKeyForDevice( device: Pick ): string { - const topic = device.publish_topic?.trim(); - return topic ? readerKeyFromTopic(topic) : ''; + return device.publish_topic?.trim() ?? ''; } diff --git a/frontend/src/types/org/index.ts b/frontend/src/types/org/index.ts index f10274ea..6a5bcf21 100644 --- a/frontend/src/types/org/index.ts +++ b/frontend/src/types/org/index.ts @@ -20,6 +20,8 @@ export interface UserOrg { export interface UserOrgWithRole { id: number; name: string; + /** Globally-unique URL-safe slug; used to pre-fill the {org_slug}/ publish_topic prefix (TRA-922). */ + identifier: string; role: OrgRole; }