Skip to content

Commit 67bd611

Browse files
committed
Groundwork for pull provider
1 parent cdc55cd commit 67bd611

File tree

6 files changed

+195
-0
lines changed

6 files changed

+195
-0
lines changed

lib/smart_proxy_remote_execution_ssh.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ def validate_ssh_log_level!
5151

5252
Plugin.settings.ssh_log_level = Plugin.settings.ssh_log_level.to_sym
5353
end
54+
55+
def job_storage
56+
@job_storage ||= Proxy::MemoryStore.new
57+
end
5458
end
5559
end
5660
end
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
require 'mqtt'
2+
require 'json'
3+
4+
module Proxy::RemoteExecution::Ssh
5+
class PullScript < Proxy::Dynflow::Action::Runner
6+
JobDelivered = Class.new
7+
8+
execution_plan_hooks.use :cleanup, :on => :stopped
9+
10+
def plan(action_input, mqtt: false)
11+
super(action_input)
12+
input[:with_mqtt] = mqtt
13+
end
14+
15+
def run(event = nil)
16+
if event == JobDelivered
17+
output[:state] = :delivered
18+
suspend
19+
else
20+
super
21+
end
22+
end
23+
24+
def init_run
25+
Proxy::RemoteExecution::Ssh::Plugin.job_storage["#{input[:hostname]}-#{execution_plan_id}", run_step_id, 'script.sh'] = input[:script]
26+
output[:state] = :ready_for_pickup
27+
mqtt_start if input[:with_mqtt]
28+
suspend
29+
end
30+
31+
def cleanup(_plan = nil)
32+
Proxy::RemoteExecution::Ssh::Plugin.job_storage.delete("#{input[:hostname]}-#{execution_plan_id}")
33+
end
34+
35+
def process_external_event(event)
36+
output[:state] = :running
37+
data = event.data
38+
continuous_output = Proxy::Dynflow::ContinuousOutput.new
39+
continuous_output.add_output(lines, 'stdout') if data.key?('output')
40+
exit_code = data['exit_code'].to_i if data['exit_code']
41+
process_update(Proxy::Dynflow::Runner::Update.new(continuous_output, exit_code))
42+
end
43+
44+
def kill_run
45+
case output[:state]
46+
when :ready_for_pickup
47+
# If the job is not running yet on the client, wipe it from storage
48+
cleanup
49+
# TODO: Stop the action
50+
when :notified, :running
51+
# Client was notified or is already running, dealing with this situation
52+
# is only supported if mqtt is available
53+
# Otherwise we have to wait it out
54+
if input[:with_mqtt]
55+
cleanup
56+
payload = {} # TODO
57+
mqtt_notify payload
58+
end
59+
end
60+
suspend
61+
end
62+
63+
def mqtt_start
64+
payload = {
65+
type: 'data',
66+
message_id: SecureRandom.uuid,
67+
version: 1,
68+
sent: DateTime.now.iso8601,
69+
directive: 'foreman',
70+
content: "#{input[:proxy_url]}/job/store/#{execution_plan_id}/#{run_step_id}/script.sh",
71+
}
72+
mqtt_notify payload
73+
output[:state] = :notified
74+
end
75+
76+
def mqtt_notify(payload)
77+
broker = 'localhost' # TODO
78+
broker_port = 1883 # TODO
79+
MQTT::Client.connect(broker, broker_port) do |c|
80+
c.publish("yggdrasil/#{input[:hostname]}/data/in", JSON.dump(payload), false, 1)
81+
end
82+
end
83+
end
84+
end

lib/smart_proxy_remote_execution_ssh/api.rb

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
require 'net/ssh'
22
require 'base64'
3+
require 'smart_proxy_dynflow/runner'
34

45
module Proxy::RemoteExecution
56
module Ssh
@@ -37,6 +38,43 @@ class Api < ::Sinatra::Base
3738
end
3839
204
3940
end
41+
42+
# Payload is a hash where
43+
# exit_code: Integer | NilClass
44+
# output: String
45+
post '/job/:task_id/:step_id/update' do |task_id, step_id|
46+
do_authorize_with_ssl_client
47+
48+
path = job_path(https_cert_cn, task_id, nil, nil).first
49+
if Proxy::RemoteExecution::Ssh.job_storage[path].nil?
50+
status 404
51+
return ''
52+
end
53+
54+
data = MultiJson.load(request.body.read)
55+
world.event(task_id, step_id, ::Proxy::Dynflow::Runner::ExternalEvent.new(data))
56+
end
57+
58+
get "/job/store/:task_id/:step_id/:file" do |task_id, step_id, file|
59+
do_authorize_with_ssl_client
60+
61+
path = job_path(https_cert_cn, task_id, step_id.to_i, file)
62+
content = Proxy::RemoteExecution::Ssh.job_storage[*path]
63+
if content
64+
world.event(task_id, step_id.to_i, Proxy::RemoteExecution::Ssh::PullScript::JobDelivered)
65+
return content
66+
end
67+
68+
status 404
69+
''
70+
end
71+
72+
def job_path(hostname, task_id, step_id, file)
73+
["#{hostname}-#{task_id}",
74+
step_id,
75+
file,
76+
]
77+
end
4078
end
4179
end
4280
end

