Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ require (
github.com/microcosm-cc/bluemonday v1.0.27
github.com/microsoft/go-mssqldb v1.8.2
github.com/microsoft/gocosmos v1.1.1
github.com/nats-io/nats.go v1.37.0
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nats.go v1.45.0
github.com/nats-io/nkeys v0.4.11
github.com/nats-io/stan.go v0.10.4
github.com/neo4j/neo4j-go-driver/v5 v5.24.0
github.com/nsf/jsondiff v0.0.0-20210926074059-1e845ec5d249
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1662,11 +1662,11 @@ github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/
github.com/nats-io/nats.go v1.14.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.15.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.22.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA=
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nats.go v1.45.0 h1:/wGPbnYXDM0pLKFjZTX+2JOw9TQPoIgTFrUaH97giwA=
github.com/nats-io/nats.go v1.45.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nats-io/stan.go v0.10.2/go.mod h1:vo2ax8K2IxaR3JtEMLZRFKIdoK/3o1/PKueapB7ezX0=
Expand Down
37 changes: 30 additions & 7 deletions internal/impl/nats/cache_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package nats
import (
"context"
"errors"
"fmt"
"sync"
"time"

Expand All @@ -34,7 +35,12 @@ func natsKVCacheConfig() *service.ConfigSpec {
Version("4.27.0").
Summary("Cache key/values in a NATS key-value bucket.").
Description(connectionNameDescription() + authDescription()).
Fields(kvDocs()...)
Fields(Docs("KV",
service.NewBoolField("create_bucket").
Description("Whether to automatically create the bucket if it doesn't exist.").
Advanced().
Default(false),
)...)
}

func init() {
Expand All @@ -47,15 +53,17 @@ func init() {
}

type kvCache struct {
connDetails connectionDetails
bucket string
connDetails connectionDetails
bucket string
createBucket bool

log *service.Logger

shutSig *shutdown.Signaller

connMut sync.RWMutex
natsConn *nats.Conn
js jetstream.JetStream
kv jetstream.KeyValue
}

Expand All @@ -74,6 +82,10 @@ func newKVCache(conf *service.ParsedConfig, mgr *service.Resources) (*kvCache, e
return nil, err
}

if p.createBucket, err = conf.FieldBool("create_bucket"); err != nil {
return nil, err
}

err = p.connect(context.Background())
return p, err
}
Expand All @@ -86,6 +98,7 @@ func (p *kvCache) disconnect() {
p.natsConn.Close()
p.natsConn = nil
}
p.js = nil
p.kv = nil
}

Expand All @@ -109,13 +122,23 @@ func (p *kvCache) connect(ctx context.Context) error {
}
}()

var js jetstream.JetStream
if js, err = jetstream.New(p.natsConn); err != nil {
if p.js, err = jetstream.New(p.natsConn); err != nil {
return err
}

if p.kv, err = js.KeyValue(ctx, p.bucket); err != nil {
return err
// Check if bucket exists first, create only if config allows
p.kv, err = p.js.KeyValue(ctx, p.bucket)
if err != nil {
if p.createBucket {
if p.kv, err = p.js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: p.bucket,
}); err != nil {
return fmt.Errorf("failed to create bucket %s: %w", p.bucket, err)
}
p.log.Infof("Created bucket %s", p.bucket)
} else {
return fmt.Errorf("bucket %s does not exist and create_bucket is false", p.bucket)
}
}
return nil
}
Expand Down
9 changes: 7 additions & 2 deletions internal/impl/nats/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,18 @@ func outputTracingDocs() *service.ConfigField {
return service.NewInjectTracingSpanMappingField().Version(tracingVersion)
}

