22
33require "fluent/plugin/otlp/constant"
44require "fluent/plugin/otlp/request"
5+ require "fluent/plugin/otlp/service"
56require "fluent/plugin/output"
67
78require "excon"
9+ require "grpc"
810require "json"
911require "stringio"
1012require "zlib"
@@ -23,7 +25,7 @@ class RetryableResponse < StandardError; end
2325 config_set_default :chunk_limit_size , 10 * 1024
2426 end
2527
26- config_section :http , required : false , multi : false , init : true , param_name : :http_config do
28+ config_section :http , required : false , multi : false , init : false , param_name : :http_config do
2729 desc "The endpoint"
2830 config_param :endpoint , :string , default : "http://127.0.0.1:4318"
2931 desc "The proxy for HTTP request"
@@ -38,21 +40,20 @@ class RetryableResponse < StandardError; end
3840 config_param :compress , :enum , list : %i[ text gzip ] , default : :text
3941 end
4042
43+ config_section :grpc , required : false , multi : false , init : false , param_name : :grpc_config do
44+ desc "The endpoint"
45+ config_param :endpoint , :string , default : "127.0.0.1:4317"
46+ end
47+
4148 config_section :transport , required : false , multi : false , init : true , param_name : :transport_config do
4249 config_argument :protocol , :enum , list : [ :tls ] , default : nil
4350 end
4451
4552 def configure ( conf )
4653 super
4754
48- @tls_settings = { }
49- if @transport_config . protocol == :tls
50- @tls_settings [ :client_cert ] = @transport_config . cert_path
51- @tls_settings [ :client_key ] = @transport_config . private_key_path
52- @tls_settings [ :client_key_pass ] = @transport_config . private_key_passphrase
53- @tls_settings [ :ssl_min_version ] = Otlp ::TLS_VERSIONS_MAP [ @transport_config . min_version ]
54- @tls_settings [ :ssl_max_version ] = Otlp ::TLS_VERSIONS_MAP [ @transport_config . max_version ]
55- end
55+ @http_handler = HttpHandler . new ( @http_config , @transport_config , log ) if @http_config
56+ @grpc_handler = GrpcHandler . new ( @grpc_config , @transport_config , log ) if @grpc_config
5657 end
5758
5859 def multi_workers_ready?
@@ -64,67 +65,129 @@ def format(tag, time, record)
6465 end
6566
6667 def write ( chunk )
67- uri , connection = create_connection ( chunk )
68- response = connection . post
68+ if @http_handler
69+ @http_handler . export ( chunk )
6970
70- if response . status != 200
71- if @http_config . retryable_response_codes &.include? ( response . status )
72- raise RetryableResponse , "got retryable error response from '#{ uri } ', response code is #{ response . status } "
73- end
74- if @http_config . error_response_as_unrecoverable
75- raise Fluent ::UnrecoverableError , "got unrecoverable error response from '#{ uri } ', response code is #{ response . status } "
76- else
77- log . error "got error response from '#{ uri } ', response code is #{ response . status } "
78- end
71+ return
72+ end
73+
74+ if @grpc_handler
75+ @grpc_handler . export ( chunk )
7976 end
8077 end
8178
8279 private
8380
84- def http_logs_endpoint
85- "#{ @http_config . endpoint } /v1/logs"
86- end
81+ class HttpHandler
82+ def initialize ( http_config , transport_config , logger )
83+ @http_config = http_config
84+ @transport_config = transport_config
85+ @logger = logger
86+
87+ @tls_settings = { }
88+ if @transport_config . protocol == :tls
89+ @tls_settings [ :client_cert ] = @transport_config . cert_path
90+ @tls_settings [ :client_key ] = @transport_config . private_key_path
91+ @tls_settings [ :client_key_pass ] = @transport_config . private_key_passphrase
92+ @tls_settings [ :ssl_min_version ] = Otlp ::TLS_VERSIONS_MAP [ @transport_config . min_version ]
93+ @tls_settings [ :ssl_max_version ] = Otlp ::TLS_VERSIONS_MAP [ @transport_config . max_version ]
94+ end
95+ end
8796
88- def http_metrics_endpoint
89- "#{ @http_config . endpoint } /v1/metrics"
90- end
97+ def export ( chunk )
98+ uri , connection = create_http_connection ( chunk )
99+ response = connection . post
100+
101+ if response . status != 200
102+ if @http_config . retryable_response_codes &.include? ( response . status )
103+ raise RetryableResponse , "got retryable error response from '#{ uri } ', response code is #{ response . status } "
104+ end
105+ if @http_config . error_response_as_unrecoverable
106+ raise Fluent ::UnrecoverableError , "got unrecoverable error response from '#{ uri } ', response code is #{ response . status } "
107+ else
108+ @logger . error "got error response from '#{ uri } ', response code is #{ response . status } "
109+ end
110+ end
111+ end
112+
113+ private
114+
115+ def http_logs_endpoint
116+ "#{ @http_config . endpoint } /v1/logs"
117+ end
118+
119+ def http_metrics_endpoint
120+ "#{ @http_config . endpoint } /v1/metrics"
121+ end
122+
123+ def http_traces_endpoint
124+ "#{ @http_config . endpoint } /v1/traces"
125+ end
126+
127+ def create_http_connection ( chunk )
128+ record = JSON . parse ( chunk . read )
129+ msg = record [ "message" ]
130+
131+ begin
132+ case record [ "type" ]
133+ when Otlp ::RECORD_TYPE_LOGS
134+ uri = http_logs_endpoint
135+ body = Otlp ::Request ::Logs . new ( msg ) . encode
136+ when Otlp ::RECORD_TYPE_METRICS
137+ uri = http_metrics_endpoint
138+ body = Otlp ::Request ::Metrics . new ( msg ) . encode
139+ when Otlp ::RECORD_TYPE_TRACES
140+ uri = http_traces_endpoint
141+ body = Otlp ::Request ::Traces . new ( msg ) . encode
142+ end
143+ rescue Google ::Protobuf ::ParseError => e
144+ # The message format does not comply with the OpenTelemetry protocol.
145+ raise ::Fluent ::UnrecoverableError , e . message
146+ end
147+
148+ headers = { Otlp ::CONTENT_TYPE => Otlp ::CONTENT_TYPE_PROTOBUF }
149+ if @http_config . compress == :gzip
150+ headers [ Otlp ::CONTENT_ENCODING ] = Otlp ::CONTENT_ENCODING_GZIP
151+ gz = Zlib ::GzipWriter . new ( StringIO . new )
152+ gz << body
153+ body = gz . close . string
154+ end
91155
92- def http_traces_endpoint
93- "#{ @http_config . endpoint } /v1/traces"
156+ Excon . defaults [ :ssl_verify_peer ] = false if @transport_config . insecure
157+ connection = Excon . new ( uri , body : body , headers : headers , proxy : @http_config . proxy , persistent : true , **@tls_settings )
158+ [ uri , connection ]
159+ end
94160 end
95161
96- def create_connection ( chunk )
97- record = JSON . parse ( chunk . read )
98- msg = record [ "message" ]
162+ class GrpcHandler
163+ def initialize ( grpc_config , transport_config , logger )
164+ @grpc_config = grpc_config
165+ @transport_config = transport_config
166+ @logger = logger
167+ end
168+
169+ def export ( chunk )
170+ record = JSON . parse ( chunk . read )
171+ msg = record [ "message" ]
172+
173+ credential = :this_channel_is_insecure
99174
100- begin
101175 case record [ "type" ]
102176 when Otlp ::RECORD_TYPE_LOGS
103- uri = http_logs_endpoint
104- body = Otlp ::Request ::Logs . new ( msg ) . encode
177+ service = Otlp ::Service ::Logs . new ( @grpc_config . endpoint , credential )
105178 when Otlp ::RECORD_TYPE_METRICS
106- uri = http_metrics_endpoint
107- body = Otlp ::Request ::Metrics . new ( msg ) . encode
179+ service = Otlp ::Service ::Metrics . new ( @grpc_config . endpoint , credential )
108180 when Otlp ::RECORD_TYPE_TRACES
109- uri = http_traces_endpoint
110- body = Otlp ::Request ::Traces . new ( msg ) . encode
181+ service = Otlp ::Service ::Traces . new ( @grpc_config . endpoint , credential )
111182 end
112- rescue Google ::Protobuf ::ParseError => e
113- # The message format does not comply with the OpenTelemetry protocol.
114- raise ::Fluent ::UnrecoverableError , e . message
115- end
116183
117- headers = { Otlp :: CONTENT_TYPE => Otlp :: CONTENT_TYPE_PROTOBUF }
118- if @http_config . compress == :gzip
119- headers [ Otlp :: CONTENT_ENCODING ] = Otlp :: CONTENT_ENCODING_GZIP
120- gz = Zlib :: GzipWriter . new ( StringIO . new )
121- gz << body
122- body = gz . close . string
184+ begin
185+ service . export ( msg )
186+ rescue Google :: Protobuf :: ParseError => e
187+ # The message format does not comply with the OpenTelemetry protocol.
188+ raise :: Fluent :: UnrecoverableError , e . message
189+ end
123190 end
124-
125- Excon . defaults [ :ssl_verify_peer ] = false if @transport_config . insecure
126- connection = Excon . new ( uri , body : body , headers : headers , proxy : @http_config . proxy , persistent : true , **@tls_settings )
127- [ uri , connection ]
128191 end
129192 end
130193end
0 commit comments