1+ # Copyright 2026 The Kubernetes Authors.
2+ #
3+ # Licensed under the Apache License, Version 2.0 (the "License");
4+ # you may not use this file except in compliance with the License.
5+ # You may obtain a copy of the License at
6+ #
7+ # http://www.apache.org/licenses/LICENSE-2.0
8+ #
9+ # Unless required by applicable law or agreed to in writing, software
10+ # distributed under the License is distributed on an "AS IS" BASIS,
11+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+ # See the License for the specific language governing permissions and
13+ # limitations under the License.
14+
15+ import logging
16+ import socket
17+ import subprocess
18+ import time
19+ import requests
20+ from abc import ABC , abstractmethod
21+ from requests .adapters import HTTPAdapter
22+ from urllib3 .util .retry import Retry
23+ from .models import (
24+ SandboxConnectionConfig ,
25+ SandboxDirectConnectionConfig ,
26+ SandboxGatewayConnectionConfig ,
27+ SandboxLocalTunnelConnectionConfig
28+ )
29+ from .k8s_helper import K8sHelper
30+
31+ ROUTER_SERVICE_NAME = "svc/sandbox-router-svc"
32+
33+ class ConnectionStrategy (ABC ):
34+ """Abstract base class for connection strategies."""
35+
36+ @abstractmethod
37+ def connect (self ) -> str :
38+ """Establishes the connection and returns the base URL."""
39+ pass
40+
41+ @abstractmethod
42+ def close (self ):
43+ """Cleans up any resources associated with the connection."""
44+ pass
45+
46+ @abstractmethod
47+ def verify_connection (self ):
48+ """Checks if the connection is healthy. Raises RuntimeError if not."""
49+ pass
50+
51+ class DirectConnectionStrategy (ConnectionStrategy ):
52+ def __init__ (self , config : SandboxDirectConnectionConfig ):
53+ self .config = config
54+
55+ def connect (self ) -> str :
56+ return self .config .api_url
57+
58+ def close (self ):
59+ pass
60+
61+ def verify_connection (self ):
62+ pass
63+
64+ class GatewayConnectionStrategy (ConnectionStrategy ):
65+ def __init__ (self , config : SandboxGatewayConnectionConfig , k8s_helper : K8sHelper ):
66+ self .config = config
67+ self .k8s_helper = k8s_helper
68+ self .base_url = None
69+
70+ def connect (self ) -> str :
71+ if self .base_url :
72+ return self .base_url
73+
74+ ip_address = self .k8s_helper .wait_for_gateway_ip (
75+ self .config .gateway_name ,
76+ self .config .gateway_namespace ,
77+ self .config .gateway_ready_timeout
78+ )
79+ self .base_url = f"http://{ ip_address } "
80+ return self .base_url
81+
82+ def close (self ):
83+ self .base_url = None
84+
85+ def verify_connection (self ):
86+ pass
87+
88+ class LocalTunnelConnectionStrategy (ConnectionStrategy ):
89+ def __init__ (self , sandbox_id : str , namespace : str , config : SandboxLocalTunnelConnectionConfig ):
90+ self .sandbox_id = sandbox_id
91+ self .namespace = namespace
92+ self .config = config
93+ self .port_forward_process : subprocess .Popen | None = None
94+ self .base_url = None
95+
96+ def _get_free_port (self ):
97+ """Finds a free port on localhost."""
98+ with socket .socket (socket .AF_INET , socket .SOCK_STREAM ) as s :
99+ s .bind (('127.0.0.1' , 0 ))
100+ return s .getsockname ()[1 ]
101+
102+ def connect (self ) -> str :
103+ if self .base_url and self .port_forward_process and self .port_forward_process .poll () is None :
104+ return self .base_url
105+
106+ if self .port_forward_process :
107+ self .close ()
108+
109+ local_port = self ._get_free_port ()
110+
111+ logging .info (
112+ f"Starting tunnel for Sandbox { self .sandbox_id } : localhost:{ local_port } -> { ROUTER_SERVICE_NAME } :8080..." )
113+ self .port_forward_process = subprocess .Popen (
114+ [
115+ "kubectl" , "port-forward" ,
116+ ROUTER_SERVICE_NAME ,
117+ f"{ local_port } :8080" ,
118+ "-n" , self .namespace
119+ ],
120+ stdout = subprocess .PIPE ,
121+ stderr = subprocess .PIPE
122+ )
123+
124+ logging .info ("Waiting for port-forwarding to be ready..." )
125+ start_time = time .monotonic ()
126+ while time .monotonic () - start_time < self .config .port_forward_ready_timeout :
127+ if self .port_forward_process .poll () is not None :
128+ _ , stderr = self .port_forward_process .communicate ()
129+ raise RuntimeError (
130+ f"Tunnel crashed: { stderr .decode (errors = 'ignore' )} " )
131+
132+ try :
133+ with socket .create_connection (("127.0.0.1" , local_port ), timeout = 0.1 ):
134+ self .base_url = f"http://127.0.0.1:{ local_port } "
135+ logging .info (f"Tunnel ready at { self .base_url } " )
136+ time .sleep (0.5 )
137+ return self .base_url
138+ except (socket .timeout , ConnectionRefusedError ):
139+ time .sleep (0.5 )
140+
141+ self .close ()
142+ raise TimeoutError ("Failed to establish tunnel to Router Service." )
143+
144+ def close (self ):
145+ if self .port_forward_process :
146+ try :
147+ logging .info (f"Stopping port-forwarding for Sandbox { self .sandbox_id } ..." )
148+ self .port_forward_process .terminate ()
149+ try :
150+ self .port_forward_process .wait (timeout = 2 )
151+ except subprocess .TimeoutExpired :
152+ self .port_forward_process .kill ()
153+ except Exception as e :
154+ logging .error (f"Failed to stop port-forwarding: { e } " )
155+ finally :
156+ self .port_forward_process = None
157+ self .base_url = None
158+
159+ def verify_connection (self ):
160+ if self .port_forward_process and self .port_forward_process .poll () is not None :
161+ _ , stderr = self .port_forward_process .communicate ()
162+ raise RuntimeError (
163+ f"Kubectl Port-Forward crashed!\n "
164+ f"Stderr: { stderr .decode (errors = 'ignore' )} "
165+ )
166+
167+ class SandboxConnector :
168+ """
169+ Manages the connection to the Sandbox, including auto-discovery and port-forwarding.
170+ """
171+ def __init__ (
172+ self ,
173+ sandbox_id : str ,
174+ namespace : str ,
175+ connection_config : SandboxConnectionConfig ,
176+ k8s_helper : K8sHelper ,
177+ ):
178+ # Parameter initialization
179+ self .id = sandbox_id
180+ self .namespace = namespace
181+ self .connection_config = connection_config
182+ self .k8s_helper = k8s_helper
183+
184+ # Connection strategy initialization
185+ self .strategy = self ._connection_strategy ()
186+
187+ # HTTP Session setup
188+ self .session = requests .Session ()
189+ retries = Retry (
190+ total = 5 ,
191+ backoff_factor = 0.5 ,
192+ status_forcelist = [500 , 502 , 503 , 504 ],
193+ allowed_methods = ["GET" , "POST" , "PUT" , "DELETE" ]
194+ )
195+ self .session .mount ("http://" , HTTPAdapter (max_retries = retries ))
196+ self .session .mount ("https://" , HTTPAdapter (max_retries = retries ))
197+
198+
199+ def _connection_strategy (self ):
200+ if isinstance (self .connection_config , SandboxDirectConnectionConfig ):
201+ return DirectConnectionStrategy (self .connection_config )
202+ elif isinstance (self .connection_config , SandboxGatewayConnectionConfig ):
203+ return GatewayConnectionStrategy (self .connection_config , self .k8s_helper )
204+ elif isinstance (self .connection_config , SandboxLocalTunnelConnectionConfig ):
205+ return LocalTunnelConnectionStrategy (self .id , self .namespace , self .connection_config )
206+ else :
207+ raise ValueError ("Unknown connection configuration type" )
208+
209+ def get_conn_strategy (self ):
210+ return self .strategy
211+
212+ def connect (self ) -> str :
213+ return self .strategy .connect ()
214+
215+ def close (self ):
216+ self .strategy .close ()
217+
218+ def send_request (self , method : str , endpoint : str , ** kwargs ) -> requests .Response :
219+ try :
220+ # Establish connection (re-establishes if closed/dead)
221+ base_url = self .connect ()
222+
223+ # Verify if the connection is active before sending the request
224+ self .strategy .verify_connection ()
225+
226+ # Prepare the request
227+ url = f"{ base_url .rstrip ('/' )} /{ endpoint .lstrip ('/' )} "
228+
229+ headers = kwargs .get ("headers" , {}).copy ()
230+ headers ["X-Sandbox-ID" ] = self .id
231+ headers ["X-Sandbox-Namespace" ] = self .namespace
232+ headers ["X-Sandbox-Port" ] = str (self .connection_config .server_port )
233+ kwargs ["headers" ] = headers
234+
235+ # Send the request
236+ response = self .session .request (method , url , ** kwargs )
237+ response .raise_for_status ()
238+ return response
239+ except (requests .exceptions .ConnectionError , requests .exceptions .ChunkedEncodingError , RuntimeError ) as e :
240+ logging .error (f"Connection failed: { e } " )
241+ self .close ()
242+ raise e
0 commit comments