Conversation
|
Caution Review failedThe pull request is closed. 📝 WalkthroughWalkthroughAdds NATS-driven observation processing and a LibTapir message generator, refactors the app from TCP to NATS, introduces new NATS/LibTapir packages and common types, updates build/config (Go, Ko), adds CI workflows, and converts README sample config from JSON to TOML. Changes
Sequence DiagramsequenceDiagram
participant KV as NATS KV Store
participant App as Observation Encoder
participant LibTapir as LibTapir
participant SB as Southbound Subject
KV->>App: WatchObservations() stream NatsMsg
Note over App: receive NatsMsg (Headers, Subject, Data)
App->>App: RemovePrefix & parse domain
App->>KV: GetObservations(domain)
KV-->>App: aggregated flags (uint32)
App->>LibTapir: GenerateObservationMsg(domain, flags)
LibTapir-->>App: JSON observation
App->>KV: SendSouthboundObservation(json)
KV->>SB: deliver published message
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
cmd/observation-encoder/main.go (2)
89-91:⚠️ Potential issue | 🔴 CriticalConfig decode error is silently discarded — app will run with zero-value config.
confDecoder.Decode(&mainConf)returns an error that is never checked. If the TOML file is malformed or has type mismatches, the application proceeds with default zero-values, leading to confusing downstream failures.Proposed fix
confDecoder.DisallowUnknownFields() - confDecoder.Decode(&mainConf) - file.Close() // TODO okay to close here while also using defer above? + if err := confDecoder.Decode(&mainConf); err != nil { + log.Error("Error decoding config file '%s': %s", configFile, err) + os.Exit(-1) + } + file.Close()
285-289:⚠️ Potential issue | 🟠 MajorClosing
exitChwhile goroutines may still write to it — potential panic.After the timeout,
close(exitCh)is called on line 287, butappHandle.Run,apiHandle.Run, andcertHandle.Rungoroutines may still be alive and attempting to send onexitCh, causing a panic on send to a closed channel.Either remove the
close(exitCh)(let the channel be GC'd) or ensure all goroutines have returned before closing.internal/api/api.go (1)
111-112:⚠️ Potential issue | 🟡 MinorContext cancel function discarded — leaks timer resources.
context.WithTimeoutreturns a cancel func that should be deferred to release the timer immediately after the shutdown completes, rather than waiting for the timeout to expire.Proposed fix
- shutdownCtx, _ := context.WithTimeout(context.Background(), time.Second*2) + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), time.Second*2) + defer shutdownCancel()
🤖 Fix all issues with AI agents
In @.github/workflows/container.yaml:
- Around line 18-19: The job-level if on the "container" job currently only
checks github.event.workflow_run.conclusion and therefore skips tag push
triggers; update the job's if expression on the container job to allow both
successful workflow_run triggers and push-tag triggers by combining checks for
github.event_name == 'workflow_run' && github.event.workflow_run.conclusion ==
'success' OR github.event_name == 'push' && startsWith(github.ref,
'refs/tags/'); modify the existing if that references
github.event.workflow_run.conclusion so it covers both cases.
- Around line 31-32: The workflow currently runs "ko build --bare
./cmd/observation-encoder" without registry config or auth so images are only
built locally; add a registry login step (e.g., use docker/login-action@v3)
before the ko steps, set the KO_DOCKER_REPO environment variable to your GHCR
registry (for example ghcr.io/${{ github.repository }}) in the job or step env,
and invoke ko build with the push option (or add --push) to ensure the image is
pushed to GHCR rather than discarded; update the existing uses:
ko-build/setup-ko@v0.7 and the ko build invocation accordingly.
In `@go.mod`:
- Line 3: go.mod declares "go 1.25.6" but .github/workflows/container.yaml sets
GO_VERSION to "1.25.5", causing a mismatch; update the workflow to use
GO_VERSION: "1.25.6" (or alternatively change the go directive in go.mod to
1.25.5) so the container build toolchain meets the module's minimum version;
locate the GO_VERSION variable in .github/workflows/container.yaml and make the
value match the go directive in go.mod.
In `@internal/app/app.go`:
- Around line 108-121: The main loop is spinning when natsInCh is closed because
the select's receive doesn't check for closure; change the natsInCh receive in
the MAIN_APP_LOOP to use the two-value receive (msg, ok := <-natsInCh) and if
ok==false handle graceful shutdown of that input (e.g., log and break
MAIN_APP_LOOP or stop sending jobs) so you don't push zero-value job structs
onto jobChan; update references to job creation (job{msg: msg}) and the
a.pm.natsInCount.Add increment to only occur when ok is true.
- Around line 138-151: The domain extraction in handleJob relies on RemovePrefix
leaving a leading delimiter and drops two elements via
domainSplit[:len(domainSplit)-2], which will break if RemovePrefix is changed;
update handleJob to explicitly remove only the observation-type component rather
than assuming an empty label: after calling a.natsHandle.RemovePrefix and
splitting into domainSplit, filter out any empty strings or explicitly pop the
single trailing observation-type element (use domainSplit[:len(domainSplit)-1]
after verifying non-empty), or locate the observation-type by index/name and
remove it, then join the remaining labels into domain; ensure this logic
references RemovePrefix, domainSplit, slices.Reverse and c_NATS_DELIM so it
stays correct if RemovePrefix is fixed.
In `@internal/common/observations.go`:
- Around line 6-9: OBS_MAP currently hardcodes numeric values for the
"globally_new" and "looptest" entries which can drift if the corresponding
constants change; update OBS_MAP to use the declared observation constants
instead of the literals (replace 1 and 1024 with the existing named constants
for the "globally_new" and "looptest" observation values), importing or
referencing those constant names in this file so the map stays in sync with the
canonical definitions.
In `@internal/nats/nats.go`:
- Around line 196-199: The Shutdown method on natsClient currently does nothing
and leaks connections; update natsClient.Shutdown to check if nc.conn is non-nil
and then gracefully close the connection (prefer nc.conn.Drain() if you want to
flush pending messages, falling back to nc.conn.Close()), return any error from
Drain/Close, and set nc.conn = nil (and optionally nc.opts/state) to avoid
double-close; ensure you reference the natsClient type and its nc.conn field and
propagate the error from Drain/Close instead of always returning nil.
- Around line 82-89: RemovePrefix currently leaves a leading delimiter when
nc.subjectPrefix is removed (e.g., "obs.foo" -> ".foo"), which causes downstream
splitting in handleJob to produce an empty first element; update
natsClient.RemovePrefix to, after calling strings.CutPrefix(subject,
nc.subjectPrefix), also strip a leading delimiter (e.g., '.' or the configured
delimiter) from subjectCut before returning it, preserving the existing warning
log when the prefix is missing so downstream code in handleJob gets a clean
subject string for domain extraction.
- Around line 139-143: Protect against an index panic by checking the length of
kSplit before accessing kSplit[1] (in the section that assigns flag) and skip or
handle entries that don't contain the delimiter; then include the flag variable
in the warning log call (the nc.log.Warning call that currently uses
"Unrecognized flag '%s', ignoring..." should be passed the flag value) so the
format string has its argument. Specifically, update the logic around kSplit,
flag, and the lookup into common.OBS_MAP to first verify len(kSplit) > 1 (and
continue or set a safe default if not), then perform the map lookup using flag,
and call nc.log.Warning with both the format string and flag.
- Around line 163-169: Replace the use of LimitMarkerTTL with TTL in the
jetstream.KeyValueConfig passed to js.CreateKeyValue so the actual KV entries
expire after nc.ttl; update the KeyValueConfig for the CreateKeyValue call
(referenced in js.CreateKeyValue and jetstream.KeyValueConfig, using nc.bucket
and nc.ttl) to set TTL to nc.ttl and remove LimitMarkerTTL unless you
intentionally want to control tombstone retention separately.
🧹 Nitpick comments (11)
internal/common/observations.go (1)
3-4: Non-idiomatic Go naming:OBS_GLOBALLY_NEW,OBS_LOOPTEST,OBS_MAP.Go convention uses MixedCaps for exported identifiers (e.g.,
ObsGloballyNew,ObsLooptest,ObsMap). Screaming snake case is unconventional in Go.internal/cert/cert.go (1)
20-20:Debugfield is declared but never used within this package.The
Debugfield is added toConfon line 20 but is never extracted inCreate()or used bycertHandle. While this field is consistently present across multiple subsystem configs (libtapir, nats, api, app), it's not wired into the cert subsystem's logging. If the intent is to control debug-level logging for certificates, the field should be passed tocertHandleand used to configure logging behavior; otherwise, it's dead configuration specific to this package.internal/libtapir/libtapir.go (3)
12-15:Debugfield inConfis unused.
conf.Debugis declared and deserialized from TOML, but never read byCreateor stored on thelibtapirstruct. Either remove it or wire it into the struct for conditional debug logging.
32-59: Hardcoded metadata and dualtime.Now()calls.Two concerns:
time.Now()is called separately at lines 35 and 49, producing slightly different timestamps forTimeAddedvsTimeStamp. Capture a singlenowand reuse it for consistency.TTL,SrcName,Creator,MsgType, andListTypeare all hardcoded. Consider making these configurable viaConfor at least defining them as named constants so they're discoverable and easy to change.Proposed fix for consistent timestamps
func (lt *libtapir) GenerateObservationMsg(domainStr string, flags uint32) (string, error) { + now := time.Now() domain := tapir.Domain{ Name: domainStr, - TimeAdded: time.Now(), + TimeAdded: now, TTL: 3600, TagMask: tapir.TagMask(flags), ExtendedTags: []string{}, } tapirMsg := tapir.TapirMsg{ SrcName: "dns-tapir", Creator: "tapir-analyse-new-qname", MsgType: "observation", ListType: "doubtlist", Added: []tapir.Domain{domain}, Removed: []tapir.Domain{}, Msg: "", - TimeStamp: time.Now(), + TimeStamp: now, TimeStr: "", }
21-30: Exported function returns unexported type*libtapir.
Createreturns*libtapir, which is unexported. While this works because callers use thelibtapirinterface defined ininternal/app/app.go, it means callers cannot declare variables of the concrete type. This is an acceptable pattern within internal packages, but be aware it prevents direct type assertions outside this package.internal/nats/nats.go (2)
151-180: NATS connection has no options — no timeouts, no reconnect policy, no auth.
nats.Connect(nc.url)uses bare defaults. Consider adding connect timeout, reconnect settings, and error/disconnect handlers for production resilience. Also,context.Background()on line 163 means KV creation has no timeout or cancellation — consider threading a context through fromCreate.
100-116: Unbuffered channel may block the KV watcher goroutine.
outChis unbuffered (line 100). If downstream consumers (theMAIN_APP_LOOPinapp.go) are busy or slow to drain, the goroutine will block onoutCh <- natsMsg, stalling further KV updates. Consider adding a buffer to match or exceedc_N_HANDLERS.cmd/observation-encoder/main.go (1)
100-106: Logger creation failures don't halt execution — leads to confusing errors downstream.On lines 104-106, if
logger.Createfails, the error is only logged but execution continues.natslogwill benil, causingnats.Createto fail with a generic "nil logger" error rather than the real root cause. The same pattern repeats for libtapir (lines 124-126), app (lines 144-146), cert (lines 166-168), and API (lines 186-188) loggers.Consider failing fast on logger creation errors, consistent with how subsystem creation errors are handled.
Example fix for the nats logger block
natslog, err := logger.Create( logger.Conf{ Debug: debugFlag || mainConf.Nats.Debug, }) if err != nil { log.Error("Error creating nats log: %s", err) + os.Exit(-1) }internal/app/app_test.go (1)
7-9: Placeholder test provides no coverage.
TestAppBasicdoes nothing beyond logging. With the new NATS and libtapir interfaces, you could inject mock implementations to testCreatevalidation,handleJobdomain extraction, and the observation aggregation flow without a real NATS server.Would you like me to open an issue to track adding unit tests with mock NATS/libtapir implementations?
internal/api/api.go (1)
120-127: Manual JSON construction is fragile.
fmt.Sprintf("{\"nats_in\": %d}", n)works for anint64but doesn't scale and bypasses proper encoding. Consider usingjson.NewEncoder(rw).Encode(...)orjson.Marshalwith a small struct/map for consistency and safety.internal/app/app.go (1)
123-126: Pending jobs are discarded without processing on shutdown.The drain loop on lines 123-125 reads and discards buffered jobs. Meanwhile,
close(jobChan)signals workers to exit. Any in-flight observations are silently lost. If at-least-once processing matters, consider draining by letting workers finish rather than discarding.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/app/app.go (1)
88-98:⚠️ Potential issue | 🟡 MinorNATS watch failure sends exit but doesn't close the channel or clean up.
If
WatchObservationsfails,Runreturns immediately after signaling onexitCh, butjobChanis never closed. The handler goroutines spawned at Lines 100–105 will leak, blocked onrange jobChanforever.🛡️ Proposed fix — close jobChan before early return
natsInCh, err := a.natsHandle.WatchObservations(ctx) if err != nil { a.log.Error("Error connecting to NATS: %s", err) + close(jobChan) a.exitCh <- common.Exit{ID: a.id, Err: err} return }Note: move the goroutine spawning after the watch call to avoid this entirely.
🤖 Fix all issues with AI agents
In `@internal/nats/nats.go`:
- Around line 155-185: initNats opens a connection with nats.Connect but doesn't
close it on subsequent failures (jetstream.New or js.CreateKeyValue), leaking
resources; modify initNats to ensure the conn is closed on any error after
nats.Connect succeeds (either by deferring conn.Close immediately after
successful nats.Connect and clearing the defer when storing nc.conn, or by
explicitly calling conn.Close() in each error branch after nats.Connect),
referencing the symbols nats.Connect, conn, jetstream.New, js.CreateKeyValue,
nc.conn and ensure successful path preserves the open connection by assigning
nc.conn before cancelling the close.
- Line 140: The log call uses the wrong receiver variable name: replace the
undefined `a.log` with the `natsClient` receiver `nc.log` in the method where
the warning is emitted (the natsClient method that currently contains the line
"Badly formatted key '%s'. Skipping..."); update the call to use
`nc.log.Warning(...)` so it references the correct receiver and resolves the
undefined identifier.
🧹 Nitpick comments (2)
internal/nats/nats.go (1)
100-100: Unbuffered channel may block the KV watcher goroutine.
outChis unbuffered, so if the consumer (main loop → job channel → handlers) can't keep up, the watcher goroutine blocks and NATS KV updates stall. Consider adding a small buffer to decouple the watcher from downstream processing.♻️ Suggested change
- outCh := make(chan common.NatsMsg) + outCh := make(chan common.NatsMsg, 64)internal/app/app.go (1)
127-130: Drain loop discards pending jobs on shutdown.The
for len(jobChan) > 0loop races with the still-running handler goroutines and silently drops buffered jobs before closing the channel. If this is intentional (fast shutdown, discard in-flight work), a brief comment would clarify intent. If pending jobs should be processed, close the channel first and let the handlers drain via theirrangeloop.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@internal/nats/nats.go`:
- Around line 64-66: The current check only rejects zero TTL but allows negative
values via conf.Ttl; change the validation around conf.Ttl to reject any
non-positive value (<= 0) and return a clearer error that includes the provided
TTL (use conf.Ttl in the message) so callers see the invalid value; update the
error returned from the block that currently returns errors.New("zero ttl") to
something like a formatted error stating the TTL is invalid/non-positive, and
ensure any downstream conversion to time.Duration remains safe.
- Around line 136-150: The loop that extracts flags from subscription keys
assumes the flag is always at kSplit[1], which breaks when the subject prefix
contains dots; update the extraction in the for loop over ls.Keys() to compute
the flag index relative to the prefix length instead of using a hardcoded 1:
split the key by c_NATS_DELIM, determine the number of segments in the
configured subjectPrefix (or count prefix segments from ls/lookup), then use
that offset to read the flag segment (before looking it up in common.OBS_MAP),
preserving current validation and accumulation into obs; adjust logging messages
in nc.log.Warning accordingly when key is malformed or flag is unrecognized.
🧹 Nitpick comments (3)
internal/nats/nats.go (3)
20-40: Unused fields:Conf.DebugandnatsClient.queue.
DebuginConfis declared and tagged but never read anywhere in this file. Similarly,queueinnatsClientis never assigned or referenced. Consider removing them to avoid confusion, or wire them up if they're intended for future use.#!/bin/bash # Verify whether Debug or queue are used elsewhere in the codebase echo "=== Searching for 'Debug' usage ===" rg -n --type=go '\bDebug\b' -g '!vendor/**' echo "" echo "=== Searching for '.queue' usage ===" rg -n --type=go '\.queue\b' -g '!vendor/**'
91-121: Watch goroutine lacksctx.Done()select and the channel is unbuffered.If
w.Updates()blocks indefinitely (e.g., on a NATS reconnect or edge case where the context cancellation doesn't propagate to the watcher channel), the goroutine will leak. Adding aselectonctx.Done()inside the loop would make this more robust. The unbufferedoutChalso means a slow consumer stalls the watch loop — a small buffer could help decouple them.Proposed sketch
go func() { + defer close(outCh) nc.log.Info("Starting NATS listener loop") - for val := range w.Updates() { - if val == nil { - continue + for { + select { + case <-ctx.Done(): + return + case val, ok := <-w.Updates(): + if !ok { + return + } + if val == nil { + continue + } + nc.log.Debug("Incoming NATS KV update on '%s'!", val.Key()) + natsMsg := common.NatsMsg{ + Headers: nil, + Data: val.Value(), + Subject: val.Key(), + } + outCh <- natsMsg } - nc.log.Debug("Incoming NATS KV update on '%s'!", val.Key()) - natsMsg := common.NatsMsg{ - Headers: nil, - Data: val.Value(), - Subject: val.Key(), - } - outCh <- natsMsg } - close(outCh) }()
187-199: Non-idiomaticelseafter early return.In Go, when the
ifblock ends with areturn, theelseis unnecessary. Removing it reduces nesting.Proposed fix
err := nc.conn.Publish(nc.subjectSouthbound, outMsg) if err != nil { nc.log.Error("Couldn't publish %d bytes msg on %s", len(outMsg), nc.subjectSouthbound) return err - } else { - nc.log.Debug("Successful publish on '%s'", nc.subjectSouthbound) } + nc.log.Debug("Successful publish on '%s'", nc.subjectSouthbound) return nil
Summary by CodeRabbit
New Features
Documentation
Chores
Tests