Skip to content

Commit cff8035

Browse files
lock cancel operations
Fixes #125
1 parent dfde844 commit cff8035

File tree

1 file changed

+22
-9
lines changed

1 file changed

+22
-9
lines changed

lib/logstash/filters/grok/timeout_enforcer.rb

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ def initialize(logger, timeout_nanos)
99
# Stores running matches with their start time, this is used to cancel long running matches
1010
# Is a map of Thread => start_time
1111
@threads_to_start_time = java.util.concurrent.ConcurrentHashMap.new
12+
@cancel_mutex = Mutex.new
1213
end
1314

1415
def grok_till_timeout(grok, field, value)
@@ -19,12 +20,18 @@ def grok_till_timeout(grok, field, value)
1920
rescue InterruptedRegexpError => e
2021
raise ::LogStash::Filters::Grok::TimeoutException.new(grok, field, value)
2122
ensure
22-
stop_thread_groking(thread)
23-
# Clear any interrupts from any previous invocations that were not caught by Joni
24-
# It may appear that this should go in #stop_thread_groking but that would actually
25-
# break functionality! If this were moved there we would clear the interrupt
26-
# immediately after setting it in #cancel_timed_out, hence this MUST be here
27-
java.lang.Thread.interrupted
23+
unless stop_thread_groking(thread)
24+
@cancel_mutex.lock
25+
begin
26+
# Clear any interrupts from any previous invocations that were not caught by Joni
27+
# It may appear that this should go in #stop_thread_groking but that would actually
28+
# break functionality! If this were moved there we would clear the interrupt
29+
# immediately after setting it in #cancel_timed_out, hence this MUST be here
30+
java.lang.Thread.interrupted
31+
ensure
32+
@cancel_mutex.unlock
33+
end
34+
end
2835
end
2936
end
3037

@@ -66,7 +73,7 @@ def start_thread_groking(thread)
6673
end
6774

6875
def stop_thread_groking(thread)
69-
@threads_to_start_time.delete(thread)
76+
@threads_to_start_time.remove(thread)
7077
end
7178

7279
def cancel_timed_out!
@@ -75,10 +82,16 @@ def cancel_timed_out!
7582
start_time = entry.get_value
7683
if start_time < now && now - start_time > @timeout_nanos
7784
thread = entry.get_key
78-
thread.interrupt()
7985
# Ensure that we never attempt to cancel this thread twice in the event
8086
# of weird races
81-
stop_thread_groking(thread)
87+
if stop_thread_groking(thread)
88+
@cancel_mutex.lock
89+
begin
90+
thread.interrupt()
91+
ensure
92+
@cancel_mutex.unlock
93+
end
94+
end
8295
end
8396
end
8497
end

0 commit comments

Comments
 (0)