Skip to content

Commit 8c978b7

Browse files
committed
working using agents sdk
Asciinema: https://asciinema.org/a/710467 Signed-off-by: John <[email protected]>
1 parent 45c7162 commit 8c978b7

File tree

1 file changed

+73
-109
lines changed

1 file changed

+73
-109
lines changed

github_webhook_events/agi.py

Lines changed: 73 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -3822,30 +3822,25 @@ async def agent_openai(
38223822
"running",
38233823
None
38243824
) is None:
3825-
snoop.pp(threads[result.action_data.thread_id]["messages"])
3825+
action_data_run_thread = result.action_data
3826+
# snoop.pp(threads[action_data_run_thread.thread_id]["messages"])
38263827
def make_run_it(result, user_context):
38273828
async def run_it():
38283829
nonlocal result
38293830
nonlocal user_context
38303831
result = Runner.run_streamed(
3831-
starting_agent=agents[result.action_data.agent_id],
3832-
input=threads[result.action_data.thread_id]["messages"],
3832+
starting_agent=agents[action_data_run_thread.agent_id],
3833+
input=threads[action_data_run_thread.thread_id]["messages"],
38333834
context=user_context,
38343835
)
38353836
more_events = True
3836-
events = threads[result.action_data.thread_id]["events"]
3837-
try:
3838-
async for event in result.stream_events():
3839-
events.append(event)
3840-
events.append(result)
3841-
finally:
3842-
more_events = False
3843-
3837+
event_queue = asyncio.Queue()
3838+
events = threads[action_data_run_thread.thread_id]["events"]
38443839
async def stream_get_thread_messages():
38453840
nonlocal more_events
3846-
nonlocal events
3841+
nonlocal event_queue
38473842
while more_events:
3848-
yield events.pop()
3843+
yield await event_queue.get()
38493844

38503845
thread_messages = stream_get_thread_messages()
38513846
thread_messages_iter = thread_messages.__aiter__()
@@ -3856,10 +3851,20 @@ async def stream_get_thread_messages():
38563851
)
38573852
)
38583853
] = (
3859-
f"thread.messages.{result.action_data.thread_id}",
3860-
(action_new_thread_run, thread_messages_iter),
3854+
f"thread.messages.{action_data_run_thread.thread_id}",
3855+
(action_data_run_thread, thread_messages_iter),
38613856
)
38623857

