|
3 | 3 | # Released under the MIT License. |
4 | 4 | # Copyright, 2019-2024, by Samuel Williams. |
5 | 5 |
|
6 | | -require "process/metrics" |
7 | | -require "json" |
8 | | - |
9 | | -require "async/service/generic" |
10 | | - |
11 | | -require "io/endpoint/bound_endpoint" |
12 | | -require "io/stream" |
| 6 | +require "async/container/supervisor/service" |
13 | 7 |
|
14 | 8 | module Falcon |
15 | 9 | module Service |
16 | 10 | # Implements a host supervisor which can restart the host services and provide various metrics about the running processes. |
17 | | - class Supervisor < Async::Service::Generic |
18 | | - # Initialize the supervisor using the given environment. |
19 | | - # @parameter environment [Build::Environment] |
20 | | - def initialize(...) |
21 | | - super |
22 | | - |
23 | | - @bound_endpoint = nil |
24 | | - end |
25 | | - |
26 | | - # The endpoint which the supervisor will bind to. |
27 | | - # Typically a unix pipe in the same directory as the host. |
28 | | - def endpoint |
29 | | - @evaluator.endpoint |
30 | | - end |
31 | | - |
32 | | - # Restart the process group that the supervisor belongs to. |
33 | | - def do_restart(message) |
34 | | - # Tell the parent of this process group to spin up a new process group/container. |
35 | | - # Wait for that to start accepting new connections. |
36 | | - # Stop accepting connections. |
37 | | - # Wait for existing connnections to drain. |
38 | | - # Terminate this process group. |
39 | | - signal = message[:signal] || :INT |
40 | | - |
41 | | - Process.kill(signal, Process.ppid) |
42 | | - end |
43 | | - |
44 | | - # Capture process metrics relating to the process group that the supervisor belongs to. |
45 | | - def do_metrics(message) |
46 | | - Process::Metrics::General.capture(pid: Process.ppid, ppid: Process.ppid) |
47 | | - end |
48 | | - |
49 | | - # Handle an incoming request. |
50 | | - # @parameter message [Hash] The decoded message. |
51 | | - def handle(message) |
52 | | - case message[:please] |
53 | | - when "restart" |
54 | | - self.do_restart(message) |
55 | | - when "metrics" |
56 | | - self.do_metrics(message) |
57 | | - end |
58 | | - end |
59 | | - |
60 | | - # Bind the supervisor to the specified endpoint. |
61 | | - def start |
62 | | - Console.logger.info(self) {"Binding to #{self.endpoint}..."} |
63 | | - |
64 | | - @bound_endpoint = Sync{self.endpoint.bound} |
65 | | - |
66 | | - super |
67 | | - end |
68 | | - |
69 | | - # Start the supervisor process which accepts connections from the bound endpoint and processes JSON formatted messages. |
70 | | - # @parameter container [Async::Container::Generic] |
71 | | - def setup(container) |
72 | | - container_options = @evaluator.container_options |
73 | | - health_check_timeout = container_options[:health_check_timeout] |
74 | | - |
75 | | - container.run(name: self.name, **container_options) do |instance| |
76 | | - Async do |
77 | | - @bound_endpoint.accept do |peer| |
78 | | - stream = ::IO::Stream(peer) |
79 | | - |
80 | | - while message = stream.read_until("\0") |
81 | | - response = handle(JSON.parse(message, symbolize_names: true)) |
82 | | - stream.puts(response.to_json, separator: "\0") |
83 | | - end |
84 | | - end |
85 | | - |
86 | | - instance.ready! |
87 | | - |
88 | | - if health_check_timeout |
89 | | - Async(transient: true) do |
90 | | - while true |
91 | | - sleep(health_check_timeout / 2) |
92 | | - instance.ready! |
93 | | - end |
94 | | - end |
95 | | - end |
96 | | - end |
97 | | - end |
98 | | - |
99 | | - super |
100 | | - end |
101 | | - |
102 | | - # Release the bound endpoint. |
103 | | - def stop |
104 | | - @bound_endpoint&.close |
105 | | - @bound_endpoint = nil |
106 | | - |
107 | | - super |
108 | | - end |
109 | | - |
110 | | - def invoke(command) |
111 | | - @bound_endpoint.local_address_endpoint.connect do |peer| |
112 | | - stream = ::IO::Stream(peer) |
113 | | - |
114 | | - stream.puts(command.to_json, separator: "\0") |
| 11 | + class Supervisor < Async::Container::Supervisor::Service |
| 12 | + def invoke(message) |
| 13 | + Sync do |
| 14 | + client = Async::Container::Supervisor::Client.new(endpoint: self.endpoint) |
115 | 15 |
|
116 | | - response = JSON.parse(stream.read_until("\0"), symbolize_names: true) |
117 | | - |
118 | | - return response |
| 16 | + client.connect do |connection| |
| 17 | + connection.call(**message) |
| 18 | + end |
119 | 19 | end |
120 | 20 | end |
121 | 21 | end |
|
0 commit comments