44from fastapi import (
55 APIRouter ,
66 Depends ,
7+ HTTPException ,
78 Response ,
9+ status ,
810)
911from fastapi import (
1012 Request as FastApiRequest ,
1618from horizon .facts .client import FactsClient , FactsClientDependency
1719from horizon .facts .dependencies import (
1820 DataUpdateSubscriberDependency ,
21+ TimeoutPolicyDependency ,
1922 WaitTimeoutDependency ,
2023)
2124from horizon .facts .opal_forwarder import (
2225 create_data_source_entry ,
2326 create_data_update_entry ,
2427)
28+ from horizon .facts .timeout_policy import TimeoutPolicy
2529from horizon .facts .update_subscriber import DataUpdateSubscriber
2630
2731facts_router = APIRouter (dependencies = [Depends (enforce_pdp_token )])
@@ -33,6 +37,7 @@ async def create_user(
3337 client : FactsClientDependency ,
3438 update_subscriber : DataUpdateSubscriberDependency ,
3539 wait_timeout : WaitTimeoutDependency ,
40+ timeout_policy : TimeoutPolicyDependency ,
3641):
3742 return await forward_request_then_wait_for_update (
3843 client ,
@@ -48,6 +53,7 @@ async def create_user(
4853 authorization_header = r .headers .get ("Authorization" ),
4954 )
5055 ],
56+ timeout_policy = timeout_policy ,
5157 )
5258
5359
@@ -57,6 +63,7 @@ async def create_tenant(
5763 client : FactsClientDependency ,
5864 update_subscriber : DataUpdateSubscriberDependency ,
5965 wait_timeout : WaitTimeoutDependency ,
66+ timeout_policy : TimeoutPolicyDependency ,
6067):
6168 return await forward_request_then_wait_for_update (
6269 client ,
@@ -72,6 +79,7 @@ async def create_tenant(
7279 authorization_header = r .headers .get ("Authorization" ),
7380 )
7481 ],
82+ timeout_policy = timeout_policy ,
7583 )
7684
7785
@@ -81,6 +89,7 @@ async def sync_user(
8189 client : FactsClientDependency ,
8290 update_subscriber : DataUpdateSubscriberDependency ,
8391 wait_timeout : WaitTimeoutDependency ,
92+ timeout_policy : TimeoutPolicyDependency ,
8493 user_id : str ,
8594):
8695 return await forward_request_then_wait_for_update (
@@ -97,6 +106,7 @@ async def sync_user(
97106 authorization_header = r .headers .get ("Authorization" ),
98107 )
99108 ],
109+ timeout_policy = timeout_policy ,
100110 )
101111
102112
@@ -106,6 +116,7 @@ async def update_user(
106116 client : FactsClientDependency ,
107117 update_subscriber : DataUpdateSubscriberDependency ,
108118 wait_timeout : WaitTimeoutDependency ,
119+ timeout_policy : TimeoutPolicyDependency ,
109120 user_id : str ,
110121):
111122 return await forward_request_then_wait_for_update (
@@ -122,6 +133,7 @@ async def update_user(
122133 authorization_header = r .headers .get ("Authorization" ),
123134 )
124135 ],
136+ timeout_policy = timeout_policy ,
125137 )
126138
127139
@@ -156,6 +168,7 @@ async def assign_user_role(
156168 client : FactsClientDependency ,
157169 update_subscriber : DataUpdateSubscriberDependency ,
158170 wait_timeout : WaitTimeoutDependency ,
171+ timeout_policy : TimeoutPolicyDependency ,
159172 user_id : str ,
160173):
161174 return await forward_request_then_wait_for_update (
@@ -165,6 +178,7 @@ async def assign_user_role(
165178 wait_timeout ,
166179 path = f"/users/{ user_id } /roles" ,
167180 entries_callback = create_role_assignment_data_entries ,
181+ timeout_policy = timeout_policy ,
168182 )
169183
170184
@@ -174,6 +188,7 @@ async def create_role_assignment(
174188 client : FactsClientDependency ,
175189 update_subscriber : DataUpdateSubscriberDependency ,
176190 wait_timeout : WaitTimeoutDependency ,
191+ timeout_policy : TimeoutPolicyDependency ,
177192):
178193 return await forward_request_then_wait_for_update (
179194 client ,
@@ -182,6 +197,7 @@ async def create_role_assignment(
182197 wait_timeout ,
183198 path = "/role_assignments" ,
184199 entries_callback = create_role_assignment_data_entries ,
200+ timeout_policy = timeout_policy ,
185201 )
186202
187203
@@ -191,6 +207,7 @@ async def create_resource_instance(
191207 client : FactsClientDependency ,
192208 update_subscriber : DataUpdateSubscriberDependency ,
193209 wait_timeout : WaitTimeoutDependency ,
210+ timeout_policy : TimeoutPolicyDependency ,
194211):
195212 return await forward_request_then_wait_for_update (
196213 client ,
@@ -206,6 +223,7 @@ async def create_resource_instance(
206223 authorization_header = r .headers .get ("Authorization" ),
207224 ),
208225 ],
226+ timeout_policy = timeout_policy ,
209227 )
210228
211229
@@ -215,6 +233,7 @@ async def update_resource_instance(
215233 client : FactsClientDependency ,
216234 update_subscriber : DataUpdateSubscriberDependency ,
217235 wait_timeout : WaitTimeoutDependency ,
236+ timeout_policy : TimeoutPolicyDependency ,
218237 instance_id : str ,
219238):
220239 return await forward_request_then_wait_for_update (
@@ -231,6 +250,7 @@ async def update_resource_instance(
231250 authorization_header = r .headers .get ("Authorization" ),
232251 ),
233252 ],
253+ timeout_policy = timeout_policy ,
234254 )
235255
236256
@@ -240,6 +260,7 @@ async def create_relationship_tuple(
240260 client : FactsClientDependency ,
241261 update_subscriber : DataUpdateSubscriberDependency ,
242262 wait_timeout : WaitTimeoutDependency ,
263+ timeout_policy : TimeoutPolicyDependency ,
243264):
244265 return await forward_request_then_wait_for_update (
245266 client ,
@@ -255,6 +276,7 @@ async def create_relationship_tuple(
255276 authorization_header = r .headers .get ("Authorization" ),
256277 ),
257278 ],
279+ timeout_policy = timeout_policy ,
258280 )
259281
260282
@@ -266,6 +288,7 @@ async def forward_request_then_wait_for_update(
266288 * ,
267289 path : str ,
268290 entries_callback : Callable [[FastApiRequest , dict [str , Any ]], Iterable [DataSourceEntry ]],
291+ timeout_policy : TimeoutPolicy = TimeoutPolicy .IGNORE ,
269292) -> Response :
270293 response = await client .send_forward_request (request , path )
271294 body = client .extract_body (response )
@@ -278,11 +301,20 @@ async def forward_request_then_wait_for_update(
278301 logger .warning (f"Missing required field { e .args [0 ]} in the response body, skipping wait for update." )
279302 return client .convert_response (response )
280303
281- await update_subscriber .publish_and_wait (
304+ wait_result = await update_subscriber .publish_and_wait (
282305 data_update_entry ,
283306 timeout = wait_timeout ,
284307 )
285- return client .convert_response (response )
308+ if wait_result :
309+ return client .convert_response (response )
310+ elif timeout_policy == TimeoutPolicy .FAIL :
311+ logger .error ("Timeout waiting for update and policy is set to fail" )
312+ raise HTTPException (
313+ status_code = status .HTTP_424_FAILED_DEPENDENCY ,
314+ detail = "Timeout waiting for update to be received" ,
315+ )
316+ else :
317+ logger .warning ("Timeout waiting for update and policy is set to ignore" )
286318
287319
288320@facts_router .api_route (
0 commit comments