diff --git a/client/globus_cw_client/client.py b/client/globus_cw_client/client.py index 415d488..07f7ef8 100644 --- a/client/globus_cw_client/client.py +++ b/client/globus_cw_client/client.py @@ -2,20 +2,18 @@ Python client API for cwlogs daemon """ -from __future__ import annotations - +import asyncio +import json import socket import time -import orjson - def _checktype(value, types, message): if not isinstance(value, types): raise TypeError(message) -def log_event(message: str | bytes, retries=10, wait=0.1): +def log_event(message, retries=10, wait=0.1): """ Log the @message string to cloudwatch logs, using the current time. message: bytes (valid utf8 required) or str. @@ -26,7 +24,7 @@ def log_event(message: str | bytes, retries=10, wait=0.1): Returns when the message was queued to the daemon's memory queue. (Does not mean the message is safe in cloudwatch) """ - + # python3 json library can't handle bytes, so preemptively decode utf-8 if isinstance(message, bytes): message = message.decode("utf-8") _checktype(message, str, "message type must be bytes or str") @@ -39,24 +37,42 @@ def log_event(message: str | bytes, retries=10, wait=0.1): if wait < 0: raise ValueError("wait must be non-negative") - req = { - "message": message, - "timestamp": int(time.time() * 1000), - } + req = {"message": message, "timestamp": int(time.time() * 1000)} return _request(req, retries, wait) +async def log_event_async(message, retries=10, wait=0.1): + if isinstance(message, bytes): + message = message.decode("utf-8") + _checktype(message, str, "message type must be bytes or unicode") + + _checktype(retries, int, "retries must be an int") + if retries < 0: + raise ValueError("retries must be non-negative") + + _checktype(wait, (int, float), "wait must be an int or float") + if wait < 0: + raise ValueError("wait must be non-negative") + + req = {} + req["message"] = message + req["timestamp"] = int(time.time() * 1000) + return await _request_async(req, retries, wait) + + +socket_path = "/tmp/org.globus.cwlogs" + + def _connect(retries, wait): """ Try to connect to the daemon @retries + 1 times, waiting @wait seconds between tries Raise: Exception if max attempts exceeded """ - addr = "\0org.globus.cwlogs" for _ in range(retries + 1): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) try: - sock.connect(addr) + sock.connect(socket_path) except Exception as err: sock.close() error = err @@ -67,22 +83,65 @@ def _connect(retries, wait): raise CWLoggerConnectionError("couldn't connect to cw", error) +async def _connect_async(retries, wait): + for _ in range(retries + 1): + writer = None + try: + reader, writer = await asyncio.open_unix_connection(path=socket_path) + except Exception as err: + if writer: + writer.close() + error = err + else: + return reader, writer + await asyncio.sleep(wait) + raise CWLoggerConnectionError("couldn't connect to cw", error) + + def _request(req, retries, wait): - buf = orjson.dumps(req) + b"\n" + buf = json.dumps(req, indent=None) + "\n" + buf = buf.encode("utf-8") sock = _connect(retries, wait) sock.sendall(buf) - resp = b"" + resp = "" while True: chunk = sock.recv(4000) if not chunk: + sock.close() raise Exception("no data") - resp += chunk - if chunk.endswith(b"\n"): + resp += chunk.decode("utf-8") + if resp.endswith("\n"): break - d = orjson.loads(resp[:-1]) + d = json.loads(resp[:-1]) + sock.close() + if isinstance(d, dict): + status = d["status"] + if status == "ok": + return d + else: + raise CWLoggerDaemonError("forwarded error", d["message"]) + else: + raise CWLoggerDaemonError("unknown response type", d) + + +async def _request_async(req, retries, wait): + buf = json.dumps(req, indent=None) + "\n" + buf = buf.encode("utf-8") + + reader, writer = await _connect_async(retries, wait) + writer.write(buf) + await writer.drain() + + resp = await reader.readline() + writer.close() + if not resp.endswith(b"\n"): + raise Exception("no data") + resp = resp.decode("utf-8") + + d = json.loads(resp[:-1]) if isinstance(d, dict): status = d["status"] if status == "ok": diff --git a/client/setup.py b/client/setup.py index 7ec4f27..e79cdbb 100644 --- a/client/setup.py +++ b/client/setup.py @@ -7,7 +7,4 @@ # descriptive info, non-critical description="Client for Globus CloudWatch Logger", url="https://github.com/globus/globus-cwlogger", - install_requires=[ - "orjson", - ], ) diff --git a/daemon/globus_cw_daemon/daemon.py b/daemon/globus_cw_daemon/daemon.py index f128638..2c7970e 100644 --- a/daemon/globus_cw_daemon/daemon.py +++ b/daemon/globus_cw_daemon/daemon.py @@ -208,8 +208,16 @@ def main(): _print("cwlogs: starting...") _log.info("starting") + addr = "/tmp/org.globus.cwlogs" + + # clean up previous socket if exists + try: + os.remove(addr) + except FileNotFoundError: + pass + + os.umask(0) listen_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) - addr = "\0org.globus.cwlogs" try: listen_sock.bind(addr) except OSError as e: