Skip to content

Commit 9822337

Browse files
authored
Merge pull request #522 from agrare/pure_ruby_event_catcher
Add a pure-ruby event catcher for VMware
2 parents def83fb + d2adf16 commit 9822337

File tree

3 files changed

+291
-0
lines changed

3 files changed

+291
-0
lines changed
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
require_relative "event_parser"
2+
3+
class EventCatcher
4+
def initialize(ems, endpoint, authentication, messaging, logger, page_size = 20)
5+
@ems = ems
6+
@endpoint = endpoint
7+
@authentication = authentication
8+
@logger = logger
9+
@messaging = messaging
10+
@page_size = page_size
11+
end
12+
13+
def run!
14+
vim = connect
15+
event_history_collector = create_event_history_collector(vim, page_size)
16+
property_filter = create_property_filter(vim, event_history_collector)
17+
18+
notify_started
19+
20+
logger.info("Collecting events...")
21+
22+
wait_for_updates(vim) do |property_change|
23+
logger.info(property_change.name)
24+
next unless property_change.name.match?(/latestPage.*/)
25+
26+
events = Array(property_change.val).map do |event|
27+
EventParser.parse_event(event).merge(:ems_id => ems["id"])
28+
end
29+
30+
logger.info(events.to_json)
31+
32+
publish_events(events)
33+
end
34+
rescue Interrupt
35+
# Catch SIGINT
36+
ensure
37+
notify_stopping
38+
property_filter&.DestroyPropertyFilter
39+
event_history_collector&.DestroyCollector
40+
vim&.close
41+
end
42+
43+
def stop!
44+
end
45+
46+
private
47+
48+
attr_reader :ems, :endpoint, :authentication, :logger, :messaging, :page_size
49+
50+
def connect
51+
vim_opts = {
52+
:ns => 'urn:vim25',
53+
:ssl => true,
54+
:host => endpoint["hostname"],
55+
:port => endpoint["port"] || 443,
56+
:insecure => endpoint["verify_ssl"] == OpenSSL::SSL::VERIFY_NONE,
57+
:path => '/sdk',
58+
:rev => '7.0',
59+
}
60+
61+
RbVmomi::VIM.new(vim_opts).tap do |vim|
62+
vim.rev = vim.serviceContent.about.apiVersion
63+
vim.serviceContent.sessionManager.Login(
64+
:userName => authentication["userid"],
65+
:password => authentication["password"]
66+
)
67+
end
68+
end
69+
70+
def create_event_history_collector(vim, page_size)
71+
filter = RbVmomi::VIM.EventFilterSpec()
72+
73+
event_manager = vim.serviceContent.eventManager
74+
event_manager.CreateCollectorForEvents(:filter => filter).tap do |c|
75+
c.SetCollectorPageSize(:maxCount => page_size)
76+
end
77+
end
78+
79+
def create_property_filter(vim, event_history_collector)
80+
vim.propertyCollector.CreateFilter(
81+
:spec => RbVmomi::VIM.PropertyFilterSpec(
82+
:objectSet => [
83+
RbVmomi::VIM.ObjectSpec(
84+
:obj => event_history_collector
85+
)
86+
],
87+
:propSet => [
88+
RbVmomi::VIM.PropertySpec(
89+
:type => event_history_collector.class.wsdl_name,
90+
:all => false,
91+
:pathSet => ["latestPage"]
92+
)
93+
]
94+
),
95+
:partialUpdates => true
96+
)
97+
end
98+
99+
def wait_for_updates(vim, &block)
100+
version = nil
101+
options = RbVmomi::VIM.WaitOptions(:maxWaitSeconds => 60)
102+
103+
loop do
104+
update_set = vim.propertyCollector.WaitForUpdatesEx(:version => version, :options => options)
105+
heartbeat
106+
next if update_set.nil?
107+
108+
version = update_set.version
109+
110+
Array(update_set.filterSet).each do |property_filter_update|
111+
Array(property_filter_update.objectSet).each do |object_update|
112+
next unless object_update.kind == "modify"
113+
114+
Array(object_update.changeSet).each(&block)
115+
end
116+
end
117+
end
118+
end
119+
120+
def publish_events(events)
121+
events.each do |event|
122+
messaging_client.publish_topic(
123+
:service => "manageiq.ems",
124+
:sender => ems["id"],
125+
:event => event[:event_type],
126+
:payload => event
127+
)
128+
end
129+
end
130+
131+
def messaging_client
132+
@messaging_client ||= ManageIQ::Messaging::Client.open(
133+
messaging.merge(:client_ref => "vmware-event-catcher-#{ems["id"]}")
134+
)
135+
end
136+
137+
def notify_started
138+
SdNotify.ready if ENV.fetch("NOTIFY_SOCKET", nil)
139+
end
140+
141+
def heartbeat
142+
SdNotify.watchdog if ENV.fetch("NOTIFY_SOCKET", nil)
143+
end
144+
145+
def notify_stopping
146+
SdNotify.stopping if ENV.fetch("NOTIFY_SOCKET", nil)
147+
end
148+
end
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
class EventParser
2+
def self.parse_event(event)
3+
event_type = event.class.wsdl_name
4+
is_task = event_type == "TaskEvent"
5+
6+
if is_task
7+
sub_event_type = event.info&.name
8+
9+
# Handle special cases
10+
case sub_event_type
11+
when nil
12+
# Handle cases wehre event name is missing
13+
sub_event_type = "PowerOnVM_Task" if event.fullFormattedMessage.to_s.downcase == "task: power on virtual machine"
14+
sub_event_type ||= "DrsMigrateVM_Task" if event.info&.descriptionId == "Drm.ExecuteVMotionLRO"
15+
if sub_event_type.nil?
16+
sub_event_type = "TaskEvent"
17+
end
18+
when "Rename_Task", "Destroy_Task"
19+
# Handle case where event name is overloaded
20+
sub_event_name = event.info.descriptionId.split(".").first
21+
sub_event_name = "VM" if sub_event_name == "VirtualMachine"
22+
sub_event_name = "Cluster" if sub_event_name == "ClusterComputeResource"
23+
sub_event_type.gsub!(/_/, "#{sub_event_name}_")
24+
end
25+
26+
event_type = sub_event_type
27+
elsif event_type == "EventEx"
28+
sub_event_type = event.eventTypeId
29+
event_type = sub_event_type if sub_event_type.present?
30+
end
31+
32+
result = {
33+
:event_type => event_type,
34+
:chain_id => event.chainId,
35+
:is_task => is_task,
36+
:source => "VC",
37+
:message => event.fullFormattedMessage,
38+
:timestamp => event.createdTime,
39+
:full_data => event.props
40+
}
41+
42+
result[:username] = event.userName if event.userName.present?
43+
44+
# Get the vm information
45+
vm_key = "vm" if event.props.key?("vm")
46+
vm_key = "sourceVm" if event.props.key?("sourceVm")
47+
vm_key = "srcTemplate" if event.props.key?("srcTemplate")
48+
if vm_key
49+
vm_data = event.send(vm_key)
50+
51+
result[:vm_ems_ref] = vm_data&.vm if vm_data&.vm
52+
result[:vm_name] = CGI.unescape(vm_data&.name) if vm_data&.name
53+
result[:vm_location] = vm_data&.path if vm_data&.path
54+
result[:vm_uid_ems] = vm_data&.uuid if vm_data&.uuid
55+
56+
result
57+
end
58+
59+
# Get the dest vm information
60+
has_dest = false
61+
if %w[sourceVm srcTemplate].include?(vm_key)
62+
vm_data = event.vm
63+
if vm_data
64+
result[:dest_vm_ems_ref] = vm_data&.vm if vm_data&.vm
65+
result[:dest_vm_name] = CGI.unescape(vm_data&.name) if vm_data&.name
66+
result[:dest_vm_location] = vm_data&.path if vm_data&.path
67+
end
68+
69+
has_dest = true
70+
elsif event.props.key?("destName")
71+
result[:dest_vm_name] = event.destName
72+
has_dest = true
73+
end
74+
75+
if event.props.key?(:host)
76+
result[:host_name] = event.host.name
77+
result[:host_ems_ref] = event.host.host
78+
end
79+
80+
if has_dest
81+
host_data = event.props["destHost"] || event.props["host"]
82+
if host_data
83+
result[:dest_host_ems_ref] = host_data["host"]
84+
result[:dest_host_name] = host_data["name"]
85+
end
86+
end
87+
88+
result
89+
end
90+
end

workers/event_catcher/worker

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
#!/usr/bin/env ruby
2+
3+
require "bundler/inline"
4+
5+
gemfile(false) do
6+
source "https://rubygems.org"
7+
8+
gem "manageiq-loggers", "~>1.0"
9+
gem "manageiq-messaging", "~> 1.0"
10+
gem "rbvmomi2", "~> 3.3"
11+
if ENV.fetch("APPLIANCE", nil)
12+
gem "sd_notify"
13+
gem "systemd-journal"
14+
end
15+
gem "json"
16+
end
17+
18+
require_relative "event_catcher"
19+
20+
def setproctitle
21+
proc_title = "MIQ: Vmware::InfraManager::EventCatcher guid: #{ENV.fetch("GUID", nil)}"
22+
Process.setproctitle(proc_title)
23+
end
24+
25+
def build_logger
26+
if ENV.fetch("CONTAINER", nil)
27+
ManageIQ::Loggers::Container.new
28+
elsif ENV.fetch("APPLIANCE", nil)
29+
ManageIQ::Loggers::Journald.new
30+
else
31+
ManageIQ::Loggers::Base.new($stdout)
32+
end
33+
end
34+
35+
def main(args)
36+
setproctitle
37+
38+
logger = build_logger
39+
ems = args["ems"].detect { |e| e["type"] == "ManageIQ::Providers::Vmware::InfraManager" }
40+
messaging = args["messaging"].symbolize_keys
41+
endpoint = ems["endpoints"].detect { |ep| ep["role"] == "default" }
42+
authentication = ems["authentications"].detect { |auth| auth["authtype"] == "default" }
43+
44+
EventCatcher.new(ems, endpoint, authentication, messaging, logger).run!
45+
end
46+
47+
def parse_args
48+
JSON.parse($stdin.read)
49+
rescue JSON::ParserError
50+
abort("Invalid argument format")
51+
end
52+
53+
main(parse_args)

0 commit comments

Comments
 (0)