Skip to content

Commit 1d4636b

Browse files
authored
Add readiness probe to await start (#16)
* Attempt to add rediness probe. * Fix probe. * Factor out readiness check.
1 parent f9657af commit 1d4636b

File tree

5 files changed

+46
-9
lines changed

5 files changed

+46
-9
lines changed

examples/slack/slack_operator_mode.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,6 @@
5555
# Wait for sink to reach running state.
5656
ray.get(sink)
5757

58-
time.sleep(15)
59-
6058
# Sends message to all sinks attached to this stream.
6159
stream << f'Sending message to Slack sink in run mode {run_mode}.'
6260

rayvens/core/common.py

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import time
2020
import threading
2121
import os
22+
import json
2223
from pathlib import Path
2324
from confluent_kafka import Consumer, Producer
2425
from rayvens.core import kamel
@@ -37,21 +38,51 @@ def get_run_mode(camel_mode):
3738
return mode
3839

3940

41+
def _wait_for_ready_integration(mode, integration):
42+
server_address = mode.server_address(integration)
43+
health_check_address = f"{server_address}/q/health"
44+
while True:
45+
response = requests.get(health_check_address, timeout=(5, None))
46+
json_response = json.loads(response.content)
47+
all_routes_are_up = True
48+
for check in json_response['checks']:
49+
if check['name'] == 'camel-readiness-checks' and check[
50+
'status'] == 'UP':
51+
data = check['data']
52+
if data['context'] == 'UP':
53+
# Ensure all routes are up.
54+
route_index = 1
55+
route = f'route:route{route_index}'
56+
while route in data:
57+
if data[route] != 'UP':
58+
all_routes_are_up = False
59+
break
60+
route_index += 1
61+
route = f'route:route{route_index}'
62+
if all_routes_are_up:
63+
break
64+
time.sleep(1)
65+
66+
4067
# Wait for an integration to reach its running state and not only that but
4168
# also be in a state where it can immediately execute incoming requests.
42-
def await_start(mode, integration_name):
69+
def await_start(mode, integration):
4370
# Only needed when operator is used.
4471
if mode.is_local():
4572
return True
4673

4774
# Check logs of the integration to make sure it was installed properly.
48-
invocation = kamel.log(mode, integration_name, "Installed features:")
75+
invocation = kamel.log(mode, integration.integration_name,
76+
"Installed features:")
4977
integration_is_running = invocation is not None
5078
if integration_is_running:
51-
print(f'Integration {integration_name} is running.')
79+
print(f'Integration {integration.integration_name} is running.')
5280
else:
5381
print('Integration did not start correctly.')
5482

83+
# Perform health check and wait for integration to be ready.
84+
_wait_for_ready_integration(mode, integration)
85+
5586
return integration_is_running
5687

5788

rayvens/core/kamel.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ def run(integration_content,
128128
command.append("-n")
129129
command.append(mode.namespace)
130130

131+
# Include health check:
132+
command.append("-d")
133+
command.append("camel:camel-quarkus-microprofile-health")
134+
131135
for envVar in envVars:
132136
if envVar not in os.environ:
133137
raise RuntimeError("Variable %s not set in current environment" %
@@ -193,6 +197,10 @@ def local_run(integration_content,
193197
command.append('--property')
194198
command.append(f'quarkus.http.port={port}')
195199

200+
# Include health check:
201+
command.append("-d")
202+
command.append("camel:camel-quarkus-microprofile-health")
203+
196204
return kamel_utils.invoke_kamel_command(
197205
command,
198206
mode,

rayvens/core/operator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def add_source(self, stream, source, source_name):
4949
# Set up source for the HTTP connector case.
5050
send_to(stream.actor, self.mode.server_address(integration), route)
5151

52-
if not await_start(self.mode, integration.integration_name):
52+
if not await_start(self.mode, integration):
5353
raise RuntimeError('Could not start source')
5454
return integration
5555

@@ -71,7 +71,7 @@ def add_sink(self, stream, sink, sink_name):
7171
stream.actor.send_to.remote(helper, sink_name)
7272

7373
# Wait for integration to finish.
74-
if not await_start(self.mode, integration.integration_name):
74+
if not await_start(self.mode, integration):
7575
raise RuntimeError('Could not start sink')
7676

7777
return integration

rayvens/core/ray_serve.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def add_source(self, stream, source, source_name):
7474
# Start running the source integration.
7575
integration.invoke_run(self.mode, integration_content)
7676

77-
if not await_start(self.mode, integration.integration_name):
77+
if not await_start(self.mode, integration):
7878
raise RuntimeError('Could not start source')
7979
return integration
8080

@@ -111,7 +111,7 @@ def add_sink(self, stream, sink, sink_name):
111111
stream.actor.send_to.remote(helper, sink_name)
112112

113113
# Wait for integration to finish.
114-
if not await_start(self.mode, integration_name):
114+
if not await_start(self.mode, integration):
115115
raise RuntimeError('Could not start sink')
116116

117117
return integration

0 commit comments

Comments
 (0)