88import urllib .parse
99from datetime import datetime , timezone
1010
11- from websockets .asyncio .client import ClientConnection , connect
11+ from websockets .asyncio .client import connect
1212
1313from .models import GatewayAction , GatewayActionStatus , Relay
1414
@@ -80,9 +80,6 @@ def confirmed(self, msg: str):
8080
8181
8282class RelayService :
83- def __init__ (self ):
84- self ._connections : dict [str , object ] = {}
85-
8683 def send_action (self , relay : Relay , action : GatewayAction ):
8784 """
8885 Synchronous wrapper around async_send_action.
@@ -97,48 +94,34 @@ async def async_send_action(
9794 result = SendActionResult ()
9895
9996 try :
100- conn = await self ._get_connection (relay )
101- if not conn :
102- result .failed (f"No connection available for relay { relay .id } " )
103- return result
104-
105- await conn .send (json .dumps (action .payload ))
106- result .sent (f"Sent action { action .id } to relay { relay .id } " )
107-
108- response = await asyncio .wait_for (
109- conn .recv (), timeout = RECEIVE_TIMEOUT_SECONDS
110- )
111- response_data = json .loads (response )
112-
113- if response_data .get ("status" ) in ("created" , "processed" ):
114- result .confirmed (f"Action { action .id } confirmed by gateway" )
115- else :
116- result .failed (
117- f"Unexpected response status from gateway: { response_data } "
97+ relay_uri = RelayURI (relay )
98+ url = relay_uri .connection_url ()
99+ async with connect (
100+ url , compression = None , open_timeout = OPEN_CONNECTION_TIMEOUT_SECONDS
101+ ) as conn :
102+ await conn .send (json .dumps (action .payload ))
103+ result .sent (f"Sent action { action .id } to relay { relay .id } " )
104+
105+ response = await asyncio .wait_for (
106+ conn .recv (), timeout = RECEIVE_TIMEOUT_SECONDS
118107 )
108+ response_data = json .loads (response )
109+
110+ if response_data .get ("status" ) in ("created" , "processed" ):
111+ result .confirmed (f"Action { action .id } confirmed by gateway" )
112+ else :
113+ result .failed (
114+ f"Unexpected response status from gateway: { response_data } "
115+ )
119116
120117 except asyncio .TimeoutError :
121118 result .failed (f"Timeout waiting for response from gateway { relay .id } " )
122119
123120 except Exception as e :
124121 result .failed (f"Error sending action to gateway { relay .id } : { e } " )
125- self ._connections .pop (relay .id , None )
126122
127123 return result
128124
129- async def _get_connection (self , relay : Relay ) -> ClientConnection :
130- if relay .id not in self ._connections :
131- self ._connections [relay .id ] = await self ._create_connection (RelayURI (relay ))
132- return self ._connections [relay .id ]
133-
134- async def _create_connection (self , relay_uri : RelayURI ) -> ClientConnection :
135- url = relay_uri .connection_url ()
136- websocket = await connect (
137- url , compression = None , open_timeout = OPEN_CONNECTION_TIMEOUT_SECONDS
138- )
139- logger .info (f"Created relay connection for relay { relay_uri .relay .id } " )
140- return websocket
141-
142125 def update_gateway_action (self , action : GatewayAction , result : SendActionResult ):
143126 """Update the GatewayAction based on the SendActionResult."""
144127 action .status = result .status
0 commit comments