lib/smart_proxy_remote_execution_ssh/plugin.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ class Plugin < Proxy::Plugin
2424
require 'smart_proxy_remote_execution_ssh/cockpit'
2525
require 'smart_proxy_remote_execution_ssh/api'
2626
require 'smart_proxy_remote_execution_ssh/actions/run_script'
27+
require 'smart_proxy_remote_execution_ssh/actions/pull_script'
2728
require 'smart_proxy_remote_execution_ssh/dispatcher'
2829
require 'smart_proxy_remote_execution_ssh/log_filter'
2930
require 'smart_proxy_remote_execution_ssh/runners'

smart_proxy_remote_execution_ssh.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,5 @@ Gem::Specification.new do |gem|
3131

3232
gem.add_runtime_dependency('smart_proxy_dynflow', '~> 0.5')
3333
gem.add_runtime_dependency('net-ssh', '>= 4.2.0')
34+
gem.add_runtime_dependency('mqtt')
3435
end

test/api_test.rb

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
require 'test_helper'
22
require 'tempfile'
3+
require 'smart_proxy_remote_execution_ssh/actions/pull_script'
34

45
KNOWN_HOSTS = <<EOF
56
c7s62.lxc,192.168.122.200 ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBFAMEcFeBHeY8AD7xw2weF6vIE0BZXBk0oOm7sM+iJ4ld7BvQDf0mF6EeyyjzDmMUTyR2q9q0OdYiTbyEiKHQF4=
@@ -78,5 +79,71 @@ def with_known_hosts
7879
end
7980
end
8081
end
82+
83+
describe '/job/update' do
84+
let(:hostname) { 'myhost.example.com' }
85+
86+
before { Proxy::RemoteExecution::Ssh.job_storage["#{hostname}-12345", 1, 'message'] = 'hello' }
87+
after { Proxy::RemoteExecution::Ssh.job_storage.delete("#{hostname}-12345") }
88+
89+
it 'returns 403 if HTTPS is used and no cert is provided' do
90+
post '/job/12345/1/update', {}, 'HTTPS' => 1
91+
_(last_response.status).must_equal 403
92+
end
93+
94+
it 'returns 404 if job does not exist' do
95+
Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).returns(hostname)
96+
post '/job/12346/1/update'
97+
_(last_response.status).must_equal 404
98+
end
99+
100+
it 'dispatches an event' do
101+
Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).returns(hostname)
102+
fake_world = mock
103+
fake_world.expects(:event) do |task_id, step_id, _payload|
104+
task_id == '12345' && step_id == 1
105+
end
106+
Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:world).returns(fake_world)
107+
108+
post '/job/12345/1/update', '{}'
109+
_(last_response.status).must_equal 200
110+
end
111+
end
112+
113+
describe '/job/store' do
114+
let(:hostname) { 'myhost.example.com' }
115+
116+
before { Proxy::RemoteExecution::Ssh.job_storage["#{hostname}-12345", 1, 'message'] = 'hello' }
117+
after { Proxy::RemoteExecution::Ssh.job_storage.delete("#{hostname}-12345") }
118+
119+
it 'returns 403 if HTTPS is used and no cert is provided' do
120+
get '/job/store/12345/1/message', {}, 'HTTPS' => 1
121+
_(last_response.status).must_equal 403
122+
end
123+
124+
it 'returns content if there is some and notifies the action' do
125+
Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).returns(hostname)
126+
fake_world = mock
127+
fake_world.expects(:event).with('12345', 1, Proxy::RemoteExecution::Ssh::PullScript::JobDelivered)
128+
Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:world).returns(fake_world)
129+
130+
get '/job/store/12345/1/message'
131+
_(last_response.status).must_equal 200
132+
_(last_response.body).must_equal 'hello'
133+
end
134+
135+
it 'returns 404 if there is no content' do
136+
Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).times(3).returns(hostname)
137+
138+
get '/job/store/12345/1/something.tar.gz'
139+
_(last_response.status).must_equal 404
140+
141+
get '/job/store/12345/2/something.tar.gz'
142+
_(last_response.status).must_equal 404
143+
144+
get '/job/store/12346/2/something.tar.gz'
145+
_(last_response.status).must_equal 404
146+
end
147+
end
81148
end
82149
end

0 commit comments

Comments
 (0)