Skip to content

Commit 8ec3fc5

Browse files
committed
add code and test to set eviction_block
update changelog and version Closes #16
1 parent 35d8d43 commit 8ec3fc5

File tree

4 files changed

+34
-9
lines changed

4 files changed

+34
-9
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 2.0.4
2+
- Add constructional method to allow an eviction specific block to be set.
3+
14
## 2.0.3
25
- Add pseudo codec IdentityMapCodec. Support class for identity based multiline processing.
36

lib/logstash/codecs/identity_map_codec.rb

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ def initialize(codec)
101101
@max_identities = MAX_IDENTITIES
102102
@evict_timeout = EVICT_TIMEOUT
103103
@cleaner = MapCleaner.new(self, CLEANER_INTERVAL)
104-
@decode_block = lambda {|*| }
104+
@decode_block = lambda {|*| true }
105+
@eviction_block = nil
105106
end
106107

107108
# ==============================================
@@ -126,6 +127,12 @@ def cleaner_interval(interval)
126127
@cleaner = MapCleaner.new(self, interval.to_i)
127128
self
128129
end
130+
131+
# used to add a non-default eviction block
132+
def eviction_block(block)
133+
@eviction_block = block
134+
self
135+
end
129136
# end Constructional/builder methods
130137
# ==============================================
131138

@@ -184,7 +191,7 @@ def map_cleanup
184191
# contents should not mutate during this call
185192
identity_map.delete_if do |identity, compo|
186193
if (flag = compo.timeout <= cut_off)
187-
compo.codec.flush(&@decode_block)
194+
compo.codec.flush(&(@eviction_block || @decode_block))
188195
end
189196
flag
190197
end

logstash-codec-multiline.gemspec

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-codec-multiline'
4-
s.version = '2.0.3'
4+
s.version = '2.0.4'
55
s.licenses = ['Apache License (2.0)']
66
s.summary = "The multiline codec will collapse multiline messages and merge them into a single event."
77
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
@@ -20,7 +20,7 @@ Gem::Specification.new do |s|
2020
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "codec" }
2121

2222
# Gem dependencies
23-
s.add_runtime_dependency "logstash-core", ">= 2.0.0.beta2", "< 3.0.0"
23+
s.add_runtime_dependency "logstash-core", ">= 2.0.0", "< 3.0.0"
2424

2525
s.add_runtime_dependency 'logstash-patterns-core'
2626
s.add_runtime_dependency 'jls-grok', '~> 0.11.1'

spec/codecs/identity_map_codec_spec.rb

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ def initialize() @tracer = []; end
1818
def clone() self.class.new; end
1919
def decode(data) @tracer.push [:decode, data]; end
2020
def encode(event) @tracer.push [:encode, event]; end
21-
def flush(&block) @tracer.push [:flush, true]; end
21+
def flush(&block) @tracer.push [:flush, block.call]; end
2222
def close() @tracer.push [:close, true]; end
2323
def logger() @logger ||= LogTracer.new; end
2424

@@ -175,7 +175,7 @@ def trace_for(symbol)
175175
sleep(1.2)
176176
demuxer.decode(Object.new, "stream2")
177177
expect(demuxer.identity_count).to eq(limit)
178-
expect { demuxer.decode(Object.new, "stream4") }.not_to raise_error
178+
expect { demuxer.decode(Object.new, "stream4"){|*| 42 } }.not_to raise_error
179179
end
180180
end
181181
end
@@ -195,12 +195,27 @@ def trace_for(symbol)
195195
end
196196

197197
describe "codec eviction" do
198-
let(:demuxer) { described_class.new(codec).evict_timeout(1).cleaner_interval(1) }
199198
context "when an identity has become stale" do
199+
let(:demuxer) { described_class.new(codec).evict_timeout(1).cleaner_interval(1) }
200200
it "the cleaner evicts the codec and flushes it first" do
201-
demuxer.decode(Object.new, "stream1")
201+
demuxer.decode(Object.new, "stream1"){|*| 42}
202+
sleep(2.1)
203+
expect(codec.trace_for(:flush)).to eq(42)
204+
expect(demuxer.identity_map.keys).not_to include("stream1")
205+
end
206+
end
207+
208+
context "when an identity has become stale and an evition block is set" do
209+
let(:demuxer) do
210+
described_class.new(codec)
211+
.evict_timeout(1)
212+
.cleaner_interval(1)
213+
.eviction_block(lambda {|*| 24} )
214+
end
215+
it "the cleaner evicts the codec and flushes it first using the eviction_block" do
216+
demuxer.decode(Object.new, "stream1"){|*| 42}
202217
sleep(2.1)
203-
expect(codec.trace_for(:flush)).to be_truthy
218+
expect(codec.trace_for(:flush)).to eq(24)
204219
expect(demuxer.identity_map.keys).not_to include("stream1")
205220
end
206221
end

0 commit comments

Comments
 (0)