Skip to content

Commit d87d7e3

Browse files
committed
Fix a number of bugs in the streaming logic
1 parent fdbbec3 commit d87d7e3

File tree

2 files changed

+74
-69
lines changed

2 files changed

+74
-69
lines changed

chipflow_lib/steps/silicon.py

Lines changed: 73 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,10 @@
77
import json
88
import logging
99
import os
10-
import re
1110
import requests
1211
import shutil
1312
import subprocess
1413
import sys
15-
import time
1614
import urllib3
1715
from pprint import pformat
1816

@@ -31,6 +29,18 @@
3129
logger = logging.getLogger(__name__)
3230

3331

32+
def halo_logging(closure):
33+
class ClosureStreamHandler(logging.StreamHandler):
34+
def emit(self, record):
35+
# Call the closure with the log message
36+
closure(self.format(record))
37+
38+
handler = ClosureStreamHandler()
39+
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
40+
handler.setFormatter(formatter)
41+
logger.addHandler(handler)
42+
43+
3444
class SiliconTop(StepBase, Elaboratable):
3545
def __init__(self, config):
3646
self._config = config
@@ -122,6 +132,7 @@ def submit(self, rtlil_path, args):
122132
else:
123133
interval = -1
124134
with Halo(text="Submitting...", spinner="dots", interval=interval) as sp:
135+
125136
fh = None
126137
submission_name = self.determine_submission_name()
127138
data = {
@@ -234,7 +245,6 @@ def network_err(e):
234245
headers["Authorization"] = "REDACTED"
235246
logger.debug(f"Request headers: {headers}")
236247

237-
logger.debug(f"Request data: {data}")
238248
logger.debug(f"Response headers: {dict(resp.headers)}")
239249
logger.debug(f"Response body: {resp_data}")
240250
sp.text = ""
@@ -248,60 +258,56 @@ def network_err(e):
248258
exit(2)
249259

250260
def _long_poll_stream(self, sp, network_err):
251-
steps = self._last_log_steps
252-
stream_event_counter = 0
253261
assert self._chipflow_api_key
254262
# after 4 errors, return to _stream_logs loop and query the build status again
255-
while (stream_event_counter < 4):
256-
sp.text = "Build running... " + ' -> '.join(steps)
257-
try:
258-
log_resp = requests.get(
259-
self._log_stream_url,
260-
auth=("", self._chipflow_api_key),
261-
stream=True,
262-
timeout=(2.0, 60.0) # fail if connect takes >2s, long poll for 60s at a time
263-
)
264-
if log_resp.status_code == 200:
265-
logger.debug(f"response from {self._log_stream_url}:\n{log_resp}")
266-
for line in log_resp.iter_lines():
267-
line_str = line.decode("utf-8") if line else ""
268-
logger.debug(line_str)
269-
match line_str[0:8]:
270-
case "DEBUG ":
271-
sp.info(line_str) if log_level <= logging.DEBUG else None
272-
case "INFO ":
273-
sp.info(line_str) if log_level <= logging.INFO else None
274-
# Some special handling for more user feedback
275-
if line_str.endswith("started"):
276-
steps = re.findall(r"([0-9a-z_.]+)\:+?", line_str[18:])[0:2]
277-
sp.text = "Build running... " + ' -> '.join(steps)
278-
case "WARNING ":
279-
sp.info(line_str) if log_level <= logging.WARNING else None
280-
case "ERROR ":
281-
sp.info(line_str) if log_level <= logging.ERROR else None
282-
sp.start()
283-
else:
284-
stream_event_counter +=1
285-
logger.debug(f"Failed to stream logs: {log_resp.text}")
286-
sp.text = "💥 Failed streaming build logs. Trying again!"
287-
break
288-
except requests.ConnectionError as e:
289-
if type(e.__context__) is urllib3.exceptions.ReadTimeoutError:
290-
continue #just timed out, continue long poll
291-
sp.text = "💥 Failed connecting to ChipFlow Cloud."
292-
logger.debug(f"Error while streaming logs: {e}")
293-
break
294-
except (requests.RequestException, requests.exceptions.ReadTimeout) as e:
295-
if type(e.__context__) is urllib3.exceptions.ReadTimeoutError:
296-
continue #just timed out, continue long poll
263+
logger.debug("Long poll start")
264+
try:
265+
log_resp = requests.get(
266+
self._log_stream_url,
267+
auth=("", self._chipflow_api_key),
268+
stream=True,
269+
timeout=(2.0, 60.0) # fail if connect takes >2s, long poll for 60s at a time
270+
)
271+
if log_resp.status_code == 200:
272+
logger.debug(f"response from {self._log_stream_url}:\n{log_resp}")
273+
for line in log_resp.iter_lines():
274+
message = line.decode("utf-8") if line else ""
275+
try:
276+
level, time, step = message.split(maxsplit=2)
277+
except ValueError:
278+
continue
279+
280+
match level:
281+
case "DEBUG":
282+
sp.info(message) if log_level <= logging.DEBUG else None
283+
case "INFO" | "INFO+":
284+
sp.info(message) if log_level <= logging.INFO else None
285+
case "WARNING":
286+
sp.info(message) if log_level <= logging.WARNING else None
287+
case "ERROR":
288+
sp.info(message) if log_level <= logging.ERROR else None
289+
290+
if step != self._last_log_step:
291+
sp.text = f"Build running: {self._last_log_step}"
292+
self._last_log_step = step
293+
else:
294+
logger.debug(f"Failed to stream logs: {log_resp.text}")
297295
sp.text = "💥 Failed streaming build logs. Trying again!"
298-
logger.debug(f"Error while streaming logs: {e}")
299-
stream_event_counter +=1
300-
continue
301-
302-
# save steps so we coninue where we left off if we manage to reconnect
303-
self._last_log_steps = steps
304-
return stream_event_counter
296+
return True
297+
except requests.ConnectionError as e:
298+
if type(e.__context__) is urllib3.exceptions.ReadTimeoutError:
299+
return True
300+
sp.text = "💥 Failed connecting to ChipFlow Cloud."
301+
logger.debug(f"Error while streaming logs: {e}")
302+
return False
303+
except (requests.RequestException, requests.exceptions.ReadTimeout) as e:
304+
if type(e.__context__) is urllib3.exceptions.ReadTimeoutError:
305+
return True
306+
sp.text = "💥 Failed streaming build logs. Trying again!"
307+
logger.debug(f"Error while streaming logs: {e}")
308+
return False
309+
310+
return True
305311

306312
def _stream_logs(self, sp, network_err):
307313
sp.start("Streaming the logs...")
@@ -310,18 +316,19 @@ def _stream_logs(self, sp, network_err):
310316
timeout = 10.0
311317
build_status = "pending"
312318
stream_event_counter = 0
313-
self._last_log_steps = []
319+
self._last_log_step = ""
314320
assert self._chipflow_api_key is not None
315-
while fail_counter < 10 and stream_event_counter < 10:
316-
sp.text = f"Waiting for build to run... {build_status}"
317-
time.sleep(timeout) # Wait before polling
321+
sp.text = f"Waiting for build to run... {build_status}"
322+
323+
while fail_counter < 5:
318324
try:
325+
logger.debug(f"Checking build status, iteration {fail_counter}")
319326
status_resp = requests.get(
320327
self._build_status_url,
321328
auth=("", self._chipflow_api_key),
322329
timeout=timeout
323330
)
324-
except requests.exceptions.ReadTimeout as e:
331+
except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectionError) as e:
325332
sp.text = "💥 Error connecting to ChipFlow Cloud. Trying again! "
326333
fail_counter += 1
327334
logger.debug(f"Failed to fetch build status{fail_counter} times: {e}")
@@ -337,22 +344,20 @@ def _stream_logs(self, sp, network_err):
337344
build_status = status_data.get("status")
338345
logger.debug(f"Build status: {build_status}")
339346

