Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions docs/modules/components/pages/outputs/redis_hash.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ output:
walk_json_object: false
fields: {}
max_in_flight: 64
batching:
count: 0
byte_size: 0
period: ""
check: ""
```

--
Expand Down Expand Up @@ -70,6 +75,12 @@ output:
walk_json_object: false
fields: {}
max_in_flight: 64
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
```

--
Expand Down Expand Up @@ -382,4 +393,106 @@ The maximum number of messages to have in flight at a given time. Increase this

*Default*: `64`

=== `batching`

Allows you to configure a xref:configuration:batching.adoc[batching policy].


*Type*: `object`


```yml
# Examples

batching:
byte_size: 5000
count: 0
period: 1s

batching:
count: 10
period: 1s

batching:
check: this.contains("END BATCH")
count: 0
period: 1m
```

=== `batching.count`

A number of messages at which the batch should be flushed. If `0` disables count based batching.


*Type*: `int`

*Default*: `0`

=== `batching.byte_size`

An amount of bytes at which the batch should be flushed. If `0` disables size based batching.


*Type*: `int`

*Default*: `0`

=== `batching.period`

A period in which an incomplete batch should be flushed regardless of its size.


*Type*: `string`

*Default*: `""`

```yml
# Examples

period: 1s

period: 1m

period: 500ms
```

=== `batching.check`

A xref:guides:bloblang/about.adoc[Bloblang query] that should return a boolean value indicating whether a message should end a batch.


*Type*: `string`

*Default*: `""`

```yml
# Examples

check: this.type == "end_of_transaction"
```

=== `batching.processors`

A list of xref:components:processors/about.adoc[processors] to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.


*Type*: `array`


```yml
# Examples

processors:
- archive:
format: concatenate

processors:
- archive:
format: lines

processors:
- archive:
format: json_array
```


82 changes: 65 additions & 17 deletions internal/impl/redis/output_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
hoFieldWalkMetadata = "walk_metadata"
hoFieldWalkJSON = "walk_json_object"
hoFieldFields = "fields"
hoFieldBatching = "batching"
)

func redisHashOutputConfig() *service.ConfigSpec {
Expand Down Expand Up @@ -80,13 +81,17 @@ Where latter stages will overwrite matching field names of a former stage.`+serv
Description("A map of key/value pairs to set as hash fields.").
Default(map[string]any{}),
service.NewOutputMaxInFlightField(),
service.NewBatchPolicyField(loFieldBatching),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hoFieldBatching is defined at line 34 but loFieldBatching (from the redis list output) is used here and at line 92. Both resolve to "batching" so it works at runtime, but this should use the hash output's own constant hoFieldBatching.

Rule: field names must use the component-prefix convention <componentAbbrev>Field<Name>godev agent.

)
}

func init() {
service.MustRegisterOutput(
service.MustRegisterBatchOutput(
"redis_hash", redisHashOutputConfig(),
func(conf *service.ParsedConfig, mgr *service.Resources) (out service.Output, maxInFlight int, err error) {
func(conf *service.ParsedConfig, mgr *service.Resources) (out service.BatchOutput, batchPol service.BatchPolicy, maxInFlight int, err error) {
if batchPol, err = conf.FieldBatchPolicy(loFieldBatching); err != nil {
return
}
if maxInFlight, err = conf.FieldMaxInFlight(); err != nil {
return
}
Expand Down Expand Up @@ -168,20 +173,14 @@ func walkForHashFields(msg *service.Message, fields map[string]any) error {
return nil
}

func (r *redisHashWriter) Write(ctx context.Context, msg *service.Message) error {
r.connMut.RLock()
client := r.client
r.connMut.RUnlock()

if client == nil {
return service.ErrNotConnected
}

func (r *redisHashWriter) buildMessage(msg *service.Message) (string, map[string]any, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: buildMessage takes a single *service.Message and uses r.key.TryString(msg) (line 177) and v.TryString(msg) (line 196) for interpolation. These create a faux batch of size 1 internally, so batch-context interpolation functions (batch_size(), batch_index(), windowed aggregations) will return incorrect results when used in key or fields expressions.

The batch path should use batch.TryInterpolatedString(i, r.key) and the batch-aware equivalent for fields, as output_list.go does in its WriteBatch.

Fix: either make buildMessage accept the full batch and index, or inline the batch-aware interpolation calls in the batch path and keep buildMessage only for the single-message fast path.

key, err := r.key.TryString(msg)
if err != nil {
return fmt.Errorf("key interpolation error: %w", err)
return "", nil, fmt.Errorf("key interpolation error: %w", err)
}

fields := map[string]any{}

if r.walkMetadata {
_ = msg.MetaWalkMut(func(k string, v any) error {
fields[k] = v
Expand All @@ -190,21 +189,70 @@ func (r *redisHashWriter) Write(ctx context.Context, msg *service.Message) error
}
if r.walkJSON {
if err := walkForHashFields(msg, fields); err != nil {
err = fmt.Errorf("failed to walk JSON object: %v", err)
r.log.Errorf("HSET error: %v\n", err)
return err
return "", nil, fmt.Errorf("HSET error: failed to walk JSON object: %v", err)
}
}
for k, v := range r.fields {
if fields[k], err = v.TryString(msg); err != nil {
return fmt.Errorf("field %v interpolation error: %w", k, err)
return "", nil, fmt.Errorf("field %v interpolation error: %w", k, err)
}
}
if err := client.HSet(ctx, key, fields).Err(); err != nil {
return key, fields, nil
}

func (r *redisHashWriter) WriteBatch(ctx context.Context, batch service.MessageBatch) error {
r.connMut.RLock()
client := r.client
r.connMut.RUnlock()

if client == nil {
return service.ErrNotConnected
}

if len(batch) == 1 {
key, fields, err := r.buildMessage(batch[0])
if err != nil {
err = fmt.Errorf("failed to create message: %v", err)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

%v breaks the error chain. Use %w so callers can use errors.Is/errors.As on the inner error. Same issue at line 231.

Rule: "Use %w for wrapping. Use %v only when you intentionally want to break the error chain." — godev agent.

return err
}
if err := client.HSet(ctx, key, fields).Err(); err != nil {
_ = r.disconnect()
r.log.Errorf("Error from redis: %v\n", err)
return service.ErrNotConnected
}
return nil
}

pipe := client.Pipeline()

for i := range batch {
key, fields, err := r.buildMessage(batch[i])
if err != nil {
err = fmt.Errorf("failed to create message: %v", err)
return err
}
_ = pipe.HSet(ctx, key, fields)
}

cmders, err := pipe.Exec(ctx)
if err != nil {
_ = r.disconnect()
r.log.Errorf("Error from redis: %v\n", err)
return service.ErrNotConnected
}

var batchErr *service.BatchError
for i, res := range cmders {
if res.Err() != nil {
if batchErr == nil {
batchErr = service.NewBatchError(batch, res.Err())
}
batchErr.Failed(i, res.Err())
}
}
if batchErr != nil {
return batchErr
}
return nil
}

Expand Down