Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
source 'https://gems.ruby-china.com'
source 'https://rubygems.org'
gem "logstash-devutils","=1.3.1"
gemspec
126 changes: 88 additions & 38 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,57 +1,107 @@
# logstash-input-pulsar
- coding demo
# Logstash Input Pulsar Plugin

## Developing
### 1. Develop environment
- rvm
- jruby
- gem
- bundler
- rake
This is a Ruby plugin for [Logstash](https://github.com/elastic/logstash).

It is fully free and fully open source. The license is Apache 2.0, meaning you are free to use it however you want.

This input will read events from a Pulsar topic.

The version of this plugin x.y.z.m conforms to Pulsar x.y.z, while m is the patch version number. For broker compatibility, see the official Pulsar compatibility reference. If the compatibility wiki is not up-to-date, please contact Pulsar support/community to confirm compatibility.

If you require features not yet available in this plugin (including client version upgrades), please file an issue with details about what you need.

# Pulsar Input Configuration Options

This plugin supports these configuration options.

| Settings | Input type | Default value | Required |
| ----------------------------- | :----------------------------------------------------------: | ----------------: | -------: |
| service_url | string | - | Yes |
| topics or topics_pattern | array | ["topic-1"] | No |
| subscription_name | string | "logstash-group" | No |
| client_id | string | "logstash-client" | No |
| subscription_type | string, one of["Shared","Exclusive","Failover","Key_shared"] | "Shared" | No |
| subscription_initial_position | string, one of["Latest","Earliest"] | "Earliest" | No |
| codec | codec | "plain" | No |
| consumer_threads | number | 1 | No |
| decorate_events | boolean | false | No |
| commit_async | boolean | false | No |
| auth_plugin_class_name | string | - | No |
| auth_params | password | - | No |

# Example

pulsar without tls & token

### 2. Install dependencies
```sh
bundle install
rake install_jars
```
input{
pulsar{
service_url => "pulsar://127.0.0.1:6650"
codec => "json"
topics => [
"persistent://public/default/topic1",
"persistent://public/default/topic2"
]
subscription_name => "my_consumer"
subscription_type => "Shared"
subscription_initial_position => "Earliest"
}
}
```

### 3. Running your unpublished Plugin in Logstash
#### 3.1 Run in a local Logstash clone
pulsar with token

- Edit Logstash `Gemfile` and add the local plugin path, for example:
```ruby
gem "logstash-input-pulsar", :path => "/your/local/logstash-input-pulsar"
```
- Install plugin
input{
pulsar{
service_url => "pulsar://127.0.0.1:6650"
codec => "plain"
topics => [
"persistent://public/default/topic1",
"persistent://public/default/topic2"
]
subscription_name => "my_subscription"
subscription_type => "Shared"
subscription_initial_position => "Earliest"
auth_plugin_class_name => "org.apache.pulsar.client.impl.auth.AuthenticationToken"
auth_params => "token:${token}"
}
}
```

# Installation

1. Install with gem file
```sh
# Logstash 2.3 and higher
bin/logstash-plugin install --no-verify
wget https://github.com/NiuBlibing/logstash-input-pulsar/releases/download/vx.y.z.m/logstash-input-pulsar-x.y.z.m.gem
bin/logstash-plugin install logstash-input-pulsar-x.y.z.m.gem
```

# Prior to Logstash 2.3
bin/plugin install --no-verify
2. Install from rubygems

```
- Run Logstash with your plugin
```sh
bin/logstash -e 'input {pulsar {}}'
bin/logstash-plugin install logstash-input-pulsar
```
At this point any modifications to the plugin code will be applied to this local Logstash setup. After modifying the plugin, simply rerun Logstash.

#### 3.2 Run in an installed Logstash
# Develop

1. Develop environment

- rvm
- jruby (**_Do not_** use ruby, it may fail to compile)
- gem
- bundler
- rake

You can use the same **3.1** method to run your plugin in an installed Logstash by editing its `Gemfile` and pointing the `:path` to your local plugin development directory or you can build the gem and install it using:
2. Install dependencies

- Build your plugin gem
```sh
gem build logstash-filter-awesome.gemspec
bundle install
rake install_jars
```
- Install the plugin from the Logstash home
```sh
# Logstash 2.3 and higher
bin/logstash-plugin install --no-verify

# Prior to Logstash 2.3
bin/plugin install --no-verify
3. Build

```sh
gem build logstash-input-pulsar.gemspec
```
- Start Logstash and proceed to test the plugin
29 changes: 20 additions & 9 deletions lib/logstash/inputs/pulsar.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ class LogStash::Inputs::Pulsar < LogStash::Inputs::Base

default :codec, 'plain'

config :service_url, :validate => :string
config :service_url, :validate => :string, :required => true
config :auth_plugin_class_name, :validate => :string
config :auth_params, :validate => :password

config :topics, :validate => :array, :default => ["logstash"]
config :topics, :validate => :array, :default => ["topic-1"]
config :topics_pattern, :validate => :string

config :group_id, :validate => :string, :default => "logstash-group"
config :subscription_name, :validate => :string, :default => "logstash-group"
config :client_id, :validate => :string, :default => "logstash-client"
config :subscription_type, :validate => :string, :default => "Shared"
config :subscription_initial_position, :validate => :string, :default => "Earliest"
config :consumer_threads, :validate => :number, :default => 1


Expand Down Expand Up @@ -52,17 +55,16 @@ def stop
@runner_threads.each { |t| t.exit }
end

public
def pulsar_consumers
@runner_consumers
end # 这个是什么用

private
def create_consumer(client_id)
begin
logger.info("client - ", :client => client_id)
clientBuilder = org.apache.pulsar.client.api.PulsarClient.builder()
clientBuilder.serviceUrl(@service_url)
if not @auth_plugin_class_name.nil? and not @auth_params.nil?
auth = org.apache.pulsar.client.api.AuthenticationFactory.create(@auth_plugin_class_name, @auth_params.value)
clientBuilder.authentication(auth)
end
client = clientBuilder.build
@runner_pulsar_clients.push(client)

Expand All @@ -80,11 +82,20 @@ def create_consumer(client_id)
consumerBuilder.subscriptionType(subscriptionType::Exclusive)
elsif @subscription_type == "Failover"
consumerBuilder.subscriptionType(subscriptionType::Failover)
elsif @subscription_type == "Key_Shared"
consumerBuilder.subscriptionType(subscriptionType::Key_Shared)
else
consumerBuilder.subscriptionType(subscriptionType::Shared)
end

consumer = consumerBuilder.subscriptionName(@group_id)
subscriptionInitialPositionType = org.apache.pulsar.client.api.SubscriptionInitialPosition
if @subscription_initial_position == "Latest"
consumerBuilder.subscriptionInitialPosition(subscriptionInitialPositionType::Latest)
else
consumerBuilder.subscriptionInitialPosition(subscriptionInitialPositionType::Earliest)
end

consumer = consumerBuilder.subscriptionName(@subscription_name)
.consumerName(client_id)
.subscribe();

Expand Down
18 changes: 9 additions & 9 deletions logstash-input-pulsar.gemspec
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
Gem::Specification.new do |s|
s.name = 'logstash-input-pulsar'
s.version = '2.3.0'
s.licenses = ['Apache License (2.0)']
s.version = '2.11.0.3'
s.licenses = ['Apache-2.0']
s.summary = 'This input will read events from a pulsar topic.'
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
s.authors = ['ShiyiLiang']
s.email = 'test'
s.homepage = "https://github.com/se7enkings"
s.authors = ['ShiyiLiang', 'Tianyue Ren']
s.email = '[email protected]'
s.homepage = "https://github.com/NiuBlibing/logstash-input-pulsar"
s.require_paths = ['lib']

# Files
Expand All @@ -18,11 +18,11 @@ Gem::Specification.new do |s|
# Special flag to let us know this is actually a logstash plugin
s.metadata = { 'logstash_plugin' => 'true', 'group' => 'input'}

s.requirements << "jar 'org.apache.pulsar:pulsar-client', '2.3.0'"
s.requirements << "jar 'org.slf4j:slf4j-log4j12', '1.7.21'"
s.requirements << "jar 'org.apache.logging.log4j:log4j-1.2-api', '2.6.2'"
s.requirements << "jar 'org.apache.pulsar:pulsar-client', '2.11.0'"
s.requirements << "jar 'org.slf4j:slf4j-log4j12', '1.7.32'"
s.requirements << "jar 'org.apache.logging.log4j:log4j-1.2-api', '2.18.0'"

s.add_development_dependency 'jar-dependencies', '~> 0.3.2'
s.add_development_dependency 'jar-dependencies', '~> 0.4.1'

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
Expand Down