340-
sp.text = f"Polling build status... {build_status}"
341-
342347
if build_status == "completed":
343348
sp.succeed("✅ Build completed successfully!")
344349
return 0
345350
elif build_status == "failed":
346351
sp.succeed("❌ Build failed.")
347352
return 1
348353
elif build_status == "running":
349-
stream_event_counter += self._long_poll_stream(sp, network_err)
350-
351-
if fail_counter >=10 or stream_event_counter >= 10:
352-
sp.text = ""
353-
sp.fail("💥 Failed fetching build status. Perhaps you hit a network error?")
354-
logger.debug(f"Failed to fetch build status {fail_counter} times and failed streaming {stream_event_counter} times. Exiting.")
355-
return 2
354+
sp.text = f"Build status: {build_status}"
355+
if not self._long_poll_stream(sp, network_err):
356+
sp.text = ""
357+
sp.fail("💥 Failed fetching build status. Perhaps you hit a network error?")
358+
logger.debug(f"Failed to fetch build status {fail_counter} times and failed streaming {stream_event_counter} times. Exiting.")
359+
return 2
360+
# check status and go again
356361

357362
def determine_submission_name(self):
358363
if "CHIPFLOW_SUBMISSION_NAME" in os.environ:

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ test.cmd = "pytest"
6666
test-cov.cmd = "pytest --cov=chipflow_lib --cov-report=term"
6767
test-cov-html.cmd = "pytest --cov=chipflow_lib --cov-report=html"
6868
test-docs.cmd = "sphinx-build -b doctest docs/ docs/_build"
69-
lint.composite = [ "./tools/license_check.sh", "ruff check", "pyright chipflow_lib"]
69+
lint.composite = [ "./tools/license_check.sh", "ruff check {args}", "pyright chipflow_lib"]
7070
docs.cmd = "sphinx-build docs/ docs/_build/ -W --keep-going"
7171
test-silicon.cmd = "pytest tests/test_silicon_platform.py tests/test_silicon_platform_additional.py tests/test_silicon_platform_amaranth.py tests/test_silicon_platform_build.py tests/test_silicon_platform_port.py --cov=chipflow_lib.platforms.silicon --cov-report=term"
7272
_check-project.call = "tools.check_project:main"

0 commit comments

Comments
 (0)