Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions docs/modules/components/pages/outputs/redis_hash.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ output:
walk_json_object: false
fields: {}
max_in_flight: 64
batching:
count: 0
byte_size: 0
period: ""
check: ""
```

--
Expand Down Expand Up @@ -70,6 +75,12 @@ output:
walk_json_object: false
fields: {}
max_in_flight: 64
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
```

--
Expand Down Expand Up @@ -382,4 +393,106 @@ The maximum number of messages to have in flight at a given time. Increase this

*Default*: `64`

=== `batching`

Allows you to configure a xref:configuration:batching.adoc[batching policy].


*Type*: `object`


```yml
# Examples

batching:
byte_size: 5000
count: 0
period: 1s

batching:
count: 10
period: 1s

batching:
check: this.contains("END BATCH")
count: 0
period: 1m
```

=== `batching.count`

A number of messages at which the batch should be flushed. If `0` disables count based batching.


*Type*: `int`

*Default*: `0`

=== `batching.byte_size`

An amount of bytes at which the batch should be flushed. If `0` disables size based batching.


*Type*: `int`

*Default*: `0`

=== `batching.period`

A period in which an incomplete batch should be flushed regardless of its size.


*Type*: `string`

*Default*: `""`

```yml
# Examples

period: 1s

period: 1m

period: 500ms
```

=== `batching.check`

A xref:guides:bloblang/about.adoc[Bloblang query] that should return a boolean value indicating whether a message should end a batch.


*Type*: `string`

*Default*: `""`

```yml
# Examples

check: this.type == "end_of_transaction"
```

=== `batching.processors`

A list of xref:components:processors/about.adoc[processors] to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.


*Type*: `array`


```yml
# Examples

processors:
- archive:
format: concatenate

processors:
- archive:
format: lines

processors:
- archive:
format: json_array
```


106 changes: 82 additions & 24 deletions internal/impl/redis/output_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
hoFieldWalkMetadata = "walk_metadata"
hoFieldWalkJSON = "walk_json_object"
hoFieldFields = "fields"
hoFieldBatching = "batching"
)

func redisHashOutputConfig() *service.ConfigSpec {
Expand Down Expand Up @@ -80,13 +81,17 @@ Where latter stages will overwrite matching field names of a former stage.`+serv
Description("A map of key/value pairs to set as hash fields.").
Default(map[string]any{}),
service.NewOutputMaxInFlightField(),
service.NewBatchPolicyField(loFieldBatching),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hoFieldBatching is defined at line 34 but loFieldBatching (from the redis list output) is used here and at line 92. Both resolve to "batching" so it works at runtime, but this should use the hash output's own constant hoFieldBatching.

Rule: field names must use the component-prefix convention <componentAbbrev>Field<Name>godev agent.

)
}

func init() {
service.MustRegisterOutput(
service.MustRegisterBatchOutput(
"redis_hash", redisHashOutputConfig(),
func(conf *service.ParsedConfig, mgr *service.Resources) (out service.Output, maxInFlight int, err error) {
func(conf *service.ParsedConfig, mgr *service.Resources) (out service.BatchOutput, batchPol service.BatchPolicy, maxInFlight int, err error) {
if batchPol, err = conf.FieldBatchPolicy(loFieldBatching); err != nil {
return
}
if maxInFlight, err = conf.FieldMaxInFlight(); err != nil {
return
}
Expand Down Expand Up @@ -168,7 +173,7 @@ func walkForHashFields(msg *service.Message, fields map[string]any) error {
return nil
}

func (r *redisHashWriter) Write(ctx context.Context, msg *service.Message) error {
func (r *redisHashWriter) WriteBatch(ctx context.Context, batch service.MessageBatch) error {
r.connMut.RLock()
client := r.client
r.connMut.RUnlock()
Expand All @@ -177,34 +182,87 @@ func (r *redisHashWriter) Write(ctx context.Context, msg *service.Message) error
return service.ErrNotConnected
}

key, err := r.key.TryString(msg)
if err != nil {
return fmt.Errorf("key interpolation error: %w", err)
}
fields := map[string]any{}
if r.walkMetadata {
_ = msg.MetaWalkMut(func(k string, v any) error {
fields[k] = v
return nil
})
}
if r.walkJSON {
if err := walkForHashFields(msg, fields); err != nil {
err = fmt.Errorf("failed to walk JSON object: %v", err)
r.log.Errorf("HSET error: %v\n", err)
return err
if len(batch) == 1 {
key, err := r.key.TryString(batch[0])
if err != nil {
return fmt.Errorf("key interpolation error: %w", err)
}
fields := map[string]any{}
if r.walkMetadata {
_ = batch[0].MetaWalkMut(func(k string, v any) error {
fields[k] = v
return nil
})
}
if r.walkJSON {
if err := walkForHashFields(batch[0], fields); err != nil {
err = fmt.Errorf("failed to walk JSON object: %v", err)
r.log.Errorf("HSET error: %v\n", err)
return err
}
}
for k, v := range r.fields {
if fields[k], err = v.TryString(batch[0]); err != nil {
return fmt.Errorf("field %v interpolation error: %w", k, err)
}
}
if err := client.HSet(ctx, key, fields).Err(); err != nil {
_ = r.disconnect()
r.log.Errorf("Error from redis: %v\n", err)
return service.ErrNotConnected
}
return nil
}
for k, v := range r.fields {
if fields[k], err = v.TryString(msg); err != nil {
return fmt.Errorf("field %v interpolation error: %w", k, err)

pipe := client.Pipeline()

for i := range batch {
key, err := batch.TryInterpolatedString(i, r.key)
if err != nil {
return fmt.Errorf("key interpolation error: %w", err)
}

fields := map[string]any{}
if r.walkMetadata {
_ = batch[i].MetaWalkMut(func(k string, v any) error {
fields[k] = v
return nil
})
}
if r.walkJSON {
if err := walkForHashFields(batch[i], fields); err != nil {
err = fmt.Errorf("failed to walk JSON object: %v", err)
r.log.Errorf("HSET error: %v\n", err)
return err
}
}
for k, v := range r.fields {
if fields[k], err = v.TryString(batch[i]); err != nil {
return fmt.Errorf("field %v interpolation error: %w", k, err)
}
}
_ = pipe.HSet(ctx, key, fields)
}
if err := client.HSet(ctx, key, fields).Err(); err != nil {

cmders, err := pipe.Exec(ctx)
if err != nil {
_ = r.disconnect()
r.log.Errorf("Error from redis: %v\n", err)
r.log.Errorf("Errorf from redis: %v\n", err)
return service.ErrNotConnected
}

var batchErr *service.BatchError
for i, res := range cmders {
if res.Err() != nil {
if batchErr == nil {
batchErr = service.NewBatchError(batch, res.Err())
}
batchErr.Failed(i, res.Err())
}
}
if batchErr != nil {
return batchErr
}
return nil
}

Expand Down