1- import pytest
1+ import json
22import os
3+ import subprocess
4+ import time
5+ from typing import Any , Dict , List , Optional , cast
6+
37import psycopg2
48import psycopg2 .extensions
9+ import pytest
10+ import requests
511
612
7- @pytest .fixture (scope = ' session' )
13+ @pytest .fixture (scope = " session" )
814def raster_endpoint ():
9- return os .getenv (' RASTER_ENDPOINT' , "http://127.0.0.1/raster" )
15+ return os .getenv (" RASTER_ENDPOINT" , "http://127.0.0.1/raster" )
1016
1117
12- @pytest .fixture (scope = ' session' )
18+ @pytest .fixture (scope = " session" )
1319def vector_endpoint ():
14- return os .getenv (' VECTOR_ENDPOINT' , "http://127.0.0.1/vector" )
20+ return os .getenv (" VECTOR_ENDPOINT" , "http://127.0.0.1/vector" )
1521
1622
17- @pytest .fixture (scope = ' session' )
23+ @pytest .fixture (scope = " session" )
1824def stac_endpoint ():
19- return os .getenv (' STAC_ENDPOINT' , "http://127.0.0.1/stac" )
25+ return os .getenv (" STAC_ENDPOINT" , "http://127.0.0.1/stac" )
2026
2127
22- @pytest .fixture (scope = ' session' )
28+ @pytest .fixture (scope = " session" )
2329def db_connection ():
24- """Create database connection for testing."""
2530 # Require all database connection parameters to be explicitly set
26- required_vars = [' PGHOST' , ' PGPORT' , ' PGDATABASE' , ' PGUSER' , ' PGPASSWORD' ]
31+ required_vars = [" PGHOST" , " PGPORT" , " PGDATABASE" , " PGUSER" , " PGPASSWORD" ]
2732 missing_vars = [var for var in required_vars if not os .getenv (var )]
2833
2934 if missing_vars :
30- pytest .fail (f"Required environment variables not set: { ', ' .join (missing_vars )} " )
35+ pytest .fail (
36+ f"Required environment variables not set: { ', ' .join (missing_vars )} "
37+ )
3138
3239 connection_params = {
33- ' host' : os .getenv (' PGHOST' ),
34- ' port' : int (os .getenv (' PGPORT' )),
35- ' database' : os .getenv (' PGDATABASE' ),
36- ' user' : os .getenv (' PGUSER' ),
37- ' password' : os .getenv (' PGPASSWORD' )
40+ " host" : os .getenv (" PGHOST" ),
41+ " port" : int (os .getenv (" PGPORT" )),
42+ " database" : os .getenv (" PGDATABASE" ),
43+ " user" : os .getenv (" PGUSER" ),
44+ " password" : os .getenv (" PGPASSWORD" ),
3845 }
3946
4047 try :
@@ -44,3 +51,187 @@ def db_connection():
4451 conn .close ()
4552 except psycopg2 .Error as e :
4653 pytest .fail (f"Cannot connect to database: { e } " )
54+
55+
56+ def get_namespace () -> str :
57+ return os .environ .get ("NAMESPACE" , "eoapi" )
58+
59+
60+ def get_release_name () -> str :
61+ return os .environ .get ("RELEASE_NAME" , "eoapi" )
62+
63+
64+ def kubectl_get (
65+ resource : str ,
66+ namespace : Optional [str ] = None ,
67+ label_selector : Optional [str ] = None ,
68+ output : str = "json" ,
69+ ) -> subprocess .CompletedProcess [str ]:
70+ cmd : List [str ] = ["kubectl" , "get" , resource ]
71+
72+ if namespace :
73+ cmd .extend (["-n" , namespace ])
74+
75+ if label_selector :
76+ cmd .extend (["-l" , label_selector ])
77+
78+ if output :
79+ cmd .extend (["-o" , output ])
80+
81+ result = subprocess .run (cmd , capture_output = True , text = True )
82+ return result
83+
84+
85+ def kubectl_port_forward (
86+ service : str , local_port : int , remote_port : int , namespace : str
87+ ) -> subprocess .Popen [str ]:
88+ cmd = [
89+ "kubectl" ,
90+ "port-forward" ,
91+ f"svc/{ service } " ,
92+ f"{ local_port } :{ remote_port } " ,
93+ "-n" ,
94+ namespace ,
95+ ]
96+
97+ process = subprocess .Popen (
98+ cmd , stdout = subprocess .PIPE , stderr = subprocess .PIPE , text = True
99+ )
100+
101+ # Give it time to establish connection
102+ time .sleep (3 )
103+ return process
104+
105+
106+ def wait_for_url (url : str , timeout : int = 30 , interval : int = 2 ) -> bool :
107+ start_time = time .time ()
108+ while time .time () - start_time < timeout :
109+ try :
110+ response = requests .get (url , timeout = 5 )
111+ if response .status_code == 200 :
112+ return True
113+ except (requests .RequestException , requests .ConnectionError ):
114+ pass
115+ time .sleep (interval )
116+ return False
117+
118+
119+ def make_request (url : str , timeout : int = 10 ) -> bool :
120+ try :
121+ response = requests .get (url , timeout = timeout )
122+ return response .status_code == 200
123+ except requests .RequestException :
124+ return False
125+
126+
127+ def get_base_url () -> str :
128+ namespace = get_namespace ()
129+
130+ # Check if we have an ingress
131+ result = subprocess .run (
132+ ["kubectl" , "get" , "ingress" , "-n" , namespace , "-o" , "json" ],
133+ capture_output = True ,
134+ text = True ,
135+ )
136+
137+ if result .returncode == 0 :
138+ ingress_data = json .loads (result .stdout )
139+ if ingress_data ["items" ]:
140+ ingress = ingress_data ["items" ][0 ]
141+ rules = ingress .get ("spec" , {}).get ("rules" , [])
142+ if rules :
143+ host = rules [0 ].get ("host" , "localhost" )
144+ # Check if host is accessible
145+ try :
146+ response = requests .get (
147+ f"http://{ host } /stac/collections" , timeout = 5
148+ )
149+ if response .status_code == 200 :
150+ return f"http://{ host } "
151+ except requests .RequestException :
152+ pass
153+
154+ return "http://localhost:8080"
155+
156+
157+ def get_pod_metrics (namespace : str , service_name : str ) -> List [Dict [str , str ]]:
158+ """Get CPU and memory metrics for pods of a specific service."""
159+ release_name = get_release_name ()
160+ result = subprocess .run (
161+ [
162+ "kubectl" ,
163+ "top" ,
164+ "pods" ,
165+ "-n" ,
166+ namespace ,
167+ "-l" ,
168+ f"app={ release_name } -{ service_name } " ,
169+ "--no-headers" ,
170+ ],
171+ capture_output = True ,
172+ text = True ,
173+ )
174+
175+ if result .returncode != 0 :
176+ return []
177+
178+ metrics : List [Dict [str , str ]] = []
179+ for line in result .stdout .strip ().split ("\n " ):
180+ if line .strip ():
181+ parts = line .split ()
182+ if len (parts ) >= 3 :
183+ pod_name = parts [0 ]
184+ cpu = parts [1 ] # e.g., "25m"
185+ memory = parts [2 ] # e.g., "128Mi"
186+ metrics .append ({"pod" : pod_name , "cpu" : cpu , "memory" : memory })
187+
188+ return metrics
189+
190+
191+ def get_hpa_status (namespace : str , hpa_name : str ) -> Optional [Dict [str , Any ]]:
192+ """Get HPA status for a specific HPA."""
193+ result = kubectl_get ("hpa" , namespace = namespace , output = "json" )
194+ if result .returncode != 0 :
195+ return None
196+
197+ hpas = json .loads (result .stdout )
198+ for hpa in hpas ["items" ]:
199+ if hpa ["metadata" ]["name" ] == hpa_name :
200+ return cast (Dict [str , Any ], hpa )
201+
202+ return None
203+
204+
205+ def get_pod_count (namespace : str , service_name : str ) -> int :
206+ """Get the count of running pods for a specific service."""
207+ release_name = get_release_name ()
208+ result = kubectl_get (
209+ "pods" ,
210+ namespace = namespace ,
211+ label_selector = f"app={ release_name } -{ service_name } " ,
212+ )
213+
214+ if result .returncode != 0 :
215+ return 0
216+
217+ pods = json .loads (result .stdout )
218+ running_pods = [
219+ pod for pod in pods ["items" ] if pod ["status" ]["phase" ] == "Running"
220+ ]
221+
222+ return len (running_pods )
223+
224+
225+ @pytest .fixture (scope = "session" )
226+ def namespace ():
227+ return get_namespace ()
228+
229+
230+ @pytest .fixture (scope = "session" )
231+ def release_name ():
232+ return get_release_name ()
233+
234+
235+ @pytest .fixture (scope = "session" )
236+ def base_url ():
237+ return get_base_url ()
0 commit comments