diff --git a/.gitignore b/.gitignore index bd6a70d..f9ac9f8 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,6 @@ Gemfile.lock .bundle vendor coverage/ + +Jars.lock +.vscode \ No newline at end of file diff --git a/.jrubyrc b/.jrubyrc new file mode 100644 index 0000000..030a38e --- /dev/null +++ b/.jrubyrc @@ -0,0 +1 @@ +debug.fullTrace=true \ No newline at end of file diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md new file mode 100644 index 0000000..a3282ed --- /dev/null +++ b/ARCHITECTURE.md @@ -0,0 +1,86 @@ +# Architecture + +## Complexities in the source data that can cause query problems + +The data source here is cloudwatch, whose `filter_log_events` method +returns log data for a single log group, possibly split into multiple +streams. It will attempt to interpolate these so they are in event +time order, but can't guarantee that. + +In a high volume situation this results in two issues that can cause +problems in queries: + +- Log messages in a group's query result may appear out of order; + although they will be in order as far as the stream is concerned. +- You may get multiple messages with the same timestamp + + +These cause a problem because there's no query model for tailing +a log stream (it's possible, but only with architectural changes +in your deployment) so the only way to get data is to record when +the last message happened and search again from there. + +The above issues impact this as follows: + +- *Out of order messages* - when to set the last event time + to avoid missing messages? +- *Multiple messages with the same timestamp* - if you have + 2 messages in the original stream with the same timestamp, + and your last query returned the first one, how do you query + to get the second, without reprocessing the first one? + +## Resolution using a log event tracking window + +This was resolved in the [LogEventTracker](lib/logstash/inputs/group_event_tracker.rb) +by maintaining a record of a window of log events, storing every event in that period. + +**NOTE:** all times are derived from the log event timestamp and +not the current actual timestamp. They are accurate to the millisecond. + +The model is per log_group: + +- `min_time`: the earliest time for which we have a log event for this group +- `max_time`: the latest time for which we have a log event for this group +- `map[log_event_time] -> set[events]`: a record of all the events + for this group in the log window. + +In effect, we're keeping track of all the events we've seen in +the window (e.g. a 15 minute period). Once we get more than, say, +15 minutes worth of events, we start dropping the older events. + +The window will tell you if a record is _"new"_ if: + +- It's identified as an event we've never seen, where an event is identified + as unique using its `stream` name and its `eventId` +- It's for a millisecond on or after the min_time. + + +The process for querying the data for some group is: + +```#ruby + # Get the earliest time for which we've seen any data + start_time = window.get_min_time(group) + + # This might contain events we've already processed + events = filter_log_events (group, start_time) + + # Loop through the events, skipping any we've already + # seen, and processing the rest + events.each do |event| + if !window.have_we_seen_this_before(group, event) + process_event(group, event) + end + end + + # Once we've finished the search, purge any events that are too + # old (e.g. more than 15 minutes older than the maximum timestamp) + # and then save the data to a file so it's there if we restart + window.purge_events_too_old(group) + window.save_to_file(group) +``` + +In experiments I've found a 15 minute window avoids any missed records. In our +use cases, however, we've been routing through an aggregator that holds back +data for a few minutes to make sure it has enough data to push out, so you +can probably reduce this window to suit your own needs. + diff --git a/CHANGELOG.md b/CHANGELOG.md index dd8092d..4e06a87 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +* Updated sincedb to track the high water mark timestamps at a stream level, rather than just at the group level. This is a fix for [#74](https://github.com/lukewaite/logstash-input-cloudwatch-logs/issues/74) + ## [v1.0.3] (2018-05-28) * Update minimum version of `logstash-mixin-aws` to support assume role ([#9a4677f](https://github.com/lukewaite/logstash-input-cloudwatch-logs/commit/9a4677fef8bcbf291bd4b357be2a9568ea4f3fc1) - Fixes [#51](https://github.com/lukewaite/logstash-input-cloudwatch-logs/issues/51), [#39](https://github.com/lukewaite/logstash-input-cloudwatch-logs/issues/39)) diff --git a/CONTRIBUTORS b/CONTRIBUTORS index a783254..1d39845 100644 --- a/CONTRIBUTORS +++ b/CONTRIBUTORS @@ -13,6 +13,7 @@ Contributors: * Ted Timmons (tedder) * Ryan O'Keeffe (danielredoak) * Luke Waite (lukewaite) +* Daniel Bray (daniel-bray-sonalake) Note: If you've sent us patches, bug reports, or otherwise contributed to Logstash, and you aren't on the list above and want to be, please let us know diff --git a/Gemfile b/Gemfile index 851fabc..7dc0379 100644 --- a/Gemfile +++ b/Gemfile @@ -1,2 +1,4 @@ source 'https://rubygems.org' gemspec +gem 'simplecov', require: false, group: :test +gem 'coveralls', require: false, group: :test diff --git a/README.md b/README.md index 98bb5d6..0359ddd 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,9 @@ Optionally, you may set the `log_group_prefix` parameter to true which will scan for all log groups matching the specified prefix(s) and ingest all logs available in all of the matching groups. +The optional `log_group_suffix` parameter will use the results of the above +cloudwatch query and then filter them + ## Usage ### Parameters @@ -22,6 +25,7 @@ and ingest all logs available in all of the matching groups. |-----------|------------|----------|---------| | log_group | string or Array of strings | Yes | | | log_group_prefix | boolean | No | `false` | +| log_group_suffix | string | No | | | start_position | `beginning`, `end`, or an Integer | No | `beginning` | | sincedb_path | string | No | `$HOME/.sincedb*` | | interval | number | No | 60 | diff --git a/lib/logstash/inputs/cloudwatch_logs.rb b/lib/logstash/inputs/cloudwatch_logs.rb index 38d3cc6..c3878c5 100644 --- a/lib/logstash/inputs/cloudwatch_logs.rb +++ b/lib/logstash/inputs/cloudwatch_logs.rb @@ -8,6 +8,8 @@ require "aws-sdk" require "logstash/inputs/cloudwatch_logs/patch" require "fileutils" +require 'logstash/inputs/group_event_tracker' + Aws.eager_autoload! @@ -36,6 +38,10 @@ class LogStash::Inputs::CloudWatch_Logs < LogStash::Inputs::Base # sincedb files to some path matching "$HOME/.sincedb*" # Should be a path with filename not just a directory. config :sincedb_path, :validate => :string, :default => nil + # the stream data grows over time, so we drop it after a configurable time + # but only after a new value comes in for some group (i.e. we purge one group + # at a time) + config :prune_since_db_stream_minutes, :validate => :number, :default => 60 # Interval to wait between to check the file list again after a run is finished. # Value is in seconds. @@ -44,11 +50,17 @@ class LogStash::Inputs::CloudWatch_Logs < LogStash::Inputs::Base # Decide if log_group is a prefix or an absolute name config :log_group_prefix, :validate => :boolean, :default => false + # Decide if present, then the results of the log group query are filtered again + # to limit to these values. Only applicable if log_group_prefix = true + config :log_group_suffix, :validate => :string, :list => true, :default => nil + config :negate_log_group_suffix, :validate => :boolean, :default => false + # When a new log group is encountered at initial plugin start (not already in # sincedb), allow configuration to specify where to begin ingestion on this group. # Valid options are: `beginning`, `end`, or an integer, representing number of # seconds before now to read back from. config :start_position, :default => 'beginning' + # def register @@ -57,8 +69,6 @@ def register require "digest/md5" @logger.debug("Registering cloudwatch_logs input", :log_group => @log_group) settings = defined?(LogStash::SETTINGS) ? LogStash::SETTINGS : nil - @sincedb = {} - check_start_position_validity Aws::ConfigService::Client.new(aws_options_hash) @@ -91,11 +101,14 @@ def register @sincedb_path = File.join(sincedb_dir, ".sincedb_" + Digest::MD5.hexdigest(@log_group.join(","))) @logger.info("No sincedb_path set, generating one based on the log_group setting", - :sincedb_path => @sincedb_path, :log_group => @log_group) + :sincedb_path => @sincedb_path, :log_group => @log_group) end - + + @logger.info("Using sincedb_path #{@sincedb_path}") + @event_tracker = LogEventTracker.new(@sincedb_path, @prune_since_db_stream_minutes) end #def register + public def check_start_position_validity raise LogStash::ConfigurationError, "No start_position specified!" unless @start_position @@ -111,8 +124,7 @@ def check_start_position_validity def run(queue) @queue = queue @priority = [] - _sincedb_open - determine_start_position(find_log_groups, @sincedb) + @event_tracker.load() while !stop? begin @@ -139,7 +151,12 @@ def find_log_groups @log_group.each do |group| loop do log_groups = @cloudwatch.describe_log_groups(log_group_name_prefix: group, next_token: next_token) - groups += log_groups.log_groups.map {|n| n.log_group_name} + # if we have no suffix setting, or if the candidate group name ends with the suffix + # we use it + groups += log_groups.log_groups + .select { |n| @log_group_suffix.nil? || (n.log_group_name.end_with?(*@log_group_suffix) ^ @negate_log_group_suffix)} + .map {|n| n.log_group_name} + next_token = log_groups.next_token @logger.debug("found #{log_groups.log_groups.length} log groups matching prefix #{group}") break if next_token.nil? @@ -158,67 +175,62 @@ def priority_of(group) @priority.index(group) || -1 end - public - def determine_start_position(groups, sincedb) - groups.each do |group| - if !sincedb.member?(group) - case @start_position - when 'beginning' - sincedb[group] = 0 - - when 'end' - sincedb[group] = DateTime.now.strftime('%Q') - - else - sincedb[group] = DateTime.now.strftime('%Q').to_i - (@start_position * 1000) - end # case @start_position - end - end - end # def determine_start_position private def process_group(group) next_token = nil loop do - if !@sincedb.member?(group) - @sincedb[group] = 0 - end + start_time = @event_tracker.get_or_set_min_time(group, get_default_start_time) + params = { :log_group_name => group, - :start_time => @sincedb[group], + :start_time => start_time, :interleaved => true, :next_token => next_token - } + } resp = @cloudwatch.filter_log_events(params) - + + actually_processed_count = 0 resp.events.each do |event| - process_log(event, group) + was_processed = process_log(event, group) + was_processed && actually_processed_count = actually_processed_count + 1 end - _sincedb_write + resp.events.length() > 0 && @logger.debug("Queried logs for #{group} from #{parse_time(start_time)} found #{resp.events.length()} events, processed #{actually_processed_count}") + # prune old records before saving + @event_tracker.purge(group) + @event_tracker.save() next_token = resp.next_token break if next_token.nil? end @priority.delete(group) @priority << group + end #def process_group - # def process_log + # def process_log - returns true if the message was actually processed private def process_log(log, group) - - @codec.decode(log.message.to_str) do |event| - event.set("@timestamp", parse_time(log.timestamp)) - event.set("[cloudwatch_logs][ingestion_time]", parse_time(log.ingestion_time)) - event.set("[cloudwatch_logs][log_group]", group) - event.set("[cloudwatch_logs][log_stream]", log.log_stream_name) - event.set("[cloudwatch_logs][event_id]", log.event_id) - decorate(event) - - @queue << event - @sincedb[group] = log.timestamp + 1 + identity = identify(group, log.log_stream_name) + if @event_tracker.is_new_event(group, log) + @logger.trace? && @logger.trace("Processing event") + @codec.decode(log.message.to_str) do |event| + + event.set("@timestamp", parse_time(log.timestamp)) + event.set("[cloudwatch_logs][ingestion_time]", parse_time(log.ingestion_time)) + event.set("[cloudwatch_logs][log_group]", group) + event.set("[cloudwatch_logs][log_stream]", log.log_stream_name) + event.set("[cloudwatch_logs][event_id]", log.event_id) + decorate(event) + + @queue << event + + @event_tracker.record_processed_event(group, log) + return true + end end + return false end # def process_log # def parse_time @@ -227,39 +239,31 @@ def parse_time(data) LogStash::Timestamp.at(data.to_i / 1000, (data.to_i % 1000) * 1000) end # def parse_time - private - def _sincedb_open - begin - File.open(@sincedb_path) do |db| - @logger.debug? && @logger.debug("_sincedb_open: reading from #{@sincedb_path}") - db.each do |line| - group, pos = line.split(" ", 2) - @logger.debug? && @logger.debug("_sincedb_open: setting #{group} to #{pos.to_i}") - @sincedb[group] = pos.to_i - end - end - rescue - #No existing sincedb to load - @logger.debug? && @logger.debug("_sincedb_open: error: #{@sincedb_path}: #{$!}") - end - end # def _sincedb_open - private - def _sincedb_write - begin - IO.write(@sincedb_path, serialize_sincedb, 0) - rescue Errno::EACCES - # probably no file handles free - # maybe it will work next time - @logger.debug? && @logger.debug("_sincedb_write: error: #{@sincedb_path}: #{$!}") - end - end # def _sincedb_write + private + def identify(group, log_stream_name) + # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-logs-loggroup.html + # ':' isn't allowed in a log group name, so we can use it safely + return "#{group}:#{log_stream_name}" + end + private + def is_stream_identifier(sincedb_name) + return sincedb_name.include? ":" + end private - def serialize_sincedb - @sincedb.map do |group, pos| - [group, pos].join(" ") - end.join("\n") + "\n" + def get_default_start_time() + # chose the start time based on the configs + case @start_position + when 'beginning' + return 0 + when 'end' + return DateTime.now.strftime('%Q').to_i + else + return DateTime.now.strftime('%Q').to_i - (@start_position.to_i * 1000) + end # case @start_position end + + end # class LogStash::Inputs::CloudWatch_Logs diff --git a/lib/logstash/inputs/group_event_tracker.rb b/lib/logstash/inputs/group_event_tracker.rb new file mode 100644 index 0000000..7d088fd --- /dev/null +++ b/lib/logstash/inputs/group_event_tracker.rb @@ -0,0 +1,244 @@ +java_import org.apache.logging.log4j.LogManager + +# +# Tracks if an event is new and stores this event. +# The process is to maintain a window of N minutes in which every event is stored by +# millisecond, and stream/event id. +# +# Events are "new" if they are after the beginning of this window, and if they are not +# already recorded in the window (as identified by the stream/event id) +# +# The window is purged once N minutes of events are processed (note, this is by +# log event time). It is, therefore, and assumption that log events are processed +# in as close to time order as possible (at least, within the N minute window). +# This is an automatic consequence of the AWS filter_log_events method. +# +# In all cases log_event is one of the events from +# https://docs.aws.amazon.com/sdk-for-ruby/v2/api/Aws/CloudWatchLogs/Client.html#filter_log_events-instance_method +class LogEventTracker + include LogStash::Util::Loggable + + def initialize(path_to_data, prune_since_db_stream_minutes) + @logger = LogManager.getLogger(LogEventTracker) + @path_to_data = path_to_data + @prune_since_db_stream_minutes = prune_since_db_stream_minutes + + # maps groups to GroupEventTracker + @group_trackers = {} + end + + # returns true if the event hasn't been processed yet + def is_new_event(group, log_event) + return ensure_group(group).is_new_event(log_event) + end + + # records the new event in the log + def record_processed_event(group, log_event) + ensure_group(group).record_processed_event(log_event) + end + + # wipe any events older than the prune time (using the last records + # in the window as the end time) + def purge (group) + ensure_group(group).purge + end + + def min_time (group, default_time = nil) + return ensure_group(group).min_time(default_time) + end + + def get_or_set_min_time(group, default_time = nil) + return ensure_group(group).get_or_set_min_time(default_time) + end + + def save() + # build the json model + save_data = {} + @group_trackers.each do |k,v| + save_data[k] = v.to_save_model + end + + # save it + begin + File.write(@path_to_data, save_data.to_json) + rescue Errno::EACCES + # probably no file handles free + # maybe it will work next time + @logger.debug? && @logger.debug("Failed to write to: #{@path_to_data}: #{$!}") + end + end + + def load() + # load the file into json + begin + load_new_format + rescue JSON::ParserError + load_old_format + rescue + # if we can't read the file, we just assume it's broken, and we'll ignore it + @logger.debug("Failed to read: #{@path_to_data}: #{$!}") + end + end + + private + def load_new_format() + @group_trackers.clear + if File.file?(@path_to_data) + data_hash = JSON.parse(File.read(@path_to_data)) + data_hash.each do |k, v| + group_entry = ensure_group(k).from_save_model(v) + end + end + end + + private + def load_old_format() + # group1:stream2 123 + # group1 456 + @group_trackers.clear + File.open(@path_to_data) do |db| + db.each do |line| + identity, pos = line.split(" ", 2) + if identity.include? ":" + identity = identity[0..identity.index(":") - 1] + end + ensure_group(identity).update_ranges(pos.to_i) + ensure_group(identity).set_to_tail + + end + end + + end + + private + def ensure_group(group) + if !@group_trackers.key?(group) + @group_trackers[group] = GroupEventTracker.new (@prune_since_db_stream_minutes) + end + return @group_trackers[group] + end + + +end +# Maintains the event window at the level of a single group +class GroupEventTracker + include LogStash::Util::Loggable + + def initialize( prune_since_db_stream_minutes) + @logger = LogManager.getLogger(GroupEventTracker) + + @prune_since_db_stream_minutes = prune_since_db_stream_minutes + + @min_time = nil + @max_time = nil + + # maps a log event timestamp (in millis) to the events in that millisecond + @events_by_ms = {} + end + + def min_time(default_time = nil) + if @min_time.nil? + return default_time + end + + return @min_time + end + + def get_or_set_min_time (default_time = nil) + + if @min_time.nil? + @min_time = default_time + end + + return @min_time + end + + # returns true if the event hasn't been processed yet + def is_new_event(log_event) + # we've seen no records at all + if @min_time.nil? + return true + # the record is too old + elsif log_event.timestamp < @min_time + return false + # so either the timestamp is new or the event is + else + if !@events_by_ms.key?(log_event.timestamp) + return true + else + return !@events_by_ms[log_event.timestamp].include?( identify(log_event)) + end + end + end + + # records the new event in the log + def record_processed_event(log_event) + # update the min/max times + update_ranges(log_event.timestamp) + + # store the event in the ms part of the process window + if !@events_by_ms.key?(log_event.timestamp) + @events_by_ms[log_event.timestamp] = [] + end + @events_by_ms[log_event.timestamp].push(identify(log_event)) + end + + def purge () + # if we've gotten no data, there's nothing top do + if @max_time.nil? + return + end + + # if our window is all after the purge time we have nothing to do + purge_before = (@max_time - (60 * 1000 * @prune_since_db_stream_minutes)) + if @min_time > purge_before + return + else + # otherwise reset the min time and purge everything before it + @min_time = purge_before + @events_by_ms.clone.each do |k, v| + if k < purge_before + @events_by_ms.delete(k) + end + end + end + end + + def identify(log_event) + return "#{log_event.log_stream_name}:#{log_event.event_id}" + end + + def to_save_model() + save_data = {} + @events_by_ms.each do |k,v| + save_data[k] = v + end + return save_data + end + + def from_save_model(json_model) + @events_by_ms.clear() + json_model.each do |k, v| + ts = k.to_i + update_ranges(ts) + @events_by_ms[ts] = v + end + end + + def update_ranges(timestamp) + if @min_time.nil? || @min_time > timestamp + @min_time = timestamp + end + if @max_time.nil? || @max_time < timestamp + @max_time = timestamp + end + end + + def set_to_tail + if !@max_time.nil? + @min_time = @max_time + @events_by_ms.clear + end + end + +end \ No newline at end of file diff --git a/logstash-input-cloudwatch_logs.gemspec b/logstash-input-cloudwatch_logs.gemspec index 400962d..bfd0431 100644 --- a/logstash-input-cloudwatch_logs.gemspec +++ b/logstash-input-cloudwatch_logs.gemspec @@ -1,12 +1,12 @@ Gem::Specification.new do |s| s.name = 'logstash-input-cloudwatch_logs' - s.version = '1.0.3' + s.version = '1.1.0-candidate' s.licenses = ['Apache License (2.0)'] s.summary = 'Stream events from CloudWatch Logs.' s.description = 'This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program' s.authors = ['Luke Waite'] - s.email = 'lwaite@gmail.com' + s.email = ['lwaite@gmail.com'] s.homepage = '' s.require_paths = ['lib'] diff --git a/spec/inputs/cloudwatch_logs_spec.rb b/spec/inputs/cloudwatch_logs_spec.rb index 75f7cad..23b049f 100644 --- a/spec/inputs/cloudwatch_logs_spec.rb +++ b/spec/inputs/cloudwatch_logs_spec.rb @@ -3,8 +3,14 @@ require 'logstash/inputs/cloudwatch_logs' require 'aws-sdk-resources' require 'aws-sdk' +require "logstash/timestamp" describe LogStash::Inputs::CloudWatch_Logs do + def parse_time(data) + LogStash::Timestamp.at(data.to_i / 1000, (data.to_i % 1000) * 1000) + end # def parse_time + + let(:config) { { 'access_key_id' => '1234', @@ -83,26 +89,171 @@ end end - describe '#determine_start_position' do - context 'start_position set to an integer' do - sincedb = {} - subject {LogStash::Inputs::CloudWatch_Logs.new(config.merge({'start_position' => 100}))} + + describe '#process_log' do + context 'with an array in the config' do + subject {LogStash::Inputs::CloudWatch_Logs.new(config.merge({ + 'start_position' => 0, + 'sincedb_path' => Dir.mktmpdir("rspec-") + '/sincedb.txt' + }))} + + it 'check default start time - beginning ' do + # given the config is for beginning + subject.instance_variable_set(:@start_position, 'beginning') + + # then the default time is epoch start + expect(subject.send(:get_default_start_time, *[])).to eq(0) + end + + it 'check default start time - end ' do + # given the config is for end + subject.instance_variable_set(:@start_position, 'end') - it 'successfully parses the start position' do - expect {subject.determine_start_position(['test'], sincedb)}.to_not raise_error + # then the default time nearly now + now = DateTime.now.strftime('%Q').to_i + expect(subject.send(:get_default_start_time, *[])).to be_within(100).of(now) end + + it 'check default start time - start position ' do + # given the config is for end + subject.instance_variable_set(:@start_position, '86400') + + # then the default time nearly the expected + now = DateTime.now.strftime('%Q').to_i + expected = now - 86400 * 1000 + expect(subject.send(:get_default_start_time, *[])).to be_within(100).of(expected) + end + + it 'process a log event - event is new' do + subject.register + event_tracker = subject.instance_variable_get(:@event_tracker) + + # given these times + old_timestamp = DateTime.now.strftime('%Q').to_i + new_timestamp = old_timestamp + 1000 + + # given we know about this group and stream from an old record + group = 'groupA' + # given we got the message for this group, for the known stream + # and where the record is "new enough" + log = Aws::CloudWatchLogs::Types::FilteredLogEvent.new() + log.message = 'this be the verse' + log.timestamp = old_timestamp + log.ingestion_time = 123 + log.log_stream_name = 'streamX' + log.event_id = 'event1' + + event_tracker.record_processed_event(group, log) + + # update this log to be a new event + log.timestamp = new_timestamp + log.event_id = 'event2' + + # when we send the log (assuming we have a queue) + queue = [] + subject.instance_variable_set(:@queue, queue) + + subject.send(:process_log, *[log, group]) + + # then a message was sent to the queue + expect(queue.length).to eq(1) + + + expect(queue[0].get('[@timestamp]')).to eq(LogStash::Timestamp.at(new_timestamp.to_i / 1000, (new_timestamp.to_i % 1000) * 1000)) + expect(queue[0].get('[message]')).to eq('this be the verse') + expect(queue[0].get('[cloudwatch_logs][log_group]')).to eq('groupA') + expect(queue[0].get('[cloudwatch_logs][log_stream]')).to eq('streamX') + expect(queue[0].get('[cloudwatch_logs][ingestion_time]').to_iso8601).to eq('1970-01-01T00:00:00.123Z') + + # then the timestamp should have been updated + start_time = event_tracker.min_time(group) + # and the new start time the earliest record + expect(start_time).to eq(old_timestamp) + end + + + + it 'process a log event - event is old' do + subject.register + event_tracker = subject.instance_variable_get(:@event_tracker) + + # given these times + old_timestamp = DateTime.now.strftime('%Q').to_i + new_timestamp = old_timestamp + 1000 + + + puts("old_timestamp: #{old_timestamp}") + puts("new_timestamp: #{new_timestamp}") + + + # given we previously got the old record + group = 'GroupA' + log = Aws::CloudWatchLogs::Types::FilteredLogEvent.new() + log.message = 'this be the verse' + log.timestamp = new_timestamp + log.ingestion_time = 123 + log.log_stream_name = 'streamX' + log.event_id = 'eventA' + event_tracker.record_processed_event(group, log) + + # given a new log message + log.timestamp = old_timestamp + log.event_id = 'eventB' + + + # when we send the log (assuming we have a queue) + queue = [] + subject.instance_variable_set(:@queue, queue) + + subject.send(:process_log, *[log, group]) + + # then no message was sent to the queue + expect(queue.length).to eq(0) + + # then the timestamp should not have been updated + start_time = event_tracker.min_time(group) + # and the new start time is 1 millisecond after the message time + expect(start_time).to eq(new_timestamp) + + end + + it 'process a log event - event has already been seen' do + subject.register + event_tracker = subject.instance_variable_get(:@event_tracker) + + # given these times + old_timestamp = DateTime.now.strftime('%Q').to_i + + + # given we previously got the old record + group = 'GroupA' + log = Aws::CloudWatchLogs::Types::FilteredLogEvent.new() + log.message = 'this be the verse' + log.timestamp = old_timestamp + log.ingestion_time = 123 + log.log_stream_name = 'streamX' + log.event_id = 'eventA' + event_tracker.record_processed_event(group, log) + + + # when we send the log (assuming we have a queue) + queue = [] + subject.instance_variable_set(:@queue, queue) + + subject.send(:process_log, *[log, group]) + + # then no message was sent to the queue + expect(queue.length).to eq(0) + + # then the timestamp should not have been updated + start_time = event_tracker.min_time(group) + # and the new start time is 1 millisecond after the message time + expect(start_time).to eq(old_timestamp) + + end end - end - # describe '#find_log_groups with prefix true' do - # subject {LogStash::Inputs::CloudWatch_Logs.new(config.merge({'log_group_prefix' => true}))} - # - # before(:each) {subject.register} - # - # it 'should create list of prefixes' do - # expect_any_instance_of(Aws::CloudWatchLogs::Resource).to receive(:describe_log_groups).and_return({'log_groups' => [{'log_group_name' => '1'},{'log_group_name' => '2'}]}) - # expect(subject.find_log_groups).to eq(['sample-log-group-1', 'sample-log-group-2']) - # end - # end + + end end diff --git a/spec/inputs/group_event_tracker_spec.rb b/spec/inputs/group_event_tracker_spec.rb new file mode 100644 index 0000000..a884a1e --- /dev/null +++ b/spec/inputs/group_event_tracker_spec.rb @@ -0,0 +1,183 @@ +# encoding: utf-8 +require 'logstash/devutils/rspec/spec_helper' +require 'logstash/inputs/group_event_tracker' +require 'aws-sdk-resources' +require 'aws-sdk' + +describe LogEventTracker do + + describe "test check events" do + + it "add new event to empty model" do + tracker = LogEventTracker.new(Dir.mktmpdir("rspec-") + '/sincedb.txt', 15) + + # given a new log event + log = Aws::CloudWatchLogs::Types::FilteredLogEvent.new() + log.message = 'this be the verse' + log.timestamp = 1 + log.log_stream_name = 'streamX' + log.event_id = 'event1' + + # when we push the first event, then it will not have been processed + expect(tracker.is_new_event('group', log)).to be_truthy + + # now when we add it it will processed + tracker.record_processed_event('group', log) + expect(tracker.is_new_event('group', log)).to be_falsey + end + + it "add multiple events" do + tracker = LogEventTracker.new(Dir.mktmpdir("rspec-") + '/sincedb.txt', 15) + + # given a new log event in some group + group = 'group' + log = Aws::CloudWatchLogs::Types::FilteredLogEvent.new() + log.timestamp = 1 + log.log_stream_name = 'streamX' + log.event_id = 'event1' + + logSameTime = Aws::CloudWatchLogs::Types::FilteredLogEvent.new() + logSameTime.timestamp = 1 + logSameTime.log_stream_name = log.log_stream_name + logSameTime.event_id = 'event2' + + logSameTimeDifferentStream = Aws::CloudWatchLogs::Types::FilteredLogEvent.new() + logSameTimeDifferentStream.timestamp = 1 + logSameTimeDifferentStream.log_stream_name = 'streamY' + logSameTimeDifferentStream.event_id = 'event2' + + # given the first log is in place + tracker.record_processed_event(group, log) + + # then it should already be in place + expect(tracker.is_new_event(group, log)).to be_falsey + # but others in the same time, but different ids or streams should be new + expect(tracker.is_new_event(group, logSameTime)).to be_truthy + expect(tracker.is_new_event(group, logSameTimeDifferentStream)).to be_truthy + + end + + it "check purge works" do + purge_minutes = 3 + tracker = LogEventTracker.new(Dir.mktmpdir("rspec-") + '/sincedb.txt', purge_minutes) + + # given a new log event + too_old = Aws::CloudWatchLogs::Types::FilteredLogEvent.new() + too_old.timestamp = 1 + too_old.log_stream_name = 'streamX' + too_old.event_id = 'event1' + + not_too_old = Aws::CloudWatchLogs::Types::FilteredLogEvent.new() + not_too_old.timestamp = too_old.timestamp + 60 * 1000 + not_too_old.log_stream_name = 'streamX' + not_too_old.event_id = 'event2' + + now = Aws::CloudWatchLogs::Types::FilteredLogEvent.new() + now.timestamp = too_old.timestamp + 60 * 1000 * purge_minutes + 1 + now.log_stream_name = 'streamX' + now.event_id = 'event3' + + # push in the three messages + group = 'group' + tracker.record_processed_event(group, too_old) + tracker.record_processed_event(group, not_too_old) + tracker.record_processed_event(group, now) + + # purge + tracker.purge(group) + + # now, get the group data out and check it + group_tracker = tracker.send(:ensure_group, *[group]) + group_data = group_tracker.instance_variable_get(:@events_by_ms) + + # then + expect(group_data.key?(too_old.timestamp)).to be_falsey + expect(group_data.key?(not_too_old.timestamp)).to be_truthy + expect(group_data.key?(now.timestamp)).to be_truthy + end + + it "check save data" do + # given an existing file at this location + purge_minutes = 3 + pathToFile = Dir.mktmpdir("rspec-") + '/sincedb.txt' + puts("pathToFile => #{pathToFile}") + tracker = LogEventTracker.new(pathToFile, purge_minutes) + + # write in some logs + log = Aws::CloudWatchLogs::Types::FilteredLogEvent.new() + log.timestamp = 1 + log.log_stream_name = 'streamX' + log.event_id = 'event1' + tracker.record_processed_event("groupA", log) + tracker.record_processed_event("groupB", log) + + log.timestamp = 2 + log.log_stream_name = 'streamY' + log.event_id = 'event2' + tracker.record_processed_event("groupA", log) + + + # save them + tracker.save + + # create a new tracker and reload the file + tracker = LogEventTracker.new(pathToFile, purge_minutes) + tracker.load + + # now, get the group data out and check it + group_tracker = tracker.send(:ensure_group, *['groupA']) + group_data = group_tracker.instance_variable_get(:@events_by_ms) + + expect(group_data[1]).to contain_exactly("streamX:event1") + expect(group_data[2]).to contain_exactly("streamY:event2") + + expect(group_tracker.instance_variable_get(:@min_time)).to eq(1) + expect(group_tracker.instance_variable_get(:@max_time)).to eq(2) + + # group b data + group_tracker = tracker.send(:ensure_group, *['groupB']) + group_data = group_tracker.instance_variable_get(:@events_by_ms) + + expect(group_data[1]).to contain_exactly("streamX:event1") + + expect(group_tracker.instance_variable_get(:@min_time)).to eq(1) + expect(group_tracker.instance_variable_get(:@max_time)).to eq(1) + + end + + it "check old save data format" do + # given an existing file at this location + purge_minutes = 3 + pathToFile = Dir.mktmpdir("rspec-") + '/sincedb.txt' + puts("pathToFile => #{pathToFile}") + tracker = LogEventTracker.new(pathToFile, purge_minutes) + + # given a file in the new "group:stream position" format + File.open(pathToFile, "w") { |f| + f.write "group1:stream1 1\n" + f.write "group1:stream2 2\n" + f.write "group1 3\n" + f.write "group2 4\n" + f.write "group2:stream1 5\n" + } + + # load the tracker + tracker.load + + # check the groups - when we load the old file we use the end as the min/max + group_tracker = tracker.send(:ensure_group, *['group1']) + expect(group_tracker.instance_variable_get(:@min_time)).to eq(3) + expect(group_tracker.instance_variable_get(:@max_time)).to eq(3) + + group_tracker = tracker.send(:ensure_group, *['group2']) + expect(group_tracker.instance_variable_get(:@min_time)).to eq(5) + expect(group_tracker.instance_variable_get(:@max_time)).to eq(5) + + + end + + + + end + +end