@@ -75,14 +75,14 @@ module KCL
75
75
class LogStash ::Inputs ::DynamoDB < LogStash ::Inputs ::Base
76
76
config_name "dynamodb"
77
77
78
- LF_DYNAMODB = "dymamodb"
79
- LF_JSON_NO_BIN = "json_drop_binary"
80
- LF_PLAIN = "plain"
81
- LF_JSON_BIN_AS_TEXT = "json_binary_as_text"
82
- VT_KEYS_ONLY = "keys_only"
83
- VT_OLD_IMAGE = "old_image"
84
- VT_NEW_IMAGE = "new_image"
85
- VT_ALL_IMAGES = "new_and_old_images"
78
+ LF_DYNAMODB = "dymamodb" . freeze
79
+ LF_JSON_NO_BIN = "json_drop_binary" . freeze
80
+ LF_PLAIN = "plain" . freeze
81
+ LF_JSON_BIN_AS_TEXT = "json_binary_as_text" . freeze
82
+ VT_KEYS_ONLY = "keys_only" . freeze
83
+ VT_OLD_IMAGE = "old_image" . freeze
84
+ VT_NEW_IMAGE = "new_image" . freeze
85
+ VT_ALL_IMAGES = "new_and_old_images" . freeze
86
86
87
87
default :codec , 'json'
88
88
@@ -129,7 +129,6 @@ class LogStash::Inputs::DynamoDB < LogStash::Inputs::Base
129
129
# Number of threads to use when scanning the specified table
130
130
config :number_of_scan_threads , :validate => :number , :default => 1
131
131
132
-
133
132
# Number of threads to write to the logstash queue when scanning the table
134
133
config :number_of_write_threads , :validate => :number , :default => 1
135
134
@@ -155,11 +154,6 @@ def build_credentials
155
154
156
155
public
157
156
def register
158
- if not LogStash ::Environment . jruby?
159
- raise ( LogStash ::PluginLoadingError , "KCL requires JRuby for streams processing" )
160
- end # if not LogStash::Environment.jruby?
161
-
162
-
163
157
LogStash ::Logger . setup_log4j ( @logger )
164
158
165
159
@host = Socket . gethostname
@@ -168,10 +162,10 @@ def register
168
162
@credentials = build_credentials ( )
169
163
@logger . info ( "Checkpointer: " + @checkpointer )
170
164
171
- if @perform_scan and @view_type === VT_OLD_IMAGE
165
+ if @perform_scan and @view_type == VT_OLD_IMAGE
172
166
raise ( LogStash ::ConfigurationError , "Cannot perform scan with view type: " + @view_type + " configuration" )
173
167
end
174
- if @view_type === VT_ALL_IMAGES and ( not @log_format = == LF_PLAIN )
168
+ if @view_type == VT_ALL_IMAGES and ( not @log_format == LF_PLAIN )
175
169
raise ( LogStash ::ConfigurationError , "Cannot show view_type: " + @view_type + ", with log_format: " + @log_format )
176
170
end
177
171
@@ -198,6 +192,34 @@ def register
198
192
end # unless @perform_stream
199
193
end # def register
200
194
195
+ public
196
+ def run ( logstash_queue )
197
+ begin
198
+ run_with_catch ( logstash_queue )
199
+ rescue LogStash ::ShutdownSignal
200
+ exit_threads
201
+ until @queue . empty?
202
+ @logger . info ( "Flushing rest of events in logstash queue" )
203
+ event = @queue . pop ( )
204
+ queue_event ( @parser . parse_stream ( event ) , logstash_queue , @host )
205
+
206
+ end # begin
207
+ end # def run(logstash_queue)
208
+
209
+ # Starts KCL app in a background thread
210
+ # Starts parallel scan if need be in a background thread
211
+ private
212
+ def run_with_catch ( logstash_queue )
213
+ if @perform_scan
214
+ scan ( logstash_queue )
215
+ end # if @perform_scan
216
+
217
+ # Once scan is finished, start kcl thread to read from streams
218
+ if @perform_stream
219
+ stream ( logstash_queue )
220
+ end # unless @perform_stream
221
+ end # def run
222
+
201
223
private
202
224
def setup_stream
203
225
worker_id = SecureRandom . uuid ( )
@@ -222,21 +244,21 @@ def setup_stream
222
244
stream_status = stream_description . getStreamStatus ( )
223
245
224
246
stream_view_type = stream_description . getStreamViewType ( ) . to_s . downcase
225
- unless ( stream_view_type === @view_type or @view_type === VT_KEYS_ONLY or stream_view_type = == VT_ALL_IMAGES )
247
+ unless ( stream_view_type == @view_type or @view_type == VT_KEYS_ONLY or stream_view_type == VT_ALL_IMAGES )
226
248
raise ( LogStash ::ConfigurationError , "Cannot stream " + @view_type + " when stream is setup for " + stream_view_type )
227
249
end
228
250
229
- while stream_status === "ENABLING"
230
- if ( stream_status === "ENABLING" )
251
+ while stream_status == "ENABLING"
252
+ if ( stream_status == "ENABLING" )
231
253
@logger . info ( "Sleeping until stream is enabled" )
232
254
sleep ( 1 )
233
- end # if stream_status === "ENABLING"
255
+ end # if stream_status == "ENABLING"
234
256
stream_description = dynamodb_streams_client . describeStream ( AmazonDynamoDB ::DescribeStreamRequest . new ( ) \
235
257
. withStreamArn ( stream_arn ) ) . getStreamDescription ( )
236
258
stream_status = stream_description . getStreamStatus ( )
237
259
end # while not active
238
260
239
- if not stream_status === "ENABLED"
261
+ if not stream_status == "ENABLED"
240
262
raise ( LogStash ::PluginLoadingError , "No streams are enabled" )
241
263
end # if not active
242
264
@logger . info ( "Stream Id: " + stream_arn )
@@ -259,34 +281,6 @@ def setup_stream
259
281
@worker = KCL ::Worker . new ( LogStashRecordProcessorFactory . new ( @queue ) , kcl_config , adapter , @dynamodb_client , cloudwatch_client )
260
282
end # def setup_stream
261
283
262
- public
263
- def run ( logstash_queue )
264
- begin
265
- run_with_catch ( logstash_queue )
266
- rescue LogStash ::ShutdownSignal
267
- exit_threads
268
- until @queue . empty?
269
- @logger . info ( "Flushing rest of events in logstash queue" )
270
- event = @queue . pop ( )
271
- queue_event ( @parser . parse_stream ( event ) , logstash_queue , @host )
272
-
273
- end # begin
274
- end # def run(logstash_queue)
275
-
276
- # Starts KCL app in a background thread
277
- # Starts parallel scan if need be in a background thread
278
- private
279
- def run_with_catch ( logstash_queue )
280
- if @perform_scan
281
- scan ( logstash_queue )
282
- end # if @perform_scan
283
-
284
- # Once scan is finished, start kcl thread to read from streams
285
- if @perform_stream
286
- stream ( logstash_queue )
287
- end # unless @perform_stream
288
- end # def run
289
-
290
284
private
291
285
def scan ( logstash_queue )
292
286
@logger . info ( "Starting scan..." )
@@ -298,7 +292,7 @@ def scan(logstash_queue)
298
292
scan_queue = @logstash_writer . getQueue ( )
299
293
while true
300
294
event = scan_queue . take ( )
301
- if event . getEntry ( ) . nil? and event . getSize ( ) === -1
295
+ if event . getEntry ( ) . nil? and event . getSize ( ) == -1
302
296
break
303
297
end # if event.isEmpty()
304
298
queue_event ( @parser . parse_scan ( event . getEntry ( ) , event . getSize ( ) ) , logstash_queue , @host )
0 commit comments