diff --git a/CHANGELOG.md b/CHANGELOG.md index 02efd55..69ceb2d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 3.3.0 + - Removed usage of jruby-stdin-channel which is not required anymore [#19](https://github.com/logstash-plugins/logstash-input-stdin/pull/19) + ## 3.2.6 - Docs: Set the default_codec doc attribute. diff --git a/CONTRIBUTORS b/CONTRIBUTORS index 33d7ca4..9ce49c6 100644 --- a/CONTRIBUTORS +++ b/CONTRIBUTORS @@ -2,6 +2,7 @@ The following is a list of people who have contributed ideas, code, bug reports, or in general have helped logstash along its way. Contributors: +* Colin Surprenant (colinsurprenant) * Fredrik Gustafsson (jagheterfredrik) * John E. Vincent (lusis) * Jordan Sissel (jordansissel) diff --git a/lib/logstash/inputs/stdin.rb b/lib/logstash/inputs/stdin.rb index 2a7322e..8f50fd9 100644 --- a/lib/logstash/inputs/stdin.rb +++ b/lib/logstash/inputs/stdin.rb @@ -3,7 +3,6 @@ require "logstash/namespace" require "concurrent/atomics" require "socket" # for Socket.gethostname -require "jruby-stdin-channel" # Read events from standard input. # @@ -17,16 +16,6 @@ class LogStash::Inputs::Stdin < LogStash::Inputs::Base READ_SIZE = 16384 def register - begin - @stdin = StdinChannel::Reader.new - self.class.module_exec { alias_method :stdin_read, :channel_read } - self.class.module_exec { alias_method :stop, :channel_stop} - rescue => e - @logger.debug("fallback to reading from regular $stdin", :exception => e) - self.class.module_exec { alias_method :stdin_read, :default_read } - self.class.module_exec { alias_method :stop, :default_stop } - end - @host = Socket.gethostname fix_streaming_codecs end @@ -52,15 +41,11 @@ def self.reloadable? private - def default_stop + def stop $stdin.close rescue nil end - def channel_stop - @stdin.close rescue nil - end - - def default_read + def stdin_read begin return $stdin.sysread(READ_SIZE) rescue IOError, EOFError @@ -71,16 +56,4 @@ def default_read end nil end - - def channel_read - begin - return @stdin.read(READ_SIZE) - rescue IOError, EOFError, StdinChannel::ClosedChannelError - do_stop - rescue => e - # ignore any exception in the shutdown process - raise(e) unless stop? - end - nil - end end diff --git a/logstash-input-stdin.gemspec b/logstash-input-stdin.gemspec index e285900..5a2dc40 100644 --- a/logstash-input-stdin.gemspec +++ b/logstash-input-stdin.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-stdin' - s.version = '3.2.6' + s.version = '3.3.0' s.licenses = ['Apache License (2.0)'] s.summary = "Reads events from standard input" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" @@ -23,7 +23,9 @@ Gem::Specification.new do |s| s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_runtime_dependency "logstash-codec-line" s.add_runtime_dependency "concurrent-ruby" - s.add_runtime_dependency "jruby-stdin-channel" + + # for JRuby >= 9.1.15.0, see https://github.com/logstash-plugins/logstash-input-stdin/pull/19 + s.add_runtime_dependency "logstash-core", ">= 6.7.0" s.add_development_dependency "logstash-codec-plain" s.add_development_dependency "logstash-codec-json" diff --git a/spec/inputs/stdin_spec.rb b/spec/inputs/stdin_spec.rb index fbd866b..1654395 100644 --- a/spec/inputs/stdin_spec.rb +++ b/spec/inputs/stdin_spec.rb @@ -30,4 +30,47 @@ insist { plugin.codec }.is_a?(LogStash::Codecs::JSONLines) end end + + context "stdin close" do + # this spec tests for the interruptibility of $stdin + # for more context see https://github.com/logstash-plugins/logstash-input-stdin/pull/19 + # starting at JRuby 9.1.15.0 it is possible to interrupt $stdin.syscall with $stdin.close + # this spec is here to prevent regression on this behaviour + + let(:signal) { Queue.new } + + it "should interrupt sysread" do + + # launch a $stdin.sysread operation is a separate thread + # where the thread return value will be the sysread IOError + # caused by the close call. + sysread_thread = Thread.new do + result = nil + begin + signal << "starting read" + $stdin.sysread(1) + rescue => e + result = e + end + result + end + + # wait for thread to be ready to call sysread + signal.pop + # wait jsut a bit more to make sure the sysread call is made + sleep(0.5) + + # launch close in a separate thread because on Rubies which does not support interruptibility + # close will block + Thread.new do + $stdin.close + end + + Timeout.timeout(5) do + expect(sysread_thread.value).to be_a(IOError) + expect(sysread_thread.value.message).to match(/stream closed/) + end + end + end + end