5
5
require "socket" # for Socket.gethostname
6
6
require "manticore"
7
7
require "rufus/scheduler"
8
+ require "yaml" # persistence
8
9
9
10
# This Logstash input plugin allows you to call an HTTP API, decode the output of it into event(s), and
10
11
# send them on their merry way. The idea behind this plugins came from a need to read springboot
@@ -132,10 +133,15 @@ def setup_requests!
132
133
@requests = Hash [ @urls . map { |name , url | [ name , normalize_request ( url ) ] } ]
133
134
end
134
135
136
+ private
137
+ def filter_dynamic_params ( allowed_keys , params )
138
+ params . slice ( *allowed_keys )
139
+ end
140
+
135
141
private
136
142
def normalize_request ( url_or_spec )
137
143
if url_or_spec . is_a? ( String )
138
- res = [ :get , url_or_spec ]
144
+ res = [ :get , url_or_spec , { } ]
139
145
elsif url_or_spec . is_a? ( Hash )
140
146
# The client will expect keys / values
141
147
spec = Hash [ url_or_spec . clone . map { |k , v | [ k . to_sym , v ] } ] # symbolize keys
@@ -144,6 +150,19 @@ def normalize_request(url_or_spec)
144
150
method = ( spec . delete ( :method ) || :get ) . to_sym . downcase
145
151
url = spec . delete ( :url )
146
152
153
+ if spec . delete ( :use_dynamic_params )
154
+ last_dynamic_params_location = spec [ :last_dynamic_params ]
155
+ dynamic_params_map = spec [ :dynamic_params_map ]
156
+
157
+ if last_dynamic_params_location . is_a? ( String ) && File . exist? ( last_dynamic_params_location )
158
+ dynamic_params = YAML . load ( File . read ( last_dynamic_params_location ) )
159
+ allowed_keys = dynamic_params_map . is_a? ( Hash ) ? dynamic_params_map . keys : [ ]
160
+ spec [ :dynamic_params ] = filter_dynamic_params ( allowed_keys , dynamic_params )
161
+ else
162
+ spec [ :dynamic_params ] = { }
163
+ end
164
+ end
165
+
147
166
# We need these strings to be keywords!
148
167
spec [ :auth ] = { user : spec [ :auth ] [ "user" ] , pass : spec [ :auth ] [ "password" ] } if spec [ :auth ]
149
168
@@ -212,13 +231,23 @@ def setup_schedule(queue)
212
231
213
232
@scheduler = Rufus ::Scheduler . new ( :max_work_threads => 1 )
214
233
#as of v3.0.9, :first_in => :now doesn't work. Use the following workaround instead
215
- opts = schedule_type == "every" ? { :first_in => 0.01 } : { }
234
+ opts = schedule_type == "every" ? { :first_in => 0.01 } : { }
216
235
@scheduler . send ( schedule_type , schedule_value , opts ) { run_once ( queue ) }
217
236
@scheduler . join
218
237
end
219
238
239
+ private
240
+ def assign_dynamic_params ( request )
241
+ params = request [ 2 ] [ :dynamic_params ]
242
+ request [ 2 ] [ :query ] = { } if !request [ 2 ] [ :query ]
243
+ params . keys . each do |key |
244
+ request [ 2 ] [ :query ] [ key ] = params [ key ]
245
+ end
246
+ end
247
+
220
248
def run_once ( queue )
221
249
@requests . each do |name , request |
250
+ assign_dynamic_params ( request ) if request [ 2 ] [ :dynamic_params ]
222
251
request_async ( queue , name , request )
223
252
end
224
253
@@ -246,11 +275,23 @@ def handle_success(queue, name, request, response, execution_time)
246
275
end
247
276
end
248
277
278
+ private
279
+ def update_dynamic_params ( request , event )
280
+ request [ 2 ] [ :dynamic_params_map ] . keys . each do |key |
281
+ value = request [ 2 ] [ :dynamic_params_map ] [ key ]
282
+ event_value = event . get ( value )
283
+ request [ 2 ] [ :dynamic_params ] [ key ] = event_value if event_value
284
+ end
285
+ File . write ( request [ 2 ] [ :last_dynamic_params ] , YAML . dump ( request [ 2 ] [ :dynamic_params ] ) )
286
+ end
287
+
249
288
private
250
289
def handle_decoded_event ( queue , name , request , response , event , execution_time )
251
290
apply_metadata ( event , name , request , response , execution_time )
252
291
decorate ( event )
253
292
queue << event
293
+
294
+ update_dynamic_params ( request , event ) if request [ 2 ] [ :dynamic_params ]
254
295
rescue StandardError , java . lang . Exception => e
255
296
@logger . error? && @logger . error ( "Error eventifying response!" ,
256
297
:exception => e ,
0 commit comments