Skip to content

Commit d41a143

Browse files
authored
in_opentelemetry: add ${type} placeholder support in tag parameter (#8)
* in_opentelemetry: add ${type} placeholder support in tag parameter Signed-off-by: Shizuo Fujita <[email protected]> * Update example --------- Signed-off-by: Shizuo Fujita <[email protected]>
1 parent ddd6937 commit d41a143

File tree

7 files changed

+122
-16
lines changed

7 files changed

+122
-16
lines changed

README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,33 @@ To receive data, this plugin requires `<http>` or `<grpc>` section, or both.
3434
|-----------|--------|----------------------|----------|
3535
| tag | string | The tag of the event | required |
3636

37+
`tag` parameter supports `${type}` placeholder and it will be expanded with data type.
38+
39+
* log data
40+
* `${type}` will be replaced to `logs`
41+
* metric data
42+
* `${type}` will be replaced to `metrics`
43+
* trace data
44+
* `${type}` will be replaced to `traces`
45+
46+
This can be used to change the output destination for each message type.
47+
48+
Example:
49+
50+
```
51+
<source>
52+
@type opentelemetry
53+
54+
# Expand to opentelemetry.logs, opentelemetry.metrics, opentelemetry.traces according to received data.
55+
tag opentelemetry.${type}
56+
57+
<http>
58+
bind 127.0.0.1
59+
port 4318
60+
</http>
61+
</source>
62+
```
63+
3764
#### `<http>` section
3865

3966
This requires to receive data via HTTP/HTTPS.

example/README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,9 @@ flowchart LR
1515
A[app/metrics.js] -->|metric data / otlp| C[Fluentd]
1616
B[app/tracing.js] -->|trace data / otlp| C[Fluentd]
1717
C[Fluentd] -->|metric data / otlp| D[Otel Collector]
18-
C[Fluentd] -->|trace data / otlp| D[Otel Collector]
18+
C[Fluentd] -->|trace data / otlp| F[Jaeger]
1919
C[Fluentd] -->|sample log / Fluentd Forward Protocol| D[Otel Collector]
2020
D[Otel Collector] -->|metric data| E[Prometheus / Grafana]
21-
D[Otel Collector] -->|trace data| F[Jaeger]
2221
D[Otel Collector] -->|sample log| G[Elasticsearch / Kibana]
2322
```
2423

example/collector/otel-collector-config.yaml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,18 @@ exporters:
1313
endpoint: http://elasticsearch:9200
1414
prometheus:
1515
endpoint: "0.0.0.0:8889"
16-
otlp:
17-
endpoint: "jaeger:4317"
18-
tls:
19-
insecure: true
16+
# otlp:
17+
# endpoint: "jaeger:4317"
18+
# tls:
19+
# insecure: true
2020
debug:
2121
verbosity: detailed
2222

2323
service:
2424
pipelines:
25-
traces:
26-
receivers: [otlp]
27-
exporters: [debug, otlp]
25+
# traces:
26+
# receivers: [otlp]
27+
# exporters: [debug, otlp]
2828
metrics:
2929
receivers: [otlp]
3030
exporters: [debug, prometheus]

example/fluentd/fluent.conf

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<source>
22
@type opentelemetry
3-
tag opentelemetry
3+
tag opentelemetry.${type}
44

55
# Experimental
66
# <grpc>
@@ -14,6 +14,21 @@
1414
</http>
1515
</source>
1616

17+
# Send traces data to jaeger directly using OpenTelemetry Protocol
18+
<match opentelemetry.traces>
19+
@type opentelemetry
20+
<buffer []>
21+
@type memory
22+
flush_mode immediate
23+
chunk_limit_size 1m
24+
</buffer>
25+
26+
<http>
27+
endpoint "http://jaeger:4318"
28+
</http>
29+
</match>
30+
31+
# Send other OpenTelemetry data to OpenTelemetry Collector
1732
<match opentelemetry.**>
1833
@type opentelemetry
1934
<buffer []>

lib/fluent/plugin/in_opentelemetry.rb

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ class OpentelemetryInput < Input
4343
def configure(conf)
4444
super
4545

46+
expand_tag_placeholders(@tag)
47+
4648
if @grpc_config && !defined?(GRPC)
4749
raise Fluent::ConfigError, "To use gRPC feature, please install grpc gem such as 'fluent-gem install grpc'."
4850
end
@@ -59,13 +61,13 @@ def start
5961
http_handler = Opentelemetry::HttpInputHandler.new
6062
http_server_create_http_server(:in_opentelemetry_http_server, addr: @http_config.bind, port: @http_config.port, logger: log) do |serv|
6163
serv.post("/v1/logs") do |req|
62-
http_handler.logs(req) { |record| router.emit(@tag, Fluent::EventTime.now, { "type" => Opentelemetry::RECORD_TYPE_LOGS, "message" => record }) }
64+
http_handler.logs(req) { |record| router.emit(tag_for(Opentelemetry::RECORD_TYPE_LOGS), Fluent::EventTime.now, { "type" => Opentelemetry::RECORD_TYPE_LOGS, "message" => record }) }
6365
end
6466
serv.post("/v1/metrics") do |req|
65-
http_handler.metrics(req) { |record| router.emit(@tag, Fluent::EventTime.now, { "type" => Opentelemetry::RECORD_TYPE_METRICS, "message" => record }) }
67+
http_handler.metrics(req) { |record| router.emit(tag_for(Opentelemetry::RECORD_TYPE_METRICS), Fluent::EventTime.now, { "type" => Opentelemetry::RECORD_TYPE_METRICS, "message" => record }) }
6668
end
6769
serv.post("/v1/traces") do |req|
68-
http_handler.traces(req) { |record| router.emit(@tag, Fluent::EventTime.now, { "type" => Opentelemetry::RECORD_TYPE_TRACES, "message" => record }) }
70+
http_handler.traces(req) { |record| router.emit(tag_for(Opentelemetry::RECORD_TYPE_TRACES), Fluent::EventTime.now, { "type" => Opentelemetry::RECORD_TYPE_TRACES, "message" => record }) }
6971
end
7072
end
7173
end
@@ -75,17 +77,31 @@ def start
7577
grpc_handler = Opentelemetry::GrpcInputHandler.new(@grpc_config, log)
7678
grpc_handler.run(
7779
logs: lambda { |record|
78-
router.emit(@tag, Fluent::EventTime.now, { "type" => Opentelemetry::RECORD_TYPE_LOGS, "message" => record })
80+
router.emit(tag_for(Opentelemetry::RECORD_TYPE_LOGS), Fluent::EventTime.now, { "type" => Opentelemetry::RECORD_TYPE_LOGS, "message" => record })
7981
},
8082
metrics: lambda { |record|
81-
router.emit(@tag, Fluent::EventTime.now, { "type" => Opentelemetry::RECORD_TYPE_METRICS, "message" => record })
83+
router.emit(tag_for(Opentelemetry::RECORD_TYPE_METRICS), Fluent::EventTime.now, { "type" => Opentelemetry::RECORD_TYPE_METRICS, "message" => record })
8284
},
8385
traces: lambda { |record|
84-
router.emit(@tag, Fluent::EventTime.now, { "type" => Opentelemetry::RECORD_TYPE_TRACES, "message" => record })
86+
router.emit(tag_for(Opentelemetry::RECORD_TYPE_TRACES), Fluent::EventTime.now, { "type" => Opentelemetry::RECORD_TYPE_TRACES, "message" => record })
8587
}
8688
)
8789
end
8890
end
8991
end
92+
93+
private
94+
95+
def expand_tag_placeholders(tag)
96+
@expand_tag_placeholders ||= {
97+
Opentelemetry::RECORD_TYPE_LOGS => tag.gsub(Opentelemetry::PLACEHOLDER_TYPE, Opentelemetry::PLACEHOLDER_TYPE_LOGS),
98+
Opentelemetry::RECORD_TYPE_METRICS => tag.gsub(Opentelemetry::PLACEHOLDER_TYPE, Opentelemetry::PLACEHOLDER_TYPE_METRICS),
99+
Opentelemetry::RECORD_TYPE_TRACES => tag.gsub(Opentelemetry::PLACEHOLDER_TYPE, Opentelemetry::PLACEHOLDER_TYPE_TRACES)
100+
}
101+
end
102+
103+
def tag_for(type)
104+
@expand_tag_placeholders[type]
105+
end
90106
end
91107
end

lib/fluent/plugin/opentelemetry/constant.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ module Fluent::Plugin::Opentelemetry
1616
RECORD_TYPE_METRICS = "opentelemetry_metrics"
1717
RECORD_TYPE_TRACES = "opentelemetry_traces"
1818

19+
PLACEHOLDER_TYPE = "${type}"
20+
PLACEHOLDER_TYPE_LOGS = "logs"
21+
PLACEHOLDER_TYPE_METRICS = "metrics"
22+
PLACEHOLDER_TYPE_TRACES = "traces"
23+
1924
TLS_VERSIONS_MAP =
2025
begin
2126
map = {

test/fluent/plugin/test_in_opentelemetry.rb

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,50 @@ def test_https_receive_protocol_buffers
287287
end
288288
end
289289

290+
sub_test_case "Placeholder" do
291+
def config
292+
<<~"CONFIG"
293+
tag opentelemetry.${type}
294+
<http>
295+
bind 127.0.0.1
296+
port #{@port}
297+
</http>
298+
CONFIG
299+
end
300+
301+
data("metrics" => {
302+
request_path: "/v1/metrics",
303+
request_data: TestData::JSON::METRICS,
304+
record_type: Fluent::Plugin::Opentelemetry::RECORD_TYPE_METRICS,
305+
record_data: TestData::JSON::METRICS,
306+
expanded_tag: "opentelemetry.metrics"
307+
},
308+
"traces" => {
309+
request_path: "/v1/traces",
310+
request_data: TestData::JSON::TRACES,
311+
record_type: Fluent::Plugin::Opentelemetry::RECORD_TYPE_TRACES,
312+
record_data: TestData::JSON::TRACES,
313+
expanded_tag: "opentelemetry.traces"
314+
},
315+
"logs" => {
316+
request_path: "/v1/logs",
317+
request_data: TestData::JSON::LOGS,
318+
record_type: Fluent::Plugin::Opentelemetry::RECORD_TYPE_LOGS,
319+
record_data: TestData::JSON::LOGS,
320+
expanded_tag: "opentelemetry.logs"
321+
})
322+
def test_type_placeholder(data)
323+
d = create_driver
324+
res = d.run(expect_records: 1) do
325+
post_json(data[:request_path], data[:request_data])
326+
end
327+
328+
expected_events = [[data[:expanded_tag], @event_time, { "type" => data[:record_type], "message" => data[:record_data] }]]
329+
assert_equal(200, res.status)
330+
assert_equal(expected_events, d.events)
331+
end
332+
end
333+
290334
def compress(data)
291335
gz = Zlib::GzipWriter.new(StringIO.new)
292336
gz << data

0 commit comments

Comments
 (0)