Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions backend/internal/cmd/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/trakrf/platform/backend/internal/services/email"
orgsservice "github.com/trakrf/platform/backend/internal/services/orgs"
readstreamsvc "github.com/trakrf/platform/backend/internal/services/readstream"
"github.com/trakrf/platform/backend/internal/services/topicroute"
"github.com/trakrf/platform/backend/internal/storage"
"github.com/trakrf/platform/backend/internal/util/jwt"
)
Expand Down Expand Up @@ -103,6 +104,15 @@ func Run(ctx context.Context, info buildinfo.Info, frontendFS fs.FS) error {
readBroadcaster := readstreamsvc.New()
defer readBroadcaster.Stop()

// TRA-922: the topic registry owns the publish_topic→route map (message
// routing) and the broker subscription set. Constructed unconditionally so
// the scan-device CRUD handler can keep it current even when ingestion is off;
// the subscriber attaches as its SubscriptionManager when MQTT is enabled.
topicRegistry := topicroute.NewRegistry(store, *log)
if err := topicRegistry.Reconcile(ctx); err != nil {
log.Warn().Err(err).Msg("initial topic registry load failed; ticker will retry")
}

mqttCfg := ingest.ConfigFromEnv()
var alarmDispatcher alarm.Dispatcher
if mqttCfg.Enabled() {
Expand All @@ -120,13 +130,33 @@ func Run(ctx context.Context, info buildinfo.Info, frontendFS fs.FS) error {
geofenceEngine.Start()
defer geofenceEngine.Stop()

subscriber := ingest.NewSubscriber(mqttCfg, store, geofenceEngine, readBroadcaster, log)
subscriber := ingest.NewSubscriber(mqttCfg, store, topicRegistry, geofenceEngine, readBroadcaster, log)
if err := subscriber.Start(); err != nil {
log.Error().Err(err).Msg("Failed to start MQTT subscriber")
return err
}
defer subscriber.Stop()
log.Info().Msg("MQTT subscriber started")

// TRA-922: periodic reconcile is the safety net for missed CRUD events,
// direct DB edits, and future multi-replica drift; CRUD reconciles inline
// and OnConnect bulk-subscribes, so this only catches the gaps.
reconcileStop := make(chan struct{})
go func() {
t := time.NewTicker(5 * time.Minute)
defer t.Stop()
for {
select {
case <-reconcileStop:
return
case <-t.C:
if err := topicRegistry.Reconcile(ctx); err != nil {
log.Warn().Err(err).Msg("topic registry reconcile failed")
}
}
}
}()
defer close(reconcileStop)
} else {
// No broker: http-only dispatcher (nil mqtt → mqtt devices error clearly).
alarmDispatcher = alarm.NewDispatcher(shellyClient, nil)
Expand All @@ -145,7 +175,7 @@ func Run(ctx context.Context, info buildinfo.Info, frontendFS fs.FS) error {
locationsHandler := locationshandler.NewHandler(store)
inventoryHandler := inventoryhandler.NewHandler(store)
reportsHandler := reportshandler.NewHandler(store)
scanDevicesHandler := scandeviceshandler.NewHandler(store)
scanDevicesHandler := scandeviceshandler.NewHandler(store, topicRegistry)
scanPointsHandler := scanpointshandler.NewHandler(store)
// 2s test-fire pulse: long enough for an operator to see the strobe, short
// enough not to leave the relay latched after a confidence check.
Expand Down
2 changes: 1 addition & 1 deletion backend/internal/cmd/serve/serve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func setupTestRouter(t *testing.T) *chi.Mux {
locationsHandler := locationshandler.NewHandler(store)
inventoryHandler := inventoryhandler.NewHandler(store)
reportsHandler := reportshandler.NewHandler(store)
scanDevicesHandler := scandeviceshandler.NewHandler(store)
scanDevicesHandler := scandeviceshandler.NewHandler(store, nil)
scanPointsHandler := scanpointshandler.NewHandler(store)
outputDevicesHandler := outputdeviceshandler.NewHandler(store, alarm.NewDispatcher(shelly.New(0), nil), 0)
lookupHandler := lookuphandler.NewHandler(store)
Expand Down
63 changes: 60 additions & 3 deletions backend/internal/handlers/scandevices/scandevices.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package scandevices

import (
"context"
"net/http"
"strconv"
"strings"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/trakrf/platform/backend/internal/models/scandevice"
"github.com/trakrf/platform/backend/internal/models/scanpoint"
"github.com/trakrf/platform/backend/internal/models/shared"
"github.com/trakrf/platform/backend/internal/services/topicroute"
"github.com/trakrf/platform/backend/internal/storage"
"github.com/trakrf/platform/backend/internal/util/httputil"
)
Expand All @@ -28,11 +30,50 @@ var validate = func() *validator.Validate {
}()

type Handler struct {
storage *storage.Storage
storage *storage.Storage
registry *topicroute.Registry // TRA-922: reconciled after CRUD so the subscriber tracks topic changes
}

func NewHandler(storage *storage.Storage) *Handler {
return &Handler{storage: storage}
func NewHandler(storage *storage.Storage, registry *topicroute.Registry) *Handler {
return &Handler{storage: storage, registry: registry}
}

// validateTopicPrefix enforces the {org_slug}/ prefix on a publish_topic
// (TRA-922). The org slug (organizations.identifier) is globally unique, so the
// prefix makes the per-org publish_topic uniqueness effectively global and lays
// down the {org_slug}/# ACL namespace (TRA-857). An empty topic or a web_ble
// (handheld) device is exempt (no MQTT topic). Returns a user-facing message and
// false on violation.
func (h *Handler) validateTopicPrefix(ctx context.Context, orgID int, transport, topic string) (string, bool) {
if topic == "" || transport == scandevice.TransportWebBLE {
return "", true
}
org, err := h.storage.GetOrganizationByID(ctx, orgID)
if err != nil || org == nil || org.Identifier == "" {
return "organization has no identifier; cannot set a publish_topic", false
}
if !strings.HasPrefix(topic, org.Identifier+"/") {
return "publish_topic must start with \"" + org.Identifier + "/\"", false
}
return "", true
}

// reconcile re-syncs the topic registry after a successful scan-device mutation
// (TRA-922). Best-effort: the mutation already committed and the periodic ticker
// backstops, so a reconcile error must not fail the request.
func (h *Handler) reconcile(ctx context.Context) {
if h.registry == nil {
return
}
_ = h.registry.Reconcile(ctx)
}

// derefOr returns *p or the fallback when p is nil.
func derefOr(p *string, fallback string) string {
if p == nil {
return fallback
}
return *p
}

// RegisterRoutes wires the scan-device routes (and the device-nested scan-point
Expand Down Expand Up @@ -123,11 +164,16 @@ func (h *Handler) Create(w http.ResponseWriter, r *http.Request) {
httputil.RespondValidationError(w, r, err, reqID)
return
}
if msg, ok := h.validateTopicPrefix(r.Context(), orgID, req.Transport, derefOr(req.PublishTopic, "")); !ok {
httputil.WriteJSONError(w, r, http.StatusBadRequest, modelerrors.ErrValidation, msg, reqID)
return
}
device, err := h.storage.CreateScanDevice(r.Context(), orgID, req)
if err != nil {
writeConflictOrInternal(w, r, err, reqID)
return
}
h.reconcile(r.Context())
w.Header().Set("Location", "/api/v1/scan-devices/"+strconv.Itoa(device.ID))
httputil.WriteJSON(w, http.StatusCreated, scandevice.ScanDeviceResponse{Data: *device})
}
Expand Down Expand Up @@ -193,6 +239,15 @@ func (h *Handler) Update(w http.ResponseWriter, r *http.Request) {
httputil.RespondValidationError(w, r, err, reqID)
return
}
// TRA-922: enforce the {org_slug}/ prefix only when publish_topic is being
// changed. An update that omits it leaves the stored value untouched, so
// existing (grandfathered) topics are not retroactively rejected.
if req.PublishTopic != nil {
if msg, ok := h.validateTopicPrefix(r.Context(), orgID, derefOr(req.Transport, ""), *req.PublishTopic); !ok {
httputil.WriteJSONError(w, r, http.StatusBadRequest, modelerrors.ErrValidation, msg, reqID)
return
}
}
device, err := h.storage.UpdateScanDevice(r.Context(), orgID, id, req)
if err != nil {
writeConflictOrInternal(w, r, err, reqID)
Expand All @@ -202,6 +257,7 @@ func (h *Handler) Update(w http.ResponseWriter, r *http.Request) {
httputil.Respond404(w, r, "scan device not found", reqID)
return
}
h.reconcile(r.Context())
httputil.WriteJSON(w, http.StatusOK, scandevice.ScanDeviceResponse{Data: *device})
}

Expand Down Expand Up @@ -232,6 +288,7 @@ func (h *Handler) Delete(w http.ResponseWriter, r *http.Request) {
httputil.Respond404(w, r, "scan device not found", reqID)
return
}
h.reconcile(r.Context())
w.WriteHeader(http.StatusNoContent)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ import (
"time"

"github.com/go-chi/chi/v5"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"

"github.com/trakrf/platform/backend/internal/handlers/scandevices"
"github.com/trakrf/platform/backend/internal/handlers/scanpoints"
"github.com/trakrf/platform/backend/internal/middleware"
"github.com/trakrf/platform/backend/internal/models/scandevice"
"github.com/trakrf/platform/backend/internal/services/topicroute"
"github.com/trakrf/platform/backend/internal/testutil"
"github.com/trakrf/platform/backend/internal/util/jwt"
)
Expand All @@ -26,13 +29,19 @@ func withOrg(req *http.Request, orgID int) *http.Request {
return req.WithContext(context.WithValue(req.Context(), middleware.UserClaimsKey, claims))
}

// newScanDevicesHandler builds the handler with a live topic registry (TRA-922),
// so the post-CRUD reconcile path is exercised against the test DB.
func newScanDevicesHandler(db *testutil.TestDB) *scandevices.Handler {
return scandevices.NewHandler(db.Store, topicroute.NewRegistry(db.Store, zerolog.Nop()))
}

func TestScanDevicesHandler_RoundTrip(t *testing.T) {
db := testutil.SetupTestDBFull(t)
orgID := testutil.CreateTestAccount(t, db.AdminPool)

r := chi.NewRouter()
r.Use(middleware.RequestID)
scandevices.NewHandler(db.Store).RegisterRoutes(r)
newScanDevicesHandler(db).RegisterRoutes(r)
scanpoints.NewHandler(db.Store).RegisterRoutes(r)

do := func(method, path string, body any) *httptest.ResponseRecorder {
Expand All @@ -51,7 +60,7 @@ func TestScanDevicesHandler_RoundTrip(t *testing.T) {

// Create
rec := do(http.MethodPost, "/api/v1/scan-devices", map[string]any{
"name": "Dock Reader", "type": "csl_cs463", "publish_topic": "trakrf.id/cs463-214/reads",
"name": "Dock Reader", "type": "csl_cs463", "publish_topic": "test-org/cs463-214/reads",
})
require.Equal(t, http.StatusCreated, rec.Code, rec.Body.String())
var created struct {
Expand All @@ -64,7 +73,7 @@ func TestScanDevicesHandler_RoundTrip(t *testing.T) {
require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &created))
require.NotZero(t, created.Data.ID)
require.Equal(t, "mqtt", created.Data.Transport)
require.Equal(t, "trakrf.id/cs463-214/reads", created.Data.PublishTopic)
require.Equal(t, "test-org/cs463-214/reads", created.Data.PublishTopic)
devicePath := "/api/v1/scan-devices/" + itoa(created.Data.ID)

// Get
Expand Down Expand Up @@ -123,6 +132,79 @@ func itoa(i int) string {
return string(b)
}

// TestScanDevicesHandler_TopicPrefix pins the TRA-922 {org_slug}/ prefix rule:
// new/edited mqtt publish_topics must start with the caller org's identifier;
// grandfathered (unchanged) topics are left alone; web_ble devices are exempt.
func TestScanDevicesHandler_TopicPrefix(t *testing.T) {
db := testutil.SetupTestDBFull(t)
orgID := testutil.CreateTestAccount(t, db.AdminPool) // identifier "test-org"

r := chi.NewRouter()
r.Use(middleware.RequestID)
newScanDevicesHandler(db).RegisterRoutes(r)

do := func(orgCtx int, method, path string, body any) *httptest.ResponseRecorder {
var buf bytes.Buffer
if body != nil {
require.NoError(t, json.NewEncoder(&buf).Encode(body))
}
req := httptest.NewRequest(method, path, &buf)
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
rec := httptest.NewRecorder()
r.ServeHTTP(rec, withOrg(req, orgCtx))
return rec
}

// (a) Wrong prefix rejected.
require.Equal(t, http.StatusBadRequest, do(orgID, http.MethodPost, "/api/v1/scan-devices", map[string]any{
"name": "Bad", "type": "csl_cs463", "publish_topic": "trakrf.id/dock-1/reads",
}).Code)

// (b) Correct prefix accepted.
rec := do(orgID, http.MethodPost, "/api/v1/scan-devices", map[string]any{
"name": "Good", "type": "csl_cs463", "publish_topic": "test-org/dock-1/reads",
})
require.Equal(t, http.StatusCreated, rec.Code, rec.Body.String())

// (c) web_ble device with no topic is exempt.
require.Equal(t, http.StatusCreated, do(orgID, http.MethodPost, "/api/v1/scan-devices", map[string]any{
"name": "Handheld", "type": "csl_cs463", "transport": "web_ble",
}).Code)

// (d) Grandfathered device (seeded directly with a non-prefixed topic):
// a metadata-only edit succeeds; changing the topic to a bad value is
// rejected; changing it to a conforming value succeeds.
legacyTopic := "trakrf.id/legacy-9/reads"
legacy, err := db.Store.CreateScanDevice(context.Background(), orgID, scandevice.CreateScanDeviceRequest{
Name: "Legacy", Type: scandevice.DeviceTypeCS463, PublishTopic: &legacyTopic,
})
require.NoError(t, err)
legacyPath := "/api/v1/scan-devices/" + itoa(legacy.ID)

require.Equal(t, http.StatusOK, do(orgID, http.MethodPatch, legacyPath, map[string]any{
"name": "Legacy Renamed",
}).Code, "metadata-only edit must not trigger the prefix check")

require.Equal(t, http.StatusBadRequest, do(orgID, http.MethodPatch, legacyPath, map[string]any{
"publish_topic": "trakrf.id/legacy-9/reads-v2",
}).Code, "changing the topic to a non-prefixed value must be rejected")

require.Equal(t, http.StatusOK, do(orgID, http.MethodPatch, legacyPath, map[string]any{
"publish_topic": "test-org/legacy-9/reads",
}).Code, "changing the topic to a conforming value must succeed")

// (e) An org with no identifier cannot set a publish_topic.
var noIDOrg int
require.NoError(t, db.AdminPool.QueryRow(context.Background(),
`INSERT INTO trakrf.organizations (name, identifier, is_active) VALUES ('No Slug', '', true) RETURNING id`,
).Scan(&noIDOrg))
require.Equal(t, http.StatusBadRequest, do(noIDOrg, http.MethodPost, "/api/v1/scan-devices", map[string]any{
"name": "x", "type": "csl_cs463", "publish_topic": "anything/dock/reads",
}).Code)
}

// TestScanPoints_UpdateLocationIDPersists pins the geofence-relevant behavior
// for TRA-931: PATCH /scan-points must persist a provided location_id (set it),
// and persist an explicit null (clear it). Regression guard for the handler
Expand All @@ -134,7 +216,7 @@ func TestScanPoints_UpdateLocationIDPersists(t *testing.T) {

r := chi.NewRouter()
r.Use(middleware.RequestID)
scandevices.NewHandler(db.Store).RegisterRoutes(r)
newScanDevicesHandler(db).RegisterRoutes(r)
scanpoints.NewHandler(db.Store).RegisterRoutes(r)

do := func(method, path string, body any) *httptest.ResponseRecorder {
Expand All @@ -153,7 +235,7 @@ func TestScanPoints_UpdateLocationIDPersists(t *testing.T) {

// A single-point gateway. Device create auto-provisions scan_point 1.
rec := do(http.MethodPost, "/api/v1/scan-devices", map[string]any{
"name": "Gateway 1", "type": "gl_s10", "publish_topic": "trakrf.id/gw-1/reads",
"name": "Gateway 1", "type": "gl_s10", "publish_topic": "test-org/gw-1/reads",
})
require.Equal(t, http.StatusCreated, rec.Code, rec.Body.String())
var dev struct {
Expand Down
12 changes: 5 additions & 7 deletions backend/internal/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,22 @@ import "os"
// (keeps local dev, tests, and pre-cutover prod inert).
type Config struct {
URL string // mqtts://user:pass@host:port (MQTT_URL)
Topic string // subscription filter (MQTT_TOPIC), e.g. trakrf.id/# or $share/grp/trakrf.id/#
ClientID string // base client id (MQTT_CLIENT_ID); subscriber appends a per-process suffix
}

// Enabled reports whether the subscriber should start.
func (c Config) Enabled() bool { return c.URL != "" }

// ConfigFromEnv reads the MQTT subscriber config from the environment, applying
// defaults for the topic filter and client id.
// ConfigFromEnv reads the MQTT subscriber config from the environment.
//
// TRA-922: MQTT_TOPIC is retired. The subscriber no longer uses a static
// subscription filter — it subscribes to exactly the registered publish_topics
// (data-driven, via the topicroute registry), so there is no topic to configure.
func ConfigFromEnv() Config {
c := Config{
URL: os.Getenv("MQTT_URL"),
Topic: os.Getenv("MQTT_TOPIC"),
ClientID: os.Getenv("MQTT_CLIENT_ID"),
}
if c.Topic == "" {
c.Topic = "trakrf.id/#"
}
if c.ClientID == "" {
c.ClientID = "trakrf-subscriber"
}
Expand Down
4 changes: 0 additions & 4 deletions backend/internal/ingest/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,15 @@ func TestConfigEnabled(t *testing.T) {

func TestConfigFromEnvDefaults(t *testing.T) {
t.Setenv("MQTT_URL", "mqtts://u:p@host:8883")
t.Setenv("MQTT_TOPIC", "")
t.Setenv("MQTT_CLIENT_ID", "")
c := ConfigFromEnv()
assert.Equal(t, "mqtts://u:p@host:8883", c.URL)
assert.Equal(t, "trakrf.id/#", c.Topic)
assert.Equal(t, "trakrf-subscriber", c.ClientID)
}

func TestConfigFromEnvOverrides(t *testing.T) {
t.Setenv("MQTT_URL", "mqtts://u:p@host:8883")
t.Setenv("MQTT_TOPIC", "$share/grp/trakrf.id/#")
t.Setenv("MQTT_CLIENT_ID", "custom-id")
c := ConfigFromEnv()
assert.Equal(t, "$share/grp/trakrf.id/#", c.Topic)
assert.Equal(t, "custom-id", c.ClientID)
}
Loading
Loading