Skip to content

Commit 82ec779

Browse files
committed
Fix a possible race where Thread.interrupted was not properly cleared
This should keep perf even Fixes #131
1 parent 9931349 commit 82ec779

File tree

3 files changed

+26
-56
lines changed

3 files changed

+26
-56
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 4.0.1
2+
- Fix a potential race
3+
14
## 4.0.0
25
- Major performance improvements due to reduced locking
36

lib/logstash/filters/grok/timeout_enforcer.rb

Lines changed: 22 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,37 @@
11
class LogStash::Filters::Grok::TimeoutEnforcer
2-
attr_reader :running
3-
42
def initialize(logger, timeout_nanos)
53
@logger = logger
6-
@running = false
4+
@running = java.util.concurrent.atomic.AtomicBoolean.new(false)
75
@timeout_nanos = timeout_nanos
86

97
# Stores running matches with their start time, this is used to cancel long running matches
108
# Is a map of Thread => start_time
119
@threads_to_start_time = java.util.concurrent.ConcurrentHashMap.new
12-
@cancel_mutex = Mutex.new
10+
end
11+
12+
def running
13+
@running.get()
1314
end
1415

1516
def grok_till_timeout(grok, field, value)
1617
begin
1718
thread = java.lang.Thread.currentThread()
18-
start_thread_groking(thread)
19+
@threads_to_start_time.put(thread, java.lang.System.nanoTime)
1920
grok.execute(value)
20-
rescue InterruptedRegexpError => e
21+
rescue InterruptedRegexpError, java.lang.InterruptedException => e
2122
raise ::LogStash::Filters::Grok::TimeoutException.new(grok, field, value)
2223
ensure
23-
unless stop_thread_groking(thread)
24-
@cancel_mutex.lock
25-
begin
26-
# Clear any interrupts from any previous invocations that were not caught by Joni
27-
# It may appear that this should go in #stop_thread_groking but that would actually
28-
# break functionality! If this were moved there we would clear the interrupt
29-
# immediately after setting it in #cancel_timed_out, hence this MUST be here
30-
java.lang.Thread.interrupted
31-
ensure
32-
@cancel_mutex.unlock
33-
end
34-
end
24+
# If the regexp finished, but interrupt was called after, we'll want to
25+
# clear the interrupted status anyway
26+
@threads_to_start_time.remove(thread)
27+
thread.interrupted
3528
end
3629
end
3730

3831
def start!
39-
@running = true
32+
@running.set(true)
4033
@timer_thread = Thread.new do
41-
while @running
34+
while @running.get()
4235
begin
4336
cancel_timed_out!
4437
rescue Exception => e
@@ -54,49 +47,23 @@ def start!
5447
end
5548

5649
def stop!
57-
@running = false
50+
@running.set(false)
5851
# Check for the thread mostly for a fast start/shutdown scenario
5952
@timer_thread.join if @timer_thread
6053
end
6154

6255
private
6356

64-
# These methods are private in large part because if they aren't called
65-
# in specific sequence and used together in specific ways the interrupt
66-
# behavior will be incorrect. Do NOT use or modify these methods unless
67-
# you know what you are doing!
68-
69-
def start_thread_groking(thread)
70-
# Clear any interrupts from any previous invocations that were not caught by Joni
71-
java.lang.Thread.interrupted
72-
@threads_to_start_time.put(thread, java.lang.System.nanoTime)
73-
end
74-
75-
# Returns falsy in case there was no Grok execution in progress for the thread
76-
def stop_thread_groking(thread)
77-
@threads_to_start_time.remove(thread)
78-
end
79-
8057
def cancel_timed_out!
8158
now = java.lang.System.nanoTime # save ourselves some nanotime calls
82-
@threads_to_start_time.entry_set.each do |entry|
83-
start_time = entry.get_value
84-
if start_time < now && now - start_time > @timeout_nanos
85-
thread = entry.get_key
86-
# Ensure that we never attempt to cancel this thread unless a Grok execution is in progress
87-
# Theoretically there is a race condition here in case the entry's grok action changed
88-
# between evaluating the above condition on the start_time and calling stop_thread_groking
89-
# Practically this is impossible, since it would require a whole loop of writing to an
90-
# output, pulling new input events and starting a new Grok execution in worker thread
91-
# in between the above `if start_time < now && now - start_time > @timeout_nanos` and
92-
# the call to `stop_thread_groking`.
93-
if stop_thread_groking(thread)
94-
@cancel_mutex.lock
95-
begin
96-
thread.interrupt()
97-
ensure
98-
@cancel_mutex.unlock
99-
end
59+
@threads_to_start_time.forEach do |thread, start_time|
60+
# Use compute to lock this value
61+
@threads_to_start_time.computeIfPresent(thread) do |thread, start_time|
62+
if start_time < now && now - start_time > @timeout_nanos
63+
thread.interrupt
64+
nil # Delete the key
65+
else
66+
start_time # preserve the key
10067
end
10168
end
10269
end

logstash-filter-grok.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-filter-grok'
4-
s.version = '4.0.0'
4+
s.version = '4.0.1'
55
s.licenses = ['Apache License (2.0)']
66
s.summary = "Parses unstructured event data into fields"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

0 commit comments

Comments
 (0)