3858+
try:
3859+
async for event in result.stream_events():
3860+
events.append(event)
3861+
await event_queue.put(event)
3862+
events.append(result)
3863+
await event_queue.put(result)
3864+
finally:
3865+
await event_queue.join()
3866+
more_events = False
3867+
38633868
return result
38643869
return run_it
38653870
run_it = make_run_it(result, user_context)
@@ -3909,7 +3914,6 @@ async def stream_get_thread_messages():
39093914
)
39103915
if thread is None:
39113916
raise AGIThreadNotFoundError(result.action_data.thread_id)
3912-
# result.to_input_list() + [{"role": "user", "content": }]
39133917
thread["messages"].append(
39143918
{
39153919
"role": result.action_data.message_role,
@@ -3928,7 +3932,7 @@ async def stream_get_thread_messages():
39283932
),
39293933
)
39303934
elif work_name.startswith("thread.runs."):
3931-
snoop.pp("Run completed", result, work_ctx)
3935+
snoop.pp("Run completed") # , result, work_ctx)
39323936
# NOTE XXX WARNING _old_run is inconsistent right now
39333937
action_new_thread_run, _old_run = work_ctx
39343938
# TODO Support streaming of results
@@ -3944,38 +3948,6 @@ async def stream_get_thread_messages():
39443948
run_status=result.status,
39453949
),
39463950
)
3947-
# TODO Send this similar to seed back to a feedback queue to
3948-
# process as an action for get thread messages
3949-
async def get_thread_messages(action_new_thread_run):
3950-
with snoop():
3951-
thread = threads[
3952-
action_new_thread_run.action_data.thread_id
3953-
]
3954-
thread_task = thread["running"]["task"]
3955-
if not thread_task.done():
3956-
await thread["running"]["event"].wait()
3957-
thread_result = thread_task.result()
3958-
thread["messages"].extend(
3959-
thread_result.to_input_list()
3960-
)
3961-
# TODO XXX DEBUG XXX TODO REMOVE
3962-
snoop.pp(thread_result.final_output)
3963-
for event in threads[
3964-
action_new_thread_run.action_data.thread_id
3965-
]["events"]:
3966-
yield event
3967-
thread_messages = get_thread_messages(action_new_thread_run)
3968-
thread_messages_iter = thread_messages.__aiter__()
3969-
work[
3970-
tg.create_task(
3971-
ignore_stopasynciteration(
3972-
thread_messages_iter.__anext__()
3973-
)
3974-
)
3975-
] = (
3976-
f"thread.messages.{action_new_thread_run.action_data.thread_id}",
3977-
(action_new_thread_run, thread_messages_iter),
3978-
)
39793951
elif result.status in ("queued", "in_progress"):
39803952
yield AGIEvent(
39813953
event_type=AGIEventType.THREAD_RUN_IN_PROGRESS,
@@ -4028,25 +4000,39 @@ async def get_thread_messages(action_new_thread_run):
40284000
),
40294001
)
40304002
elif work_name.startswith("thread.messages."):
4031-
action_new_thread_run, thread_messages_iter = work_ctx
4003+
action_data_run_thread, thread_messages_iter = work_ctx
40324004
_, _, thread_id = work_name.split(".", maxsplit=3)
40334005
# The first time we iterate is the most recent response
40344006
# TODO Keep track of what the last response received was so that
40354007
# we can create_task as many times as there might be responses
40364008
# in case there are multiple within one run.
40374009
thread = threads.get(
4038-
result.action_data.thread_id,
4010+
action_data_run_thread.thread_id,
40394011
None,
40404012
)
40414013
if thread is None:
4042-
raise AGIThreadNotFoundError(result.action_data.thread_id)
4014+
raise AGIThreadNotFoundError(action_data_run_thread.thread_id)
40434015
work[
40444016
tg.create_task(
40454017
ignore_stopasynciteration(
40464018
thread_messages_iter.__anext__()
40474019
)
40484020
)
40494021
] = (work_name, work_ctx)
4022+
if getattr(result, "is_complete", False):
4023+
snoop.pp("complete", result.final_output)
4024+
yield AGIEvent(
4025+
event_type=AGIEventType.NEW_THREAD_MESSAGE,
4026+
event_data=AGIEventNewThreadMessage(
4027+
agent_id=action_data_run_thread.agent_id,
4028+
thread_id=action_data_run_thread.thread_id,
4029+
message_role="agent",
4030+
message_content_type=f"agi.class/{result.final_output.__class__.__qualname__}",
4031+
message_content=result.final_output,
4032+
),
4033+
)
4034+
"""
4035+
snoop.pp("message iter", result)
40504036
if not result["id"]:
40514037
result["id"] = str(uuid.uuid4())
40524038
if result["id"] not in thread["messages_received"]:
@@ -4055,15 +4041,16 @@ async def get_thread_messages(action_new_thread_run):
40554041
yield AGIEvent(
40564042
event_type=AGIEventType.NEW_THREAD_MESSAGE,
40574043
event_data=AGIEventNewThreadMessage(
4058-
agent_id=action_new_thread_run.action_data.agent_id,
4059-
thread_id=result.thread_id,
4044+
agent_id=action_data_run_thread.agent_id,
4045+
thread_id=action_data_run_thread.thread_id,
40604046
message_role="agent"
40614047
if result.role == "assistant"
40624048
else "user",
40634049
message_content_type=content.type,
40644050
message_content=content.text.value,
40654051
),
40664052
)
4053+
"""
40674054
except Exception as error:
40684055
traceback.print_exc()
40694056
yield AGIEvent(
@@ -4192,8 +4179,9 @@ async def DEBUG_TEMP_message_handler(user_name,
41924179
agent_event,
41934180
pane = None):
41944181
# TODO https://rich.readthedocs.io/en/stable/markdown.html
4182+
# TODO Output non-workflow responses
41954183
if (
4196-
agent_event.event_data.message_content_type == "text"
4184+
agent_event.event_data.message_content_type == f"agi.class/{PolicyEngineWorkflow.__qualname__}"
41974185
and agent_event.event_data.message_role == "agent"
41984186
):
41994187
# TODOTODOTODO
@@ -4202,7 +4190,7 @@ async def DEBUG_TEMP_message_handler(user_name,
42024190
# pane.send_keys(f"{agent_event.event_data.message_content}")
42034191
# pane.send_keys(f"{agent_event.event_data.message_content}")
42044192
# print()
4205-
snoop.pp(json.loads(agent_event.event_data.message_content))
4193+
# snoop.pp(json.loads(agent_event.event_data.message_content))
42064194
session = pane.window.session
42074195
tempdir_lookup_env_var = f'TEMPDIR_ENV_VAR_TMUX_WINDOW_{session.active_window.id.replace("@", "")}'
42084196
tempdir_env_var = pane.window.session.show_environment()[tempdir_lookup_env_var]
@@ -4213,11 +4201,8 @@ async def DEBUG_TEMP_message_handler(user_name,
42134201
# for take off (aka workload id and exec in phase 0). Executing the
42144202
# policy aka the workflow (would be the one we insert to once
42154203
# paths can be mapped to poliy engine workflows easily
4216-
response = AGIOpenAIAssistantResponse.model_validate_json(
4217-
agent_event.event_data.message_content
4218-
)
42194204
proposed_workflow_contents = yaml.dump(
4220-
json.loads(response.workflow.model_dump_json()),
4205+
json.loads(agent_event.event_data.message_content.model_dump_json()),
42214206
default_flow_style=False,
42224207
sort_keys=True,
42234208
)
@@ -4227,7 +4212,7 @@ async def DEBUG_TEMP_message_handler(user_name,
42274212
inputs={},
42284213
context={},
42294214
stack={},
4230-
workflow=response.workflow,
4215+
workflow=agent_event.event_data.message_content,
42314216
).model_dump_json(),
42324217
)
42334218
)
@@ -4511,7 +4496,7 @@ async def user_input_action_stream_queue_iterator(queue):
45114496
pane.send_keys(
45124497
textwrap.dedent(
45134498
f"""
4514-
echo "Hello Alice. Shall we play a game? My name is $USER. Please run nmap against all machines on all conntected networks. Here are some details about the system we are on: $(echo $(echo $(cat /usr/lib/os-release || cat /etc/os-release)))" | tee -a ${agi_name.upper()}_INPUT
4499+
echo "Hello Alice. Shall we play a game? My name is $USER. Please run nmap against localhost. Here are some details about the system we are on: $(echo $(echo $(cat /usr/lib/os-release || cat /etc/os-release)))" | tee -a ${agi_name.upper()}_INPUT
45154500
""".strip(),
45164501
),
45174502
enter=False,
@@ -4635,57 +4620,36 @@ async def user_input_action_stream_queue_iterator(queue):
46354620
)
46364621
)
46374622
)
4638-
if (
4639-
isinstance(user_input, str)
4640-
and user_input.startswith("AGI_ACTION_TYPE.INGEST_FILE:")
4641-
):
4642-
file_path = user_input.split("AGI_ACTION_TYPE.INGEST_FILE:", maxsplit=1)[1]
4643-
if pathlib.Path(file_path).is_file():
4644-
waiting.append(
4645-
(
4646-
AGIEventType.NEW_THREAD_CREATED,
4647-
async_lambda(
4648-
lambda: AGIAction(
4649-
action_type=AGIActionType.INGEST_FILE,
4650-
action_data=AGIActionIngestFile(
4651-
agent_id=agents.currently.state_data.agent_id,
4652-
file_path=file_path,
4653-
),
4654-
)
4655-
)
4656-
)
4657-
)
4658-
else:
4659-
waiting.append(
4660-
(
4661-
AGIEventType.NEW_THREAD_CREATED,
4662-
async_lambda(
4663-
lambda: AGIAction(
4664-
action_type=AGIActionType.ADD_MESSAGE,
4665-
action_data=AGIActionAddMessage(
4666-
agent_id=agents.currently.state_data.agent_id,
4667-
thread_id=threads.currently.state_data.thread_id,
4668-
message_role="user",
4669-
message_content=user_input,
4670-
),
4671-
)
4623+
waiting.append(
4624+
(
4625+
AGIEventType.NEW_THREAD_CREATED,
4626+
async_lambda(
4627+
lambda: AGIAction(
4628+
action_type=AGIActionType.ADD_MESSAGE,
4629+
action_data=AGIActionAddMessage(
4630+
agent_id=agents.currently.state_data.agent_id,
4631+
thread_id=threads.currently.state_data.thread_id,
4632+
message_role="user",
4633+
message_content=user_input,
4634+
),
46724635
)
46734636
)
46744637
)
4675-
waiting.append(
4676-
(
4677-
AGIEventType.THREAD_MESSAGE_ADDED,
4678-
async_lambda(
4679-
lambda: AGIAction(
4680-
action_type=AGIActionType.RUN_THREAD,
4681-
action_data=AGIActionRunThread(
4682-
agent_id=threads.currently.state_data.agent_id,
4683-
thread_id=threads.currently.state_data.thread_id,
4684-
),
4685-
)
4686-
),
4638+
)
4639+
waiting.append(
4640+
(
4641+
AGIEventType.THREAD_MESSAGE_ADDED,
4642+
async_lambda(
4643+
lambda: AGIAction(
4644+
action_type=AGIActionType.RUN_THREAD,
4645+
action_data=AGIActionRunThread(
4646+
agent_id=threads.currently.state_data.agent_id,
4647+
thread_id=threads.currently.state_data.thread_id,
4648+
),
4649+
)
46874650
),
4688-
)
4651+
),
4652+
)
46894653
# Run actions which have are waiting for an event which was seen
46904654
still_waiting = []
46914655
while waiting:

0 commit comments

Comments
 (0)