Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 76 additions & 17 deletions client/globus_cw_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,18 @@
Python client API for cwlogs daemon
"""

from __future__ import annotations

import asyncio
import json
import socket
import time

import orjson


def _checktype(value, types, message):
if not isinstance(value, types):
raise TypeError(message)


def log_event(message: str | bytes, retries=10, wait=0.1):
def log_event(message, retries=10, wait=0.1):
"""
Log the @message string to cloudwatch logs, using the current time.
message: bytes (valid utf8 required) or str.
Expand All @@ -26,7 +24,7 @@ def log_event(message: str | bytes, retries=10, wait=0.1):
Returns when the message was queued to the daemon's memory queue.
(Does not mean the message is safe in cloudwatch)
"""

# python3 json library can't handle bytes, so preemptively decode utf-8
if isinstance(message, bytes):
message = message.decode("utf-8")
_checktype(message, str, "message type must be bytes or str")
Expand All @@ -39,24 +37,42 @@ def log_event(message: str | bytes, retries=10, wait=0.1):
if wait < 0:
raise ValueError("wait must be non-negative")

req = {
"message": message,
"timestamp": int(time.time() * 1000),
}
req = {"message": message, "timestamp": int(time.time() * 1000)}
return _request(req, retries, wait)


async def log_event_async(message, retries=10, wait=0.1):
if isinstance(message, bytes):
message = message.decode("utf-8")
_checktype(message, str, "message type must be bytes or unicode")

_checktype(retries, int, "retries must be an int")
if retries < 0:
raise ValueError("retries must be non-negative")

_checktype(wait, (int, float), "wait must be an int or float")
if wait < 0:
raise ValueError("wait must be non-negative")

req = {}
req["message"] = message
req["timestamp"] = int(time.time() * 1000)
return await _request_async(req, retries, wait)


socket_path = "/tmp/org.globus.cwlogs"


def _connect(retries, wait):
"""
Try to connect to the daemon @retries + 1 times,
waiting @wait seconds between tries
Raise: Exception if max attempts exceeded
"""
addr = "\0org.globus.cwlogs"
for _ in range(retries + 1):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
try:
sock.connect(addr)
sock.connect(socket_path)
except Exception as err:
sock.close()
error = err
Expand All @@ -67,22 +83,65 @@ def _connect(retries, wait):
raise CWLoggerConnectionError("couldn't connect to cw", error)


async def _connect_async(retries, wait):
for _ in range(retries + 1):
writer = None
try:
reader, writer = await asyncio.open_unix_connection(path=socket_path)
except Exception as err:
if writer:
writer.close()
error = err
else:
return reader, writer
await asyncio.sleep(wait)
raise CWLoggerConnectionError("couldn't connect to cw", error)


def _request(req, retries, wait):
buf = orjson.dumps(req) + b"\n"
buf = json.dumps(req, indent=None) + "\n"
buf = buf.encode("utf-8")

sock = _connect(retries, wait)
sock.sendall(buf)

resp = b""
resp = ""
while True:
chunk = sock.recv(4000)
if not chunk:
sock.close()
raise Exception("no data")
resp += chunk
if chunk.endswith(b"\n"):
resp += chunk.decode("utf-8")
if resp.endswith("\n"):
break

d = orjson.loads(resp[:-1])
d = json.loads(resp[:-1])
sock.close()
if isinstance(d, dict):
status = d["status"]
if status == "ok":
return d
else:
raise CWLoggerDaemonError("forwarded error", d["message"])
else:
raise CWLoggerDaemonError("unknown response type", d)


async def _request_async(req, retries, wait):
buf = json.dumps(req, indent=None) + "\n"
buf = buf.encode("utf-8")

reader, writer = await _connect_async(retries, wait)
writer.write(buf)
await writer.drain()

resp = await reader.readline()
writer.close()
if not resp.endswith(b"\n"):
raise Exception("no data")
resp = resp.decode("utf-8")

d = json.loads(resp[:-1])
if isinstance(d, dict):
status = d["status"]
if status == "ok":
Expand Down
3 changes: 0 additions & 3 deletions client/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,4 @@
# descriptive info, non-critical
description="Client for Globus CloudWatch Logger",
url="https://github.com/globus/globus-cwlogger",
install_requires=[
"orjson",
],
)
10 changes: 9 additions & 1 deletion daemon/globus_cw_daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,16 @@ def main():
_print("cwlogs: starting...")
_log.info("starting")

addr = "/tmp/org.globus.cwlogs"

# clean up previous socket if exists
try:
os.remove(addr)
except FileNotFoundError:
pass

os.umask(0)
listen_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
addr = "\0org.globus.cwlogs"
try:
listen_sock.bind(addr)
except OSError as e:
Expand Down