1- import pytest
1+ import json
22import os
3- import psycopg2
4- import psycopg2 .extensions
3+ import subprocess
4+ import time
5+ from typing import Any , Dict , Generator , List , Optional , cast
6+
7+ import psycopg2 # type: ignore[import-untyped]
8+ import psycopg2 .extensions # type: ignore[import-untyped]
9+ import pytest
10+ import requests
511
612
7- @pytest .fixture (scope = ' session' )
8- def raster_endpoint ():
9- return os .getenv (' RASTER_ENDPOINT' , "http://127.0.0.1/raster" )
13+ @pytest .fixture (scope = " session" )
14+ def raster_endpoint () -> str :
15+ return os .getenv (" RASTER_ENDPOINT" , "http://127.0.0.1/raster" )
1016
1117
12- @pytest .fixture (scope = ' session' )
13- def vector_endpoint ():
14- return os .getenv (' VECTOR_ENDPOINT' , "http://127.0.0.1/vector" )
18+ @pytest .fixture (scope = " session" )
19+ def vector_endpoint () -> str :
20+ return os .getenv (" VECTOR_ENDPOINT" , "http://127.0.0.1/vector" )
1521
1622
17- @pytest .fixture (scope = ' session' )
18- def stac_endpoint ():
19- return os .getenv (' STAC_ENDPOINT' , "http://127.0.0.1/stac" )
23+ @pytest .fixture (scope = " session" )
24+ def stac_endpoint () -> str :
25+ return os .getenv (" STAC_ENDPOINT" , "http://127.0.0.1/stac" )
2026
2127
22- @pytest .fixture (scope = 'session' )
23- def db_connection ():
24- """Create database connection for testing."""
28+ @pytest .fixture (scope = "session" )
29+ def db_connection () -> Generator [psycopg2 .extensions .connection , None , None ]:
2530 # Require all database connection parameters to be explicitly set
26- required_vars = [' PGHOST' , ' PGPORT' , ' PGDATABASE' , ' PGUSER' , ' PGPASSWORD' ]
31+ required_vars = [" PGHOST" , " PGPORT" , " PGDATABASE" , " PGUSER" , " PGPASSWORD" ]
2732 missing_vars = [var for var in required_vars if not os .getenv (var )]
2833
2934 if missing_vars :
30- pytest .fail (f"Required environment variables not set: { ', ' .join (missing_vars )} " )
35+ pytest .fail (
36+ f"Required environment variables not set: { ', ' .join (missing_vars )} "
37+ )
38+
39+ port_str = os .getenv ("PGPORT" )
40+ if not port_str :
41+ pytest .fail ("PGPORT environment variable not set" )
3142
3243 connection_params = {
33- ' host' : os .getenv (' PGHOST' ),
34- ' port' : int (os . getenv ( 'PGPORT' ) ),
35- ' database' : os .getenv (' PGDATABASE' ),
36- ' user' : os .getenv (' PGUSER' ),
37- ' password' : os .getenv (' PGPASSWORD' )
44+ " host" : os .getenv (" PGHOST" ),
45+ " port" : int (port_str ),
46+ " database" : os .getenv (" PGDATABASE" ),
47+ " user" : os .getenv (" PGUSER" ),
48+ " password" : os .getenv (" PGPASSWORD" ),
3849 }
3950
4051 try :
@@ -44,3 +55,186 @@ def db_connection():
4455 conn .close ()
4556 except psycopg2 .Error as e :
4657 pytest .fail (f"Cannot connect to database: { e } " )
58+
59+
60+ def get_namespace () -> str :
61+ return os .environ .get ("NAMESPACE" , "eoapi" )
62+
63+
64+ def get_release_name () -> str :
65+ return os .environ .get ("RELEASE_NAME" , "eoapi" )
66+
67+
68+ def kubectl_get (
69+ resource : str ,
70+ namespace : Optional [str ] = None ,
71+ label_selector : Optional [str ] = None ,
72+ output : str = "json" ,
73+ ) -> subprocess .CompletedProcess [str ]:
74+ cmd : List [str ] = ["kubectl" , "get" , resource ]
75+
76+ if namespace :
77+ cmd .extend (["-n" , namespace ])
78+
79+ if label_selector :
80+ cmd .extend (["-l" , label_selector ])
81+
82+ if output :
83+ cmd .extend (["-o" , output ])
84+
85+ result = subprocess .run (cmd , capture_output = True , text = True )
86+ return result
87+
88+
89+ def kubectl_port_forward (
90+ service : str , local_port : int , remote_port : int , namespace : str
91+ ) -> subprocess .Popen [str ]:
92+ cmd = [
93+ "kubectl" ,
94+ "port-forward" ,
95+ f"svc/{ service } " ,
96+ f"{ local_port } :{ remote_port } " ,
97+ "-n" ,
98+ namespace ,
99+ ]
100+
101+ process = subprocess .Popen (
102+ cmd , stdout = subprocess .PIPE , stderr = subprocess .PIPE , text = True
103+ )
104+
105+ time .sleep (3 )
106+ return process
107+
108+
109+ def wait_for_url (url : str , timeout : int = 30 , interval : int = 2 ) -> bool :
110+ start_time = time .time ()
111+ while time .time () - start_time < timeout :
112+ try :
113+ response = requests .get (url , timeout = 5 )
114+ if response .status_code == 200 :
115+ return True
116+ except (requests .RequestException , requests .ConnectionError ):
117+ pass
118+ time .sleep (interval )
119+ return False
120+
121+
122+ def make_request (url : str , timeout : int = 10 ) -> bool :
123+ try :
124+ response = requests .get (url , timeout = timeout )
125+ return response .status_code == 200
126+ except requests .RequestException :
127+ return False
128+
129+
130+ def get_base_url () -> str :
131+ namespace = get_namespace ()
132+
133+ # Check if we have an ingress
134+ result = subprocess .run (
135+ ["kubectl" , "get" , "ingress" , "-n" , namespace , "-o" , "json" ],
136+ capture_output = True ,
137+ text = True ,
138+ )
139+
140+ if result .returncode == 0 :
141+ ingress_data = json .loads (result .stdout )
142+ if ingress_data ["items" ]:
143+ ingress = ingress_data ["items" ][0 ]
144+ rules = ingress .get ("spec" , {}).get ("rules" , [])
145+ if rules :
146+ host = rules [0 ].get ("host" , "localhost" )
147+ # Check if host is accessible
148+ try :
149+ response = requests .get (
150+ f"http://{ host } /stac/collections" , timeout = 5
151+ )
152+ if response .status_code == 200 :
153+ return f"http://{ host } "
154+ except requests .RequestException :
155+ pass
156+
157+ return "http://localhost:8080"
158+
159+
160+ def get_pod_metrics (namespace : str , service_name : str ) -> List [Dict [str , str ]]:
161+ """Get CPU and memory metrics for pods of a specific service."""
162+ release_name = get_release_name ()
163+ result = subprocess .run (
164+ [
165+ "kubectl" ,
166+ "top" ,
167+ "pods" ,
168+ "-n" ,
169+ namespace ,
170+ "-l" ,
171+ f"app={ release_name } -{ service_name } " ,
172+ "--no-headers" ,
173+ ],
174+ capture_output = True ,
175+ text = True ,
176+ )
177+
178+ if result .returncode != 0 :
179+ return []
180+
181+ metrics : List [Dict [str , str ]] = []
182+ for line in result .stdout .strip ().split ("\n " ):
183+ if line .strip ():
184+ parts = line .split ()
185+ if len (parts ) >= 3 :
186+ pod_name = parts [0 ]
187+ cpu = parts [1 ] # e.g., "25m"
188+ memory = parts [2 ] # e.g., "128Mi"
189+ metrics .append ({"pod" : pod_name , "cpu" : cpu , "memory" : memory })
190+
191+ return metrics
192+
193+
194+ def get_hpa_status (namespace : str , hpa_name : str ) -> Optional [Dict [str , Any ]]:
195+ """Get HPA status for a specific HPA."""
196+ result = kubectl_get ("hpa" , namespace = namespace , output = "json" )
197+ if result .returncode != 0 :
198+ return None
199+
200+ hpas = json .loads (result .stdout )
201+ for hpa in hpas ["items" ]:
202+ if hpa ["metadata" ]["name" ] == hpa_name :
203+ return cast (Dict [str , Any ], hpa )
204+
205+ return None
206+
207+
208+ def get_pod_count (namespace : str , service_name : str ) -> int :
209+ """Get the count of running pods for a specific service."""
210+ release_name = get_release_name ()
211+ result = kubectl_get (
212+ "pods" ,
213+ namespace = namespace ,
214+ label_selector = f"app={ release_name } -{ service_name } " ,
215+ )
216+
217+ if result .returncode != 0 :
218+ return 0
219+
220+ pods = json .loads (result .stdout )
221+ running_pods = [
222+ pod for pod in pods ["items" ] if pod ["status" ]["phase" ] == "Running"
223+ ]
224+
225+ return len (running_pods )
226+
227+
228+ @pytest .fixture (scope = "session" )
229+ def namespace () -> str :
230+ return get_namespace ()
231+
232+
233+ @pytest .fixture (scope = "session" )
234+ def release_name () -> str :
235+ return get_release_name ()
236+
237+
238+ @pytest .fixture (scope = "session" )
239+ def base_url () -> str :
240+ return get_base_url ()
0 commit comments