-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconfig.yaml
More file actions
95 lines (83 loc) · 2.65 KB
/
config.yaml
File metadata and controls
95 lines (83 loc) · 2.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
http:
enabled: true
address: 0.0.0.0:4196
input:
gcp_pubsub:
project: ${CONTROL_QUEUE_PROJECT}
subscription: ${CONTROL_QUEUE}
max_outstanding_messages: 1 # Ensures only one message is processed at a time
pipeline:
threads: 1
processors:
- log:
level: INFO
message: 'Received control message'
fields_mapping: |
root.custom_attributes.received_message = this
- mapping: |
meta benthos_meta_info = this.benthos_meta_info
# Step 2: Build your dynamic inline YAML (untouched)
- switch:
- check: this.processor == "kafka-to-pubsub"
processors:
- mapping: from "./blob_configs/kafka-to-pubsub/dynamic_k2p_config.blobl"
# Default - TBD should error out with a log
- processors:
- mapping: from "./blob_configs/kafka-to-pubsub/dynamic_k2p_config.blobl"
# Step 3: Execute YAML as benthos stream
- http:
url: "http://localhost:4195/streams/k2p"
verb: POST
headers:
Content-Type: application/yaml
retries: 3
retry_period: "5s"
max_retry_backoff: "5s"
- log:
level: ERROR
message: 'Launched Benthos stream worker'
fields_mapping: |
root.custom_attributes.benthos_meta_info = meta("benthos_meta_info")
- while:
check: this.active == true
at_least_once: true
max_loops: ${CHECK_STATUS_TIMES:30} # Maximum number of times to check status
processors:
- sleep:
duration: "${CHECK_STATUS_DELAY:5s}" # Delay before checking status
- http:
url: "http://localhost:4195/streams/k2p"
verb: GET
- mapping: |
root = {
"active": this.active
}
- switch:
- check: this.active == false
processors:
- log:
level: INFO
message: 'Job successfully completed'
fields_mapping: |
root.custom_attributes.benthos_meta_info = meta("benthos_meta_info")
- processors:
- log:
level: ERROR
message: 'Job took more time than expected, job will be retried'
fields_mapping: |
root.custom_attributes.benthos_meta_info = meta("benthos_meta_info")
- http:
url: "http://localhost:4195/streams/k2p"
verb: DELETE
logger:
level: INFO
format: json
add_timestamp: true
static_fields:
"service": ${DEPLOYMENT_NAME:}
metrics:
# adding prefix 'benthos' to all the metrics
mapping: root = "benthos" + "_" + this
prometheus: {}
output:
drop: {}