diff --git a/Gemfile b/Gemfile index cdd8aaa..4ee263f 100644 --- a/Gemfile +++ b/Gemfile @@ -1,3 +1,3 @@ -source 'https://gems.ruby-china.com' +source 'https://rubygems.org' gem "logstash-devutils","=1.3.1" gemspec diff --git a/README.md b/README.md index a005b64..2da55cb 100644 --- a/README.md +++ b/README.md @@ -1,57 +1,107 @@ -# logstash-input-pulsar -- coding demo +# Logstash Input Pulsar Plugin -## Developing -### 1. Develop environment -- rvm -- jruby -- gem -- bundler -- rake +This is a Ruby plugin for [Logstash](https://github.com/elastic/logstash). + +It is fully free and fully open source. The license is Apache 2.0, meaning you are free to use it however you want. + +This input will read events from a Pulsar topic. + +The version of this plugin x.y.z.m conforms to Pulsar x.y.z, while m is the patch version number. For broker compatibility, see the official Pulsar compatibility reference. If the compatibility wiki is not up-to-date, please contact Pulsar support/community to confirm compatibility. + +If you require features not yet available in this plugin (including client version upgrades), please file an issue with details about what you need. + +# Pulsar Input Configuration Options + +This plugin supports these configuration options. + +| Settings | Input type | Default value | Required | +| ----------------------------- | :----------------------------------------------------------: | ----------------: | -------: | +| service_url | string | - | Yes | +| topics or topics_pattern | array | ["topic-1"] | No | +| subscription_name | string | "logstash-group" | No | +| client_id | string | "logstash-client" | No | +| subscription_type | string, one of["Shared","Exclusive","Failover","Key_shared"] | "Shared" | No | +| subscription_initial_position | string, one of["Latest","Earliest"] | "Earliest" | No | +| codec | codec | "plain" | No | +| consumer_threads | number | 1 | No | +| decorate_events | boolean | false | No | +| commit_async | boolean | false | No | +| auth_plugin_class_name | string | - | No | +| auth_params | password | - | No | + +# Example + +pulsar without tls & token -### 2. Install dependencies -```sh -bundle install -rake install_jars +``` +input{ + pulsar{ + service_url => "pulsar://127.0.0.1:6650" + codec => "json" + topics => [ + "persistent://public/default/topic1", + "persistent://public/default/topic2" + ] + subscription_name => "my_consumer" + subscription_type => "Shared" + subscription_initial_position => "Earliest" + } +} ``` -### 3. Running your unpublished Plugin in Logstash -#### 3.1 Run in a local Logstash clone +pulsar with token -- Edit Logstash `Gemfile` and add the local plugin path, for example: -```ruby -gem "logstash-input-pulsar", :path => "/your/local/logstash-input-pulsar" ``` -- Install plugin +input{ + pulsar{ + service_url => "pulsar://127.0.0.1:6650" + codec => "plain" + topics => [ + "persistent://public/default/topic1", + "persistent://public/default/topic2" + ] + subscription_name => "my_subscription" + subscription_type => "Shared" + subscription_initial_position => "Earliest" + auth_plugin_class_name => "org.apache.pulsar.client.impl.auth.AuthenticationToken" + auth_params => "token:${token}" + } +} +``` + +# Installation + +1. Install with gem file ```sh -# Logstash 2.3 and higher -bin/logstash-plugin install --no-verify + wget https://github.com/NiuBlibing/logstash-input-pulsar/releases/download/vx.y.z.m/logstash-input-pulsar-x.y.z.m.gem +bin/logstash-plugin install logstash-input-pulsar-x.y.z.m.gem +``` -# Prior to Logstash 2.3 -bin/plugin install --no-verify +2. Install from rubygems ``` -- Run Logstash with your plugin -```sh -bin/logstash -e 'input {pulsar {}}' +bin/logstash-plugin install logstash-input-pulsar ``` -At this point any modifications to the plugin code will be applied to this local Logstash setup. After modifying the plugin, simply rerun Logstash. -#### 3.2 Run in an installed Logstash +# Develop + +1. Develop environment + +- rvm +- jruby (**_Do not_** use ruby, it may fail to compile) +- gem +- bundler +- rake -You can use the same **3.1** method to run your plugin in an installed Logstash by editing its `Gemfile` and pointing the `:path` to your local plugin development directory or you can build the gem and install it using: +2. Install dependencies -- Build your plugin gem ```sh -gem build logstash-filter-awesome.gemspec +bundle install +rake install_jars ``` -- Install the plugin from the Logstash home -```sh -# Logstash 2.3 and higher -bin/logstash-plugin install --no-verify -# Prior to Logstash 2.3 -bin/plugin install --no-verify +3. Build +```sh +gem build logstash-input-pulsar.gemspec ``` -- Start Logstash and proceed to test the plugin \ No newline at end of file diff --git a/lib/logstash/inputs/pulsar.rb b/lib/logstash/inputs/pulsar.rb index f811341..abcecd8 100644 --- a/lib/logstash/inputs/pulsar.rb +++ b/lib/logstash/inputs/pulsar.rb @@ -9,14 +9,17 @@ class LogStash::Inputs::Pulsar < LogStash::Inputs::Base default :codec, 'plain' - config :service_url, :validate => :string + config :service_url, :validate => :string, :required => true + config :auth_plugin_class_name, :validate => :string + config :auth_params, :validate => :password - config :topics, :validate => :array, :default => ["logstash"] + config :topics, :validate => :array, :default => ["topic-1"] config :topics_pattern, :validate => :string - config :group_id, :validate => :string, :default => "logstash-group" + config :subscription_name, :validate => :string, :default => "logstash-group" config :client_id, :validate => :string, :default => "logstash-client" config :subscription_type, :validate => :string, :default => "Shared" + config :subscription_initial_position, :validate => :string, :default => "Earliest" config :consumer_threads, :validate => :number, :default => 1 @@ -52,17 +55,16 @@ def stop @runner_threads.each { |t| t.exit } end - public - def pulsar_consumers - @runner_consumers - end # 这个是什么用 - private def create_consumer(client_id) begin logger.info("client - ", :client => client_id) clientBuilder = org.apache.pulsar.client.api.PulsarClient.builder() clientBuilder.serviceUrl(@service_url) + if not @auth_plugin_class_name.nil? and not @auth_params.nil? + auth = org.apache.pulsar.client.api.AuthenticationFactory.create(@auth_plugin_class_name, @auth_params.value) + clientBuilder.authentication(auth) + end client = clientBuilder.build @runner_pulsar_clients.push(client) @@ -80,11 +82,20 @@ def create_consumer(client_id) consumerBuilder.subscriptionType(subscriptionType::Exclusive) elsif @subscription_type == "Failover" consumerBuilder.subscriptionType(subscriptionType::Failover) + elsif @subscription_type == "Key_Shared" + consumerBuilder.subscriptionType(subscriptionType::Key_Shared) else consumerBuilder.subscriptionType(subscriptionType::Shared) end - consumer = consumerBuilder.subscriptionName(@group_id) + subscriptionInitialPositionType = org.apache.pulsar.client.api.SubscriptionInitialPosition + if @subscription_initial_position == "Latest" + consumerBuilder.subscriptionInitialPosition(subscriptionInitialPositionType::Latest) + else + consumerBuilder.subscriptionInitialPosition(subscriptionInitialPositionType::Earliest) + end + + consumer = consumerBuilder.subscriptionName(@subscription_name) .consumerName(client_id) .subscribe(); diff --git a/logstash-input-pulsar.gemspec b/logstash-input-pulsar.gemspec index 166f06e..53b7ee5 100644 --- a/logstash-input-pulsar.gemspec +++ b/logstash-input-pulsar.gemspec @@ -1,12 +1,12 @@ Gem::Specification.new do |s| s.name = 'logstash-input-pulsar' - s.version = '2.3.0' - s.licenses = ['Apache License (2.0)'] + s.version = '2.11.0.3' + s.licenses = ['Apache-2.0'] s.summary = 'This input will read events from a pulsar topic.' s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" - s.authors = ['ShiyiLiang'] - s.email = 'test' - s.homepage = "https://github.com/se7enkings" + s.authors = ['ShiyiLiang', 'Tianyue Ren'] + s.email = 'rentianyue-jk@360shuke.com' + s.homepage = "https://github.com/NiuBlibing/logstash-input-pulsar" s.require_paths = ['lib'] # Files @@ -18,11 +18,11 @@ Gem::Specification.new do |s| # Special flag to let us know this is actually a logstash plugin s.metadata = { 'logstash_plugin' => 'true', 'group' => 'input'} - s.requirements << "jar 'org.apache.pulsar:pulsar-client', '2.3.0'" - s.requirements << "jar 'org.slf4j:slf4j-log4j12', '1.7.21'" - s.requirements << "jar 'org.apache.logging.log4j:log4j-1.2-api', '2.6.2'" + s.requirements << "jar 'org.apache.pulsar:pulsar-client', '2.11.0'" + s.requirements << "jar 'org.slf4j:slf4j-log4j12', '1.7.32'" + s.requirements << "jar 'org.apache.logging.log4j:log4j-1.2-api', '2.18.0'" - s.add_development_dependency 'jar-dependencies', '~> 0.3.2' + s.add_development_dependency 'jar-dependencies', '~> 0.4.1' # Gem dependencies s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"