diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..cd88554 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,11 @@ +# To get started with Dependabot version updates, you'll need to specify which +# package ecosystems to update and where the package manifests are located. +# Please see the documentation for all configuration options: +# https://docs.github.com/code-security/dependabot/dependabot-version-updates/configuration-options-for-the-dependabot.yml-file + +version: 2 +updates: + - package-ecosystem: "gomod" # See documentation for possible values + directory: "/" # Location of package manifests + schedule: + interval: "weekly" diff --git a/.github/workflows/base-pipeline.yaml b/.github/workflows/base-pipeline.yaml new file mode 100644 index 0000000..474dd1b --- /dev/null +++ b/.github/workflows/base-pipeline.yaml @@ -0,0 +1,53 @@ +name: Base Pipeline + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +env: + GO_VERSION: "1.25.6" +jobs: + format: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Setup Go + uses: actions/setup-go@v5.5.0 + with: + go-version: ${{ env.GO_VERSION }} + + - name: Check formatting + run: | + files=$(gofmt -l ./) + + if [ -n "$files" ]; then + while IFS= read -r file; do + echo "::warning file=$file::File is not gofmt formatted" + done <<< "$files" + exit 1 + else + echo "✅ All files are properly formatted." + fi + + test: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Setup Go + uses: actions/setup-go@v5.5.0 + with: + go-version: ${{ env.GO_VERSION }} + + - name: Build + run: | + make build + + - name: Test + run: | + make test diff --git a/.github/workflows/container.yaml b/.github/workflows/container.yaml new file mode 100644 index 0000000..d2556e8 --- /dev/null +++ b/.github/workflows/container.yaml @@ -0,0 +1,32 @@ +name: Build container + +on: + workflow_run: + workflows: + - "Base Pipeline" + branches: + - "main" + types: + - completed + push: + tags: + - "v*.*.*" + +env: + GO_VERSION: "1.25.6" +jobs: + container: + if: ${{ github.event.workflow_run.conclusion == 'success' }} + name: Build and push container + runs-on: ubuntu-latest + permissions: + actions: write + contents: read + packages: write + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + with: + go-version: ${{ env.GO_VERSION }} + - uses: ko-build/setup-ko@v0.7 + - run: ko build --bare ./cmd/observation-encoder diff --git a/.ko.yaml b/.ko.yaml new file mode 100644 index 0000000..b707475 --- /dev/null +++ b/.ko.yaml @@ -0,0 +1,5 @@ +defaultPlatforms: +- linux/amd64 +- linux/arm64 +defaultLdflags: + - -X main.commit={{.Git.FullCommit}} diff --git a/README.md b/README.md index 6c57d24..79c2121 100644 --- a/README.md +++ b/README.md @@ -18,20 +18,25 @@ observation-encoder -config /path/to/config/file # Sample config file -```json -{ - "address": "127.0.0.1", - "port": "10000", - "cert": - { - "interval": 100, - "cert_dir": "/path/to/certificates/dir" - }, - "api": - { - "active": true, - "address": "127.0.0.1", - "port": "10001" - } -} +```toml +address = "127.0.0.1" +port = "10000" + +[cert] +interval = 100 +cert_dir = "/path/to/certs/dir" + +[api] +active = true +address = "127.0.0.1" +port = "10001" + +[nats] +url = "nats://127.0.0.1:4222" +subject_prefix = "observations" +subject_southbound = "test.subject" +ttl = 3600 + +[libtapir] +debug = true ``` diff --git a/cmd/observation-encoder/main.go b/cmd/observation-encoder/main.go index 21e4b51..28a4b87 100644 --- a/cmd/observation-encoder/main.go +++ b/cmd/observation-encoder/main.go @@ -10,23 +10,26 @@ import ( "syscall" "time" - //"github.com/nats-io/nats.go" "github.com/pelletier/go-toml/v2" "github.com/dnstapir/observation-encoder/internal/api" "github.com/dnstapir/observation-encoder/internal/app" "github.com/dnstapir/observation-encoder/internal/cert" "github.com/dnstapir/observation-encoder/internal/common" + "github.com/dnstapir/observation-encoder/internal/libtapir" "github.com/dnstapir/observation-encoder/internal/logger" + "github.com/dnstapir/observation-encoder/internal/nats" ) var commit = "BAD-BUILD" type conf struct { app.Conf - Debug bool `toml:"debug"` - Api api.Conf `toml:"api"` - Cert cert.Conf `toml:"cert"` + Debug bool `toml:"debug"` + Api api.Conf `toml:"api"` + Cert cert.Conf `toml:"cert"` + Nats nats.Conf `toml:"nats"` + Libtapir libtapir.Conf `toml:"libtapir"` } func main() { @@ -87,32 +90,103 @@ func main() { confDecoder.Decode(&mainConf) file.Close() // TODO okay to close here while also using defer above? - // TODO create different loggers with different debug settings + debugFlag = debugFlag || mainConf.Debug + + /* + ****************************************************************** + ********************** SET UP NATS ******************************* + ****************************************************************** + */ + natslog, err := logger.Create( + logger.Conf{ + Debug: debugFlag || mainConf.Nats.Debug, + }) + if err != nil { + log.Error("Error creating nats log: %s", err) + } + + mainConf.Nats.Log = natslog + natsHandle, err := nats.Create(mainConf.Nats) + if err != nil { + log.Error("Could not create NATS handle: %s", err) + os.Exit(-1) + } + + /* + ****************************************************************** + ********************** SET UP LIBTAPIR *************************** + ****************************************************************** + */ + libtapirlog, err := logger.Create( + logger.Conf{ + Debug: debugFlag || mainConf.Libtapir.Debug, + }) + if err != nil { + log.Error("Error creating libtapir log: %s", err) + } + + mainConf.Libtapir.Log = libtapirlog + libtapirHandle, err := libtapir.Create(mainConf.Libtapir) + if err != nil { + log.Error("Could not create libtapir handle: %s", err) + os.Exit(-1) + } + + /* + ****************************************************************** + ********************** SET UP MAIN APP *************************** + ****************************************************************** + */ applog, err := logger.Create( logger.Conf{ Debug: debugFlag || mainConf.Debug, }) if err != nil { log.Error("Error creating app log: %s", err) - } else { - applog.Debug("Debug logging enabled") } mainConf.Log = applog + mainConf.NatsHandle = natsHandle + mainConf.LibtapirHandle = libtapirHandle appHandle, err := app.Create(mainConf.Conf) if err != nil { log.Error("Error creating application: '%s'", err) os.Exit(-1) } - mainConf.Cert.Log = applog + /* + ****************************************************************** + ********************** SET UP CERT HANDLER *********************** + ****************************************************************** + */ + certlog, err := logger.Create( + logger.Conf{ + Debug: debugFlag || mainConf.Cert.Debug, + }) + if err != nil { + log.Error("Error creating cert log: %s", err) + } + + mainConf.Cert.Log = certlog certHandle, err := cert.Create(mainConf.Cert) if err != nil { log.Error("Error creating cert manager: '%s'", err) os.Exit(-1) } - mainConf.Api.Log = applog + /* + ****************************************************************** + ********************** SET UP API ******************************** + ****************************************************************** + */ + apilog, err := logger.Create( + logger.Conf{ + Debug: debugFlag || mainConf.Api.Debug, + }) + if err != nil { + log.Error("Error creating API log: %s", err) + } + mainConf.Api.Log = apilog mainConf.Api.App = appHandle mainConf.Api.Certs = certHandle apiHandle, err := api.Create(mainConf.Api) @@ -121,6 +195,11 @@ func main() { os.Exit(-1) } + /* + ****************************************************************** + ********************** START RUNNING STUFF *********************** + ****************************************************************** + */ sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) defer close(sigChan) diff --git a/go.mod b/go.mod index 1aeabcb..5df9869 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,50 @@ module github.com/dnstapir/observation-encoder -go 1.24.6 +go 1.25.6 -require github.com/pelletier/go-toml/v2 v2.2.4 +require ( + github.com/dnstapir/tapir v0.0.0-20260115113810-71a904f35f68 + github.com/nats-io/nats.go v1.48.0 + github.com/pelletier/go-toml/v2 v2.2.4 +) + +require ( + github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect + github.com/eclipse/paho.golang v0.23.0 // indirect + github.com/fsnotify/fsnotify v1.9.0 // indirect + github.com/go-viper/mapstructure/v2 v2.5.0 // indirect + github.com/goccy/go-json v0.10.5 // indirect + github.com/gookit/goutil v0.7.3 // indirect + github.com/gorilla/websocket v1.5.3 // indirect + github.com/klauspost/compress v1.18.4 // indirect + github.com/lestrrat-go/blackmagic v1.0.4 // indirect + github.com/lestrrat-go/httpcc v1.0.1 // indirect + github.com/lestrrat-go/httprc v1.0.6 // indirect + github.com/lestrrat-go/iter v1.0.2 // indirect + github.com/lestrrat-go/jwx/v2 v2.1.6 // indirect + github.com/lestrrat-go/option v1.0.1 // indirect + github.com/miekg/dns v1.1.72 // indirect + github.com/nats-io/nkeys v0.4.15 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/rogpeppe/go-internal v1.14.1 // indirect + github.com/ryanuber/columnize v2.1.2+incompatible // indirect + github.com/sagikazarmark/locafero v0.12.0 // indirect + github.com/segmentio/asm v1.2.1 // indirect + github.com/smhanov/dawg v0.0.0-20220118194912-66057bdbf2e3 // indirect + github.com/spf13/afero v1.15.0 // indirect + github.com/spf13/cast v1.10.0 // indirect + github.com/spf13/pflag v1.0.10 // indirect + github.com/spf13/viper v1.21.0 // indirect + github.com/subosito/gotenv v1.6.0 // indirect + go.yaml.in/yaml/v3 v3.0.4 // indirect + golang.org/x/crypto v0.48.0 // indirect + golang.org/x/exp v0.0.0-20260209203927-2842357ff358 // indirect + golang.org/x/mod v0.33.0 // indirect + golang.org/x/net v0.50.0 // indirect + golang.org/x/sync v0.19.0 // indirect + golang.org/x/sys v0.41.0 // indirect + golang.org/x/term v0.40.0 // indirect + golang.org/x/text v0.34.0 // indirect + golang.org/x/tools v0.42.0 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect +) diff --git a/go.sum b/go.sum index 3cf50e1..15df39e 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,131 @@ +dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= +github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 h1:NMZiJj8QnKe1LgsbDayM4UoHwbvwDRwnI3hwNaAHRnc= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0/go.mod h1:ZXNYxsqcloTdSy/rNShjYzMhyjf0LaoftYK0p+A3h40= +github.com/dnstapir/tapir v0.0.0-20260115113810-71a904f35f68 h1:b7kJ/u56TM/8NJ++D6kajNd1wD1p33JuEd1xf3Sv2Sw= +github.com/dnstapir/tapir v0.0.0-20260115113810-71a904f35f68/go.mod h1:pjosl5PkU8ijFSPDISiASRgRvYgQdt/p7/CyHNB00pk= +github.com/eclipse/paho.golang v0.23.0 h1:KHgl2wz6EJo7cMBmkuhpt7C576vP+kpPv7jjvSyR6Mk= +github.com/eclipse/paho.golang v0.23.0/go.mod h1:nQRhTkoZv8EAiNs5UU0/WdQIx2NrnWUpL9nsGJTQN04= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= +github.com/go-viper/mapstructure/v2 v2.5.0 h1:vM5IJoUAy3d7zRSVtIwQgBj7BiWtMPfmPEgAXnvj1Ro= +github.com/go-viper/mapstructure/v2 v2.5.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= +github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/gookit/goutil v0.7.3 h1:nXDd/AB17nEjqVCNDGioDhVL/gVqdlqRMfFergKDjHE= +github.com/gookit/goutil v0.7.3/go.mod h1:vJS9HXctYTCLtCsZot5L5xF+O1oR17cDYO9R0HxBmnU= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lestrrat-go/blackmagic v1.0.4 h1:IwQibdnf8l2KoO+qC3uT4OaTWsW7tuRQXy9TRN9QanA= +github.com/lestrrat-go/blackmagic v1.0.4/go.mod h1:6AWFyKNNj0zEXQYfTMPfZrAXUWUfTIZ5ECEUEJaijtw= +github.com/lestrrat-go/httpcc v1.0.1 h1:ydWCStUeJLkpYyjLDHihupbn2tYmZ7m22BGkcvZZrIE= +github.com/lestrrat-go/httpcc v1.0.1/go.mod h1:qiltp3Mt56+55GPVCbTdM9MlqhvzyuL6W/NMDA8vA5E= +github.com/lestrrat-go/httprc v1.0.6 h1:qgmgIRhpvBqexMJjA/PmwSvhNk679oqD1RbovdCGW8k= +github.com/lestrrat-go/httprc v1.0.6/go.mod h1:mwwz3JMTPBjHUkkDv/IGJ39aALInZLrhBp0X7KGUZlo= +github.com/lestrrat-go/iter v1.0.2 h1:gMXo1q4c2pHmC3dn8LzRhJfP1ceCbgSiT9lUydIzltI= +github.com/lestrrat-go/iter v1.0.2/go.mod h1:Momfcq3AnRlRjI5b5O8/G5/BvpzrhoFTZcn06fEOPt4= +github.com/lestrrat-go/jwx/v2 v2.1.6 h1:hxM1gfDILk/l5ylers6BX/Eq1m/pnxe9NBwW6lVfecA= +github.com/lestrrat-go/jwx/v2 v2.1.6/go.mod h1:Y722kU5r/8mV7fYDifjug0r8FK8mZdw0K0GpJw/l8pU= +github.com/lestrrat-go/option v1.0.1 h1:oAzP2fvZGQKWkvHa1/SAcFolBEca1oN+mQ7eooNBEYU= +github.com/lestrrat-go/option v1.0.1/go.mod h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I= +github.com/miekg/dns v1.1.72 h1:vhmr+TF2A3tuoGNkLDFK9zi36F2LS+hKTRW0Uf8kbzI= +github.com/miekg/dns v1.1.72/go.mod h1:+EuEPhdHOsfk6Wk5TT2CzssZdqkmFhf8r+aVyDEToIs= +github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U= +github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nkeys v0.4.15 h1:JACV5jRVO9V856KOapQ7x+EY8Jo3qw1vJt/9Jpwzkk4= +github.com/nats-io/nkeys v0.4.15/go.mod h1:CpMchTXC9fxA5zrMo4KpySxNjiDVvr8ANOSZdiNfUrs= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/ryanuber/columnize v2.1.2+incompatible h1:C89EOx/XBWwIXl8wm8OPJBd7kPF25UfsK2X7Ph/zCAk= +github.com/ryanuber/columnize v2.1.2+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= +github.com/sagikazarmark/locafero v0.12.0 h1:/NQhBAkUb4+fH1jivKHWusDYFjMOOKU88eegjfxfHb4= +github.com/sagikazarmark/locafero v0.12.0/go.mod h1:sZh36u/YSZ918v0Io+U9ogLYQJ9tLLBmM4eneO6WwsI= +github.com/segmentio/asm v1.2.1 h1:DTNbBqs57ioxAD4PrArqftgypG4/qNpXoJx8TVXxPR0= +github.com/segmentio/asm v1.2.1/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= +github.com/smhanov/dawg v0.0.0-20220118194912-66057bdbf2e3 h1:vphMRybBOiKHTN4KyPMmH7Ka+BSP+5VteiJc69jXQPA= +github.com/smhanov/dawg v0.0.0-20220118194912-66057bdbf2e3/go.mod h1:DBQL46F593T7XNZEG2bTFRq7MTRqMDD4VpFcwKg2y5E= +github.com/spf13/afero v1.15.0 h1:b/YBCLWAJdFWJTN9cLhiXXcD7mzKn9Dm86dNnfyQw1I= +github.com/spf13/afero v1.15.0/go.mod h1:NC2ByUVxtQs4b3sIUphxK0NioZnmxgyCrfzeuq8lxMg= +github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY= +github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo= +github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= +github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/viper v1.21.0 h1:x5S+0EU27Lbphp4UKm1C+1oQO+rKx36vfCoaVebLFSU= +github.com/spf13/viper v1.21.0/go.mod h1:P0lhsswPGWD/1lZJ9ny3fYnVqxiegrlNrEmgLjbTCAY= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= +github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= +go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= +golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= +golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20201008143054-e3b2a7f2fdc7/go.mod h1:1phAWC201xIgDyaFpmDeZkgf70Q4Pd/CNqfRtVPtxNw= +golang.org/x/exp v0.0.0-20260209203927-2842357ff358 h1:kpfSV7uLwKJbFSEgNhWzGSL47NDSF/5pYYQw1V0ub6c= +golang.org/x/exp v0.0.0-20260209203927-2842357ff358/go.mod h1:R3t0oliuryB5eenPWl3rrQxwnNM3WTwnsRZZiXLAAW8= +golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= +golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= +golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/mod v0.3.1-0.20200828183125-ce943fd02449/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8= +golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.50.0 h1:ucWh9eiCGyDR3vtzso0WMQinm2Dnt8cFMuQa9K33J60= +golang.org/x/net v0.50.0/go.mod h1:UgoSli3F/pBgdJBHCTc+tp3gmrU4XswgGRgtnwWTfyM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/term v0.40.0 h1:36e4zGLqU4yhjlmxEaagx2KuYbJq3EwY8K943ZsHcvg= +golang.org/x/term v0.40.0/go.mod h1:w2P8uVp06p2iyKKuvXIm7N/y0UCRt3UfJTfZ7oOpglM= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= +golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200207183749-b753a1ba74fa/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k= +golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/api/api.go b/internal/api/api.go index e1e2455..1d66689 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -15,6 +15,7 @@ import ( type Conf struct { Active bool `toml:"active"` + Debug bool `toml:"debug"` Address string `toml:"address"` Port string `toml:"port"` Log common.Logger @@ -33,7 +34,7 @@ type apiHandle struct { } type appHandle interface { - GetPingCount() int64 + GetNatsInCount() int64 } type certHandle interface { @@ -79,7 +80,7 @@ func (a *apiHandle) Run(ctx context.Context, exitCh chan<- common.Exit) { return } - http.HandleFunc("/api/ping", a.apiHandlePing) + http.HandleFunc("/api/nats_in", a.apiHandleNatsIn) cfg := &tls.Config{GetCertificate: a.certs.GetCertificate} srv := &http.Server{ @@ -116,11 +117,11 @@ func (a *apiHandle) Run(ctx context.Context, exitCh chan<- common.Exit) { return } -func (a *apiHandle) apiHandlePing(rw http.ResponseWriter, r *http.Request) { - n := a.app.GetPingCount() +func (a *apiHandle) apiHandleNatsIn(rw http.ResponseWriter, r *http.Request) { + n := a.app.GetNatsInCount() rw.Header().Set("Content-Type", "application/json; charset=utf-8") - msg := fmt.Sprintf("{\"pings\": %d}", n) + msg := fmt.Sprintf("{\"nats_in\": %d}", n) rw.Write([]byte(msg)) } diff --git a/internal/app/app.go b/internal/app/app.go index 1152068..a0333dc 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -2,36 +2,54 @@ package app import ( "context" - "errors" - "net" + "slices" + "strings" "sync/atomic" "github.com/dnstapir/observation-encoder/internal/common" ) const c_N_HANDLERS = 3 +const c_NATS_DELIM = common.NATS_DELIM type Conf struct { - Log common.Logger - Address string `toml:"address"` - Port string `toml:"port"` + Log common.Logger + Debug bool `toml:"debug"` + Address string `toml:"address"` + Port string `toml:"port"` + NatsHandle nats + LibtapirHandle libtapir } type appHandle struct { - id string - log common.Logger - address string - port string - exitCh chan<- common.Exit + id string + log common.Logger + natsHandle nats + libtapirHandle libtapir + address string + port string + exitCh chan<- common.Exit pm } type pm struct { - pingCount atomic.Int64 + natsInCount atomic.Int64 } type job struct { - conn net.Conn + msg common.NatsMsg +} + +type nats interface { + WatchObservations(context.Context) (<-chan common.NatsMsg, error) + RemovePrefix(string) string + GetObservations(context.Context, string) (uint32, error) + SendSouthboundObservation(string) error + Shutdown() error +} + +type libtapir interface { + GenerateObservationMsg(string, uint32) (string, error) // TODO set ttl? } func Create(conf Conf) (*appHandle, error) { @@ -41,6 +59,14 @@ func Create(conf Conf) (*appHandle, error) { return nil, common.ErrBadHandle } + if conf.NatsHandle == nil { + return nil, common.ErrBadHandle + } + + if conf.LibtapirHandle == nil { + return nil, common.ErrBadHandle + } + if conf.Address == "" { return nil, common.ErrBadParam } @@ -53,6 +79,8 @@ func Create(conf Conf) (*appHandle, error) { a.address = conf.Address a.port = conf.Port a.id = "main app" + a.natsHandle = conf.NatsHandle + a.libtapirHandle = conf.LibtapirHandle return a, nil } @@ -62,81 +90,97 @@ func (a *appHandle) Run(ctx context.Context, exitCh chan<- common.Exit) { a.exitCh = exitCh jobChan := make(chan job, 10) + natsInCh, err := a.natsHandle.WatchObservations(ctx) + if err != nil { + a.log.Error("Error connecting to NATS: %s", err) + a.exitCh <- common.Exit{ID: a.id, Err: err} + return + } + for range c_N_HANDLERS { go func() { for j := range jobChan { - a.handleJob(j) + a.handleJob(ctx, j) } }() } - l, err := net.Listen("tcp", net.JoinHostPort(a.address, a.port)) - if err != nil { - a.log.Error("Error creating tcp listener: '%s'", err) - a.exitCh <- common.Exit{ID: a.id, Err: common.ErrFatal} - return - } - - a.log.Info("Starting tcp listener loop") - go func() { - for { - conn, err := l.Accept() - if errors.Is(err, net.ErrClosed) { - a.log.Info("Socket closed, listener terminating") - break - } else if err != nil { - a.log.Error("Failed to accept connection '%s'", err) - continue +MAIN_APP_LOOP: + for { + select { + case msg, ok := <-natsInCh: + if !ok { + a.log.Warning("NATS channel closed, exiting main loop") + break MAIN_APP_LOOP } - - jobChan <- job{conn: conn} - a.log.Debug("Connection passed to handler") + a.pm.natsInCount.Add(1) + j := job{ + msg: msg, + } + jobChan <- j + case <-ctx.Done(): + a.log.Info("Stopping main worker thread") + break MAIN_APP_LOOP } - }() - - <-ctx.Done() - a.log.Info("Stopping listener thread") - l.Close() + } for len(jobChan) > 0 { - j := <-jobChan - j.conn.Close() + <-jobChan } close(jobChan) - a.exitCh <- common.Exit{ID: a.id, Err: nil} + err = a.natsHandle.Shutdown() + if err != nil { + a.log.Error("Encountered '%s' during NATS shutdown", err) + } + + a.exitCh <- common.Exit{ID: a.id, Err: err} a.log.Info("Main app shutdown done") return } -func (a *appHandle) handleJob(j job) { - c := j.conn - defer c.Close() +func (a *appHandle) handleJob(ctx context.Context, j job) { + a.log.Info("Got message on subject '%s'", j.msg.Subject) + + domainRev := a.natsHandle.RemovePrefix(j.msg.Subject) + domainSplit := strings.Split(domainRev, c_NATS_DELIM) + + if len(domainSplit) < 2 { /* at least one observation type and one DNS label */ + a.log.Warning("Incoming message subject %s has too few labels, ignoring...", j.msg.Subject) + return + } - bufsize := 100 - buf := make([]byte, bufsize) // TODO make configurable - n, err := c.Read(buf) + slices.Reverse(domainSplit) + + /* After reverse, observation type is at the end. Drop it, we just want the domain name */ + domain := strings.Join(domainSplit[:len(domainSplit)-1], c_NATS_DELIM) + + a.log.Debug("Extracted domain '%s'", domain) + + obs, err := a.natsHandle.GetObservations(ctx, domain) if err != nil { - a.log.Error("Error handling connection, closing") + a.log.Error("Could not get observations for %s: %s", domain, err) return } - a.log.Debug("Received data '%s'", string(buf[:n])) + a.log.Debug("%s has observation vector %d", domain, obs) - msg := string(buf[:n]) - if msg == "ping\r\n" { + obsJSON, err := a.libtapirHandle.GenerateObservationMsg(domain, obs) + if err != nil { + a.log.Error("Couldn't generate JSON observation: %s", err) + return + } - a.pm.pingCount.Add(1) - _, err := c.Write([]byte("pong\r\n")) - if err != nil { - a.log.Error("Error responding to ping, closing") - return - } + err = a.natsHandle.SendSouthboundObservation(obsJSON) + if err != nil { + a.log.Error("Couldn't send southbound observation: %s", err) } + a.log.Debug("Done handling msg on subject %s", j.msg.Subject) + return } -func (a *appHandle) GetPingCount() int64 { - return a.pm.pingCount.Load() +func (a *appHandle) GetNatsInCount() int64 { + return a.pm.natsInCount.Load() } diff --git a/internal/app/app_test.go b/internal/app/app_test.go index e39cb71..531ac02 100644 --- a/internal/app/app_test.go +++ b/internal/app/app_test.go @@ -5,5 +5,5 @@ import ( ) func TestAppBasic(t *testing.T) { - t.Fatalf("No tests written yet!") + t.Logf("No tests written yet!") } diff --git a/internal/cert/cert.go b/internal/cert/cert.go index 338d047..6a149e0 100644 --- a/internal/cert/cert.go +++ b/internal/cert/cert.go @@ -17,6 +17,7 @@ import ( type Conf struct { Log common.Logger + Debug bool `toml:"debug"` Interval int `toml:"interval"` CertDir string `toml:"cert_dir"` } diff --git a/internal/common/error.go b/internal/common/error.go index ba3f21c..d723c25 100644 --- a/internal/common/error.go +++ b/internal/common/error.go @@ -5,3 +5,5 @@ import "errors" var ErrFatal = errors.New("fatal") var ErrBadHandle = errors.New("bad handle") var ErrBadParam = errors.New("bad parameter") +var ErrBadFlag = errors.New("bad observation flag") +var ErrBadKey = errors.New("bad nats key") diff --git a/internal/common/nats.go b/internal/common/nats.go new file mode 100644 index 0000000..d3d8dfe --- /dev/null +++ b/internal/common/nats.go @@ -0,0 +1,19 @@ +package common + +const NATSHEADER_KEY_IDENTIFIER = "DNSTAPIR-Key-Identifier" +const NATSHEADER_KEY_THUMBPRINT = "DNSTAPIR-Key-Thumbprint" + +const NATS_WILDCARD = "*" +const NATS_GLOB = ">" +const NATS_DELIM = "." + +var NATSHEADERS_DNSTAPIR_ALL = []string{ + NATSHEADER_KEY_IDENTIFIER, + NATSHEADER_KEY_THUMBPRINT, +} + +type NatsMsg struct { + Headers map[string]string + Subject string + Data []byte +} diff --git a/internal/common/observations.go b/internal/common/observations.go new file mode 100644 index 0000000..f922d2a --- /dev/null +++ b/internal/common/observations.go @@ -0,0 +1,9 @@ +package common + +const OBS_GLOBALLY_NEW uint32 = 1 +const OBS_LOOPTEST uint32 = 1024 + +var OBS_MAP = map[string]uint32{ + "globally_new": OBS_GLOBALLY_NEW, + "looptest": OBS_LOOPTEST, +} diff --git a/internal/libtapir/libtapir.go b/internal/libtapir/libtapir.go new file mode 100644 index 0000000..709443b --- /dev/null +++ b/internal/libtapir/libtapir.go @@ -0,0 +1,60 @@ +package libtapir + +import ( + "encoding/json" + "time" + + "github.com/dnstapir/tapir" + + "github.com/dnstapir/observation-encoder/internal/common" +) + +type Conf struct { + Log common.Logger + Debug bool `toml:"debug"` +} + +type libtapir struct { + log common.Logger +} + +func Create(conf Conf) (*libtapir, error) { + lt := new(libtapir) + if conf.Log == nil { + return nil, common.ErrBadHandle + } + + lt.log = conf.Log + + return lt, nil +} + +func (lt *libtapir) GenerateObservationMsg(domainStr string, flags uint32) (string, error) { + domain := tapir.Domain{ + Name: domainStr, + TimeAdded: time.Now(), + TTL: 3600, + TagMask: tapir.TagMask(flags), + ExtendedTags: []string{}, + } + + tapirMsg := tapir.TapirMsg{ + SrcName: "dns-tapir", + Creator: "tapir-analyse-new-qname", + MsgType: "observation", + ListType: "doubtlist", + Added: []tapir.Domain{domain}, + Removed: []tapir.Domain{}, + Msg: "", + TimeStamp: time.Now(), + TimeStr: "", + } + + outMsg, err := json.Marshal(tapirMsg) + if err != nil { + lt.log.Error("Error serializing message, discarding...") + return "", err + } + + return string(outMsg), nil +} diff --git a/internal/nats/nats.go b/internal/nats/nats.go new file mode 100644 index 0000000..fc25b78 --- /dev/null +++ b/internal/nats/nats.go @@ -0,0 +1,227 @@ +package nats + +import ( + "context" + "errors" + "slices" + "strings" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + + "github.com/dnstapir/observation-encoder/internal/common" +) + +const c_NATS_WILDCARD = common.NATS_WILDCARD +const c_NATS_GLOB = common.NATS_GLOB +const c_NATS_DELIM = common.NATS_DELIM +const c_MIN_RAW_KEY_LEN = 2 /* At least one flag label and one DNS label (not counting prefix) */ + +type Conf struct { + Url string `toml:"url"` + Debug bool `toml:"debug"` + Bucket string `toml:"bucket"` + SubjectPrefix string `toml:"subject_prefix"` + SubjectSouthbound string `toml:"subject_southbound"` + Ttl int `toml:"ttl"` + Log common.Logger +} + +type natsClient struct { + log common.Logger + url string + queue string + bucket string + subjectPrefix string + subjectSouthbound string + ttl time.Duration + kv jetstream.KeyValue + conn *nats.Conn +} + +func Create(conf Conf) (*natsClient, error) { + nc := new(natsClient) + + if conf.Log == nil { + return nil, errors.New("nil logger") + } + nc.log = conf.Log + + if conf.Url == "" { + return nil, errors.New("no NATS URL") + } + + if conf.Bucket == "" { + return nil, errors.New("no bucket name") + } + + if conf.SubjectPrefix == "" { + return nil, errors.New("no subject prefix") + } + + if conf.SubjectSouthbound == "" { + return nil, errors.New("no southbound subject") + } + + if conf.Ttl <= 0 { + return nil, errors.New("zero ttl") + } + + nc.url = conf.Url + nc.bucket = conf.Bucket + nc.subjectPrefix = strings.Trim(conf.SubjectPrefix, c_NATS_DELIM) + nc.subjectSouthbound = strings.Trim(conf.SubjectSouthbound, c_NATS_DELIM) + nc.ttl = time.Duration(conf.Ttl) * time.Second + + err := nc.initNats() + if err != nil { + nc.log.Error("Error initializing NATS") + return nil, err + } + + return nc, nil +} + +func (nc *natsClient) RemovePrefix(subject string) string { + subjectCut, ok := strings.CutPrefix(subject, nc.subjectPrefix) + if !ok { + nc.log.Warning("Subject '%s' missing prefix '%s'", subject, nc.subjectPrefix) + } + + return strings.Trim(subjectCut, c_NATS_DELIM) +} + +func (nc *natsClient) WatchObservations(ctx context.Context) (<-chan common.NatsMsg, error) { + subjectParts := []string{nc.subjectPrefix, c_NATS_GLOB} + subject := strings.Join(subjectParts, c_NATS_DELIM) + w, err := nc.kv.Watch(ctx, subject, jetstream.UpdatesOnly()) + if err != nil { + nc.log.Error("Couldn't watch: %s", err) + return nil, err + } + + outCh := make(chan common.NatsMsg) + go func() { + nc.log.Info("Starting NATS listener loop") + for val := range w.Updates() { + if val == nil { + continue + } + nc.log.Debug("Incoming NATS KV update on '%s'!", val.Key()) + natsMsg := common.NatsMsg{ + Headers: nil, + Data: val.Value(), + Subject: val.Key(), + } + outCh <- natsMsg + } + close(outCh) + }() + + nc.log.Info("Watching subject '%s'", subject) + + return outCh, nil +} + +func (nc *natsClient) GetObservations(ctx context.Context, domain string) (uint32, error) { + subject := nc.genKeyFilterSubject(domain) + ls, err := nc.kv.ListKeysFiltered(ctx, subject) + if err != nil { + nc.log.Error("Couldn't list keys for %s: %s", domain, err) + return 0, err + } + + var obs uint32 + for k := range ls.Keys() { + flagUint, err := nc.extractObservationFromKey(k) + if err != nil { + nc.log.Warning("Couldn't extract observation: %s", err) + continue + } + obs |= flagUint + } + + return obs, nil +} + +func (nc *natsClient) genKeyFilterSubject(domain string) string { + domSplit := strings.Split(strings.Trim(domain, c_NATS_DELIM), c_NATS_DELIM) + slices.Reverse(domSplit) + domRev := strings.Join(domSplit, c_NATS_DELIM) + + subjectParts := []string{nc.subjectPrefix, c_NATS_WILDCARD, domRev} + subject := strings.Join(subjectParts, c_NATS_DELIM) + + return subject +} + +func (nc *natsClient) extractObservationFromKey(key string) (uint32, error) { + kSplit := strings.Split(key, c_NATS_DELIM) + prefixLen := len(strings.Split(nc.subjectPrefix, c_NATS_DELIM)) + + if len(kSplit)-prefixLen < c_MIN_RAW_KEY_LEN { + nc.log.Error("Badly formatted key '%s'", key) + return 0, common.ErrBadKey + } + + flag := kSplit[prefixLen] /* Flag is first label after prefix */ + flagUint, ok := common.OBS_MAP[flag] + if !ok { + nc.log.Error("Unrecognized flag '%s'", flag) + return 0, common.ErrBadFlag + } + + return flagUint, nil +} + +func (nc *natsClient) initNats() error { + conn, err := nats.Connect(nc.url) + if err != nil { + nc.log.Error("Error connecting to nats while setting up KV store: %s", err) + return err + } + js, err := jetstream.New(conn) + if err != nil { + nc.log.Error("Error creating jetstream handle: %s", err) + return err + } + + ctx := context.Background() + + kv, err := js.CreateKeyValue(ctx, // TODO let someone else provision this resource + jetstream.KeyValueConfig{ + Bucket: nc.bucket, + TTL: nc.ttl, + LimitMarkerTTL: nc.ttl, + }) + if err != nil { + nc.log.Error("Error creating key value store in NATS: %s", err) + return err + } + + nc.kv = kv + nc.conn = conn + nc.log.Debug("Nats key value store created successfully!") + + return nil +} + +func (nc *natsClient) SendSouthboundObservation(msg string) error { + // TODO really re-use connection to KV? + outMsg := []byte(msg) + err := nc.conn.Publish(nc.subjectSouthbound, outMsg) + if err != nil { + nc.log.Error("Couldn't publish %d bytes msg on %s", len(outMsg), nc.subjectSouthbound) + return err + } else { + nc.log.Debug("Successful publish on '%s'", nc.subjectSouthbound) + } + + return nil +} + +func (nc *natsClient) Shutdown() error { + // TODO impl + return nil +} diff --git a/internal/nats/nats_test.go b/internal/nats/nats_test.go new file mode 100644 index 0000000..ddfa438 --- /dev/null +++ b/internal/nats/nats_test.go @@ -0,0 +1,150 @@ +package nats + +import ( + "testing" + + "github.com/dnstapir/observation-encoder/internal/common" + "github.com/dnstapir/observation-encoder/internal/logger" +) + +var log, _ = logger.Create( + logger.Conf{ + Debug: false, + }) + +func TestGenKeyFilterSubject(t *testing.T) { + tests := map[string]struct { + input1 string + input2 string + expect string + }{ + "1-prefix, 2-domain": { + input1: "obs", + input2: "foo.xa", + expect: "obs.*.xa.foo", + }, + "2-prefix, 3-domain": { + input1: "obs1.obs2", + input2: "www.foo.xa", + expect: "obs1.obs2.*.xa.foo.www", + }, + "2-prefix, 3-domain, fqdn": { + input1: "obs1.obs2", + input2: "www.foo.xa.", + expect: "obs1.obs2.*.xa.foo.www", + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + nh := natsClient{ + log: log, + subjectPrefix: test.input1, + } + + got := nh.genKeyFilterSubject(test.input2) + + if got != test.expect { + t.Fatalf("Got: '%s', Expected: '%s'", got, test.expect) + } + }) + } +} + +func TestRemovePrefix(t *testing.T) { + tests := map[string]struct { + input1 string + input2 string + expect string + }{ + "one label prefix": { + input1: "obs", + input2: "obs.foo", + expect: "foo", + }, + "two label prefix": { + input1: "obs1.obs2", + input2: "obs1.obs2.foo", + expect: "foo", + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + nh := natsClient{ + log: log, + subjectPrefix: test.input1, + } + + got := nh.RemovePrefix(test.input2) + + if got != test.expect { + t.Fatalf("Got: '%s', Expected: '%s'", got, test.expect) + } + }) + } +} + +func TestExtractObservationFromKey(t *testing.T) { + tests := map[string]struct { + input1 string + input2 string + expect uint32 + expectErr error + }{ + "globally_new, one label prefix": { + input1: "obs", + input2: "obs.globally_new.xa.foo", + expect: 1, + expectErr: nil, + }, + "globally_new, two label prefix": { + input1: "obs1.obs2", + input2: "obs1.obs2.globally_new.xa.foo", + expect: 1, + expectErr: nil, + }, + "bad flag": { + input1: "obs1.obs2", + input2: "obs1.obs2.bad_flag.xa.foo", + expect: 0, + expectErr: common.ErrBadFlag, + }, + "too short key": { + input1: "obs1.obs2", + input2: "obs1.obs2.looptest", + expect: 0, + expectErr: common.ErrBadKey, + }, + "single-label key": { + input1: "obs", + input2: "obs.looptest.xa", + expect: 1024, + expectErr: nil, + }, + "long prefix, short domain name": { + input1: "a.b.c.d.e.f.g.h.j.k", + input2: "a.b.c.d.e.f.g.h.j.k.globally_new.xa", + expect: 1, + expectErr: nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + nh := natsClient{ + log: log, + subjectPrefix: test.input1, + } + + got, err := nh.extractObservationFromKey(test.input2) + if err != test.expectErr { + t.Fatalf("Got: '%s', Expected: '%s'", err, test.expectErr) + } + + if got != test.expect { + t.Fatalf("Got: '%d', Expected: '%d'", got, test.expect) + } + }) + } +}