@@ -25,14 +25,18 @@ def mock_stac_api_job_database(mock_auth) -> STACAPIJobDatabase:
2525
2626
2727@pytest .fixture
28- def mock_pystac_client ():
28+ def mock_pystac_client (dummy_stac_item ):
2929 mock_client = MagicMock (spec = pystac_client .Client )
3030
3131 mock_client .get_collections .return_value = [
3232 MagicMock (id = "collection-1" ),
3333 MagicMock (id = "collection-2" ),
3434 ]
3535
36+ mock_item_search = MagicMock (spec = pystac_client .ItemSearch )
37+ mock_item_search .items .return_value = [dummy_stac_item ]
38+ mock_client .search .return_value = mock_item_search
39+
3640 with patch ("pystac_client.Client.open" , return_value = mock_client ):
3741 yield mock_client
3842
@@ -195,6 +199,16 @@ def patch_datetime_now():
195199 yield mock_datetime
196200
197201
202+ @pytest .fixture
203+ def bulk_dataframe ():
204+ return pd .DataFrame (
205+ {
206+ "some_property" : [f"value-{ i } " for i in range (10 )],
207+ },
208+ index = [i for i in range (10 )],
209+ )
210+
211+
198212class TestSTACAPIJobDatabase :
199213 def test_exists (self , job_db_exists , job_db_not_exists ):
200214
@@ -246,7 +260,6 @@ def test_initialize_from_df_with_geometry(
246260 assert job_db_not_exists .geometry_column == "geometry"
247261
248262 def test_series_from (self , job_db_exists , dummy_series , dummy_stac_item ):
249- job_db_exists .has_geometry = True
250263 pdt .assert_series_equal (job_db_exists .series_from (dummy_stac_item ), dummy_series )
251264
252265 def test_item_from (self , patch_datetime_now , job_db_exists , dummy_series , dummy_stac_item ):
@@ -260,12 +273,119 @@ def test_item_from_geometry(
260273 item = job_db_exists .item_from (dummy_series_geometry )
261274 assert item .to_dict () == dummy_stac_item_geometry .to_dict ()
262275
263- def test_count_by_status (self ):
264- pass
265-
266- def test_get_by_status (self ):
267- pass
276+ @patch ("openeo.extra.stac_job_db.STACAPIJobDatabase.get_by_status" )
277+ def test_count_by_status (self , mock_get_by_status , normalized_dummy_dataframe , job_db_exists ):
278+ mock_get_by_status .return_value = normalized_dummy_dataframe
279+ assert job_db_exists .count_by_status () == {"not_started" : 1 }
280+
281+ def test_get_by_status_no_filter (self , job_db_exists ):
282+ job_db_exists .get_by_status (())
283+
284+ job_db_exists .client .search .assert_called_once_with (
285+ method = "GET" , collections = ["collection-1" ], filter = None , max_items = None
286+ )
287+
288+ def test_get_by_status_with_filter (self , job_db_exists ):
289+ job_db_exists .get_by_status (["not_started" ])
290+ job_db_exists .client .search .assert_called_once_with (
291+ method = "GET" , collections = ["collection-1" ], filter = "\" properties.status\" ='not_started'" , max_items = None
292+ )
293+
294+ def test_get_by_status_result (self , job_db_exists ):
295+ df = job_db_exists .get_by_status (["not_started" ])
296+
297+ pdt .assert_frame_equal (
298+ df ,
299+ pd .DataFrame (
300+ {
301+ "datetime" : [pystac .utils .datetime_to_str (FAKE_NOW )],
302+ "some_property" : ["value" ],
303+ },
304+ index = ["test" ],
305+ ),
306+ )
307+
308+ @patch ("requests.post" )
309+ def test_persist_single_chunk (self , mock_requests_post , bulk_dataframe , job_db_exists , patch_datetime_now ):
310+ def bulk_items (df ):
311+ all_items = []
312+ if not df .empty :
313+
314+ def handle_row (series ):
315+ item = job_db_exists .item_from (series )
316+ job_db_exists ._prepare_item (item , job_db_exists .collection_id )
317+ all_items .append (item )
318+
319+ df .apply (handle_row , axis = 1 )
320+ return all_items
321+
322+ items = bulk_items (bulk_dataframe )
323+
324+ mock_requests_post .return_value .status_code = 200
325+ mock_requests_post .return_value .json .return_value = {"status" : "success" }
326+ mock_requests_post .reason = "OK"
327+
328+ job_db_exists .persist (bulk_dataframe )
329+ # job_db_exists._upload_items_bulk(collection_id=job_db_exists.collection_id, items=items)
330+
331+ mock_requests_post .assert_called_once ()
332+
333+ mock_requests_post .assert_called_with (
334+ url = f"http://fake-stac-api/collections/{ job_db_exists .collection_id } /bulk_items" ,
335+ auth = None ,
336+ json = {
337+ "method" : "upsert" ,
338+ "items" : {item .id : item .to_dict () for item in items },
339+ },
340+ )
341+
342+ @patch ("requests.post" )
343+ def test_persist_multiple_chunks (self , mock_requests_post , bulk_dataframe , job_db_exists ):
344+ def bulk_items (df ):
345+ all_items = []
346+ if not df .empty :
347+
348+ def handle_row (series ):
349+ item = job_db_exists .item_from (series )
350+ job_db_exists ._prepare_item (item , job_db_exists .collection_id )
351+ all_items .append (item )
352+
353+ df .apply (handle_row , axis = 1 )
354+ return all_items
355+
356+ items = bulk_items (bulk_dataframe )
357+
358+ mock_requests_post .return_value .status_code = 200
359+ mock_requests_post .return_value .json .return_value = {"status" : "success" }
360+ mock_requests_post .reason = "OK"
361+
362+ job_db_exists .bulk_size = 3
363+ job_db_exists ._upload_items_bulk (collection_id = job_db_exists .collection_id , items = items )
364+
365+ # 10 items in total, 3 items per chunk, should result in 4 calls
366+ assert mock_requests_post .call_count == 4
367+ expected_calls = [
368+ {
369+ "url" : f"http://fake-stac-api/collections/{ job_db_exists .collection_id } /bulk_items" ,
370+ "auth" : None ,
371+ "json" : {"method" : "upsert" , "items" : {item .id : item .to_dict () for item in items [:3 ]}},
372+ },
373+ {
374+ "url" : f"http://fake-stac-api/collections/{ job_db_exists .collection_id } /bulk_items" ,
375+ "auth" : None ,
376+ "json" : {"method" : "upsert" , "items" : {item .id : item .to_dict () for item in items [3 :6 ]}},
377+ },
378+ {
379+ "url" : f"http://fake-stac-api/collections/{ job_db_exists .collection_id } /bulk_items" ,
380+ "auth" : None ,
381+ "json" : {"method" : "upsert" , "items" : {item .id : item .to_dict () for item in items [6 :9 ]}},
382+ },
383+ {
384+ "url" : f"http://fake-stac-api/collections/{ job_db_exists .collection_id } /bulk_items" ,
385+ "auth" : None ,
386+ "json" : {"method" : "upsert" , "items" : {item .id : item .to_dict () for item in items [9 :]}},
387+ },
388+ ]
268389
269- def test_persist (self ):
270- pass
271- # This should test upload items bulk
390+ for i , call in enumerate (mock_requests_post .call_args_list ):
391+ assert call [1 ] == expected_calls [i ]
0 commit comments