Skip to content

Commit 7fa9b60

Browse files
Pathways-on-Cloud Teamcopybara-github
authored andcommitted
Add "Shared Pathways Service" for Pathways-on-Cloud
This change introduces `Shared Pathways Service` for GKE. It includes: * `isc_pathways.py` uses `kubectl` to deploy a Pathways proxy JobSet on a GKE cluster, sets up port forwarding, and configures Pathways JAX backend. * `run_connect_example.py` is an example script to start the proxy. Prerequisite: A GKE cluster is up and running with Pathways pods: Resource Manager and Worker pods, e.g., using `pw-service-example.yaml`. PiperOrigin-RevId: 840371516
1 parent e228057 commit 7fa9b60

File tree

7 files changed

+778
-0
lines changed

7 files changed

+778
-0
lines changed

pathwaysutils/experimental/shared_pathways_service/__init__.py

Whitespace-only changes.
Lines changed: 323 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,323 @@
1+
"""GKE utils for deploying and managing the Pathways proxy."""
2+
3+
import logging
4+
import socket
5+
import subprocess
6+
import urllib.parse
7+
8+
import portpicker
9+
10+
_logger = logging.getLogger(__name__)
11+
12+
# TODO(b/456189271): Evaluate and replace the subprocess calls with Kubernetes
13+
# Python API for kubectl calls.
14+
15+
16+
def fetch_cluster_credentials(
17+
*, cluster_name: str, project_id: str, location: str
18+
) -> None:
19+
"""Fetches credentials for the GKE cluster."""
20+
_logger.info("Fetching credentials for '%s'.", cluster_name)
21+
get_credentials_command = [
22+
"gcloud",
23+
"container",
24+
"clusters",
25+
"get-credentials",
26+
cluster_name,
27+
f"--location={location}",
28+
f"--project={project_id}",
29+
]
30+
try:
31+
subprocess.run(
32+
get_credentials_command,
33+
check=True,
34+
capture_output=True,
35+
text=True,
36+
)
37+
except subprocess.CalledProcessError as e:
38+
_logger.exception(
39+
r"Failed to get cluster credentials. gcloud output:\n%r", e.stderr
40+
)
41+
raise
42+
43+
44+
def deploy_gke_yaml(yaml: str) -> None:
45+
"""Deploys the given YAML to the GKE cluster.
46+
47+
Args:
48+
yaml: The GKE YAML to deploy.
49+
50+
Raises:
51+
subprocess.CalledProcessError: If the kubectl command fails.
52+
"""
53+
_logger.info("Deploying GKE YAML: %s", yaml)
54+
kubectl_apply_command = ["kubectl", "apply", "-f", "-"]
55+
try:
56+
proxy_result = subprocess.run(
57+
kubectl_apply_command,
58+
input=yaml,
59+
check=True,
60+
capture_output=True,
61+
text=True,
62+
)
63+
except subprocess.CalledProcessError as e:
64+
_logger.exception(
65+
r"Failed to deploy the GKE YAML. kubectl output:\n%r", e.stderr
66+
)
67+
raise
68+
69+
_logger.info(
70+
"Successfully deployed the GKE YAML. %s", proxy_result.stdout
71+
)
72+
73+
74+
def get_pod_from_job(job_name: str) -> str:
75+
"""Returns the pod name for the given job.
76+
77+
Args:
78+
job_name: The name of the job.
79+
80+
Returns:
81+
The name of the pod.
82+
83+
Raises:
84+
subprocess.CalledProcessError: If the kubectl command fails.
85+
RuntimeError: If the pod is missing or the pod name is not in the expected
86+
format.
87+
"""
88+
get_pod_command = [
89+
"kubectl",
90+
"get",
91+
"pods",
92+
"-l",
93+
f"job-name={job_name}",
94+
"-o",
95+
"name",
96+
]
97+
try:
98+
pod_result = subprocess.run(
99+
get_pod_command,
100+
check=True,
101+
capture_output=True,
102+
text=True,
103+
)
104+
except subprocess.CalledProcessError as e:
105+
_logger.exception(
106+
r"Failed to get pod name. kubectl output:\n%r", e.stderr
107+
)
108+
raise
109+
110+
pod_name = pod_result.stdout.strip()
111+
_logger.info("Pod name: %s", pod_name)
112+
113+
if (
114+
not pod_name
115+
or not pod_name.startswith("pod/")
116+
or len(pod_name.split("/")) != 2
117+
):
118+
raise RuntimeError(
119+
"Failed to get pod name. Expected format: pod/<pod_name>. Got:"
120+
f" {pod_name}"
121+
)
122+
123+
# pod_name is in the format of "pod/<pod_name>". We only need the pod name.
124+
_, pod_name = pod_name.split("/")
125+
return pod_name
126+
127+
128+
def check_pod_ready(pod_name: str, timeout: int = 30) -> str:
129+
"""Checks if the given pod is ready.
130+
131+
Args:
132+
pod_name: The name of the pod.
133+
timeout: The maximum time in seconds to wait for the pod to be ready.
134+
135+
Returns:
136+
The name of the pod.
137+
138+
Raises:
139+
RuntimeError: If the pod fails to become ready within the timeout.
140+
"""
141+
wait_command = [
142+
"kubectl",
143+
"wait",
144+
"--for=condition=Ready",
145+
f"pod/{pod_name}",
146+
f"--timeout={timeout}s",
147+
]
148+
try:
149+
subprocess.run(wait_command, check=True, capture_output=True, text=True)
150+
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
151+
_logger.exception("Pod failed to become ready: %r", e)
152+
153+
raise RuntimeError(
154+
f"Pod did not become ready: {e.stderr}."
155+
) from e
156+
except Exception as e:
157+
_logger.exception("Error setting up the pod: %r", e)
158+
raise
159+
160+
_logger.info("Pod is ready: %s.", pod_name)
161+
return pod_name
162+
163+
164+
def get_log_link(*, cluster: str, project: str, job_name: str) -> str:
165+
"""Returns a link to Cloud Logging for the given cluster and job name."""
166+
log_filter = (
167+
'resource.type="k8s_container"\n'
168+
f'resource.labels.cluster_name="{cluster}"\n'
169+
'resource.labels.namespace_name="default"\n'
170+
f'labels.k8s-pod/job-name="{job_name}"'
171+
)
172+
encoded_filter = urllib.parse.quote(log_filter, safe="")
173+
174+
return (
175+
"https://console.cloud.google.com/logs/query;"
176+
f"query={encoded_filter};duration=PT1H"
177+
f"?project={project}"
178+
)
179+
180+
181+
def wait_for_pod(job_name: str) -> str:
182+
"""Waits for the given job's pod to be ready.
183+
184+
Args:
185+
job_name: The name of the job.
186+
Returns:
187+
The name of the pod.
188+
Raises:
189+
RuntimeError: If the pod is not ready.
190+
"""
191+
_logger.info("Waiting for pod to be created...")
192+
pod_name = get_pod_from_job(job_name)
193+
194+
_logger.info(
195+
"Pod created: %s. Waiting for it to be ready...", pod_name
196+
)
197+
198+
return check_pod_ready(pod_name)
199+
200+
201+
def __test_pod_connection(port: int) -> None:
202+
"""Tests the connection to the pod.
203+
204+
Args:
205+
port: The port of the pod to connect to.
206+
"""
207+
_logger.info("Connecting to localhost:%d", port)
208+
try:
209+
with socket.create_connection(("localhost", port), timeout=30):
210+
_logger.info("Pod is ready.")
211+
except (socket.timeout, ConnectionRefusedError) as exc:
212+
raise RuntimeError("Could not connect to the pod.") from exc
213+
214+
215+
def enable_port_forwarding(
216+
pod_name: str,
217+
server_port: int,
218+
) -> tuple[int, subprocess.Popen[str]]:
219+
"""Enables port forwarding for the given pod.
220+
221+
Args:
222+
pod_name: The name of the pod.
223+
server_port: The port of the server to forward to.
224+
225+
Returns:
226+
A tuple containing the pod port and the port forwarding process.
227+
Raises:
228+
RuntimeError: If port forwarding fails to start or the pod connection
229+
cannot be established.
230+
"""
231+
try:
232+
port_available = portpicker.pick_unused_port()
233+
except Exception as e:
234+
_logger.exception("Error finding free local port: %r", e)
235+
raise
236+
237+
_logger.info("Found free local port: %d", port_available)
238+
_logger.info(
239+
"Starting port forwarding from local port %d to %s:%d",
240+
port_available,
241+
pod_name,
242+
server_port,
243+
)
244+
245+
port_forward_command = [
246+
"kubectl",
247+
"port-forward",
248+
"--address",
249+
"localhost",
250+
pod_name,
251+
f"{port_available}:{server_port}",
252+
]
253+
try:
254+
# Start port forwarding in the background.
255+
port_forward_process = subprocess.Popen(
256+
port_forward_command,
257+
stdout=subprocess.PIPE,
258+
stderr=subprocess.PIPE,
259+
text=True,
260+
)
261+
except Exception as e:
262+
_logger.exception("Error enabling port forwarding for the pod: %r", e)
263+
raise
264+
265+
# Check that the port forwarding is ready.
266+
if port_forward_process.stdout is None:
267+
_logger.error("Port-forward process stdout is None. Terminating.")
268+
port_forward_process.terminate()
269+
_, stderr = port_forward_process.communicate()
270+
raise RuntimeError(
271+
"Failed to start port forwarding: stdout not available.\n"
272+
f"STDERR: {stderr}"
273+
)
274+
275+
ready_line = port_forward_process.stdout.readline()
276+
if "Forwarding from" in ready_line:
277+
_logger.info("Port-forward is ready: %s", ready_line.strip())
278+
else:
279+
# If the ready line is not found, the process might have exited with an
280+
# error. We terminate it and raise an error with the stderr.
281+
_logger.error("Port-forward process exited with error. Terminating.")
282+
port_forward_process.terminate()
283+
_, stderr = port_forward_process.communicate()
284+
raise RuntimeError(
285+
"Failed to start port forwarding.\n"
286+
f"STDOUT: {port_forward_process.stdout}\n"
287+
f"STDERR: {stderr}"
288+
)
289+
290+
try:
291+
__test_pod_connection(port_available)
292+
except Exception:
293+
port_forward_process.terminate()
294+
raise
295+
296+
return (port_available, port_forward_process)
297+
298+
299+
def delete_gke_job(job_name: str) -> None:
300+
"""Deletes the given job from the GKE cluster.
301+
302+
Args:
303+
job_name: The name of the job.
304+
"""
305+
_logger.info("Deleting job: %s", job_name)
306+
delete_job_command = [
307+
"kubectl",
308+
"delete",
309+
"job",
310+
job_name,
311+
"--ignore-not-found",
312+
]
313+
try:
314+
result = subprocess.run(
315+
delete_job_command,
316+
check=True,
317+
capture_output=True,
318+
text=True,
319+
)
320+
except subprocess.CalledProcessError as e:
321+
_logger.exception("Failed to delete job. kubectl output:\\n%r", e.stderr)
322+
raise
323+
_logger.info("Successfully deleted job. %s", result.stdout)

0 commit comments

Comments
 (0)