Skip to content

Commit 09ee013

Browse files
committed
Publish events to kafka
1 parent 45e0916 commit 09ee013

File tree

2 files changed

+45
-15
lines changed

2 files changed

+45
-15
lines changed

workers/event_catcher/Gemfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
source "https://rubygems.org"
22

3+
gem "manageiq-messaging", "~> 0.1"
34
gem "manageiq-password", "~> 0.3"
45
gem "optimist", "~> 3.0"
56
gem "rbvmomi", "~> 2.0"

workers/event_catcher/event_catcher.rb

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
#!/usr/bin/env ruby
22

3+
require "manageiq-messaging"
34
require "manageiq-password"
5+
require "pathname"
46

57
class EventCatcher
6-
def initialize(ems_id, hostname, username, password, port, page_size = 20)
7-
@ems_id = ems_id
8-
@hostname = hostname
9-
@username = username
10-
@password = ManageIQ::Password.try_decrypt(password)
11-
@port = port
12-
@page_size = page_size
8+
def initialize(ems_id, hostname, username, password, port, messaging_host, messaging_port, page_size = 20)
9+
@ems_id = ems_id
10+
@hostname = hostname
11+
@username = username
12+
@password = ManageIQ::Password.try_decrypt(password)
13+
@port = port
14+
@messaging_host = messaging_host
15+
@messaging_port = messaging_port
16+
@page_size = page_size
1317
end
1418

1519
def run!
@@ -21,7 +25,7 @@ def run!
2125
next unless property_change.name =~ /latestPage.*/
2226

2327
events = Array(property_change.val).map { |event| parse_event(event) }
24-
puts events
28+
publish_events(events)
2529
end
2630
ensure
2731
property_filter&.DestroyPropertyFilter
@@ -31,7 +35,7 @@ def run!
3135

3236
private
3337

34-
attr_reader :ems_id, :hostname, :password, :port, :page_size, :username
38+
attr_reader :ems_id, :hostname, :messaging_host, :messaging_port, :password, :port, :page_size, :username
3539

3640
def connect
3741
vim_opts = {
@@ -118,24 +122,49 @@ def parse_event(event)
118122

119123
result
120124
end
125+
126+
def publish_events(events)
127+
events.each do |event|
128+
messaging_client.publish_topic(
129+
:service => "manageiq.ems-events",
130+
:sender => ems_id,
131+
:event => event[:event_type],
132+
:payload => event
133+
)
134+
end
135+
end
136+
137+
def messaging_client
138+
@messaging_client ||= begin
139+
ManageIQ::Messaging::Client.open(
140+
:host => messaging_host,
141+
:port => messaging_port,
142+
:protocol => :Kafka,
143+
:encoding => "json",
144+
:client_ref => "vmware-event-catcher-#{ems_id}"
145+
)
146+
end
147+
end
121148
end
122149

123150
def main(args)
124151
ManageIQ::Password.key_root = Pathname.new(ENV["APP_ROOT"]).join("certs")
125152

126-
event_catcher = EventCatcher.new(*args.values_at(:ems_id, :hostname, :username, :password, :port))
153+
event_catcher = EventCatcher.new(*args.values_at(:ems_id, :hostname, :username, :password, :port, :messaging_host, :messaging_port))
127154
event_catcher.run!
128155
end
129156

130157
def parse_args
131158
require "optimist"
132159

133160
Optimist.options do
134-
opt :ems_id, "EMS ID", :type => :int, :default => ENV["EMS_ID"].to_i, :required => ENV["EMS_ID"].nil?
135-
opt :hostname, "Hostname", :type => :string, :default => ENV["HOSTNAME"], :required => ENV["HOSTNAME"].nil?
136-
opt :username, "Username", :type => :string, :default => ENV["USERNAME"], :required => ENV["USERNAME"].nil?
137-
opt :password, "Password", :type => :string, :default => ENV["PASSWORD"], :required => ENV["PASSWORD"].nil?
138-
opt :port, "Port", :type => :int, :default => (ENV["PORT"] || 443).to_i
161+
opt :ems_id, "EMS ID", :type => :int, :default => ENV["EMS_ID"]&.to_i, :required => ENV["EMS_ID"].nil?
162+
opt :hostname, "Hostname", :type => :string, :default => ENV["HOSTNAME"], :required => ENV["HOSTNAME"].nil?
163+
opt :username, "Username", :type => :string, :default => ENV["USERNAME"], :required => ENV["USERNAME"].nil?
164+
opt :password, "Password", :type => :string, :default => ENV["PASSWORD"], :required => ENV["PASSWORD"].nil?
165+
opt :messaging_host, "Messaging Host", :type => :string, :default => ENV["MESSAGING_HOST"], :required => ENV["MESSAGING_HOST"].nil?
166+
opt :messaging_port, "Messaging Port", :type => :int, :default => ENV["MESSAGING_PORT"]&.to_i, :required => ENV["MESSAGING_PORT"].nil?
167+
opt :port, "Port", :type => :int, :default => (ENV["PORT"] || 443).to_i
139168
end
140169
end
141170

0 commit comments

Comments
 (0)