6
6
import threading
7
7
import time
8
8
import traceback
9
+ from collections import deque
9
10
from datetime import datetime
10
11
from typing import Any , Dict , List , Optional , Union
11
12
36
37
37
38
complete_message = {"role" : "server" , "type" : "status" , "content" : "complete" }
38
39
40
+ os .environ ["INTERPRETER_REQUIRE_ACKNOWLEDGE" ] = "True"
41
+
39
42
40
43
class AsyncInterpreter (OpenInterpreter ):
41
44
def __init__ (self , * args , ** kwargs ):
@@ -44,6 +47,7 @@ def __init__(self, *args, **kwargs):
44
47
self .respond_thread = None
45
48
self .stop_event = threading .Event ()
46
49
self .output_queue = None
50
+ self .unsent_messages = deque ()
47
51
self .id = os .getenv ("INTERPRETER_ID" , datetime .now ().timestamp ())
48
52
self .print = True # Will print output
49
53
@@ -441,91 +445,101 @@ async def receive_input():
441
445
async def send_output ():
442
446
while True :
443
447
try :
444
- output = await async_interpreter .output ()
445
- # print("Attempting to send the following output:", output)
448
+ # First, try to send any unsent messages
449
+ while async_interpreter .unsent_messages :
450
+ output = async_interpreter .unsent_messages [0 ]
451
+ try :
452
+ await send_message (output )
453
+ async_interpreter .unsent_messages .popleft ()
454
+ except Exception :
455
+ # If we can't send, break and try again later
456
+ break
446
457
447
- id = shortuuid .uuid ()
458
+ # If we've sent all unsent messages, get a new output
459
+ if not async_interpreter .unsent_messages :
460
+ output = await async_interpreter .output ()
461
+ await send_message (output )
448
462
449
- for attempt in range (100 ):
450
- try :
451
- if isinstance (output , bytes ):
452
- await websocket .send_bytes (output )
453
- else :
454
- if async_interpreter .require_acknowledge :
455
- output ["id" ] = id
456
-
457
- await websocket .send_text (json .dumps (output ))
458
-
459
- if async_interpreter .require_acknowledge :
460
- acknowledged = False
461
- for _ in range (1000 ):
462
- # print(async_interpreter.acknowledged_outputs)
463
- if (
464
- id
465
- in async_interpreter .acknowledged_outputs
466
- ):
467
- async_interpreter .acknowledged_outputs .remove (
468
- id
469
- )
470
- acknowledged = True
471
- break
472
- await asyncio .sleep (0.0001 )
473
-
474
- if acknowledged :
475
- break
476
- else :
477
- raise Exception (
478
- "Acknowledgement not received."
479
- )
480
- else :
481
- break
482
-
483
- except Exception as e :
484
- print (
485
- "Failed to send output on attempt number:" ,
486
- attempt + 1 ,
487
- ". Output was:" ,
488
- output ,
489
- )
490
- print ("Error:" , str (e ))
491
- await asyncio .sleep (0.05 )
492
- else :
493
- raise Exception (
494
- "Failed to send after 100 attempts. Output was:" ,
495
- str (output ),
496
- )
497
463
except Exception as e :
498
464
error = traceback .format_exc () + "\n " + str (e )
499
465
error_message = {
500
466
"role" : "server" ,
501
467
"type" : "error" ,
502
- "content" : traceback . format_exc () + " \n " + str ( e ) ,
468
+ "content" : error ,
503
469
}
504
- await websocket . send_text ( json . dumps (error_message ) )
505
- await websocket . send_text ( json . dumps (complete_message ) )
506
- print ("\n \n --- SENT ERROR: ---\n \n " )
470
+ async_interpreter . unsent_messages . append (error_message )
471
+ async_interpreter . unsent_messages . append (complete_message )
472
+ print ("\n \n --- ERROR (will be sent when possible) : ---\n \n " )
507
473
print (error )
508
- print ("\n \n --- (ERROR ABOVE WAS SENT) ---\n \n " )
474
+ print (
475
+ "\n \n --- (ERROR ABOVE WILL BE SENT WHEN POSSIBLE) ---\n \n "
476
+ )
477
+
478
+ async def send_message (output ):
479
+ if isinstance (output , dict ) and "id" in output :
480
+ id = output ["id" ]
481
+ else :
482
+ id = shortuuid .uuid ()
483
+ if (
484
+ isinstance (output , dict )
485
+ and async_interpreter .require_acknowledge
486
+ ):
487
+ output ["id" ] = id
488
+
489
+ for attempt in range (100 ):
490
+ if websocket .client_state == 3 : # 3 represents 'CLOSED' state
491
+ break
492
+ try :
493
+ if isinstance (output , bytes ):
494
+ await websocket .send_bytes (output )
495
+ else :
496
+ if async_interpreter .require_acknowledge :
497
+ output ["id" ] = id
498
+ await websocket .send_text (json .dumps (output ))
499
+
500
+ if async_interpreter .require_acknowledge :
501
+ acknowledged = False
502
+ for _ in range (1000 ):
503
+ if id in async_interpreter .acknowledged_outputs :
504
+ async_interpreter .acknowledged_outputs .remove (id )
505
+ acknowledged = True
506
+ break
507
+ await asyncio .sleep (0.0001 )
508
+
509
+ if acknowledged :
510
+ return
511
+ else :
512
+ raise Exception ("Acknowledgement not received." )
513
+ else :
514
+ return
515
+
516
+ except Exception as e :
517
+ print (
518
+ f"Failed to send output on attempt number: { attempt + 1 } . Output was: { output } "
519
+ )
520
+ print (f"Error: { str (e )} " )
521
+ await asyncio .sleep (0.05 )
522
+
523
+ # If we've reached this point, we've failed to send after 100 attempts
524
+ async_interpreter .unsent_messages .append (output )
525
+ print (
526
+ f"Added message to unsent_messages queue after failed attempts: { output } "
527
+ )
509
528
510
529
await asyncio .gather (receive_input (), send_output ())
530
+
511
531
except Exception as e :
512
- try :
513
- error = traceback .format_exc () + "\n " + str (e )
514
- error_message = {
515
- "role" : "server" ,
516
- "type" : "error" ,
517
- "content" : traceback .format_exc () + "\n " + str (e ),
518
- }
519
- await websocket .send_text (json .dumps (error_message ))
520
- await websocket .send_text (json .dumps (complete_message ))
521
- print ("\n \n --- SENT ERROR: ---\n \n " )
522
- print (error )
523
- print ("\n \n --- (ERROR ABOVE WAS SENT) ---\n \n " )
524
- except :
525
- # If we can't send it, that's fine.
526
- pass
527
- finally :
528
- await websocket .close ()
532
+ error = traceback .format_exc () + "\n " + str (e )
533
+ error_message = {
534
+ "role" : "server" ,
535
+ "type" : "error" ,
536
+ "content" : error ,
537
+ }
538
+ async_interpreter .unsent_messages .append (error_message )
539
+ async_interpreter .unsent_messages .append (complete_message )
540
+ print ("\n \n --- ERROR (will be sent when possible): ---\n \n " )
541
+ print (error )
542
+ print ("\n \n --- (ERROR ABOVE WILL BE SENT WHEN POSSIBLE) ---\n \n " )
529
543
530
544
# TODO
531
545
@router .post ("/" )
0 commit comments