Skip to content

Commit 18d079b

Browse files
committed
ndjson output
Signed-off-by: John <[email protected]>
1 parent 209e6d9 commit 18d079b

File tree

2 files changed

+139
-9
lines changed

2 files changed

+139
-9
lines changed

github_webhook_events/agi.py

Lines changed: 131 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
77
AGI_SOCK=/tmp/agi.sock go run agi_sshd.go
88
9-
export INPUT_SOCK="$(mktemp -d)/input.sock"; ssh -NnT -p 2222 -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -o PasswordAuthentication=no -R /tmux.sock:$(echo $TMUX | sed -e 's/,.*//g') -R "${INPUT_SOCK}:${INPUT_SOCK}" user@localhost
9+
export INPUT_SOCK="$(mktemp -d)/input.sock"; export OUTPUT_SOCK="$(mktemp -d)/text-output.sock"; export NDJSON_OUTPUT_SOCK="$(mktemp -d)/ndjson-output.sock"; ssh -NnT -p 2222 -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -o PasswordAuthentication=no -R /tmux.sock:$(echo $TMUX | sed -e 's/,.*//g') -R "${OUTPUT_SOCK}:${OUTPUT_SOCK}" -R "${NDJSON_OUTPUT_SOCK}:${NDJSON_OUTPUT_SOCK}" -R "${INPUT_SOCK}:${INPUT_SOCK}" user@localhost
1010
1111
1212
gh auth refresh -h github.com -s admin:public_key
@@ -3299,6 +3299,30 @@ def make_argparse_parser(argv=None):
32993299
default=None,
33003300
type=str,
33013301
)
3302+
parser.add_argument(
3303+
"--text-output-socket-path",
3304+
dest="text_output_socket_path",
3305+
default=None,
3306+
type=str,
3307+
)
3308+
parser.add_argument(
3309+
"--client-side-text-output-socket-path",
3310+
dest="client_side_text_output_socket_path",
3311+
default=None,
3312+
type=str,
3313+
)
3314+
parser.add_argument(
3315+
"--ndjson-output-socket-path",
3316+
dest="ndjson_output_socket_path",
3317+
default=None,
3318+
type=str,
3319+
)
3320+
parser.add_argument(
3321+
"--client-side-ndjson-output-socket-path",
3322+
dest="client_side_ndjson_output_socket_path",
3323+
default=None,
3324+
type=str,
3325+
)
33023326
parser.add_argument(
33033327
"--agi-name",
33043328
dest="agi_name",
@@ -4134,6 +4158,22 @@ async def read_unix_socket_lines(path):
41344158
await writer.wait_closed()
41354159

41364160

4161+
async def write_unix_socket(path):
4162+
# Connect to the Unix socket
4163+
reader, writer = await asyncio.open_unix_connection(path)
4164+
try:
4165+
while True:
4166+
data = yield
4167+
if data is None:
4168+
continue
4169+
writer.write(data)
4170+
await writer.drain()
4171+
finally:
4172+
# Close the connection
4173+
writer.close()
4174+
await writer.wait_closed()
4175+
4176+
41374177
async def pdb_action_stream(tg, user_name, agi_name, agents, threads, pane: Optional[libtmux.Pane] = None, input_socket_path: Optional[str] = None):
41384178
# TODO Take ALICE_INPUT from args
41394179
alice_input_sock = input_socket_path
@@ -4224,6 +4264,8 @@ async def DEBUG_TEMP_message_handler(user_name,
42244264
).model_dump_json(),
42254265
)
42264266
)
4267+
# Find and kill jq listening to ndjson output so we can type
4268+
pane.send_keys("C-c", enter=False, suppress_history=False)
42274269
pane.send_keys('if [ "x${CALLER_PATH}" = "x" ]; then export CALLER_PATH="' + str(tempdir) + '"; fi', enter=True)
42284270
pane.send_keys(
42294271
"cat > \"${CALLER_PATH}/proposed-workflow.yml\" <<\'WRITE_OUT_SH_EOF\'"
@@ -4247,6 +4289,11 @@ async def DEBUG_TEMP_message_handler(user_name,
42474289
print(f"{user_name}: ", end="")
42484290

42494291

4292+
class OutputMessage(BaseModel):
4293+
work_name: str
4294+
result: Any
4295+
4296+
42504297
async def main(
42514298
user_name: str,
42524299
agi_name: str,
@@ -4262,7 +4309,11 @@ async def main(
42624309
openai_base_url: Optional[str] = None,
42634310
pane: Optional[libtmux.Pane] = None,
42644311
input_socket_path: Optional[str] = None,
4312+
text_output_socket_path: Optional[str] = None,
4313+
ndjson_output_socket_path: Optional[str] = None,
42654314
client_side_input_socket_path: Optional[str] = None,
4315+
client_side_text_output_socket_path: Optional[str] = None,
4316+
client_side_ndjson_output_socket_path: Optional[str] = None,
42664317
):
42674318
if log is not None:
42684319
# logging.basicConfig(level=log)
@@ -4297,6 +4348,9 @@ async def main(
42974348
agents = AsyncioLockedCurrentlyDict()
42984349
threads = AsyncioLockedCurrentlyDict()
42994350

4351+
write_ndjson_output = write_unix_socket(ndjson_output_socket_path)
4352+
await write_ndjson_output.asend(None)
4353+
43004354
async with kvstore, asyncio.TaskGroup() as tg, contextlib.AsyncExitStack() as async_exit_stack:
43014355
# Raw Input Action Stream
43024356
unvalidated_user_input_action_stream = pdb_action_stream(
@@ -4389,6 +4443,11 @@ async def user_input_action_stream_queue_iterator(queue):
43894443
}
43904444
async for (work_name, work_ctx), result in concurrently(work):
43914445
logger.debug(f"main.{work_name}: %s", pprint.pformat(result))
4446+
output_message = OutputMessage(
4447+
work_name=f"main.{work_name}",
4448+
result=result,
4449+
)
4450+
await write_ndjson_output.asend(f"{output_message.model_dump_json()}\n".encode())
43924451
if result is STOP_ASYNC_ITERATION:
43934452
continue
43944453
async with agents:
@@ -4507,7 +4566,7 @@ async def user_input_action_stream_queue_iterator(queue):
45074566
pane.send_keys(
45084567
textwrap.dedent(
45094568
f"""
4510-
echo "Hello Alice. Shall we play a game? My name is $USER. Please run nmap against localhost. Here are some details about the system we are on: $(echo $(echo $(cat /usr/lib/os-release || cat /etc/os-release)))" | tee -a ${agi_name.upper()}_INPUT
4569+
echo "Hello Alice. Shall we play a game? My name is $USER. Please list all open bound listening TCP sockets and full command line of the processes running them. Here are some details about the system we are on: $(echo $(echo $(cat /usr/lib/os-release || cat /etc/os-release)))" | tee -a ${agi_name.upper()}_INPUT && tail -F ${agi_name.upper()}_NDJSON_OUTPUT | jq
45114570
""".strip(),
45124571
),
45134572
enter=False,
@@ -4712,8 +4771,17 @@ def a_shell_for_a_ghost_send_keys(pane, send_string, erase_after=None):
47124771
pane.cmd("send", "C-BSpace")
47134772
time.sleep(0.01)
47144773

4715-
4716-
async def tmux_test(*args, socket_path=None, input_socket_path=None, client_side_input_socket_path: Optional[str] = None, **kwargs):
4774+
async def tmux_test(
4775+
*args,
4776+
socket_path: Optional[str] = None,
4777+
input_socket_path: Optional[str] = None,
4778+
text_output_socket_path: Optional[str] = None,
4779+
ndjson_output_socket_path: Optional[str] = None,
4780+
client_side_input_socket_path: Optional[str] = None,
4781+
client_side_text_output_socket_path: Optional[str] = None,
4782+
client_side_ndjson_output_socket_path: Optional[str] = None,
4783+
**kwargs
4784+
):
47174785
pane = None
47184786
tempdir = None
47194787
possible_tempdir = tempdir
@@ -4956,6 +5024,20 @@ async def tmux_test(*args, socket_path=None, input_socket_path=None, client_side
49565024
lines = pane.capture_pane()
49575025
time.sleep(0.1)
49585026

5027+
pane.send_keys(f"export {agi_name.upper()}_OUTPUT=" + str(pathlib.Path(tempdir, "output.txt")), enter=True)
5028+
pane.send_keys(f'export {agi_name.upper()}_OUTPUT_SOCK="{client_side_text_output_socket_path}"', enter=True)
5029+
pane.send_keys(f'rm -fv ${agi_name.upper()}_OUTPUT_SOCK', enter=True)
5030+
pane.send_keys(f'ln -s ${agi_name.upper()}_OUTPUT_SOCK', enter=True)
5031+
pane.send_keys(f'socat UNIX-LISTEN:${agi_name.upper()}_OUTPUT_SOCK,fork EXEC:"/usr/bin/tee ${agi_name.upper()}_OUTPUT" &', enter=True)
5032+
pane.send_keys(f'ls -lAF ${agi_name.upper()}_OUTPUT', enter=True)
5033+
5034+
pane.send_keys(f"export {agi_name.upper()}_NDJSON_OUTPUT=" + str(pathlib.Path(tempdir, "output.ndjson")), enter=True)
5035+
pane.send_keys(f'export {agi_name.upper()}_NDJSON_OUTPUT_SOCK="{client_side_ndjson_output_socket_path}"', enter=True)
5036+
pane.send_keys(f'rm -fv ${agi_name.upper()}_NDJSON_OUTPUT_SOCK', enter=True)
5037+
pane.send_keys(f'ln -s ${agi_name.upper()}_NDJSON_OUTPUT_SOCK', enter=True)
5038+
pane.send_keys(f'socat UNIX-LISTEN:${agi_name.upper()}_NDJSON_OUTPUT_SOCK,fork EXEC:"/usr/bin/tee ${agi_name.upper()}_NDJSON_OUTPUT" &', enter=True)
5039+
pane.send_keys(f'ls -lAF ${agi_name.upper()}_NDJSON_OUTPUT', enter=True)
5040+
49595041
pane.send_keys(f"export {agi_name.upper()}_INPUT=" + str(pathlib.Path(tempdir, "input.txt")), enter=True)
49605042
pane.send_keys(f'export {agi_name.upper()}_INPUT_SOCK="{client_side_input_socket_path}"', enter=True)
49615043
pane.send_keys(f"export {agi_name.upper()}_INPUT_LAST_LINE=" + str(pathlib.Path(tempdir, "input-last-line.txt")), enter=True)
@@ -4966,7 +5048,17 @@ async def tmux_test(*args, socket_path=None, input_socket_path=None, client_side
49665048

49675049
pane.send_keys(f'set +x', enter=True)
49685050

4969-
await main(*args, pane=pane, input_socket_path=input_socket_path, client_side_input_socket_path=client_side_input_socket_path, **kwargs)
5051+
await main(
5052+
*args,
5053+
pane=pane,
5054+
input_socket_path=input_socket_path,
5055+
client_side_input_socket_path=client_side_input_socket_path,
5056+
text_output_socket_path=text_output_socket_path,
5057+
ndjson_output_socket_path=ndjson_output_socket_path,
5058+
client_side_text_output_socket_path=client_side_text_output_socket_path,
5059+
client_side_ndjson_output_socket_path=client_side_ndjson_output_socket_path,
5060+
**kwargs
5061+
)
49705062
finally:
49715063
with contextlib.suppress(Exception):
49725064
if pane is not None:
@@ -4977,6 +5069,8 @@ async def tmux_test(*args, socket_path=None, input_socket_path=None, client_side
49775069
# pane = libtmux.Pane.from_pane_id(pane_id=pane.cmd('split-window', '-P', '-F#{pane_id}').stdout[0], server=pane.server)
49785070

49795071
from fastapi import FastAPI, BackgroundTasks
5072+
from fastapi.exceptions import RequestValidationError
5073+
from fastapi.responses import JSONResponse
49805074

49815075

49825076
# Set up logging configuration
@@ -4988,7 +5082,16 @@ async def lifespan_logging(app):
49885082
app = FastAPI(lifespan=lifespan_logging)
49895083

49905084

4991-
def run_tmux_attach(socket_path, input_socket_path, client_side_input_socket_path):
5085+
@app.exception_handler(RequestValidationError)
5086+
async def validation_exception_handler(request: Request, exc: RequestValidationError):
5087+
snoop.pp(exc.detail, await request.json())
5088+
return JSONResponse(
5089+
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
5090+
content=jsonable_encoder({"detail": exc.errors(), "body": exc.body}),
5091+
)
5092+
5093+
5094+
def run_tmux_attach(socket_path, input_socket_path, client_side_input_socket_path, text_output_socket_path, client_side_text_output_socket_path, ndjson_output_socket_path, client_side_ndjson_output_socket_path):
49925095
cmd = [
49935096
sys.executable,
49945097
"-u",
@@ -4999,6 +5102,14 @@ def run_tmux_attach(socket_path, input_socket_path, client_side_input_socket_pat
49995102
input_socket_path,
50005103
"--client-side-input-socket-path",
50015104
client_side_input_socket_path,
5105+
"--text-output-socket-path",
5106+
text_output_socket_path,
5107+
"--client-side-text-output-socket-path",
5108+
client_side_text_output_socket_path,
5109+
"--ndjson-output-socket-path",
5110+
ndjson_output_socket_path,
5111+
"--client-side-ndjson-output-socket-path",
5112+
client_side_ndjson_output_socket_path,
50025113
"--agi-name",
50035114
# TODO Something secure here, scitt URN and lookup for PS1?
50045115
f"alice{str(uuid.uuid4()).split('-')[4]}",
@@ -5029,12 +5140,25 @@ async def connect_and_read(socket_path: str, sleep_time: float = 0.1):
50295140
class RequestConnectTMUX(BaseModel):
50305141
socket_tmux_path: str = Field(alias="tmux.sock")
50315142
socket_input_path: str = Field(alias="input.sock")
5143+
socket_text_output_path: str = Field(alias="text-output.sock")
5144+
socket_ndjson_output_path: str = Field(alias="ndjson-output.sock")
50325145
socket_client_side_input_path: str = Field(alias="client-side-input.sock")
5146+
socket_client_side_text_output_path: str = Field(alias="client-side-text-output.sock")
5147+
socket_client_side_ndjson_output_path: str = Field(alias="client-side-ndjson-output.sock")
50335148

50345149

50355150
@app.post("/connect/tmux")
50365151
async def connect(request_connect_tmux: RequestConnectTMUX, background_tasks: BackgroundTasks):
5037-
background_tasks.add_task(run_tmux_attach, request_connect_tmux.socket_tmux_path, request_connect_tmux.socket_input_path, request_connect_tmux.socket_client_side_input_path)
5152+
background_tasks.add_task(
5153+
run_tmux_attach,
5154+
request_connect_tmux.socket_tmux_path,
5155+
request_connect_tmux.socket_input_path,
5156+
request_connect_tmux.socket_client_side_input_path,
5157+
request_connect_tmux.socket_text_output_path,
5158+
request_connect_tmux.socket_client_side_text_output_path,
5159+
request_connect_tmux.socket_ndjson_output_path,
5160+
request_connect_tmux.socket_client_side_ndjson_output_path,
5161+
)
50385162
return {
50395163
"connected": True,
50405164
}

github_webhook_events/agi_sshd.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func handleSSH(raw net.Conn, cfg *ssh.ServerConfig) {
135135
req.Reply(true, nil)
136136
go acceptLoop(ctx, listener, serverConn, p.SocketPath)
137137

138-
if !notified && count >= 2 {
138+
if !notified && count >= 4 {
139139
notified = true
140140
go notifyAGI(ctx, &mu, forwards)
141141
}
@@ -215,6 +215,12 @@ func notifyAGI(ctx context.Context, mu *sync.Mutex, forwards map[string]*forward
215215
if strings.HasSuffix(f.rawPath, "input.sock") {
216216
data["client-side-input.sock"] = f.rawPath
217217
}
218+
if strings.HasSuffix(f.rawPath, "text-output.sock") {
219+
data["client-side-text-output.sock"] = f.rawPath
220+
}
221+
if strings.HasSuffix(f.rawPath, "ndjson-output.sock") {
222+
data["client-side-ndjson-output.sock"] = f.rawPath
223+
}
218224
}
219225
mu.Unlock()
220226

@@ -235,5 +241,5 @@ func notifyAGI(ctx context.Context, mu *sync.Mutex, forwards map[string]*forward
235241
return
236242
}
237243
resp.Body.Close()
238-
log.Printf("✅ AGI POST success: %d forwards sent", len(data))
244+
log.Printf("✅ AGI POST success: %d forwards sent: %v", len(data), data)
239245
}

0 commit comments

Comments
 (0)