1- java_import java . util . concurrent . locks . ReentrantLock
2-
31class LogStash ::Filters ::Grok ::TimeoutEnforcer
42 attr_reader :running
53
@@ -10,15 +8,14 @@ def initialize(logger, timeout_nanos)
108
119 # Stores running matches with their start time, this is used to cancel long running matches
1210 # Is a map of Thread => start_time
13- @threads_to_start_time = { }
14- @state_lock = ReentrantLock . new
11+ @threads_to_start_time = java . util . concurrent . ConcurrentHashMap . new
1512 end
1613
17- def grok_till_timeout ( event , grok , field , value )
14+ def grok_till_timeout ( grok , field , value )
1815 begin
1916 thread = java . lang . Thread . currentThread ( )
2017 start_thread_groking ( thread )
21- yield
18+ grok . execute ( value )
2219 rescue InterruptedRegexpError => e
2320 raise ::LogStash ::Filters ::Grok ::TimeoutException . new ( grok , field , value )
2421 ensure
@@ -27,7 +24,7 @@ def grok_till_timeout(event, grok, field, value)
2724 # It may appear that this should go in #stop_thread_groking but that would actually
2825 # break functionality! If this were moved there we would clear the interrupt
2926 # immediately after setting it in #cancel_timed_out, hence this MUST be here
30- thread . interrupted
27+ java . lang . Thread . interrupted
3128 end
3229 end
3330
@@ -64,44 +61,26 @@ def stop!
6461
6562 def start_thread_groking ( thread )
6663 # Clear any interrupts from any previous invocations that were not caught by Joni
67- thread . interrupted
68- synchronize do
69- @threads_to_start_time [ thread ] = java . lang . System . nanoTime ( )
70- end
64+ java . lang . Thread . interrupted
65+ @threads_to_start_time . put ( thread , java . lang . System . nanoTime )
7166 end
7267
7368 def stop_thread_groking ( thread )
74- synchronize do
75- @threads_to_start_time . delete ( thread )
76- end
69+ @threads_to_start_time . delete ( thread )
7770 end
7871
7972 def cancel_timed_out!
80- synchronize do
81- @threads_to_start_time . each do |thread , start_time |
82- now = java . lang . System . nanoTime # save ourselves some nanotime calls
83- elapsed = java . lang . System . nanoTime - start_time
84- if elapsed > @timeout_nanos
85- elapsed_millis = elapsed / 1000
86- thread . interrupt ( )
87- # Ensure that we never attempt to cancel this thread twice in the event
88- # of weird races
89- stop_thread_groking ( thread )
90- end
73+ now = java . lang . System . nanoTime # save ourselves some nanotime calls
74+ @threads_to_start_time . entry_set . each do |entry |
75+ start_time = entry . get_value
76+ if start_time < now && now - start_time > @timeout_nanos
77+ thread = entry . get_key
78+ thread . interrupt ( )
79+ # Ensure that we never attempt to cancel this thread twice in the event
80+ # of weird races
81+ stop_thread_groking ( thread )
9182 end
9283 end
9384 end
9485
95- # We use this instead of a Mutex because JRuby mutexes are interruptible
96- # We actually don't want that behavior since we always clear the interrupt in
97- # grok_till_timeout
98- def synchronize
99- # The JRuby Mutex uses lockInterruptibly which is what we DO NOT want
100- @state_lock . lock ( )
101- yield
102- ensure
103- @state_lock . unlock ( )
104- end
105-
106-
10786end
0 commit comments