Skip to content

Commit 07f4ff7

Browse files
author
Jack Kordas
committed
async log event
1 parent 33120aa commit 07f4ff7

File tree

1 file changed

+63
-3
lines changed

1 file changed

+63
-3
lines changed

client/globus_cw_client/client.py

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
Python client API for cwlogs daemon
33
"""
44

5+
import asyncio
56
import json
67
import socket
78
import time
@@ -42,6 +43,25 @@ def log_event(message, retries=10, wait=0.1):
4243
return _request(req, retries, wait)
4344

4445

46+
async def log_event_async(message, retries=10, wait=0.1):
47+
if isinstance(message, bytes):
48+
message = message.decode("utf-8")
49+
_checktype(message, str, "message type must be bytes or unicode")
50+
51+
_checktype(retries, int, "retries must be an int")
52+
if retries < 0:
53+
raise ValueError("retries must be non-negative")
54+
55+
_checktype(wait, (int, float), "wait must be an int or float")
56+
if wait < 0:
57+
raise ValueError("wait must be non-negative")
58+
59+
req = {}
60+
req["message"] = message
61+
req["timestamp"] = int(time.time() * 1000)
62+
return await _request_async(req, retries, wait)
63+
64+
4565
def _connect(retries, wait):
4666
"""
4767
Try to connect to the daemon @retries + 1 times,
@@ -63,11 +83,24 @@ def _connect(retries, wait):
6383
raise CWLoggerConnectionError("couldn't connect to cw", error)
6484

6585

86+
async def _connect_async(retries, wait):
87+
addr = "\0org.globus.cwlogs"
88+
for _ in range(retries + 1):
89+
try:
90+
reader, writer = await asyncio.open_unix_connection(path=addr)
91+
except Exception as err:
92+
if writer:
93+
writer.close()
94+
error = err
95+
else:
96+
return reader, writer
97+
await asyncio.sleep(wait)
98+
raise CWLoggerConnectionError("couldn't connect to cw", error)
99+
100+
66101
def _request(req, retries, wait):
67102
buf = json.dumps(req, indent=None) + "\n"
68-
# dumps returns unicode with python3, but sock requires bytes
69-
if isinstance(buf, str):
70-
buf = buf.encode("utf-8")
103+
buf = buf.encode("utf-8")
71104

72105
sock = _connect(retries, wait)
73106
sock.sendall(buf)
@@ -76,11 +109,38 @@ def _request(req, retries, wait):
76109
while True:
77110
chunk = sock.recv(4000)
78111
if not chunk:
112+
sock.close()
79113
raise Exception("no data")
80114
resp += chunk.decode("utf-8")
81115
if resp.endswith("\n"):
82116
break
83117

118+
d = json.loads(resp[:-1])
119+
sock.close()
120+
if isinstance(d, dict):
121+
status = d["status"]
122+
if status == "ok":
123+
return d
124+
else:
125+
raise CWLoggerDaemonError("forwarded error", d["message"])
126+
else:
127+
raise CWLoggerDaemonError("unknown response type", d)
128+
129+
130+
async def _request_async(req, retries, wait):
131+
buf = json.dumps(req, indent=None) + "\n"
132+
buf = buf.encode("utf-8")
133+
134+
reader, writer = await _connect_async(retries, wait)
135+
writer.write(buf)
136+
await writer.drain()
137+
138+
resp = await reader.readline()
139+
writer.close()
140+
if not resp.endswith(b"\n"):
141+
raise Exception("no data")
142+
resp = resp.decode("utf-8")
143+
84144
d = json.loads(resp[:-1])
85145
if isinstance(d, dict):
86146
status = d["status"]

0 commit comments

Comments
 (0)