loops? #3728
-
|
I'm trying to build a stream processor which reads messages from a NATS subject, extract a file list from those messages, executes a processor for each file, and eventually creating a new message and send it back to another NATS subject. Currently I'm struggling with the "for each file" part. Is there some kind of loop, I can use to go over this? {
"files": [
{"path":"a","size":1},
{"path":"b","size":2},
{"path":"c","size":3}
]
}This json was extracted from the message using the mapper processor, meaning I can adjust this if necessary. This is my current configuration: input:
nats_jetstream:
urls: [nats://${NATS_USERNAME}:${NATS_PASSWORD}@${NATS_ENDPOINT}]
subject: ${NATS_INPUT_SUBJECT}
stream: ${NATS_INPUT_STREAM}
deliver: ${NATS_INPUT_DELIVER}
durable: ${NATS_INPUT_CONSUMER_NAME}
pipeline:
processors:
- mapping: |
root.files = this.package.files
# for each file???
output:
nats_jetstream:
urls: [nats://${NATS_USERNAME}:${NATS_PASSWORD}@${NATS_ENDPOINT}]
subject: ${NATS_OUTPUT_SUBJECT}I could need a little push into the right direction :) |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 5 replies
-
|
You can use the fold function in bloblang https://docs.redpanda.com/redpanda-connect/guides/bloblang/methods/#fold Or the while processor: https://docs.redpanda.com/redpanda-connect/components/processors/while/ |
Beta Was this translation helpful? Give feedback.
-
|
This works like a charm, thanks :) pipeline:
processors:
- mapping: |
root = this.package.files.map_each(f -> f.path)
- unarchive:
format: json_array
- for_each:
- log:
level: INFO
message: transfer file
fields_mapping: |
root.path = this |
Beta Was this translation helpful? Give feedback.
Yeah for_each doesn't do what you want, you might just want
unarchivewith the json_array format: https://docs.redpanda.com/redpanda-connect/components/processors/unarchive/