Skip to content

Commit 566c7fd

Browse files
robert-ohOhlicher RobertItalyPaleAle
authored
use rabbitmq message's header values as metadata values in the binding (#3031)
Signed-off-by: Ohlicher Robert <[email protected]> Signed-off-by: Alessandro (Ale) Segala <[email protected]> Co-authored-by: Ohlicher Robert <[email protected]> Co-authored-by: Alessandro (Ale) Segala <[email protected]>
1 parent 60322a1 commit 566c7fd

File tree

1 file changed

+14
-1
lines changed

1 file changed

+14
-1
lines changed

bindings/rabbitmq/rabbitmq.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"errors"
2222
"fmt"
2323
"math"
24+
"net/url"
2425
"reflect"
2526
"strconv"
2627
"sync"
@@ -464,8 +465,20 @@ func (r *RabbitMQ) handleMessage(ctx context.Context, handler bindings.Handler,
464465
r.logger.Info("Input binding channel closed")
465466
return
466467
}
468+
469+
metadata := make(map[string]string, len(d.Headers))
470+
// Passthrough any custom metadata to the handler.
471+
for k, v := range d.Headers {
472+
if s, ok := v.(string); ok {
473+
// Escape the key and value to ensure they are valid URL query parameters.
474+
// This is necessary for them to be sent as HTTP Metadata.
475+
metadata[url.QueryEscape(k)] = url.QueryEscape(s)
476+
}
477+
}
478+
467479
_, err := handler(ctx, &bindings.ReadResponse{
468-
Data: d.Body,
480+
Data: d.Body,
481+
Metadata: metadata,
469482
})
470483
if err != nil {
471484
ch.Nack(d.DeliveryTag, false, true)

0 commit comments

Comments
 (0)