diff --git a/go.mod b/go.mod index 2837489..d4bc73e 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,12 @@ go 1.24 toolchain go1.24.0 require ( + github.com/cespare/xxhash/v2 v2.3.0 github.com/ethereum/go-ethereum v1.15.3 github.com/hashicorp/go-plugin v1.6.3 github.com/leanovate/gopter v0.2.11 + github.com/nats-io/nats-server/v2 v2.11.0 + github.com/nats-io/nats.go v1.40.1 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chainlink-common v0.4.2-0.20250130202959-6f1f48342e36 github.com/smartcontractkit/libocr v0.0.0-20250220133800-f3b940c4f298 @@ -26,7 +29,6 @@ require ( github.com/bits-and-blooms/bitset v1.17.0 // indirect github.com/buger/jsonparser v1.1.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/consensys/bavard v0.1.22 // indirect github.com/consensys/gnark-crypto v0.14.0 // indirect github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a // indirect @@ -52,6 +54,7 @@ require ( github.com/gogo/protobuf v1.3.3 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/go-cmp v0.6.0 // indirect + github.com/google/go-tpm v0.9.3 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 // indirect @@ -78,9 +81,13 @@ require ( github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.14 // indirect + github.com/minio/highwayhash v1.0.3 // indirect github.com/mmcloughlin/addchain v0.4.0 // indirect github.com/mr-tron/base58 v1.2.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/nats-io/jwt/v2 v2.7.3 // indirect + github.com/nats-io/nkeys v0.4.10 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/oklog/run v1.1.0 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/pkg/errors v0.9.1 // indirect @@ -124,11 +131,12 @@ require ( go.opentelemetry.io/proto/otlp v1.5.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/crypto v0.33.0 // indirect + golang.org/x/crypto v0.36.0 // indirect golang.org/x/net v0.35.0 // indirect - golang.org/x/sync v0.11.0 // indirect - golang.org/x/sys v0.30.0 // indirect - golang.org/x/text v0.22.0 // indirect + golang.org/x/sync v0.12.0 // indirect + golang.org/x/sys v0.31.0 // indirect + golang.org/x/text v0.23.0 // indirect + golang.org/x/time v0.11.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250219182151-9fdb1cabc7b2 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250219182151-9fdb1cabc7b2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 3aa376e..e6ac029 100644 --- a/go.sum +++ b/go.sum @@ -53,6 +53,8 @@ github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkT github.com/XSAM/otelsql v0.29.0 h1:pEw9YXXs8ZrGRYfDc0cmArIz9lci5b42gmP5+tA1Huc= github.com/XSAM/otelsql v0.29.0/go.mod h1:d3/0xGIGC5RVEE+Ld7KotwaLy6zDeaF3fLJHOPpdN2w= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op h1:+OSa/t11TFhqfrX0EOSqQBDJ0YlpmK0rDSiB19dg9M0= +github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E= github.com/apache/arrow-go/v18 v18.0.0 h1:1dBDaSbH3LtulTyOVYaBCHO3yVRwjV+TZaqn3g6V7ZM= github.com/apache/arrow-go/v18 v18.0.0/go.mod h1:t6+cWRSmKgdQ6HsxisQjok+jBpKGhRDiqcf3p0p/F+A= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= @@ -246,6 +248,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-tpm v0.9.3 h1:+yx0/anQuGzi+ssRqeD6WpXjW2L/V0dItUayO0i9sRc= +github.com/google/go-tpm v0.9.3/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -441,6 +445,8 @@ github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= +github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= +github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= @@ -463,6 +469,16 @@ github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE= +github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4= +github.com/nats-io/nats-server/v2 v2.11.0 h1:fdwAT1d6DZW/4LUz5rkvQUe5leGEwjjOQYntzVRKvjE= +github.com/nats-io/nats-server/v2 v2.11.0/go.mod h1:leXySghbdtXSUmWem8K9McnJ6xbJOb0t9+NQ5HTRZjI= +github.com/nats-io/nats.go v1.40.1 h1:MLjDkdsbGUeCMKFyCFoLnNn/HDTqcgVa3EQm+pMNDPk= +github.com/nats-io/nats.go v1.40.1/go.mod h1:wV73x0FSI/orHPSYoyMeJB+KajMDoWyXmFaRrrYaaTo= +github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc= +github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo= github.com/neelance/sourcemap v0.0.0-20200213170602-2833bce08e4c/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= @@ -695,8 +711,8 @@ golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ= -golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= -golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= +golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= +golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -814,8 +830,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= -golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= +golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -885,8 +901,9 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= -golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -907,13 +924,13 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= -golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= +golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= -golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0= +golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/pkg/nats/client.go b/pkg/nats/client.go new file mode 100644 index 0000000..b77a8e3 --- /dev/null +++ b/pkg/nats/client.go @@ -0,0 +1,194 @@ +package nats + +import ( + "context" + "crypto" + "crypto/ed25519" + "encoding/hex" + "errors" + "fmt" + "strings" + "time" + + "github.com/nats-io/nats.go" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-data-streams/rpc" + "github.com/smartcontractkit/chainlink-data-streams/rpc/mtls" +) + +type Client interface { + services.Service + Transmit(ctx context.Context, payload []byte, dedupKey string, donID uint32) error +} + +var _ Client = (*client)(nil) + +type client struct { + services.Service + + lggr logger.Logger + clientSigner crypto.Signer + serverPubKey ed25519.PublicKey + serverURLs []string + clientPubKeyHex string + + conn *nats.Conn + js nats.JetStreamContext +} + +func NewClient(opts ClientOpts) (Client, error) { + if err := opts.verifyConfig(); err != nil { + return nil, fmt.Errorf("invalid configuration: %w", err) + } + + c := &client{ + lggr: opts.Logger, + clientSigner: opts.ClientSigner, + serverPubKey: opts.ServerPubKey, + serverURLs: opts.ServerURLs, + } + + svc, _ := services.Config{ + Name: "NATSClient", + Start: c.start, + Close: c.close, + }.NewServiceEngine(opts.Logger) + c.Service = svc + + return c, nil +} + +// connect creates a new NATS connection with the given configuration +func (c *client) connect() (*nats.Conn, error) { + cMtls, err := mtls.NewTLSTransportSigner(c.clientSigner, []ed25519.PublicKey{c.serverPubKey}) + if err != nil { + return nil, fmt.Errorf("failed to create client mTLS credentials: %w", err) + } + options := []nats.Option{ + // Connection settings + nats.ReconnectWait(1 * time.Second), + nats.RetryOnFailedConnect(true), + nats.MaxReconnects(-1), + nats.ReconnectBufSize(256 * 1024 * 1024), // 256MB + // Timeouts and keepalive + nats.PingInterval(1 * time.Second), + nats.Timeout(5 * time.Second), + nats.TLSHandshakeFirst(), + nats.Secure(cMtls), + nats.Name(c.getClientPubKeyHex()), + // Connection handlers for various NATS events + nats.ConnectHandler(func(nc *nats.Conn) { + c.lggr.Info("NATS client connection established", "server_id", nc.ConnectedServerId(), "server_url", nc.ConnectedUrl()) + }), + nats.ReconnectHandler(func(nc *nats.Conn) { + c.lggr.Info("NATS client reconnected", nc.ConnectedServerId(), "server_url", nc.ConnectedUrl(), "total_reconnects", nc.Reconnects) + }), + nats.ReconnectErrHandler(func(nc *nats.Conn, err error) { + c.lggr.Error("NATS client reconnected with error", nc.ConnectedServerId(), "server_url", nc.ConnectedUrl(), "error", err) + }), + nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { + c.lggr.Error("NATS client disconnected with error", nc.ConnectedServerId(), "server_url", nc.ConnectedUrl(), "total_reconnects", nc.Reconnects, "error", err) + }), + nats.ClosedHandler(func(nc *nats.Conn) { + c.lggr.Warn("NATS client closed", "server_id", nc.ConnectedServerId(), "server_url", nc.ConnectedUrl()) + }), + // Error handler for subscriptions + nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) { + c.lggr.Error("NATS client subscription error", "server_id", nc.ConnectedServerId(), "server_url", nc.ConnectedUrl(), "error", err, "subject", sub.Subject, "queue", sub.Queue) + }), + } + + nc, err := nats.Connect(strings.Join(c.serverURLs, ","), options...) + if err != nil { + return nil, fmt.Errorf("failed to create NATS connection: %w", err) + } + + jsOptions := []nats.JSOpt{ + nats.PublishAsyncMaxPending(256 * 1024), // Allow large number of async publishes + nats.PublishAsyncTimeout(100 * time.Millisecond), + nats.MaxWait(100 * time.Millisecond), + } + + // Create the JetStream context + c.js, err = nc.JetStream(jsOptions...) + if err != nil { + return nil, fmt.Errorf("failed to create JetStream context: %w", err) + } + + return nc, nil +} + +func (c *client) start(context.Context) error { + nc, err := c.connect() + // if there is only one server URL, use it + if err != nil { + return err + } + c.conn = nc + return nil +} + +func (c *client) close() error { + if c.conn != nil { + return c.conn.Drain() + } + return nil +} + +func (c *client) Transmit(ctx context.Context, payload []byte, dedupKey string, donID uint32) error { + subject := fmt.Sprintf("%d.%s", donID, c.getClientPubKeyHex()) + + pubOpts := []nats.PubOpt{ + nats.MsgId(dedupKey), + nats.StallWait(200 * time.Millisecond), + nats.MsgTTL(24 * time.Hour), + } + ack, err := c.js.PublishAsync(subject, payload, pubOpts...) + if err != nil { + return fmt.Errorf("failed to publish to fast lane: %w", err) + } + select { + case <-ack.Ok(): + return nil + case <-time.After(1 * time.Second): + return fmt.Errorf("fast lane publish timed out") + } +} + +func (c *client) LatestReport(ctx context.Context, req *rpc.LatestReportRequest) (resp *rpc.LatestReportResponse, err error) { + return nil, errors.New("LatestReport is not supported in nats mode") +} + +func (c *client) Name() string { + if c.lggr == nil { + return "NATSClient" + } + return c.lggr.Name() +} + +func (c *client) Healthy() error { + switch { + case c.conn == nil: + return fmt.Errorf("NATS connection is nil") + case !c.conn.IsConnected(): + return fmt.Errorf("NATS connection is %s", c.conn.Status()) + default: + return nil + } +} + +func (c *client) Ready() error { + if c.conn == nil || !c.conn.IsConnected() { + return errors.New("NATS connection is not ready") + } + return nil +} + +func (c *client) HealthReport() map[string]error { + return map[string]error{c.Name(): c.Healthy()} +} + +func (c *client) getClientPubKeyHex() string { + return hex.EncodeToString(c.clientSigner.Public().(ed25519.PublicKey)) +} diff --git a/pkg/nats/client_test.go b/pkg/nats/client_test.go new file mode 100644 index 0000000..f33c30a --- /dev/null +++ b/pkg/nats/client_test.go @@ -0,0 +1,196 @@ +package nats + +import ( + "crypto/ed25519" + "crypto/rand" + "encoding/hex" + "fmt" + "testing" + "time" + + "github.com/nats-io/nats-server/v2/server" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + "github.com/smartcontractkit/chainlink-data-streams/rpc/mtls" + "github.com/stretchr/testify/require" +) + +func TestClient_Connect(t *testing.T) { + // Generate server and client keypairs + serverPub, serverPriv, _ := ed25519.GenerateKey(rand.Reader) + clientPub, clientPriv, _ := ed25519.GenerateKey(nil) + clientPubHex := hex.EncodeToString(clientPub) + + serverTLSConfig, err := mtls.NewTLSConfig(serverPriv, []ed25519.PublicKey{clientPub}) + require.NoError(t, err) + + opts := &server.Options{ + Host: "0.0.0.0", // Listen on all interfaces since it's public + Port: 4222, + NoAuthUser: "", + NoLog: false, // Keep logs for monitoring + NoSigs: true, // Disable signal handling since we handle it ourselves + Logtime: true, // Include timestamps in logs + Debug: false, // Disable debug mode in production + Trace: false, // Disable trace mode in production + TLSConfig: serverTLSConfig, + TLSHandshakeFirst: true, // Require TLS handshake before any other communication + AllowNonTLS: false, // Only allow TLS connections + TLSMap: true, // Enable TLS certificate mapping + // Connection limits and DDoS protection + MaxConn: 1000, // Limit total connections + MaxSubs: 100, // Limit subscriptions per connection + MaxControlLine: 4096, // Limit control line size (4KB) + MaxPayload: 512 * 1024, // 512KB max payload for reports + MaxPending: 1024 * 1024 * 2, // 2MB total pending messages + MaxClosedClients: 1000, // Keep track of closed clients + // Rate limiting + WriteDeadline: 1 * time.Second, // Timeout for write operations + // Connection timeouts + AuthTimeout: 2.0, // 2 seconds for auth + TLSTimeout: 2.0, // 2 seconds for TLS handshake + PingInterval: 2 * time.Second, + MaxPingsOut: 3, // Disconnect after 3 missed pings + // Security hardening + NoHeaderSupport: false, // Disable header support for simpler protocol + NoFastProducerStall: true, // Prevent fast producer stall + // Graceful shutdown + LameDuckDuration: 30 * time.Second, + LameDuckGracePeriod: 10 * time.Second, + // Define users with specific permissions + Users: []*server.User{ + // User 1 with restricted permissions + { + Username: fmt.Sprintf("CN=%s,OU=%s,O=Chainlink Data Streams", clientPubHex[:32], clientPubHex), + Permissions: &server.Permissions{ + Publish: &server.SubjectPermission{ + Allow: []string{"test.*"}, // Allows test.anything + }, + Subscribe: &server.SubjectPermission{ + Allow: []string{"test.*"}, // Allows test.anything + }, + }, + }, + // Insecure client with no permissions + }, + } + + // Start the server + ns, err := server.NewServer(opts) + + require.NoError(t, err) + ns.Start() + defer ns.Shutdown() + + // Wait for server to be ready + for i := 0; i < 10; i++ { + if ns.ReadyForConnections(1 * time.Second) { + break + } + if i == 9 { + t.Fatal("NATS server did not start in time") + } + time.Sleep(100 * time.Millisecond) + } + + serverURL := fmt.Sprintf("tls://localhost:4222") + + testCases := []struct { + name string + clientSigner ed25519.PrivateKey + serverPubKey ed25519.PublicKey + serverURLs []string + expectSuccess bool + errorContains string + }{ + { + name: "successful connection", + clientSigner: clientPriv, + serverPubKey: serverPub, + serverURLs: []string{serverURL}, + expectSuccess: true, + }, + { + name: "invalid server URL", + clientSigner: clientPriv, + serverPubKey: serverPub, + serverURLs: []string{"tls://invalid:9999"}, + expectSuccess: false, + errorContains: "failed to create NATS connection", + }, + { + name: "wrong server public key", + clientSigner: clientPriv, + serverPubKey: make([]byte, ed25519.PublicKeySize), // Invalid key + serverURLs: []string{serverURL}, + expectSuccess: false, + errorContains: "failed to create client mTLS credentials", + }, + } + + clientOptions := ClientOpts{ + Logger: logger.Test(t), + ClientSigner: clientPriv, + ServerPubKey: serverPub, + ServerURLs: []string{serverURL}, + } + + err = clientOptions.verifyConfig() + require.NoError(t, err) + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + + c, err := NewClient(clientOptions) + + servicetest.Run(t, c) + require.NoError(t, err) + + if tc.expectSuccess { + require.NoError(t, err) + } + }) + } +} + +// // Helper to create a test NATS server with mTLS support +// func createTestServer(t *testing.T, allowedClientCerts []string) (*server.Server, string) { +// opts := &server.Options{ +// Host: "127.0.0.1", +// Port: -1, // Use a random port +// NoLog: true, +// NoSigs: true, +// TLS: true, +// TLSVerify: true, +// TLSMap: true, +// TLSTimeout: 5, +// TLSPinnedCerts: makePinnedCertSet(allowedClientCerts), +// } + +// s, err := server.NewServer(opts) +// require.NoError(t, err) +// go s.Start() + +// // Wait for server to be ready +// for i := 0; i < 10; i++ { +// if s.ReadyForConnections(1 * time.Second) { +// break +// } +// if i == 9 { +// t.Fatal("NATS server did not start in time") +// } +// time.Sleep(100 * time.Millisecond) +// } + +// serverURL := fmt.Sprintf("tls://%s:%d", opts.Host, s.ClusterAddr().Port) +// return s, serverURL +// } + +// // Helper function to convert string slice to PinnedCertSet +// func makePinnedCertSet(certs []string) server.PinnedCertSet { +// set := server.PinnedCertSet{} +// for _, cert := range certs { +// set[cert] = struct{}{} +// } +// return set +// } diff --git a/pkg/nats/config.go b/pkg/nats/config.go new file mode 100644 index 0000000..64aac36 --- /dev/null +++ b/pkg/nats/config.go @@ -0,0 +1,40 @@ +package nats + +import ( + "crypto" + "crypto/ed25519" + "fmt" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +type ClientOpts struct { + Logger logger.Logger + ClientSigner crypto.Signer + ServerPubKey ed25519.PublicKey + ServerURLs []string +} + +// verifyConfig validates all required fields are properly set +func (c *ClientOpts) verifyConfig() error { + var errs []error + + if c.Logger == nil { + errs = append(errs, fmt.Errorf("logger is required for NATS client")) + } + if c.ClientSigner == nil { + errs = append(errs, fmt.Errorf("client signer is required for NATS client")) + } + if len(c.ServerPubKey) == 0 { + errs = append(errs, fmt.Errorf("server public key is required for NATS client")) + } + if len(c.ServerURLs) == 0 { + errs = append(errs, fmt.Errorf("at least one server URL is required for NATS client")) + } + + if len(errs) > 0 { + return fmt.Errorf("invalid NATS client configuration: %v", errs) + } + + return nil +} diff --git a/pkg/nats/example/main.go b/pkg/nats/example/main.go new file mode 100644 index 0000000..d923646 --- /dev/null +++ b/pkg/nats/example/main.go @@ -0,0 +1,213 @@ +package main + +import ( + "crypto" + "crypto/ed25519" + "crypto/rand" + "encoding/hex" + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" + + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" + "github.com/smartcontractkit/chainlink-data-streams/rpc/mtls" + // Your custom TLS package +) + +func startServer(opts *server.Options) (*server.Server, error) { + // Create a new NATS server + ns, err := server.NewServer(opts) + if err != nil { + return nil, fmt.Errorf("error creating server: %w", err) + } + + // Start the server in a goroutine + go func() { + ns.Start() + }() + + // Wait for the server to be ready + if !ns.ReadyForConnections(4 * time.Second) { + return nil, fmt.Errorf("NATS server failed to start") + } + + log.Printf("NATS server is running on %s", ns.ClientURL()) + return ns, nil +} + +func startClientandSendHello(clientName string, clientPriv ed25519.PrivateKey, serverPubKeys []ed25519.PublicKey, serverURL string) { + // Set up client TLS config + clientTLSSigner := crypto.Signer(clientPriv) + clientTLSConfig, err := mtls.NewTLSTransportSigner(clientTLSSigner, serverPubKeys) + if err != nil { + log.Fatalf("Client %s: Error creating TLS config: %v", clientName, err) + } + + natsOpts := []nats.Option{ + nats.Secure(clientTLSConfig), + nats.ReconnectWait(500 * time.Millisecond), + nats.Compression(true), + nats.MaxReconnects(-1), + nats.TLSHandshakeFirst(), + nats.FlusherTimeout(1 * time.Second), + nats.PingInterval(1 * time.Second), + nats.Timeout(2 * time.Second), + nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) { + log.Printf("Client %s: Error: %v", clientName, err) + }), + nats.Name(clientName), + } + nc, err := nats.Connect(serverURL, natsOpts...) + if err != nil { + log.Fatalf("Client %s: Error connecting to NATS: %v", clientName, err) + } + defer nc.Close() + + // Subscribe to the test subject + sub, err := nc.Subscribe("test.*", func(msg *nats.Msg) { + log.Printf("Client %s received: %s", clientName, string(msg.Data)) + }) + if err != nil { + log.Fatalf("Client %s: Error subscribing: %v", clientName, err) + } + defer sub.Unsubscribe() + + // Publish a message + err = nc.Publish("test.*", []byte(fmt.Sprintf("Hello world from %s", clientName))) + if err != nil { + log.Fatalf("Client %s: Error publishing message: %v", clientName, err) + } + + // Ensure the publish message is sent to the server. + if err := nc.Flush(); err != nil { + log.Fatalf("Client %s: Error flushing connection: %v", clientName, err) + } + + // Wait to ensure the message is received before closing. + time.Sleep(2 * time.Second) +} + +func main() { + serverPub, serverPriv, _ := ed25519.GenerateKey(rand.Reader) + log.Printf("Server public key: %x", serverPub) + + client1Pub, client1Priv, _ := ed25519.GenerateKey(rand.Reader) + log.Printf("Client1 public key: %x", client1Pub) + + client2Pub, client2Priv, _ := ed25519.GenerateKey(rand.Reader) + log.Printf("Client2 public key: %x", client2Pub) + + insecureClientPub, insecureClientPriv, _ := ed25519.GenerateKey(rand.Reader) + log.Printf("Insecure client public key: %x", insecureClientPub) + + clientPubKeys := []ed25519.PublicKey{client1Pub, client2Pub} + + serverTLSConfig, err := mtls.NewTLSConfig(serverPriv, clientPubKeys) + for _, key := range clientPubKeys { + log.Printf("Client public key: %x", key) + } + + if err != nil { + log.Fatalf("Error creating TLS config: %v", err) + } + + // Options block for nats-server. + // NOTE: This structure is no longer used for monitoring endpoints + // and json tags are deprecated and may be removed in the future. + // Create an embedded NATS server with least privilege permissions + + client1PubHex := hex.EncodeToString(client1Pub) + client2PubHex := hex.EncodeToString(client2Pub) + // Create a new NATS server options + opts := &server.Options{ + Host: "0.0.0.0", // Listen on all interfaces since it's public + Port: 4222, + NoAuthUser: "", + NoLog: false, // Keep logs for monitoring + NoSigs: true, // Disable signal handling since we handle it ourselves + Logtime: true, // Include timestamps in logs + Debug: false, // Disable debug mode in production + Trace: false, // Disable trace mode in production + TLSConfig: serverTLSConfig, + TLSHandshakeFirst: true, // Require TLS handshake before any other communication + AllowNonTLS: false, // Only allow TLS connections + TLSMap: true, // Enable TLS certificate mapping + // Connection limits and DDoS protection + MaxConn: 1000, // Limit total connections + MaxSubs: 100, // Limit subscriptions per connection + MaxControlLine: 4096, // Limit control line size (4KB) + MaxPayload: 512 * 1024, // 512KB max payload for reports + MaxPending: 1024 * 1024 * 2, // 2MB total pending messages + MaxClosedClients: 1000, // Keep track of closed clients + // Rate limiting + WriteDeadline: 1 * time.Second, // Timeout for write operations + // Connection timeouts + AuthTimeout: 2.0, // 2 seconds for auth + TLSTimeout: 2.0, // 2 seconds for TLS handshake + PingInterval: 2 * time.Second, + MaxPingsOut: 3, // Disconnect after 3 missed pings + // Security hardening + NoHeaderSupport: false, // Disable header support for simpler protocol + NoFastProducerStall: true, // Prevent fast producer stall + // Graceful shutdown + LameDuckDuration: 30 * time.Second, + LameDuckGracePeriod: 10 * time.Second, + // Define users with specific permissions + Users: []*server.User{ + // User 1 with restricted permissions + { + Username: fmt.Sprintf("CN=%s,OU=%s,O=Chainlink Data Streams", client1PubHex[:32], client1PubHex), + Permissions: &server.Permissions{ + Publish: &server.SubjectPermission{ + Allow: []string{"test.*"}, // Allows test.anything + }, + Subscribe: &server.SubjectPermission{ + Allow: []string{"test.*"}, // Allows test.anything + }, + }, + }, + + // User 2 with different permissions + { + Username: fmt.Sprintf("CN=%s,OU=%s,O=Chainlink Data Streams", client2PubHex[:32], client2PubHex), + Permissions: &server.Permissions{ + Publish: &server.SubjectPermission{ + Allow: []string{"other"}, + Deny: []string{"service.admin.>", "service.internal.>"}, + }, + Subscribe: &server.SubjectPermission{ + Allow: []string{"other", "other"}, + Deny: []string{"service.admin.>", "service.internal.>"}, + }, + }, + }, + + // Insecure client with no permissions + }, + } + + // Start the server + ns, err := startServer(opts) + if err != nil { + log.Fatalf("Failed to start server: %v", err) + } + + serverPubKeys := []ed25519.PublicKey{serverPub} + + // Start client in a goroutine + startClientandSendHello("client1", client1Priv, serverPubKeys, ns.ClientURL()) + startClientandSendHello("client2", client2Priv, serverPubKeys, ns.ClientURL()) + startClientandSendHello("insecureClient", insecureClientPriv, serverPubKeys, ns.ClientURL()) + + // Handle graceful shutdown + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + + <-sigCh + log.Println("\nShutting down...") + ns.Shutdown() +} diff --git a/pkg/nats/server.go b/pkg/nats/server.go new file mode 100644 index 0000000..aa27469 --- /dev/null +++ b/pkg/nats/server.go @@ -0,0 +1,267 @@ +package nats + +import ( + "context" + "crypto" + "crypto/ed25519" + "fmt" + "time" + + natssrv "github.com/nats-io/nats-server/v2/server" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-data-streams/rpc/mtls" +) + +// Server is the interface for a NATS server service, mirroring the pattern used by the client. +type Server interface { + services.Service + + // URL returns the server connect URL(s). Useful if you want to pass the + // string to clients so they know how to connect. + URL() []string +} + +// Make sure *serverImpl implements Server. +var _ Server = (*serverImpl)(nil) + +// serverImpl is the concrete implementation that holds the server instance and configuration. +type serverImpl struct { + services.Service + + lggr logger.Logger + opts ServerOpts + srv *natssrv.Server + urls []string +} + +// ServerOpts is the set of options required to stand up a NATS Server with mTLS. +type ServerOpts struct { + // Logging + Logger logger.Logger + + // The server’s private key. Must be an ed25519 key, used to prove identity to clients. + ServerPrivKey ed25519.PrivateKey + + // Which client public keys are allowed to connect via mTLS. Typically includes + // the set of ed25519.PublicKey(s) for the clients you'd like to authorize. + AllowedClientPubKeys []ed25519.PublicKey + + // Where the server should listen (e.g. "0.0.0.0" or "127.0.0.1") + Host string + + // The TCP port on which NATS server will listen (e.g. 4222). + Port int + + // (Optional) Custom NATS permissions. If empty, this example code + // defaults to some basic subject permissions. + AllowedPublish []string + AllowedSubscribe []string +} + +// NewServer constructs a new NATS Server service but does not start it. It mirrors +// the client pattern: we build a *serverImpl, then wrap it in a ServiceEngine so +// we get Start/Close/Health checks. +func NewServer(opts ServerOpts) (Server, error) { + if err := verifyServerOpts(opts); err != nil { + return nil, fmt.Errorf("invalid server configuration: %w", err) + } + + s := &serverImpl{ + lggr: opts.Logger, + opts: opts, + } + + // Create a service lifecycle engine, using the same pattern as client.go. + svc, err := services.Config{ + Name: "NATSServer", + Start: s.start, + Close: s.close, + }.NewServiceEngine(s.lggr) + if err != nil { + return nil, fmt.Errorf("failed to create NATSServer service engine: %w", err) + } + s.Service = svc + + return s, nil +} + +// verifyServerOpts does basic checks on your server configuration to +// avoid returning an incorrectly configured server. +func verifyServerOpts(opts ServerOpts) error { + if opts.Logger == nil { + return fmt.Errorf("logger must not be nil") + } + if opts.ServerPrivKey == nil { + return fmt.Errorf("server private key is required") + } + if len(opts.AllowedClientPubKeys) == 0 { + return fmt.Errorf("at least one allowed client public key is required") + } + if opts.Host == "" { + return fmt.Errorf("host must not be empty") + } + if opts.Port <= 0 { + return fmt.Errorf("port must be > 0") + } + return nil +} + +// start spins up the embedded NATS server with an mTLS config and begins listening. +func (s *serverImpl) start(ctx context.Context) error { + // Build the server TLSConfig from the server’s private key & allowed client pub keys. + serverTLSConfig, err := mtls.NewTLSConfig(s.opts.ServerPrivKey, s.opts.AllowedClientPubKeys) + if err != nil { + return fmt.Errorf("failed to create server TLS config: %w", err) + } + + // Derive a default set of allowed subjects for testing or production usage. + // If you'd like more advanced mapping, you can dynamically build natssrv.Users + // with specific Permissions, etc. + pubAllows := s.opts.AllowedPublish + if len(pubAllows) == 0 { + pubAllows = []string{"test.*"} + } + subAllows := s.opts.AllowedSubscribe + if len(subAllows) == 0 { + subAllows = []string{"test.*"} + } + + // For each allowed client pubkey, build a user stanza with the same pattern + // from your test code. This approach uses NATS TLSMap to tie the subject to + // the client's certificate identity, so the client’s CN is typically: + // "CN=,OU=,O=Chainlink Data Streams" + // This is an example pattern. You can adapt it to your environment. + var users []*natssrv.User + for _, clientPub := range s.opts.AllowedClientPubKeys { + clientHex := prettyKeyHex(clientPub) + user := &natssrv.User{ + Username: fmt.Sprintf("CN=%s,OU=%s,O=Chainlink Data Streams", + clientHex[:32], // maybe first 32 chars for brevity + clientHex), + Permissions: &natssrv.Permissions{ + Publish: &natssrv.SubjectPermission{ + Allow: pubAllows, + }, + Subscribe: &natssrv.SubjectPermission{ + Allow: subAllows, + }, + }, + } + users = append(users, user) + } + + // Configure the embedded server. + natsOpts := &natssrv.Options{ + Host: s.opts.Host, + Port: s.opts.Port, + NoLog: false, + NoSigs: true, + Logtime: true, + Debug: false, + Trace: false, + TLSConfig: serverTLSConfig, + TLSHandshakeFirst: true, + AllowNonTLS: false, + TLSMap: true, + // Connection limits & server resource constraints + MaxConn: 1000, + MaxSubs: 100, + MaxPayload: 512 * 1024, // 512KB + MaxPending: 2 * 1024 * 1024, + // Timeouts + WriteDeadline: 1 * time.Second, + AuthTimeout: 2.0, + TLSTimeout: 2.0, + // Ping intervals + PingInterval: 2 * time.Second, + MaxPingsOut: 3, + // Lame-duck + LameDuckDuration: 30 * time.Second, + LameDuckGracePeriod: 10 * time.Second, + // User / permissions + Users: users, + } + + // Spin up the NATS server instance (this *does not* block). + ns, err := natssrv.NewServer(natsOpts) + if err != nil { + return fmt.Errorf("failed to create embedded NATS server: %w", err) + } + + // Start listening for connections in a goroutine. Because NATSServer doesn’t block on Start(), + // you can proceed to do readiness checks to see if the server is ready for connections. + go ns.Start() + + s.srv = ns + // Prepare our final "URL()" that clients can connect to + // e.g. "tls://HOST:PORT" + addr := fmt.Sprintf("tls://%s:%d", s.opts.Host, s.opts.Port) + s.urls = []string{addr} + + s.lggr.Info("NATS server is starting", "host", s.opts.Host, "port", s.opts.Port) + return nil +} + +// close gracefully shuts down the NATS server. +func (s *serverImpl) close() error { + if s.srv == nil { + return nil + } + s.lggr.Info("Shutting down NATS server", "host", s.opts.Host, "port", s.opts.Port) + s.srv.Shutdown() + return nil +} + +// Healthy implements the Service interface and indicates if the NATS server +// is logically healthy (i.e. we have a non-nil server pointer). +func (s *serverImpl) Healthy() error { + if s.srv == nil { + return fmt.Errorf("NATS server is nil") + } + // Optionally, check internal server status or track metrics. + return nil +} + +// Ready implements the Service interface and indicates if the server is +// ready to accept connections. We can use the server's built-in readiness check. +func (s *serverImpl) Ready() error { + if s.srv == nil { + return fmt.Errorf("NATS server is nil") + } + // If we fail readiness (e.g. server not started or ephemeral failure), return an error. + // 0-second wait means immediate check. + if !s.srv.ReadyForConnections(0 * time.Second) { + return fmt.Errorf("NATS server is not ready for connections") + } + return nil +} + +// HealthReport aggregates the server's health in a map with the key as the +// service name. +func (s *serverImpl) HealthReport() map[string]error { + return map[string]error{s.Name(): s.Healthy()} +} + +// Name returns this service's name. If the logger has a name, use it. Otherwise "NATSServer". +func (s *serverImpl) Name() string { + if s.lggr == nil { + return "NATSServer" + } + return s.lggr.Name() +} + +// URL returns the array of server URLs that clients can connect to, e.g. ["tls://localhost:4222"]. +func (s *serverImpl) URL() []string { + return s.urls +} + +// prettyKeyHex helper function to turn ed25519.PublicKey or generic crypto.PublicKey into hex. +func prettyKeyHex(pub crypto.PublicKey) string { + switch k := pub.(type) { + case ed25519.PublicKey: + return fmt.Sprintf("%x", k) + default: + return "unknown_public_key" + } +} diff --git a/pkg/nats/transmitter.go b/pkg/nats/transmitter.go new file mode 100644 index 0000000..423a782 --- /dev/null +++ b/pkg/nats/transmitter.go @@ -0,0 +1,138 @@ +package nats + +import ( + "context" + "crypto" + "crypto/ed25519" + "encoding/hex" + "fmt" + "sync" + + "github.com/cespare/xxhash/v2" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" +) + +type TransmitterOpts struct { + Logger logger.Logger + FromAccount string + DonID uint32 + ServerURLs []string + ClientSigner crypto.Signer + ServerPubKey ed25519.PublicKey +} + +type transmitter struct { + services.StateMachine + services.Service + + lggr logger.Logger + fromAccount string + donID uint32 + serverURL []string + client Client + + // Reusable hash instance with mutex for thread safety + hashPool sync.Pool +} + +func NewTransmitter(opts TransmitterOpts) (llotypes.Transmitter, error) { + t := &transmitter{ + lggr: opts.Logger, + fromAccount: opts.FromAccount, + donID: opts.DonID, + serverURL: opts.ServerURLs, + } + + // Initialize the hash pool + t.hashPool.New = func() interface{} { + return xxhash.New() + } + + // Create NATS client + clientOpts := ClientOpts{ + Logger: opts.Logger, + ClientSigner: opts.ClientSigner, + ServerPubKey: opts.ServerPubKey, + ServerURLs: opts.ServerURLs, + } + client, err := NewClient(clientOpts) + if err != nil { + return nil, fmt.Errorf("failed to create NATS client: %w", err) + } + t.client = client + + // Initialize service with subservices + svc, _ := services.Config{ + Name: "NATSTransmitter", + Start: func(ctx context.Context) error { return nil }, + Close: func() error { return nil }, + NewSubServices: func(lggr logger.Logger) []services.Service { + return []services.Service{client} + }, + }.NewServiceEngine(opts.Logger) + t.Service = svc + + return t, nil +} + +func (t *transmitter) Transmit( + ctx context.Context, + digest ocr2types.ConfigDigest, + seqNr uint64, + report ocr3types.ReportWithInfo[llotypes.ReportInfo], + sigs []ocr2types.AttributedOnchainSignature, +) error { + if !t.IfStarted(func() {}) { + return fmt.Errorf("transmitter is not started") + } + + // Get a hash instance from the pool + h := t.hashPool.Get().(*xxhash.Digest) + defer t.hashPool.Put(h) + + // Reset the hash instance before use + h.Reset() + h.Write(report.Report) + dedupeKey := hex.EncodeToString(h.Sum(nil)) + + err := t.client.Transmit(ctx, report.Report, dedupeKey, t.donID) + if err != nil { + t.lggr.Errorw("Failed to transmit report", + "error", err, + "digest", digest, + "seqNr", seqNr, + "reportFormat", report.Info.ReportFormat, + ) + return err + } + + t.lggr.Debugw("Successfully transmitted report", + "digest", digest, + "seqNr", seqNr, + "reportFormat", report.Info.ReportFormat, + ) + return nil +} + +func (t *transmitter) FromAccount(ctx context.Context) (ocr2types.Account, error) { + return ocr2types.Account(t.fromAccount), nil +} + +func (t *transmitter) Ready() error { + return t.Healthy() +} + +func (t *transmitter) HealthReport() map[string]error { + report := map[string]error{t.Name(): t.Healthy()} + services.CopyHealth(report, t.client.HealthReport()) + return report +} + +func (t *transmitter) Name() string { + return t.lggr.Name() +} diff --git a/rpc/mtls/mtls.go b/rpc/mtls/mtls.go index ccab601..af30b78 100644 --- a/rpc/mtls/mtls.go +++ b/rpc/mtls/mtls.go @@ -7,10 +7,13 @@ import ( "crypto/subtle" "crypto/tls" "crypto/x509" + "crypto/x509/pkix" + "encoding/hex" "errors" "fmt" "math/big" "sync" + "time" "google.golang.org/grpc/credentials" ) @@ -55,8 +58,32 @@ func NewTLSConfig(privKey ed25519.PrivateKey, pubKeys []ed25519.PublicKey) (*tls if err != nil { return nil, err } + c, err := newMutualTLSConfig(priv.key, pubs) - return newMutualTLSConfig(priv.key, pubs) + if err != nil { + return nil, err + } + c.InsecureSkipVerify = true + c.ClientAuth = tls.RequireAnyClientCert + + return c, nil +} + +func NewTLSTransportSigner(signer crypto.Signer, pubKeys []ed25519.PublicKey) (*tls.Config, error) { + pubs, err := ValidPublicKeysFromEd25519(pubKeys...) + if err != nil { + return nil, err + } + + c, err := newMutualTLSConfig(signer, pubs) + c.ClientAuth = tls.RequireAnyClientCert + c.InsecureSkipVerify = true + + if err != nil { + return nil, err + } + + return c, nil } // newMutualTLSConfig uses the private key and public keys to construct a mutual @@ -93,21 +120,47 @@ func newMutualTLSConfig(signer crypto.Signer, pubs *PublicKeys) (*tls.Config, er // Generates a minimal certificate (that wouldn't be considered valid outside of // this networking protocol) from an Ed25519 private key. +// This also sets the organization and organizational unit for the certificate used for User Mapping func newMinimalX509Cert(signer crypto.Signer) (tls.Certificate, error) { + pubKey, ok := signer.Public().(ed25519.PublicKey) + if !ok { + return tls.Certificate{}, fmt.Errorf("invalid public key type") + } + + pubKeyHex := hex.EncodeToString(pubKey) + + // Generate a random serial number. + serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128) + serialNumber, err := rand.Int(rand.Reader, serialNumberLimit) + if err != nil { + return tls.Certificate{}, fmt.Errorf("failed to generate serial number: %v", err) + } + + now := time.Now() + // Set certificate validity (e.g., valid for 24 hours) template := x509.Certificate{ - SerialNumber: big.NewInt(0), // serial number must be set, so we set it to 0 + SerialNumber: serialNumber, + Subject: pkix.Name{ + CommonName: pubKeyHex[:32], + Organization: []string{"Chainlink Data Streams"}, + OrganizationalUnit: []string{pubKeyHex}, + }, + NotBefore: now, + KeyUsage: x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}, + BasicConstraintsValid: true, } encodedCert, err := x509.CreateCertificate(rand.Reader, &template, &template, signer.Public(), signer) if err != nil { return tls.Certificate{}, err } - - return tls.Certificate{ + cert := tls.Certificate{ Certificate: [][]byte{encodedCert}, PrivateKey: signer, SupportedSignatureAlgorithms: []tls.SignatureScheme{tls.Ed25519}, - }, nil + } + return cert, nil } type PrivateKey struct {