Skip to content

Commit 1fbbf70

Browse files
authored
Enable non-blocking logging reads of kamel command output (#36)
* Fix logging of kamel commands. * Clean-up. * Remove inner timeout. * Refresh timeout when actual input is received. * Limit non-blocking log reads for timeout only.
1 parent 5fb0f69 commit 1fbbf70

File tree

2 files changed

+70
-17
lines changed

2 files changed

+70
-17
lines changed

rayvens/core/invocation.py

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -152,31 +152,56 @@ def _check_kamel_output(self,
152152
end_condition,
153153
with_output=False,
154154
with_timeout=False):
155-
# Implicit 30s timer.
156-
countdown = None
155+
# Implicit 2 minute timout in hundredths of a second:
156+
timout_duration = 2 * 60 * 100
157157
if with_timeout:
158-
countdown = 30
158+
countdown = timout_duration
159+
reading_thread = utils.LogThread(self.process.stdout)
160+
161+
# Kill thread when program ends in case it does not end before
162+
# that.
163+
reading_thread.daemon = True
164+
165+
# Start thread analyzing logs:
166+
reading_thread.start()
167+
168+
success = False
159169
while True:
160170
# Log progress of kamel subprocess:
161-
output = utils.print_log_from_subprocess(self.subprocess_name,
162-
self.process.stdout,
163-
with_output=with_output)
171+
if with_timeout:
172+
output = utils.print_log_from_queue(self.subprocess_name,
173+
reading_thread.queue,
174+
with_output)
175+
else:
176+
output = utils.print_log_from_subprocess(
177+
self.subprocess_name, self.process.stdout, with_output)
164178

165179
# Use the Kamel output to decide when Kamel instance is
166180
# ready to receive requests.
167-
if end_condition in output:
168-
return True
181+
if output is not None:
182+
if with_timeout:
183+
countdown = timout_duration
184+
if end_condition in output:
185+
success = True
186+
break
169187

170188
# Check process has not exited prematurely.
171189
if self.process.poll() is not None:
172190
break
191+
192+
# If timeout is enabled we decrement countdown.
173193
if with_timeout:
174194
countdown -= 1
175195
if countdown == 0:
176-
return False
177-
time.sleep(1)
196+
break
197+
time.sleep(0.01)
178198

179-
return False
199+
# Terminate log thread:
200+
if with_timeout:
201+
reading_thread.stop_flag.set()
202+
reading_thread.join()
203+
204+
return success
180205

181206

182207
#
@@ -240,8 +265,7 @@ def pod_is_running(self, integration_name):
240265
# for.
241266
# There should only be one new pod.
242267
output = utils.print_log_from_subprocess(self.subprocess_name,
243-
self.process.stdout,
244-
with_output=True)
268+
self.process.stdout, True)
245269
if self.pod_name == "":
246270
self.pod_name = kubernetes_utils.extract_pod_name(
247271
output, integration_name)
@@ -277,7 +301,7 @@ def _check_kubectl_output(self,
277301
while True:
278302
output = utils.print_log_from_subprocess(self.subprocess_name,
279303
self.process.stdout,
280-
with_output=with_output)
304+
with_output)
281305
if self.subcommand_type == \
282306
kubernetes_utils.KubectlCommand.GET_SERVICES:
283307
if kubernetes_utils.service_name_matches(output, service_name):
@@ -320,8 +344,7 @@ def _check_kafka_output(self, checked_topic):
320344
while True:
321345
# Log progress of kafka topic creation subprocess:
322346
output = utils.print_log_from_subprocess(self.subprocess_name,
323-
self.process.stdout,
324-
with_output=True)
347+
self.process.stdout, True)
325348
if checked_topic is not None and checked_topic in output:
326349
return True
327350

rayvens/core/utils.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
import os
1818
import random
19+
import time
20+
import threading
21+
from queue import Queue, Empty
1922

2023
# Port externalized by the cluster.
2124
externalized_cluster_port = "31095"
@@ -69,7 +72,7 @@ def subprocess_tag(subprocess_name):
6972
return "[%s subprocess]" % subprocess_name
7073

7174

72-
def print_log_from_subprocess(subprocess_name, stdout, with_output=False):
75+
def print_log_from_subprocess(subprocess_name, stdout, with_output):
7376
output = stdout.readline().decode("utf-8")
7477
output = output.strip()
7578

@@ -79,6 +82,33 @@ def print_log_from_subprocess(subprocess_name, stdout, with_output=False):
7982
return output
8083

8184

85+
class LogThread(threading.Thread):
86+
def __init__(self, stdout):
87+
threading.Thread.__init__(self)
88+
self.stop_flag = threading.Event()
89+
self.stdout = stdout
90+
self.queue = Queue()
91+
92+
def run(self):
93+
while not self.stop_flag.is_set():
94+
line = self.stdout.readline().decode("utf-8")
95+
self.queue.put(line.strip())
96+
time.sleep(0.1)
97+
98+
print("[Log thread] Kamel command logging terminated.")
99+
100+
101+
def print_log_from_queue(subprocess_name, queue, with_output):
102+
try:
103+
line = queue.get_nowait()
104+
except Empty:
105+
return None
106+
else:
107+
if line != "" and with_output:
108+
print(subprocess_tag(subprocess_name), line)
109+
return line
110+
111+
82112
def print_log(subprocess_name, message):
83113
print(subprocess_tag(subprocess_name), message)
84114

0 commit comments

Comments
 (0)