Skip to content

Commit 0265644

Browse files
authored
Make gRPC sending and receiving optional (#4)
* in_opentelemetry: split the Handler class into proper files Signed-off-by: Shizuo Fujita <[email protected]> * out_opentelemetry: split the Handler class into proper files Signed-off-by: Shizuo Fujita <[email protected]> * gemspec: move grpc to development dependency Signed-off-by: Shizuo Fujita <[email protected]> * rubocop: update rubocop.yml Signed-off-by: Shizuo Fujita <[email protected]> * README: update Signed-off-by: Shizuo Fujita <[email protected]> * Add tests Signed-off-by: Shizuo Fujita <[email protected]> * gemspec: remove grpc dependency Signed-off-by: Shizuo Fujita <[email protected]> * CI: test separately when grpc gem is installed. Signed-off-by: Shizuo Fujita <[email protected]> * example: install grpc gem manually Signed-off-by: Shizuo Fujita <[email protected]> * Gemfile: add grpc Signed-off-by: Shizuo Fujita <[email protected]> * CI: use io-stream 0.7.0 to avoid error on Windows Signed-off-by: Shizuo Fujita <[email protected]> --------- Signed-off-by: Shizuo Fujita <[email protected]>
1 parent 0737ef7 commit 0265644

19 files changed

+468
-379
lines changed

.github/workflows/test.yml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,14 @@ jobs:
3535
matrix:
3636
os: ['ubuntu-latest', 'macos-latest', 'windows-latest']
3737
ruby-version: ['3.2', '3.3', '3.4']
38+
gemfile:
39+
- grpc
40+
- http
3841

39-
name: Ruby ${{ matrix.ruby-version }} on ${{ matrix.os }}
42+
env:
43+
BUNDLE_GEMFILE: gemfiles/${{ matrix.gemfile }}.gemfile
44+
45+
name: ${{ matrix.gemfile }} - Ruby ${{ matrix.ruby-version }} on ${{ matrix.os }}
4046
steps:
4147
- uses: actions/checkout@v4
4248
- name: Set up Ruby

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@
99

1010
/example/node_modules/
1111

12+
/gemfiles/*.lock
1213
Gemfile.lock

.rubocop.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,18 @@ AllCops:
66
SuggestExtensions: false
77
TargetRubyVersion: 3.2
88

9+
Gemspec/DevelopmentDependencies:
10+
Enabled: false
11+
912
Layout/LineLength:
1013
Enabled: false
1114

1215
Lint/UnusedMethodArgument:
1316
Enabled: false
1417

18+
Lint/SuppressedException:
19+
Enabled: false
20+
1521
Metrics:
1622
Enabled: false
1723

Gemfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ source "https://rubygems.org"
55
# Specify your gem's dependencies in fluent-plugin-opentelemetry.gemspec
66
gemspec
77

8+
gem "grpc", "~> 1.73"
89
gem "grpc-tools"
910
gem "irb"
1011
gem "rake", "~> 13.0"

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ This requires to receive data via HTTP/HTTPS.
4646
#### `<grpc>` section
4747

4848
This requires to receive data via gRPC.
49+
It needs to install `grpc` gem manually to use this feature.
4950

5051
> [!WARNING]
5152
> Now, gRPC feature status is experimental.
@@ -103,6 +104,7 @@ This requires to send data via HTTP/HTTPS.
103104
#### `<grpc>` section
104105

105106
This requires to send data via gRPC.
107+
It needs to install `grpc` gem manually to use this feature.
106108

107109
> [!WARNING]
108110
> Now, gRPC feature status is experimental.

example/fluentd/Dockerfile

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,4 @@ RUN apt-get update && \
77
make \
88
gcc \
99
git
10-
RUN git clone https://github.com/fluent-plugins-nursery/fluent-plugin-opentelemetry.git && \
11-
cd fluent-plugin-opentelemetry && \
12-
gem build fluent-plugin-opentelemetry.gemspec && \
13-
gem install fluent-plugin-opentelemetry-*.gem
10+
RUN gem install fluent-plugin-opentelemetry grpc

fluent-plugin-opentelemetry.gemspec

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ Gem::Specification.new do |spec|
2222
spec.files = IO.popen(%w[git ls-files -z], chdir: __dir__, err: IO::NULL) do |ls|
2323
ls.readlines("\x0", chomp: true).reject do |f|
2424
(f == gemspec) ||
25-
f.start_with?(*%w[test/ .git .github Gemfile example])
25+
f.start_with?(*%w[test/ .git .github gemfiles Gemfile example])
2626
end
2727
end
2828
spec.require_paths = ["lib"]
@@ -31,5 +31,4 @@ Gem::Specification.new do |spec|
3131
spec.add_dependency("excon", "~> 1.2")
3232
spec.add_dependency("fluentd", "~> 1.18")
3333
spec.add_dependency("google-protobuf", "~> 4.30")
34-
spec.add_dependency("grpc", "~> 1.71")
3534
end

gemfiles/grpc.gemfile

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# frozen_string_literal: true
2+
3+
source "https://rubygems.org"
4+
5+
gemspec path: "../"
6+
7+
gem "grpc", "~> 1.73"
8+
gem "io-stream", "= 0.7.0"
9+
gem "rake", "~> 13.0"
10+
gem "test-unit", "~> 3.0"
11+
gem "timecop"

gemfiles/http.gemfile

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# frozen_string_literal: true
2+
3+
source "https://rubygems.org"
4+
5+
gemspec path: "../"
6+
7+
# gem "grpc", "~> 1.73"
8+
gem "io-stream", "= 0.7.0"
9+
gem "rake", "~> 13.0"
10+
gem "test-unit", "~> 3.0"
11+
gem "timecop"

lib/fluent/plugin/in_opentelemetry.rb

Lines changed: 11 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,15 @@
22

33
require "fluent/plugin/input"
44
require "fluent/plugin/opentelemetry/constant"
5-
require "fluent/plugin/opentelemetry/request"
6-
require "fluent/plugin/opentelemetry/response"
7-
require "fluent/plugin/opentelemetry/service_handler"
5+
require "fluent/plugin/opentelemetry/http_input_handler"
86
require "fluent/plugin_helper/http_server"
97
require "fluent/plugin_helper/thread"
108

11-
require "zlib"
9+
begin
10+
require "grpc"
1211

13-
unless Fluent::PluginHelper::HttpServer::Request.method_defined?(:headers)
14-
# This API was introduced at fluentd v1.19.0.
15-
# Ref. https://github.com/fluent/fluentd/pull/4903
16-
# If we have supported v1.19.0+ only, we can remove this patch.
17-
module Fluent::PluginHelper::HttpServer
18-
module Extension
19-
refine Request do
20-
def headers
21-
@request.headers
22-
end
23-
end
24-
end
25-
end
26-
27-
using Fluent::PluginHelper::HttpServer::Extension
12+
require "fluent/plugin/opentelemetry/grpc_input_handler"
13+
rescue LoadError
2814
end
2915

3016
module Fluent::Plugin
@@ -57,6 +43,10 @@ class OpentelemetryInput < Input
5743
def configure(conf)
5844
super
5945

46+
if @grpc_config && !defined?(GRPC)
47+
raise Fluent::ConfigError, "To use gRPC feature, please install grpc gem such as 'fluent-gem install grpc'."
48+
end
49+
6050
unless [@http_config, @grpc_config].any?
6151
raise Fluent::ConfigError, "Please configure either <http> or <grpc> section, or both."
6252
end
@@ -66,7 +56,7 @@ def start
6656
super
6757

6858
if @http_config
69-
http_handler = HttpHandler.new
59+
http_handler = Opentelemetry::HttpInputHandler.new
7060
http_server_create_http_server(:in_opentelemetry_http_server, addr: @http_config.bind, port: @http_config.port, logger: log) do |serv|
7161
serv.post("/v1/logs") do |req|
7262
http_handler.logs(req) { |record| router.emit(@tag, Fluent::EventTime.now, { type: Opentelemetry::RECORD_TYPE_LOGS, message: record }) }
@@ -82,7 +72,7 @@ def start
8272

8373
if @grpc_config
8474
thread_create(:in_opentelemetry_grpc_server) do
85-
grpc_handler = GrpcHandler.new(@grpc_config, log)
75+
grpc_handler = Opentelemetry::GrpcInputHandler.new(@grpc_config, log)
8676
grpc_handler.run(
8777
logs: lambda { |record|
8878
router.emit(@tag, Fluent::EventTime.now, { type: Opentelemetry::RECORD_TYPE_LOGS, message: record })
@@ -97,115 +87,5 @@ def start
9787
end
9888
end
9989
end
100-
101-
class HttpHandler
102-
def logs(req, &block)
103-
common(req, Opentelemetry::Request::Logs, Opentelemetry::Response::Logs, &block)
104-
end
105-
106-
def metrics(req, &block)
107-
common(req, Opentelemetry::Request::Metrics, Opentelemetry::Response::Metrics, &block)
108-
end
109-
110-
def traces(req, &block)
111-
common(req, Opentelemetry::Request::Traces, Opentelemetry::Response::Traces, &block)
112-
end
113-
114-
private
115-
116-
def common(req, request_class, response_class)
117-
content_type = req.headers["content-type"]
118-
content_encoding = req.headers["content-encoding"]&.first
119-
return response_unsupported_media_type unless valid_content_type?(content_type)
120-
return response_bad_request(content_type) unless valid_content_encoding?(content_encoding)
121-
122-
body = req.body
123-
body = Zlib::GzipReader.new(StringIO.new(body)).read if content_encoding == Opentelemetry::CONTENT_ENCODING_GZIP
124-
125-
begin
126-
record = request_class.new(body).record
127-
rescue Google::Protobuf::ParseError
128-
# The format in request body does not comply with the OpenTelemetry protocol.
129-
return response_bad_request(content_type)
130-
end
131-
132-
yield record
133-
134-
res = response_class.new
135-
response(200, content_type, res.body(type: Opentelemetry::Response.type(content_type)))
136-
end
137-
138-
def valid_content_type?(content_type)
139-
case content_type
140-
when Opentelemetry::CONTENT_TYPE_PROTOBUF, Opentelemetry::CONTENT_TYPE_JSON
141-
true
142-
else
143-
false
144-
end
145-
end
146-
147-
def valid_content_encoding?(content_encoding)
148-
return true if content_encoding.nil?
149-
150-
content_encoding == Opentelemetry::CONTENT_ENCODING_GZIP
151-
end
152-
153-
def response(code, content_type, body)
154-
[code, { Opentelemetry::CONTENT_TYPE => content_type }, body]
155-
end
156-
157-
def response_unsupported_media_type
158-
response(415, Opentelemetry::CONTENT_TYPE_PAIN, "415 unsupported media type, supported: [application/json, application/x-protobuf]")
159-
end
160-
161-
def response_bad_request(content_type)
162-
response(400, content_type, "") # TODO: fix body message
163-
end
164-
end
165-
166-
class GrpcHandler
167-
class ExceptionInterceptor < GRPC::ServerInterceptor
168-
def request_response(request:, call:, method:)
169-
# call actual service
170-
yield
171-
rescue StandardError => e
172-
puts "[#{method}] Error: #{e.message}"
173-
raise
174-
end
175-
end
176-
177-
def initialize(grpc_config, logger)
178-
@grpc_config = grpc_config
179-
@logger = logger
180-
end
181-
182-
def run(logs:, metrics:, traces:)
183-
server = GRPC::RpcServer.new(interceptors: [ExceptionInterceptor.new])
184-
server.add_http2_port("#{@grpc_config.bind}:#{@grpc_config.port}", :this_port_is_insecure)
185-
186-
logs_handler = Opentelemetry::ServiceHandler::Logs.new
187-
logs_handler.callback = lambda { |request|
188-
logs.call(request.to_json)
189-
Opentelemetry::Response::Logs.build
190-
}
191-
server.handle(logs_handler)
192-
193-
metrics_handler = Opentelemetry::ServiceHandler::Metrics.new
194-
metrics_handler.callback = lambda { |request|
195-
metrics.call(request.to_json)
196-
Opentelemetry::Response::Metrics.build
197-
}
198-
server.handle(metrics_handler)
199-
200-
traces_handler = Opentelemetry::ServiceHandler::Traces.new
201-
traces_handler.callback = lambda { |request|
202-
traces.call(request.to_json)
203-
Opentelemetry::Response::Traces.build
204-
}
205-
server.handle(traces_handler)
206-
207-
server.run_till_terminated
208-
end
209-
end
21090
end
21191
end

0 commit comments

Comments
 (0)