33from pathlib import Path
44import json
55from datetime import datetime
6+ import time
7+ import threading
8+ import signal
9+ from typing import Dict , Any , Tuple
610
711parent_dir = str (Path (__file__ ).parent .parent )
812sys .path .append (parent_dir )
913
1014from game_sdk .game .agent import Agent , WorkerConfig
11- from game_sdk .game .custom_types import FunctionResult
15+ from game_sdk .game .custom_types import FunctionResult , FunctionResultStatus , Function , Argument
1216from dpsn_plugin_gamesdk .dpsn_plugin import plugin
1317
14- # --- Add Message Handler ---
18+ # Global message counter and timestamp
19+ message_count = 0
20+ start_time = None
21+ collected_messages = []
22+ task_completed = False # Flag to track if the main task has been completed
23+
24+ # Update message handler to track count and time
1525def handle_incoming_message (message_data : dict ):
1626 """Callback function to process messages received via the plugin."""
17- try :
18- topic = message_data .get ('topic' , 'N/A' )
19- payload = message_data .get ('payload' , '{}' )
20- timestamp = datetime .now ().strftime ('%Y-%m-%d %H:%M:%S' )
21- print (f"\n --- Message Received ({ timestamp } ) ---" )
22- print (f"Topic: { topic } " )
23- # Pretty print payload if it's likely JSON/dict
24- if isinstance (payload , (dict , list )):
25- print (f"Payload:\n { json .dumps (payload , indent = 2 )} " )
26- return json .dumps (payload )
27- else :
28- print (f"Payload: { payload } " )
29- return str (payload )
30- except Exception as e :
31- print (f"Error in message handler: { e } " )
32- return str (e )
27+ global message_count , start_time , collected_messages
28+
29+ # Don't process messages if task is already completed
30+ if task_completed :
31+ return "Task already completed"
32+
33+ # Initialize start time on first message
34+ if start_time is None :
35+ start_time = time .time ()
36+
37+ message_count += 1
38+ collected_messages .append (message_data )
39+
40+ topic = message_data .get ('topic' , 'N/A' )
41+ payload = message_data .get ('payload' , '{}' )
42+ timestamp = datetime .now ().strftime ('%Y-%m-%d %H:%M:%S' )
43+
44+ print (f"\n --- Message Received ({ timestamp } ) ---" )
45+ print (f"Topic: { topic } " )
46+ print (f"Message count: { message_count } " )
47+ print (f"Time elapsed: { time .time () - start_time :.1f} seconds" )
48+
49+ # Pretty print payload
50+ if isinstance (payload , (dict , list )):
51+ print (f"Payload:\n { json .dumps (payload , indent = 2 )} " )
52+ return json .dumps (payload )
53+ else :
54+ print (f"Payload: { payload } " )
55+ return str (payload )
56+
57+ # Add a new function for the agent to check if collection is complete
58+ def check_collection_status () -> tuple [FunctionResultStatus , str , dict ]:
59+ """Check if we have collected enough data (2 minutes or 3+ messages)"""
60+ global message_count , start_time , collected_messages
61+
62+ elapsed = 0
63+ if start_time :
64+ elapsed = time .time () - start_time
65+
66+ # Check conditions
67+ time_condition = elapsed >= 120 # 2 minutes
68+ count_condition = message_count >= 3
69+
70+ if time_condition or count_condition :
71+ reason = "time limit reached" if time_condition else "message count reached"
72+ summary = {
73+ "messages_received" : message_count ,
74+ "time_elapsed_seconds" : elapsed ,
75+ "collection_complete" : True ,
76+ "reason" : reason ,
77+ "sample_messages" : collected_messages [:3 ] # First 3 messages as samples
78+ }
79+ return (FunctionResultStatus .DONE , f"Data collection complete: { reason } " , summary )
80+ else :
81+ summary = {
82+ "messages_received" : message_count ,
83+ "time_elapsed_seconds" : elapsed ,
84+ "collection_complete" : False ,
85+ "remaining_time" : 120 - elapsed ,
86+ "remaining_messages" : max (0 , 3 - message_count )
87+ }
88+ return (FunctionResultStatus .DONE , "Still collecting data..." , summary )
89+
90+ # Function to mark a task completed after dpsn shutdown
91+ def mark_complete_after_shutdown () -> tuple [FunctionResultStatus , str , dict ]:
92+ """Mark the task as complete after DPSN client shutdown"""
93+ global task_completed
94+
95+ # Only allow this to be called if DPSN was already shut down
96+ if task_completed :
97+ return (FunctionResultStatus .DONE , "Task already marked as complete." , {"status" : "already_completed" })
98+
99+ # Explain what's happening
100+ print ("\n === FINALIZING TASK ===" )
101+ print ("1. DPSN Client has been shut down" )
102+ print ("2. Marking task as complete" )
103+ print ("3. Program will exit shortly" )
104+ task_completed = True
105+
106+ # Schedule a delayed exit to allow time for the agent to report completion
107+ def exit_program ():
108+ print ("\n === TASK COMPLETE ===" )
109+ print ("Exiting program as all tasks are complete..." )
110+ os ._exit (0 ) # Force exit the program
111+
112+ # Schedule exit after 5 seconds
113+ timer = threading .Timer (5.0 , exit_program )
114+ timer .daemon = True
115+ timer .start ()
116+
117+ return (FunctionResultStatus .DONE , "Task complete! Agent is now finished." , {"status" : "success" })
118+
119+ # Create a function objects for the functions
120+ check_status_function = Function (
121+ fn_name = "check_collection_status" ,
122+ fn_description = "Check if enough data has been collected (2 minutes or 3+ messages)" ,
123+ args = [],
124+ hint = "Use this to check if it's time to unsubscribe (returns collection status)" ,
125+ executable = check_collection_status
126+ )
127+
128+ complete_task_function = Function (
129+ fn_name = "mark_task_complete" ,
130+ fn_description = "Mark the agent task as complete and exit the program" ,
131+ args = [],
132+ hint = "Use this as the VERY LAST step after unsubscribing and shutting down DPSN" ,
133+ executable = mark_complete_after_shutdown
134+ )
33135
34136# Set the callback in the plugin instance *before* running the agent
35137plugin .set_message_callback (handle_incoming_message )
@@ -40,11 +142,32 @@ def get_agent_state_fn(function_result: FunctionResult, current_state: dict) ->
40142 init_state = {}
41143
42144 if current_state is None :
43- return init_state
145+ current_state = init_state # Initialize if None
146+
147+ # Check if function_result is None (initial call)
148+ if function_result is None :
149+ return current_state # Return current (likely initial) state
44150
45151 if function_result .info is not None :
46152 current_state .update (function_result .info )
47153
154+ # Check if we have completion info
155+ if function_result .info and 'status' in function_result .info and function_result .info ['status' ] == 'success' :
156+ current_state ['task_completed' ] = True
157+ # If we're marking task as complete, set state to indicate we're done
158+ print ("Agent state updated: Task marked as complete." )
159+
160+ # Add a delay if we just checked status and collection is NOT complete
161+ # Check if the 'collection_complete' key exists in the info dict
162+ # to infer that check_collection_status was likely the last function run.
163+ if function_result .info and \
164+ "collection_complete" in function_result .info and \
165+ not function_result .info .get ("collection_complete" , True ):
166+
167+ wait_time = 5 # Wait for 5 seconds before next check
168+ print (f"Collection not complete. Waiting { wait_time } seconds before next action..." )
169+ time .sleep (wait_time )
170+
48171 return current_state
49172
50173def get_worker_state (function_result : FunctionResult , current_state : dict ) -> dict :
@@ -59,23 +182,24 @@ def get_worker_state(function_result: FunctionResult, current_state: dict) -> di
59182
60183 return current_state
61184
62-
63185subscription_worker = WorkerConfig (
64186 id = "subscription_worker" ,
65187 worker_description = "Worker specialized in managing DPSN topic subscriptions, unsubscriptions, message handling, and shutdown." ,
66188 get_state_fn = get_worker_state ,
67189 action_space = [
68190 plugin .get_function ("subscribe" ),
69191 plugin .get_function ("unsubscribe" ),
70- plugin .get_function ("shutdown" )
192+ plugin .get_function ("shutdown" ),
193+ check_status_function ,
194+ complete_task_function # Add the task completion function
71195 ],
72196)
73197
74198# Initialize the agent
75199agent = Agent (
76200 api_key = os .environ .get ("GAME_API_KEY" ),
77201 name = "DPSN Market Data Agent" ,
78- agent_goal = "Monitor SOLUSDT market data from DPSN, process messages for 2 minutes, then clean up." ,
202+ agent_goal = "Monitor SOLUSDT market data from DPSN, process messages for 2 minutes, then clean up and exit ." ,
79203 agent_description = (
80204 "You are an AI agent specialized in DPSN market data processing. Follow these steps in order:\n \n "
81205
@@ -86,18 +210,26 @@ def get_worker_state(function_result: FunctionResult, current_state: dict) -> di
86210 "3. When you receive messages on this topic:\n "
87211 " - Report that you received a message\n "
88212
89- "4. After receiving messages for 2 minutes (or at least 3 messages, whichever comes first) :\n "
90- " - Unsubscribe from the topic using the unsubscribe function \n "
91- " - Report that you have unsubscribed \n "
213+ "4. After subscribing, periodically use the check_collection_status function to check if :\n "
214+ " - 2 minutes have passed, OR \n "
215+ " - At least 3 messages have been received \n "
92216
93- "5. After unsubscribing:\n "
217+ "5. Only when check_collection_status indicates collection is complete:\n "
218+ " - Unsubscribe from the topic using the unsubscribe function\n "
94219 " - Shut down the DPSN client connection using the shutdown function\n "
95- " - Report that the connection has been closed\n "
220+
221+ "6. FINAL STEP: After shutting down the DPSN connection, call the mark_task_complete function.\n "
222+ " This will finish your task and automatically exit the program after 5 seconds.\n "
223+ " This must be the VERY LAST function you call.\n "
224+
225+ "IMPORTANT: The program will exit automatically after you mark the task complete.\n "
96226
97227 "Available functions:\n "
98228 "- subscribe: Subscribe to a DPSN topic to receive data\n "
99229 "- unsubscribe: Unsubscribe from a DPSN topic\n "
100- "- shutdown: Close the DPSN client connection\n \n "
230+ "- shutdown: Close the DPSN client connection\n "
231+ "- check_collection_status: Check if collection is complete\n "
232+ "- mark_task_complete: Mark the agent's task as complete and exit the program\n \n "
101233
102234 "Available topic:\n "
103235 "- 0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ohlc"
@@ -109,6 +241,13 @@ def get_worker_state(function_result: FunctionResult, current_state: dict) -> di
109241)
110242
111243try :
244+ print ("\n === DPSN AGENT STARTING ===" )
245+ print ("This agent will:" )
246+ print ("1. Subscribe to the SOLUSDT/ohlc topic" )
247+ print ("2. Collect messages for 2 minutes or at least 3 messages" )
248+ print ("3. Unsubscribe and clean up" )
249+ print ("4. Exit automatically\n " )
250+
112251 agent .compile ()
113252 agent .run ()
114253except Exception as e :
0 commit comments