func kvDocs(extraFields ...*service.ConfigField) []*service.ConfigField {
func Docs(natsComponentType string, extraFields ...*service.ConfigField) []*service.ConfigField {
// TODO: Use `slices.Concat()` after switching to Go 1.22
bucketName := "my_bucket"
if natsComponentType == "KV" {
bucketName = "my_kv_bucket"
}

fields := append(
connectionHeadFields(),
[]*service.ConfigField{
service.NewStringField(kvFieldBucket).
Description("The name of the KV bucket.").Example("my_kv_bucket"),
Description("The name of the " + natsComponentType + " bucket.").Example(bucketName),
}...,
)
fields = append(fields, extraFields...)
Expand Down
25 changes: 23 additions & 2 deletions internal/impl/nats/input_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package nats

import (
"context"
"fmt"
"sync"

"github.com/nats-io/nats.go"
Expand Down Expand Up @@ -54,11 +55,15 @@ This input adds the following metadata fields to each message:
` + "```" + `

` + connectionNameDescription() + authDescription()).
Fields(kvDocs([]*service.ConfigField{
Fields(Docs("KV", []*service.ConfigField{
service.NewStringField(kviFieldKey).
Description("Key to watch for updates, can include wildcards.").
Default(">").
Example("foo.bar.baz").Example("foo.*.baz").Example("foo.bar.*").Example("foo.>"),
service.NewBoolField("create_bucket").
Description("Whether to automatically create the bucket if it doesn't exist.").
Advanced().
Default(false),
service.NewAutoRetryNacksToggleField(),
service.NewBoolField(kviFieldIgnoreDeletes).
Description("Do not send delete markers as messages.").
Expand Down Expand Up @@ -92,6 +97,7 @@ type kvReader struct {
connDetails connectionDetails
bucket string
key string
createBucket bool
ignoreDeletes bool
includeHistory bool
metaOnly bool
Expand Down Expand Up @@ -120,6 +126,10 @@ func newKVReader(conf *service.ParsedConfig, mgr *service.Resources) (*kvReader,
return nil, err
}

if r.createBucket, err = conf.FieldBool("create_bucket"); err != nil {
return nil, err
}

if r.key, err = conf.FieldString(kviFieldKey); err != nil {
return nil, err
}
Expand Down Expand Up @@ -167,9 +177,20 @@ func (r *kvReader) Connect(ctx context.Context) (err error) {
return err
}

// Check if bucket exists first, create only if config allows
kv, err := js.KeyValue(ctx, r.bucket)
if err != nil {
return err
if r.createBucket {
kv, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: r.bucket,
})
if err != nil {
return fmt.Errorf("failed to create bucket %s: %w", r.bucket, err)
}
r.log.Infof("Created bucket %s", r.bucket)
} else {
return fmt.Errorf("bucket %s does not exist and create_bucket is false", r.bucket)
}
}

var watchOpts []jetstream.WatchOpt
Expand Down
33 changes: 27 additions & 6 deletions internal/impl/nats/output_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package nats

import (
"context"
"fmt"
"sync"

"github.com/nats-io/nats.go"
Expand All @@ -42,12 +43,16 @@ xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions],
you to create a unique key for each message.

` + connectionNameDescription() + authDescription()).
Fields(kvDocs([]*service.ConfigField{
Fields(Docs("KV", []*service.ConfigField{
service.NewInterpolatedStringField(kvoFieldKey).
Description("The key for each message.").
Example("foo").
Example("foo.bar.baz").
Example(`foo.${! json("meta.type") }`),
service.NewBoolField("create_bucket").
Description("Whether to automatically create the bucket if it doesn't exist.").
Advanced().
Default(false),
service.NewOutputMaxInFlightField().Default(1024),
}...)...)
}
Expand All @@ -68,10 +73,11 @@ func init() {
//------------------------------------------------------------------------------

type kvOutput struct {
connDetails connectionDetails
bucket string
key *service.InterpolatedString
keyRaw string
connDetails connectionDetails
bucket string
key *service.InterpolatedString
keyRaw string
createBucket bool

log *service.Logger

Expand All @@ -97,6 +103,10 @@ func newKVOutput(conf *service.ParsedConfig, mgr *service.Resources) (*kvOutput,
return nil, err
}

if kv.createBucket, err = conf.FieldBool("create_bucket"); err != nil {
return nil, err
}

if kv.keyRaw, err = conf.FieldString(kvoFieldKey); err != nil {
return nil, err
}
Expand Down Expand Up @@ -134,9 +144,20 @@ func (kv *kvOutput) Connect(ctx context.Context) (err error) {
return err
}

// Check if bucket exists first, create only if config allows
kv.keyValue, err = jsc.KeyValue(ctx, kv.bucket)
if err != nil {
return err
if kv.createBucket {
kv.keyValue, err = jsc.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: kv.bucket,
})
if err != nil {
return fmt.Errorf("failed to create bucket %s: %w", kv.bucket, err)
}
kv.log.Infof("Created bucket %s", kv.bucket)
} else {
return fmt.Errorf("bucket %s does not exist and create_bucket is false", kv.bucket)
}
}

kv.natsConn = natsConn
Expand Down
44 changes: 34 additions & 10 deletions internal/impl/nats/processor_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package nats

import (
"context"
"errors"
"fmt"
"strconv"
"sync"
Expand Down Expand Up @@ -101,7 +102,7 @@ This processor adds the following metadata fields to each message, depending on
` + "```" + `

` + connectionNameDescription() + authDescription()).
Fields(kvDocs([]*service.ConfigField{
Fields(Docs("KV", []*service.ConfigField{
service.NewStringAnnotatedEnumField(kvpFieldOperation, kvpOperations).
Description("The operation to perform on the KV bucket."),
service.NewInterpolatedStringField(kvpFieldKey).
Expand All @@ -111,6 +112,10 @@ This processor adds the following metadata fields to each message, depending on
Example("foo.*").
Example("foo.>").
Example(`foo.${! json("meta.type") }`).LintRule(`if this == "" {[ "'key' must be set to a non-empty string" ]}`),
service.NewBoolField("create_bucket").
Description("Whether to automatically create the bucket if it doesn't exist.").
Advanced().
Default(false),
service.NewInterpolatedStringField(kvpFieldRevision).
Description("The revision of the key to operate on. Used for `get_revision` and `update` operations.").
Example("42").
Expand All @@ -137,12 +142,13 @@ func init() {
}

type kvProcessor struct {
connDetails connectionDetails
bucket string
operation kvpOperationType
key *service.InterpolatedString
revision *service.InterpolatedString
timeout time.Duration
connDetails connectionDetails
bucket string
operation kvpOperationType
key *service.InterpolatedString
revision *service.InterpolatedString
timeout time.Duration
createBucket bool

log *service.Logger

Expand All @@ -168,6 +174,10 @@ func newKVProcessor(conf *service.ParsedConfig, mgr *service.Resources) (*kvProc
return nil, err
}

if p.createBucket, err = conf.FieldBool("create_bucket"); err != nil {
return nil, err
}

if operation, err := conf.FieldString(kvpFieldOperation); err != nil {
return nil, err
} else {
Expand Down Expand Up @@ -395,9 +405,23 @@ func (p *kvProcessor) Connect(ctx context.Context) (err error) {
return err
}

p.kv, err = js.KeyValue(ctx, p.bucket)
if err != nil {
return err
// Try to get existing bucket first
if p.kv, err = js.KeyValue(ctx, p.bucket); err != nil {
if errors.Is(err, jetstream.ErrBucketNotFound) {
if p.createBucket {
// Create the bucket if it doesn't exist
p.log.Infof("Creating KV bucket: %s", p.bucket)
if p.kv, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: p.bucket,
}); err != nil {
return fmt.Errorf("failed to create bucket %s: %w", p.bucket, err)
}
} else {
return fmt.Errorf("bucket %s does not exist and create_bucket is false", p.bucket)
}
} else {
return err
}
}
return nil
}
Expand Down