66require "fluent/plugin/otlp/response"
77require "fluent/plugin/otlp/service_handler"
88require "fluent/plugin_helper/http_server"
9+ require "fluent/plugin_helper/thread"
910
1011require "zlib"
1112
@@ -30,7 +31,7 @@ module Fluent::Plugin
3031 class OtlpInput < Input
3132 Fluent ::Plugin . register_input ( "otlp" , self )
3233
33- helpers :http_server
34+ helpers :thread , : http_server
3435
3536 desc "The tag of the event."
3637 config_param :tag , :string
@@ -67,7 +68,7 @@ def start
6768
6869 if @http_config
6970 http_handler = HttpHandler . new
70- http_server_create_http_server ( :in_otlp_http_server_helper , addr : @http_config . bind , port : @http_config . port , logger : log ) do |serv |
71+ http_server_create_http_server ( :in_otlp_http_server , addr : @http_config . bind , port : @http_config . port , logger : log ) do |serv |
7172 serv . post ( "/v1/logs" ) do |req |
7273 http_handler . logs ( req ) { |record | router . emit ( @tag , Fluent ::EventTime . now , { type : Otlp ::RECORD_TYPE_LOGS , message : record } ) }
7374 end
@@ -81,18 +82,20 @@ def start
8182 end
8283
8384 if @grpc_config
84- grpc_handler = GrpcHandler . new ( @grpc_config )
85- grpc_handler . run (
86- logs : lambda { |record |
87- router . emit ( @tag , Fluent ::EventTime . now , { type : Otlp ::RECORD_TYPE_LOGS , message : record } )
88- } ,
89- metrics : lambda { |record |
90- router . emit ( @tag , Fluent ::EventTime . now , { type : Otlp ::RECORD_TYPE_METRICS , message : record } )
91- } ,
92- traces : lambda { |record |
93- router . emit ( @tag , Fluent ::EventTime . now , { type : Otlp ::RECORD_TYPE_TRACES , message : record } )
94- }
95- )
85+ thread_create ( :in_otlp_grpc_server ) do
86+ grpc_handler = GrpcHandler . new ( @grpc_config )
87+ grpc_handler . run (
88+ logs : lambda { |record |
89+ router . emit ( @tag , Fluent ::EventTime . now , { type : Otlp ::RECORD_TYPE_LOGS , message : record } )
90+ } ,
91+ metrics : lambda { |record |
92+ router . emit ( @tag , Fluent ::EventTime . now , { type : Otlp ::RECORD_TYPE_METRICS , message : record } )
93+ } ,
94+ traces : lambda { |record |
95+ router . emit ( @tag , Fluent ::EventTime . now , { type : Otlp ::RECORD_TYPE_TRACES , message : record } )
96+ }
97+ )
98+ end
9699 end
97100 end
98101
@@ -177,33 +180,31 @@ def initialize(grpc_config)
177180 end
178181
179182 def run ( logs :, metrics :, traces :)
180- Thread . new do
181- server = GRPC ::RpcServer . new ( interceptors : [ ExceptionInterceptor . new ] )
182- server . add_http2_port ( "#{ @grpc_config . host } :#{ @grpc_config . port } " , :this_port_is_insecure )
183-
184- logs_handler = Otlp ::ServiceHandler ::Logs . new
185- logs_handler . callback = lambda { |request |
186- logs . call ( request . to_json )
187- Otlp ::Response ::Logs . build
188- }
189- server . handle ( logs_handler )
190-
191- metrics_handler = Otlp ::ServiceHandler ::Metrics . new
192- metrics_handler . callback = lambda { |request |
193- metrics . call ( request . to_json )
194- Otlp ::Response ::Metrics . build
195- }
196- server . handle ( metrics_handler )
197-
198- traces_handler = Otlp ::ServiceHandler ::Traces . new
199- traces_handler . callback = lambda { |request |
200- traces . call ( request . to_json )
201- Otlp ::Response ::Traces . build
202- }
203- server . handle ( traces_handler )
204-
205- server . run_till_terminated
206- end
183+ server = GRPC ::RpcServer . new ( interceptors : [ ExceptionInterceptor . new ] )
184+ server . add_http2_port ( "#{ @grpc_config . host } :#{ @grpc_config . port } " , :this_port_is_insecure )
185+
186+ logs_handler = Otlp ::ServiceHandler ::Logs . new
187+ logs_handler . callback = lambda { |request |
188+ logs . call ( request . to_json )
189+ Otlp ::Response ::Logs . build
190+ }
191+ server . handle ( logs_handler )
192+
193+ metrics_handler = Otlp ::ServiceHandler ::Metrics . new
194+ metrics_handler . callback = lambda { |request |
195+ metrics . call ( request . to_json )
196+ Otlp ::Response ::Metrics . build
197+ }
198+ server . handle ( metrics_handler )
199+
200+ traces_handler = Otlp ::ServiceHandler ::Traces . new
201+ traces_handler . callback = lambda { |request |
202+ traces . call ( request . to_json )
203+ Otlp ::Response ::Traces . build
204+ }
205+ server . handle ( traces_handler )
206+
207+ server . run_till_terminated
207208 end
208209 end
209210 end
0 commit comments