1+ import json
12import os
2- from typing import Any , Generator
3+ import subprocess
4+ import time
5+ from typing import Any , Dict , Generator , List , Optional , cast
36
47import psycopg2
58import psycopg2 .extensions
69import pytest
10+ import requests
711
812
913@pytest .fixture (scope = "session" )
@@ -22,17 +26,22 @@ def stac_endpoint() -> str:
2226
2327
2428@pytest .fixture (scope = "session" )
25- def db_connection () -> Generator [Any , None , None ]:
26- """Create database connection for testing."""
27- # Require all database connection parameters to be explicitly set
29+ def db_connection () -> Generator [psycopg2 .extensions .connection , None , None ]:
2830 required_vars = ["PGHOST" , "PGPORT" , "PGDATABASE" , "PGUSER" , "PGPASSWORD" ]
2931 missing_vars = [var for var in required_vars if not os .getenv (var )]
30-
3132 if missing_vars :
3233 pytest .fail (
3334 f"Required environment variables not set: { ', ' .join (missing_vars )} "
3435 )
3536
37+ connection_params = {
38+ "host" : os .getenv ("PGHOST" ),
39+ "port" : os .getenv ("PGPORT" ),
40+ "database" : os .getenv ("PGDATABASE" ),
41+ "user" : os .getenv ("PGUSER" ),
42+ "password" : os .getenv ("PGPASSWORD" ),
43+ }
44+
3645 # All required vars are guaranteed to exist due to check above
3746 try :
3847 conn = psycopg2 .connect (
@@ -47,3 +56,174 @@ def db_connection() -> Generator[Any, None, None]:
4756 conn .close ()
4857 except psycopg2 .Error as e :
4958 pytest .fail (f"Cannot connect to database: { e } " )
59+
60+
61+ def get_namespace () -> str :
62+ """Get the namespace from environment variable."""
63+ return os .environ .get ("NAMESPACE" , "eoapi" )
64+
65+
66+ def get_release_name () -> str :
67+ """Get the release name from environment variable."""
68+ return os .environ .get ("RELEASE_NAME" , "eoapi" )
69+
70+
71+ def kubectl_get (
72+ resource : str ,
73+ namespace : Optional [str ] = None ,
74+ label_selector : Optional [str ] = None ,
75+ output : str = "json" ,
76+ ) -> subprocess .CompletedProcess [str ]:
77+ cmd : List [str ] = ["kubectl" , "get" , resource ]
78+
79+ if namespace :
80+ cmd .extend (["-n" , namespace ])
81+
82+ if label_selector :
83+ cmd .extend (["-l" , label_selector ])
84+
85+ if output :
86+ cmd .extend (["-o" , output ])
87+
88+ result = subprocess .run (cmd , capture_output = True , text = True )
89+ return result
90+
91+
92+ def kubectl_port_forward (
93+ service : str , local_port : int , remote_port : int , namespace : str
94+ ) -> subprocess .Popen [str ]:
95+ cmd = [
96+ "kubectl" ,
97+ "port-forward" ,
98+ f"svc/{ service } " ,
99+ f"{ local_port } :{ remote_port } " ,
100+ "-n" ,
101+ namespace ,
102+ ]
103+
104+ process = subprocess .Popen (
105+ cmd , stdout = subprocess .PIPE , stderr = subprocess .PIPE , text = True
106+ )
107+
108+ time .sleep (3 )
109+ return process
110+
111+
112+ def wait_for_url (url : str , timeout : int = 30 , interval : int = 2 ) -> bool :
113+ start_time = time .time ()
114+ while time .time () - start_time < timeout :
115+ try :
116+ response = requests .get (url , timeout = 5 )
117+ if response .status_code == 200 :
118+ return True
119+ except (requests .RequestException , requests .ConnectionError ):
120+ pass
121+ time .sleep (interval )
122+ return False
123+
124+
125+ def make_request (url : str , timeout : int = 10 ) -> bool :
126+ try :
127+ response = requests .get (url , timeout = timeout )
128+ return response .status_code == 200
129+ except requests .RequestException :
130+ return False
131+
132+
133+ def get_base_url () -> str :
134+ """Get the base URL for API access."""
135+ namespace = get_namespace ()
136+
137+ # Check if we have an ingress
138+ result = subprocess .run (
139+ ["kubectl" , "get" , "ingress" , "-n" , namespace , "-o" , "json" ],
140+ capture_output = True ,
141+ text = True ,
142+ )
143+
144+ if result .returncode == 0 :
145+ ingress_data = json .loads (result .stdout )
146+ if ingress_data ["items" ]:
147+ ingress = ingress_data ["items" ][0 ]
148+ rules = ingress .get ("spec" , {}).get ("rules" , [])
149+ if rules :
150+ host = rules [0 ].get ("host" , "localhost" )
151+ # Check if host is accessible
152+ try :
153+ response = requests .get (
154+ f"http://{ host } /stac/collections" , timeout = 5
155+ )
156+ if response .status_code == 200 :
157+ return f"http://{ host } "
158+ except requests .RequestException :
159+ pass
160+
161+ return "http://localhost:8080"
162+
163+
164+ def get_pod_metrics (namespace : str , service_name : str ) -> List [Dict [str , str ]]:
165+ """Get CPU and memory metrics for pods of a specific service."""
166+ release_name_val = get_release_name ()
167+ result = subprocess .run (
168+ [
169+ "kubectl" ,
170+ "top" ,
171+ "pods" ,
172+ "-n" ,
173+ namespace ,
174+ "-l" ,
175+ f"app={ release_name_val } -{ service_name } " ,
176+ "--no-headers" ,
177+ ],
178+ capture_output = True ,
179+ text = True ,
180+ )
181+
182+ if result .returncode != 0 :
183+ return []
184+
185+ metrics : List [Dict [str , str ]] = []
186+ for line in result .stdout .strip ().split ("\n " ):
187+ if line .strip ():
188+ parts = line .split ()
189+ if len (parts ) >= 3 :
190+ pod_name = parts [0 ]
191+ cpu = parts [1 ] # e.g., "25m"
192+ memory = parts [2 ] # e.g., "128Mi"
193+ metrics .append ({"pod" : pod_name , "cpu" : cpu , "memory" : memory })
194+
195+ return metrics
196+
197+
198+ def get_hpa_status (namespace : str , hpa_name : str ) -> Optional [Dict [str , Any ]]:
199+ """Get HPA status for a specific HPA."""
200+ result = kubectl_get ("hpa" , namespace = namespace , output = "json" )
201+ if result .returncode != 0 :
202+ return None
203+
204+ hpas = json .loads (result .stdout )
205+ for hpa in hpas ["items" ]:
206+ if hpa ["metadata" ]["name" ] == hpa_name :
207+ return cast (Dict [str , Any ], hpa )
208+
209+ return None
210+
211+
212+ def get_pod_count (namespace : str , service_name : str ) -> int :
213+ """Get the count of running pods for a specific service."""
214+ release_name_val = get_release_name ()
215+ result = kubectl_get (
216+ "pods" ,
217+ namespace = namespace ,
218+ label_selector = f"app={ release_name_val } -{ service_name } " ,
219+ )
220+
221+ if result .returncode != 0 :
222+ return 0
223+
224+ pods = json .loads (result .stdout )
225+ running_pods = [
226+ pod for pod in pods ["items" ] if pod ["status" ]["phase" ] == "Running"
227+ ]
228+
229+ return len (running_pods )
0 commit comments