|
6 | 6 | import threading
|
7 | 7 | import time
|
8 | 8 | import traceback
|
| 9 | +from collections import deque |
9 | 10 | from datetime import datetime
|
10 | 11 | from typing import Any, Dict, List, Optional, Union
|
11 | 12 |
|
@@ -44,6 +45,7 @@ def __init__(self, *args, **kwargs):
|
44 | 45 | self.respond_thread = None
|
45 | 46 | self.stop_event = threading.Event()
|
46 | 47 | self.output_queue = None
|
| 48 | + self.unsent_messages = deque() |
47 | 49 | self.id = os.getenv("INTERPRETER_ID", datetime.now().timestamp())
|
48 | 50 | self.print = True # Will print output
|
49 | 51 |
|
@@ -441,91 +443,101 @@ async def receive_input():
|
441 | 443 | async def send_output():
|
442 | 444 | while True:
|
443 | 445 | try:
|
444 |
| - output = await async_interpreter.output() |
445 |
| - # print("Attempting to send the following output:", output) |
| 446 | + # First, try to send any unsent messages |
| 447 | + while async_interpreter.unsent_messages: |
| 448 | + output = async_interpreter.unsent_messages[0] |
| 449 | + try: |
| 450 | + await send_message(output) |
| 451 | + async_interpreter.unsent_messages.popleft() |
| 452 | + except Exception: |
| 453 | + # If we can't send, break and try again later |
| 454 | + break |
446 | 455 |
|
447 |
| - id = shortuuid.uuid() |
| 456 | + # If we've sent all unsent messages, get a new output |
| 457 | + if not async_interpreter.unsent_messages: |
| 458 | + output = await async_interpreter.output() |
| 459 | + await send_message(output) |
448 | 460 |
|
449 |
| - for attempt in range(100): |
450 |
| - try: |
451 |
| - if isinstance(output, bytes): |
452 |
| - await websocket.send_bytes(output) |
453 |
| - else: |
454 |
| - if async_interpreter.require_acknowledge: |
455 |
| - output["id"] = id |
456 |
| - |
457 |
| - await websocket.send_text(json.dumps(output)) |
458 |
| - |
459 |
| - if async_interpreter.require_acknowledge: |
460 |
| - acknowledged = False |
461 |
| - for _ in range(1000): |
462 |
| - # print(async_interpreter.acknowledged_outputs) |
463 |
| - if ( |
464 |
| - id |
465 |
| - in async_interpreter.acknowledged_outputs |
466 |
| - ): |
467 |
| - async_interpreter.acknowledged_outputs.remove( |
468 |
| - id |
469 |
| - ) |
470 |
| - acknowledged = True |
471 |
| - break |
472 |
| - await asyncio.sleep(0.0001) |
473 |
| - |
474 |
| - if acknowledged: |
475 |
| - break |
476 |
| - else: |
477 |
| - raise Exception( |
478 |
| - "Acknowledgement not received." |
479 |
| - ) |
480 |
| - else: |
481 |
| - break |
482 |
| - |
483 |
| - except Exception as e: |
484 |
| - print( |
485 |
| - "Failed to send output on attempt number:", |
486 |
| - attempt + 1, |
487 |
| - ". Output was:", |
488 |
| - output, |
489 |
| - ) |
490 |
| - print("Error:", str(e)) |
491 |
| - await asyncio.sleep(0.05) |
492 |
| - else: |
493 |
| - raise Exception( |
494 |
| - "Failed to send after 100 attempts. Output was:", |
495 |
| - str(output), |
496 |
| - ) |
497 | 461 | except Exception as e:
|
498 | 462 | error = traceback.format_exc() + "\n" + str(e)
|
499 | 463 | error_message = {
|
500 | 464 | "role": "server",
|
501 | 465 | "type": "error",
|
502 |
| - "content": traceback.format_exc() + "\n" + str(e), |
| 466 | + "content": error, |
503 | 467 | }
|
504 |
| - await websocket.send_text(json.dumps(error_message)) |
505 |
| - await websocket.send_text(json.dumps(complete_message)) |
506 |
| - print("\n\n--- SENT ERROR: ---\n\n") |
| 468 | + async_interpreter.unsent_messages.append(error_message) |
| 469 | + async_interpreter.unsent_messages.append(complete_message) |
| 470 | + print("\n\n--- ERROR (will be sent when possible): ---\n\n") |
507 | 471 | print(error)
|
508 |
| - print("\n\n--- (ERROR ABOVE WAS SENT) ---\n\n") |
| 472 | + print( |
| 473 | + "\n\n--- (ERROR ABOVE WILL BE SENT WHEN POSSIBLE) ---\n\n" |
| 474 | + ) |
| 475 | + |
| 476 | + async def send_message(output): |
| 477 | + if isinstance(output, dict) and "id" in output: |
| 478 | + id = output["id"] |
| 479 | + else: |
| 480 | + id = shortuuid.uuid() |
| 481 | + if ( |
| 482 | + isinstance(output, dict) |
| 483 | + and async_interpreter.require_acknowledge |
| 484 | + ): |
| 485 | + output["id"] = id |
| 486 | + |
| 487 | + for attempt in range(100): |
| 488 | + if websocket.client_state == 3: # 3 represents 'CLOSED' state |
| 489 | + break |
| 490 | + try: |
| 491 | + if isinstance(output, bytes): |
| 492 | + await websocket.send_bytes(output) |
| 493 | + else: |
| 494 | + if async_interpreter.require_acknowledge: |
| 495 | + output["id"] = id |
| 496 | + await websocket.send_text(json.dumps(output)) |
| 497 | + |
| 498 | + if async_interpreter.require_acknowledge: |
| 499 | + acknowledged = False |
| 500 | + for _ in range(1000): |
| 501 | + if id in async_interpreter.acknowledged_outputs: |
| 502 | + async_interpreter.acknowledged_outputs.remove(id) |
| 503 | + acknowledged = True |
| 504 | + break |
| 505 | + await asyncio.sleep(0.0001) |
| 506 | + |
| 507 | + if acknowledged: |
| 508 | + return |
| 509 | + else: |
| 510 | + raise Exception("Acknowledgement not received.") |
| 511 | + else: |
| 512 | + return |
| 513 | + |
| 514 | + except Exception as e: |
| 515 | + print( |
| 516 | + f"Failed to send output on attempt number: {attempt + 1}. Output was: {output}" |
| 517 | + ) |
| 518 | + print(f"Error: {str(e)}") |
| 519 | + await asyncio.sleep(0.05) |
| 520 | + |
| 521 | + # If we've reached this point, we've failed to send after 100 attempts |
| 522 | + async_interpreter.unsent_messages.append(output) |
| 523 | + print( |
| 524 | + f"Added message to unsent_messages queue after failed attempts: {output}" |
| 525 | + ) |
509 | 526 |
|
510 | 527 | await asyncio.gather(receive_input(), send_output())
|
| 528 | + |
511 | 529 | except Exception as e:
|
512 |
| - try: |
513 |
| - error = traceback.format_exc() + "\n" + str(e) |
514 |
| - error_message = { |
515 |
| - "role": "server", |
516 |
| - "type": "error", |
517 |
| - "content": traceback.format_exc() + "\n" + str(e), |
518 |
| - } |
519 |
| - await websocket.send_text(json.dumps(error_message)) |
520 |
| - await websocket.send_text(json.dumps(complete_message)) |
521 |
| - print("\n\n--- SENT ERROR: ---\n\n") |
522 |
| - print(error) |
523 |
| - print("\n\n--- (ERROR ABOVE WAS SENT) ---\n\n") |
524 |
| - except: |
525 |
| - # If we can't send it, that's fine. |
526 |
| - pass |
527 |
| - finally: |
528 |
| - await websocket.close() |
| 530 | + error = traceback.format_exc() + "\n" + str(e) |
| 531 | + error_message = { |
| 532 | + "role": "server", |
| 533 | + "type": "error", |
| 534 | + "content": error, |
| 535 | + } |
| 536 | + async_interpreter.unsent_messages.append(error_message) |
| 537 | + async_interpreter.unsent_messages.append(complete_message) |
| 538 | + print("\n\n--- ERROR (will be sent when possible): ---\n\n") |
| 539 | + print(error) |
| 540 | + print("\n\n--- (ERROR ABOVE WILL BE SENT WHEN POSSIBLE) ---\n\n") |
529 | 541 |
|
530 | 542 | # TODO
|
531 | 543 | @router.post("/")
|
@@ -767,18 +779,20 @@ def run(self, host=None, port=None, retries=5):
|
767 | 779 | else:
|
768 | 780 | print(f"Server will run at http://{self.host}:{self.port}")
|
769 | 781 |
|
770 |
| - for _ in range(retries): |
771 |
| - try: |
772 |
| - self.uvicorn_server.run() |
773 |
| - break |
774 |
| - except KeyboardInterrupt: |
775 |
| - break |
776 |
| - except ImportError as e: |
777 |
| - if _ == 4: # If this is the last attempt |
778 |
| - raise ImportError( |
779 |
| - str(e) |
780 |
| - + """\n\nPlease ensure you have run `pip install "open-interpreter[server]"` to install server dependencies.""" |
781 |
| - ) |
782 |
| - except: |
783 |
| - print("An unexpected error occurred:", traceback.format_exc()) |
784 |
| - print("Server restarting.") |
| 782 | + self.uvicorn_server.run() |
| 783 | + |
| 784 | + # for _ in range(retries): |
| 785 | + # try: |
| 786 | + # self.uvicorn_server.run() |
| 787 | + # break |
| 788 | + # except KeyboardInterrupt: |
| 789 | + # break |
| 790 | + # except ImportError as e: |
| 791 | + # if _ == 4: # If this is the last attempt |
| 792 | + # raise ImportError( |
| 793 | + # str(e) |
| 794 | + # + """\n\nPlease ensure you have run `pip install "open-interpreter[server]"` to install server dependencies.""" |
| 795 | + # ) |
| 796 | + # except: |
| 797 | + # print("An unexpected error occurred:", traceback.format_exc()) |
| 798 | + # print("Server restarting.") |
0 commit comments