Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ Gemfile.lock
.bundle
vendor
coverage/

Jars.lock
.vscode
1 change: 1 addition & 0 deletions .jrubyrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
debug.fullTrace=true
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

* Updated sincedb to track the high water mark timestamps at a stream level, rather than just at the group level. This is a fix for [#74](https://github.com/lukewaite/logstash-input-cloudwatch-logs/issues/74)

## [v1.0.3] (2018-05-28)
* Update minimum version of `logstash-mixin-aws` to support assume role ([#9a4677f](https://github.com/lukewaite/logstash-input-cloudwatch-logs/commit/9a4677fef8bcbf291bd4b357be2a9568ea4f3fc1) - Fixes [#51](https://github.com/lukewaite/logstash-input-cloudwatch-logs/issues/51), [#39](https://github.com/lukewaite/logstash-input-cloudwatch-logs/issues/39))

Expand Down
1 change: 1 addition & 0 deletions CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Contributors:
* Ted Timmons (tedder)
* Ryan O'Keeffe (danielredoak)
* Luke Waite (lukewaite)
* Daniel Bray (daniel-bray-sonalake)

Note: If you've sent us patches, bug reports, or otherwise contributed to
Logstash, and you aren't on the list above and want to be, please let us know
Expand Down
159 changes: 119 additions & 40 deletions lib/logstash/inputs/cloudwatch_logs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def register
:sincedb_path => @sincedb_path, :log_group => @log_group)
end

@logger.info("Using sincedb_path #{@sincedb_path}")
end #def register

public
Expand All @@ -112,7 +113,6 @@ def run(queue)
@queue = queue
@priority = []
_sincedb_open
determine_start_position(find_log_groups, @sincedb)

while !stop?
begin
Expand Down Expand Up @@ -158,41 +158,34 @@ def priority_of(group)
@priority.index(group) || -1
end

public
def determine_start_position(groups, sincedb)
groups.each do |group|
if !sincedb.member?(group)
case @start_position
when 'beginning'
sincedb[group] = 0

when 'end'
sincedb[group] = DateTime.now.strftime('%Q')

else
sincedb[group] = DateTime.now.strftime('%Q').to_i - (@start_position * 1000)
end # case @start_position
end
end
end # def determine_start_position

private
def process_group(group)
next_token = nil
loop do
if [email protected]?(group)
@sincedb[group] = 0
end

# The log streams in the group can overlap, even if interleaved
# is true (as it is by default) so we restart each query from the earliest
# time for which we had records across the group.
# We will still filter out old records at the stream level in process_log
# We don't filter by the log streams because we'd have to go looking for them
# and AWS can't guarantee the last event time in the result of that query
# is accurate

# NOTE: if the group-level filter isn't set to anything it will
# be set by this method to the correct default
start_time, stream_positions = get_sincedb_group_values(group)

params = {
:log_group_name => group,
:start_time => @sincedb[group],
:start_time => start_time,
:interleaved => true,
:next_token => next_token
}
resp = @cloudwatch.filter_log_events(params)

resp.events.each do |event|
process_log(event, group)
process_log(event, group, stream_positions)
end

_sincedb_write
Expand All @@ -206,18 +199,25 @@ def process_group(group)

# def process_log
private
def process_log(log, group)

@codec.decode(log.message.to_str) do |event|
event.set("@timestamp", parse_time(log.timestamp))
event.set("[cloudwatch_logs][ingestion_time]", parse_time(log.ingestion_time))
event.set("[cloudwatch_logs][log_group]", group)
event.set("[cloudwatch_logs][log_stream]", log.log_stream_name)
event.set("[cloudwatch_logs][event_id]", log.event_id)
decorate(event)

@queue << event
@sincedb[group] = log.timestamp + 1
def process_log(log, group, stream_positions)
identity = identify(group, log.log_stream_name)
stream_position = stream_positions.fetch(identity, 0)

@logger.trace? && @logger.trace("Checking event time #{log.timestamp} (#{parse_time(log.timestamp)}) -> #{stream_position}")

if log.timestamp >= stream_position
@logger.trace? && @logger.trace("Processing event")
@codec.decode(log.message.to_str) do |event|
event.set("@timestamp", parse_time(log.timestamp))
event.set("[cloudwatch_logs][ingestion_time]", parse_time(log.ingestion_time))
event.set("[cloudwatch_logs][log_group]", group)
event.set("[cloudwatch_logs][log_stream]", log.log_stream_name)
event.set("[cloudwatch_logs][event_id]", log.event_id)
decorate(event)

@queue << event
set_sincedb_value(group, log.log_stream_name, log.timestamp + 1 )
end
end
end # def process_log

Expand All @@ -227,15 +227,94 @@ def parse_time(data)
LogStash::Timestamp.at(data.to_i / 1000, (data.to_i % 1000) * 1000)
end # def parse_time


private
def identify(group, log_stream_name)
# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-logs-loggroup.html
# ':' isn't allowed in a log group name, so we can use it safely
return "#{group}:#{log_stream_name}"
end

private
def set_sincedb_value(group, log_stream_name, timestamp)
@logger.debug("Setting sincedb #{group}:#{log_stream_name} -> #{timestamp}")

# make sure this is an integer
if timestamp.is_a? String
timestamp = timestamp.to_i
end

# the per-stream level
@sincedb[identify(group, log_stream_name)] = timestamp

# the overall group high water mark
current_pos = @sincedb.fetch(group, 0)
if current_pos.is_a? String
current_pos = current_pos.to_i
end

if current_pos < timestamp
@sincedb[group] = timestamp
end

end # def set_sincedb_value



# scan over all the @sincedb entries for this group and pick the _earliest_ value
# Returns a list of [earliest last position in group, the last position of each log stream in that group ]
# the map is of the form [group:logstream] -> timestamp (long ms) and also [group] -> timestamp (long ms)
private
def get_sincedb_group_values(group)
# if we've no group, or stream, then we have never seen these events
@logger.debug("Getting sincedb #{group} from #{@sincedb}")

# if we have no group-level then we set one
if @sincedb[group].nil?
@sincedb[group] = get_default_start_time
end

## assume the group level min value
min_value = @sincedb[group]
min_map = {}
# now route through all the entries for this group (one of these will
# be the group-specific entry)
@sincedb.map do |identity, pos|
if identity.start_with?(group) && group != identity
min_map[identity] = pos
if (min_value.nil? || min_value > pos)
min_value = pos
end
end
end

@logger.debug("Got sincedb #{group} as #{min_value} -> #{min_map}")

return [min_value, min_map]
end # def get_sincedb_group_values

private
def get_default_start_time()
# chose the start time based on the configs
case @start_position
when 'beginning'
return 0
when 'end'
return DateTime.now.strftime('%Q')
else
return DateTime.now.strftime('%Q').to_i - (@start_position * 1000)
end # case @start_position
end

private
def _sincedb_open
begin
File.open(@sincedb_path) do |db|
@logger.debug? && @logger.debug("_sincedb_open: reading from #{@sincedb_path}")
db.each do |line|
group, pos = line.split(" ", 2)
@logger.debug? && @logger.debug("_sincedb_open: setting #{group} to #{pos.to_i}")
@sincedb[group] = pos.to_i
identity, pos = line.split(" ", 2)
@logger.debug? && @logger.debug("_sincedb_open: setting #{identity} to #{pos.to_i}")
@sincedb[identity] = pos.to_i
end
end
rescue
Expand All @@ -258,8 +337,8 @@ def _sincedb_write

private
def serialize_sincedb
@sincedb.map do |group, pos|
[group, pos].join(" ")
@sincedb.map do |identity, pos|
[identity, pos].join(" ")
end.join("\n") + "\n"
end
end # class LogStash::Inputs::CloudWatch_Logs
4 changes: 2 additions & 2 deletions logstash-input-cloudwatch_logs.gemspec
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-cloudwatch_logs'
s.version = '1.0.3'
s.version = '1.0.4-candidate'
s.licenses = ['Apache License (2.0)']
s.summary = 'Stream events from CloudWatch Logs.'
s.description = 'This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program'
s.authors = ['Luke Waite']
s.email = '[email protected]'
s.email = ['[email protected]']
s.homepage = ''
s.require_paths = ['lib']

Expand Down
Loading