1+ import json
12import os
2- from typing import Any , Generator
3+ import subprocess
4+ import time
5+ from typing import Any , Dict , Generator , List , Optional , cast
36
47import psycopg2
58import psycopg2 .extensions
69import pytest
10+ import requests
711
812
913@pytest .fixture (scope = "session" )
@@ -22,8 +26,7 @@ def stac_endpoint() -> str:
2226
2327
2428@pytest .fixture (scope = "session" )
25- def db_connection () -> Generator [Any , None , None ]:
26- """Create database connection for testing."""
29+ def db_connection () -> Generator [psycopg2 .extensions .connection , None , None ]:
2730 # Require all database connection parameters to be explicitly set
2831 required_vars = ["PGHOST" , "PGPORT" , "PGDATABASE" , "PGUSER" , "PGPASSWORD" ]
2932 missing_vars = [var for var in required_vars if not os .getenv (var )]
@@ -33,6 +36,18 @@ def db_connection() -> Generator[Any, None, None]:
3336 f"Required environment variables not set: { ', ' .join (missing_vars )} "
3437 )
3538
39+ port_str = os .getenv ("PGPORT" )
40+ if not port_str :
41+ pytest .fail ("PGPORT environment variable not set" )
42+
43+ connection_params = {
44+ "host" : os .getenv ("PGHOST" ),
45+ "port" : int (port_str ),
46+ "database" : os .getenv ("PGDATABASE" ),
47+ "user" : os .getenv ("PGUSER" ),
48+ "password" : os .getenv ("PGPASSWORD" ),
49+ }
50+
3651 # All required vars are guaranteed to exist due to check above
3752 try :
3853 conn = psycopg2 .connect (
@@ -47,3 +62,186 @@ def db_connection() -> Generator[Any, None, None]:
4762 conn .close ()
4863 except psycopg2 .Error as e :
4964 pytest .fail (f"Cannot connect to database: { e } " )
65+
66+
67+ def get_namespace () -> str :
68+ return os .environ .get ("NAMESPACE" , "eoapi" )
69+
70+
71+ def get_release_name () -> str :
72+ return os .environ .get ("RELEASE_NAME" , "eoapi" )
73+
74+
75+ def kubectl_get (
76+ resource : str ,
77+ namespace : Optional [str ] = None ,
78+ label_selector : Optional [str ] = None ,
79+ output : str = "json" ,
80+ ) -> subprocess .CompletedProcess [str ]:
81+ cmd : List [str ] = ["kubectl" , "get" , resource ]
82+
83+ if namespace :
84+ cmd .extend (["-n" , namespace ])
85+
86+ if label_selector :
87+ cmd .extend (["-l" , label_selector ])
88+
89+ if output :
90+ cmd .extend (["-o" , output ])
91+
92+ result = subprocess .run (cmd , capture_output = True , text = True )
93+ return result
94+
95+
96+ def kubectl_port_forward (
97+ service : str , local_port : int , remote_port : int , namespace : str
98+ ) -> subprocess .Popen [str ]:
99+ cmd = [
100+ "kubectl" ,
101+ "port-forward" ,
102+ f"svc/{ service } " ,
103+ f"{ local_port } :{ remote_port } " ,
104+ "-n" ,
105+ namespace ,
106+ ]
107+
108+ process = subprocess .Popen (
109+ cmd , stdout = subprocess .PIPE , stderr = subprocess .PIPE , text = True
110+ )
111+
112+ time .sleep (3 )
113+ return process
114+
115+
116+ def wait_for_url (url : str , timeout : int = 30 , interval : int = 2 ) -> bool :
117+ start_time = time .time ()
118+ while time .time () - start_time < timeout :
119+ try :
120+ response = requests .get (url , timeout = 5 )
121+ if response .status_code == 200 :
122+ return True
123+ except (requests .RequestException , requests .ConnectionError ):
124+ pass
125+ time .sleep (interval )
126+ return False
127+
128+
129+ def make_request (url : str , timeout : int = 10 ) -> bool :
130+ try :
131+ response = requests .get (url , timeout = timeout )
132+ return response .status_code == 200
133+ except requests .RequestException :
134+ return False
135+
136+
137+ def get_base_url () -> str :
138+ namespace = get_namespace ()
139+
140+ # Check if we have an ingress
141+ result = subprocess .run (
142+ ["kubectl" , "get" , "ingress" , "-n" , namespace , "-o" , "json" ],
143+ capture_output = True ,
144+ text = True ,
145+ )
146+
147+ if result .returncode == 0 :
148+ ingress_data = json .loads (result .stdout )
149+ if ingress_data ["items" ]:
150+ ingress = ingress_data ["items" ][0 ]
151+ rules = ingress .get ("spec" , {}).get ("rules" , [])
152+ if rules :
153+ host = rules [0 ].get ("host" , "localhost" )
154+ # Check if host is accessible
155+ try :
156+ response = requests .get (
157+ f"http://{ host } /stac/collections" , timeout = 5
158+ )
159+ if response .status_code == 200 :
160+ return f"http://{ host } "
161+ except requests .RequestException :
162+ pass
163+
164+ return "http://localhost:8080"
165+
166+
167+ def get_pod_metrics (namespace : str , service_name : str ) -> List [Dict [str , str ]]:
168+ """Get CPU and memory metrics for pods of a specific service."""
169+ release_name = get_release_name ()
170+ result = subprocess .run (
171+ [
172+ "kubectl" ,
173+ "top" ,
174+ "pods" ,
175+ "-n" ,
176+ namespace ,
177+ "-l" ,
178+ f"app={ release_name } -{ service_name } " ,
179+ "--no-headers" ,
180+ ],
181+ capture_output = True ,
182+ text = True ,
183+ )
184+
185+ if result .returncode != 0 :
186+ return []
187+
188+ metrics : List [Dict [str , str ]] = []
189+ for line in result .stdout .strip ().split ("\n " ):
190+ if line .strip ():
191+ parts = line .split ()
192+ if len (parts ) >= 3 :
193+ pod_name = parts [0 ]
194+ cpu = parts [1 ] # e.g., "25m"
195+ memory = parts [2 ] # e.g., "128Mi"
196+ metrics .append ({"pod" : pod_name , "cpu" : cpu , "memory" : memory })
197+
198+ return metrics
199+
200+
201+ def get_hpa_status (namespace : str , hpa_name : str ) -> Optional [Dict [str , Any ]]:
202+ """Get HPA status for a specific HPA."""
203+ result = kubectl_get ("hpa" , namespace = namespace , output = "json" )
204+ if result .returncode != 0 :
205+ return None
206+
207+ hpas = json .loads (result .stdout )
208+ for hpa in hpas ["items" ]:
209+ if hpa ["metadata" ]["name" ] == hpa_name :
210+ return cast (Dict [str , Any ], hpa )
211+
212+ return None
213+
214+
215+ def get_pod_count (namespace : str , service_name : str ) -> int :
216+ """Get the count of running pods for a specific service."""
217+ release_name = get_release_name ()
218+ result = kubectl_get (
219+ "pods" ,
220+ namespace = namespace ,
221+ label_selector = f"app={ release_name } -{ service_name } " ,
222+ )
223+
224+ if result .returncode != 0 :
225+ return 0
226+
227+ pods = json .loads (result .stdout )
228+ running_pods = [
229+ pod for pod in pods ["items" ] if pod ["status" ]["phase" ] == "Running"
230+ ]
231+
232+ return len (running_pods )
233+
234+
235+ @pytest .fixture (scope = "session" )
236+ def namespace () -> str :
237+ return get_namespace ()
238+
239+
240+ @pytest .fixture (scope = "session" )
241+ def release_name () -> str :
242+ return get_release_name ()
243+
244+
245+ @pytest .fixture (scope = "session" )
246+ def base_url () -> str :
247+ return get_base_url ()
0 commit comments