Skip to content

Commit cd718ba

Browse files
doks5dakodakovduyguHsnHsn
authored
vdk-core: Adopt 'method' argument in pre-process plugins (#3074)
As part of 03dde45, the `pre_ingest_process` and `post_ingest_process` hooks of the IIngesterPlugin were updated to pass the ingest method to user-implemented ingestion plugins. This change updates the IngesterBase to pass the `method` argument from the ingested payloads to the `pre_ingest_process` and `post_ingest_process` hooks of the registered ingester plugins. Testing Done: Updated tests Signed-off-by: Andon Andonov <[email protected]> Co-authored-by: dakodakov <[email protected]> Co-authored-by: Duygu Hasan <[email protected]>
1 parent 78e0df8 commit cd718ba

File tree

2 files changed

+18
-1
lines changed

2 files changed

+18
-1
lines changed

projects/vdk-core/src/vdk/internal/builtin_plugins/ingestion/ingester_base.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,9 +437,16 @@ def _payload_poster_thread(self):
437437
destination_table: Optional[str] = None
438438
target: Optional[str] = None
439439
collection_id: Optional[str] = None
440+
method: Optional[str] = None
440441
try:
441442
payload = self._payloads_queue.get()
442-
payload_obj, destination_table, _, target, collection_id = payload
443+
(
444+
payload_obj,
445+
destination_table,
446+
method,
447+
target,
448+
collection_id,
449+
) = payload
443450

444451
# If there are any pre-processors set, pass the payload object
445452
# through them.
@@ -450,6 +457,7 @@ def _payload_poster_thread(self):
450457
target=target,
451458
collection_id=collection_id,
452459
metadata=ingestion_metadata,
460+
method=method,
453461
)
454462

455463
# Verify payload after pre-processing it, since this preprocessing might be responsible for
@@ -512,6 +520,7 @@ def _payload_poster_thread(self):
512520
collection_id=collection_id,
513521
metadata=ingestion_metadata,
514522
exception=exception,
523+
method=method,
515524
)
516525
except Exception as e:
517526
resolvable_by = errors.get_exception_resolvable_by(e)
@@ -575,6 +584,7 @@ def _pre_process_payload(
575584
target: Optional[str],
576585
collection_id: Optional[str],
577586
metadata: Optional[IIngesterPlugin.IngestionMetadata],
587+
method: Optional[str],
578588
) -> Tuple[List[dict], Optional[IIngesterPlugin.IngestionMetadata]]:
579589
for plugin in self._pre_processors:
580590
try:
@@ -584,6 +594,7 @@ def _pre_process_payload(
584594
target=target,
585595
collection_id=collection_id,
586596
metadata=metadata,
597+
method=method,
587598
)
588599
except Exception as e:
589600
raise PreProcessPayloadIngestionException(
@@ -611,6 +622,7 @@ def _execute_post_process_operations(
611622
collection_id: Optional[str],
612623
metadata: Optional[IIngesterPlugin.IngestionMetadata],
613624
exception: Optional[Exception],
625+
method: Optional[str],
614626
):
615627
for plugin in self._post_processors:
616628
try:
@@ -621,6 +633,7 @@ def _execute_post_process_operations(
621633
collection_id=collection_id,
622634
metadata=metadata,
623635
exception=exception,
636+
method=method,
624637
)
625638
except Exception as e:
626639
raise PostProcessPayloadIngestionException(

projects/vdk-core/tests/vdk/internal/builtin_plugins/ingestion/test_ingester_base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ def test_pre_ingestion_operation():
245245
target=shared_test_values.get("target"),
246246
collection_id=shared_test_values.get("collection_id"),
247247
metadata=None,
248+
method=shared_test_values.get("method"),
248249
)
249250
ingester_base._ingester.ingest_payload.assert_called_once()
250251

@@ -276,6 +277,7 @@ def test_pre_ingestion_updated_dynamic_params():
276277
target=shared_test_values.get("target"),
277278
collection_id=shared_test_values.get("collection_id"),
278279
metadata=None,
280+
method=shared_test_values.get("method"),
279281
)
280282
ingester_base._ingester.ingest_payload.assert_called_with(
281283
collection_id=shared_test_values.get("collection_id"),
@@ -335,6 +337,7 @@ def test_ingest_payload_and_post_ingestion_operation():
335337
collection_id=shared_test_values.get("collection_id"),
336338
metadata=test_ingestion_metadata,
337339
exception=None,
340+
method=shared_test_values.get("method"),
338341
)
339342

340343

@@ -361,4 +364,5 @@ def test_post_ingestion_operation_with_exceptions():
361364
collection_id=shared_test_values.get("collection_id"),
362365
metadata=None,
363366
exception=test_exception,
367+
method=shared_test_values.get("method"),
364368
)

0 commit comments

Comments
 (0)