1
+ import asyncio
1
2
import httpx
2
3
from typing import AsyncIterator
3
4
8
9
def run_service (
9
10
client : Gitpod ,
10
11
environment_id : str ,
11
- metadata : service_create_params .Metadata ,
12
- spec : service_create_params .Spec
12
+ metadata : service_create_params .ServiceMetadataParam ,
13
+ spec : service_create_params .ServiceSpecParam
13
14
) -> AsyncIterator [str ]:
14
15
reference = metadata ["reference" ]
15
16
if not reference :
@@ -36,7 +37,7 @@ def run_service(
36
37
log_url = wait_for_service_log_url (client , environment_id , service_id )
37
38
return stream_logs (client , environment_id , log_url )
38
39
39
- async def run_command (client : Gitpod , environment_id : str , command : str ) -> AsyncIterator [str ]:
40
+ def run_command (client : Gitpod , environment_id : str , command : str ) -> AsyncIterator [str ]:
40
41
tasks = client .environments .automations .tasks .list (
41
42
filter = {
42
43
"references" : [TASK_REFERENCE ],
@@ -70,20 +71,6 @@ async def run_command(client: Gitpod, environment_id: str, command: str) -> Asyn
70
71
log_url = wait_for_task_log_url (client , environment_id , task_execution_id )
71
72
return stream_logs (client , environment_id , log_url )
72
73
73
- async def stream_logs (client : Gitpod , environment_id : str , log_url : str ) -> AsyncIterator [str ]:
74
- logs_access_token = client .environments .create_logs_token (environment_id = environment_id ).access_token
75
- async with httpx .AsyncClient () as http_client :
76
- async with http_client .stream ("GET" , log_url , headers = {"Authorization" : f"Bearer { logs_access_token } " }, timeout = None ) as response :
77
- buffer = ""
78
- async for chunk in response .aiter_text ():
79
- buffer += chunk
80
- while "\n " in buffer :
81
- line , buffer = buffer .split ("\n " , 1 )
82
- if line :
83
- yield line
84
- if buffer :
85
- yield buffer
86
-
87
74
def wait_for_task_log_url (client : Gitpod , environment_id : str , task_execution_id : str ) -> str :
88
75
def get_log_url ():
89
76
execution = client .environments .automations .tasks .executions .retrieve (id = task_execution_id ).task_execution
@@ -118,3 +105,35 @@ def wait_for_log_url(client: Gitpod, environment_id: str, resource_id: str, get_
118
105
return log_url
119
106
finally :
120
107
event_stream .http_response .close ()
108
+
109
+ async def stream_logs (client : Gitpod , environment_id : str , log_url : str ) -> AsyncIterator [str ]:
110
+ logs_access_token = client .environments .create_logs_token (environment_id = environment_id ).access_token
111
+ async with httpx .AsyncClient () as http_client :
112
+ retries = 3
113
+ while retries > 0 :
114
+ try :
115
+ async with http_client .stream ("GET" , log_url , headers = {"Authorization" : f"Bearer { logs_access_token } " }, timeout = None ) as response :
116
+ if response .status_code == 502 : # Bad Gateway
117
+ retries -= 1
118
+ if retries == 0 :
119
+ raise Exception ("Failed to stream logs after 3 retries" )
120
+ await asyncio .sleep (1 ) # Wait before retrying
121
+ continue
122
+
123
+ buffer = ""
124
+ async for chunk in response .aiter_text ():
125
+ buffer += chunk
126
+ while "\n " in buffer :
127
+ line , buffer = buffer .split ("\n " , 1 )
128
+ if line :
129
+ yield line
130
+ if buffer :
131
+ yield buffer
132
+ break # Success - exit retry loop
133
+
134
+ except httpx .HTTPError as e :
135
+ if retries > 0 and (isinstance (e , httpx .HTTPStatusError ) and e .response .status_code == 502 ):
136
+ retries -= 1
137
+ await asyncio .sleep (1 ) # Wait before retrying
138
+ continue
139
+ raise # Re-raise if not a 502 or out of retries
0 commit comments