Skip to content

Commit 042ad76

Browse files
authored
[Core] Fix itemsToParse not applied during webhook live event processing (#2868)
# Description What - Fixed `itemsToParse` expansion being silently skipped during webhook/live event processing, causing entity mapping failures for any integration using `itemsToParse` (e.g. GitLab-v2 `file` kind with YAML arrays). Why - The live events path in `_parse_raw_event_results_to_entities` passed raw items directly to `entity_processor.parse_items()` without expansion, and `handle_items_to_parse` had an implicit dependency on the `event` context which is only populated during resync — making it impossible to safely reuse in the live events path. How - Refactored `handle_items_to_parse` to accept `items_to_parse_top_level_transform` as an explicit parameter (removing the event context dependency), then added a `_expand_raw_item` async generator to `LiveEventsMixin` that applies the expansion when `itemsToParse` is configured before passing items to `entity_processor.parse_items()`. ## Type of change Please leave one option from the following and delete the rest: - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] New Integration (non-breaking change which adds a new integration) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] Non-breaking change (fix of existing functionality that will not change current behavior) - [ ] Documentation (added/updated documentation) <h4> All tests should be run against the port production environment(using a testing org). </h4> ### Core testing checklist - [ ] Integration able to create all default resources from scratch - [ ] Resync finishes successfully - [ ] Resync able to create entities - [ ] Resync able to update entities - [ ] Resync able to detect and delete entities - [ ] Scheduled resync able to abort existing resync and start a new one - [ ] Tested with at least 2 integrations from scratch - [ ] Tested with Kafka and Polling event listeners - [ ] Tested deletion of entities that don't pass the selector ### Integration testing checklist - [ ] Integration able to create all default resources from scratch - [ ] Completed a full resync from a freshly installed integration and it completed successfully - [ ] Resync able to create entities - [ ] Resync able to update entities - [ ] Resync able to detect and delete entities - [ ] Resync finishes successfully - [ ] If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the `examples` folder in the integration directory. - [ ] If resource kind is updated, run the integration with the example data and check if the expected result is achieved - [ ] If new resource kind is added or updated, validate that live-events for that resource are working as expected - [ ] Docs PR link [here](#) ### Preflight checklist - [ ] Handled rate limiting - [ ] Handled pagination - [ ] Implemented the code in async - [ ] Support Multi account ## Screenshots Include screenshots from your environment showing how the resources of the integration will look. ## API Documentation Provide links to the API documentation used for this integration.
1 parent ec2697a commit 042ad76

File tree

7 files changed

+234
-110
lines changed

7 files changed

+234
-110
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
77

88
<!-- towncrier release notes start -->
99

10+
## 0.38.11 (2026-03-11)
11+
12+
### Improvements
13+
14+
- Added support for items to parse in live events context
15+
16+
1017
## 0.38.10 (2026-03-10)
1118

1219
### Improvements

port_ocean/core/integrations/mixins/live_events.py

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1+
from typing import AsyncGenerator
2+
13
from loguru import logger
24
from port_ocean.clients.port.types import UserAgentType
5+
from port_ocean.core.handlers.port_app_config.models import ResourceConfig
36
from port_ocean.core.handlers.webhook.webhook_event import WebhookEventRawResults
47
from port_ocean.core.integrations.mixins.handler import HandlerMixin
5-
from port_ocean.core.integrations.mixins.utils import is_lakehouse_data_enabled
8+
from port_ocean.core.integrations.mixins.utils import handle_items_to_parse, is_lakehouse_data_enabled
69
from port_ocean.core.models import Entity, LakehouseOperation
10+
from port_ocean.core.ocean_types import RAW_ITEM
711
from port_ocean.context.ocean import ocean
812

913

@@ -28,6 +32,20 @@ async def sync_raw_results(self, webhook_events_raw_result: list[WebhookEventRaw
2832
await self._delete_entities(entities_to_delete)
2933

3034

35+
async def _expand_raw_item(
36+
self, raw_item: RAW_ITEM, resource: ResourceConfig
37+
) -> AsyncGenerator[list[RAW_ITEM], None]:
38+
if resource.port.items_to_parse:
39+
async for batch in handle_items_to_parse(
40+
[raw_item],
41+
resource.port.items_to_parse_name,
42+
resource.port.items_to_parse,
43+
resource.port.items_to_parse_top_level_transform,
44+
):
45+
yield batch
46+
else:
47+
yield [raw_item]
48+
3149
async def _parse_raw_event_results_to_entities(self, webhook_events_raw_result: list[WebhookEventRawResults]) -> tuple[list[Entity], list[Entity]]:
3250
"""Parse the webhook event raw results and return a list of entities.
3351
@@ -38,18 +56,21 @@ async def _parse_raw_event_results_to_entities(self, webhook_events_raw_result:
3856
entities_not_passed: list[Entity] = []
3957
entities_to_delete: list[Entity] = []
4058
for webhook_event_raw_result in webhook_events_raw_result:
59+
resource = webhook_event_raw_result.resource
4160
for raw_item in webhook_event_raw_result.updated_raw_results:
42-
calaculation_results = await self.entity_processor.parse_items(
43-
webhook_event_raw_result.resource, [raw_item], parse_all=True, send_raw_data_examples_amount=0
44-
)
45-
entities.extend(calaculation_results.entity_selector_diff.passed)
46-
entities_not_passed.extend(calaculation_results.entity_selector_diff.failed)
61+
async for batch in self._expand_raw_item(raw_item, resource):
62+
calculation_results = await self.entity_processor.parse_items(
63+
resource, batch, parse_all=True, send_raw_data_examples_amount=0
64+
)
65+
entities.extend(calculation_results.entity_selector_diff.passed)
66+
entities_not_passed.extend(calculation_results.entity_selector_diff.failed)
4767

4868
for raw_item in webhook_event_raw_result.deleted_raw_results:
49-
deletion_results = await self.entity_processor.parse_items(
50-
webhook_event_raw_result.resource, [raw_item], parse_all=True, send_raw_data_examples_amount=0
51-
)
52-
entities_to_delete.extend(deletion_results.entity_selector_diff.passed)
69+
async for batch in self._expand_raw_item(raw_item, resource):
70+
deletion_results = await self.entity_processor.parse_items(
71+
resource, batch, parse_all=True, send_raw_data_examples_amount=0
72+
)
73+
entities_to_delete.extend(deletion_results.entity_selector_diff.passed)
5374

5475
entities_to_remove = []
5576
for entity in entities_to_delete + entities_not_passed:

port_ocean/core/integrations/mixins/sync_raw.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ async def _execute_resync_tasks(
127127
resource_config.kind,
128128
resource_config.port.items_to_parse_name,
129129
resource_config.port.items_to_parse,
130+
resource_config.port.items_to_parse_top_level_transform,
130131
)
131132
)
132133
else:

port_ocean/core/integrations/mixins/utils.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from loguru import logger
88

99
from port_ocean.clients.port.utils import _http_client as _port_http_client
10-
from port_ocean.context.event import event
1110
from port_ocean.context.ocean import ocean
1211
from port_ocean.core.handlers import JQEntityProcessor
1312
from port_ocean.core.ocean_types import (
@@ -133,7 +132,7 @@ async def resync_function_wrapper(
133132
results = await fn(kind)
134133
return validate_result(results)
135134

136-
async def handle_items_to_parse(result: RAW_RESULT, items_to_parse_name: str, items_to_parse: str | None = None) -> AsyncGenerator[list[dict[str, Any]], None]:
135+
async def handle_items_to_parse(result: RAW_RESULT, items_to_parse_name: str, items_to_parse: str | None = None, items_to_parse_top_level_transform: bool = True) -> AsyncGenerator[list[dict[str, Any]], None]:
137136
delete_target = extract_jq_deletion_path_revised(items_to_parse) or '.'
138137
jq_expression = f". | del({delete_target})"
139138
batch_size = ocean.config.yield_items_to_parse_batch_size
@@ -142,8 +141,14 @@ async def handle_items_to_parse(result: RAW_RESULT, items_to_parse_name: str, it
142141

143142
for item in result:
144143
items_to_parse_data = await entity_processor._search(item, items_to_parse)
145-
if event.resource_config.port.items_to_parse_top_level_transform:
146-
item = await entity_processor._search(item, jq_expression)
144+
if items_to_parse_top_level_transform:
145+
transformed = await entity_processor._search(item, jq_expression)
146+
if transformed is None:
147+
logger.warning(
148+
f"Top-level transform '{jq_expression}' returned None for item, skipping..."
149+
)
150+
continue
151+
item = transformed
147152
if not isinstance(items_to_parse_data, list):
148153
logger.warning(
149154
f"Failed to parse items for JQ expression {items_to_parse}, Expected list but got {type(items_to_parse_data)}."
@@ -162,7 +167,7 @@ async def handle_items_to_parse(result: RAW_RESULT, items_to_parse_name: str, it
162167
yield batch
163168

164169
async def resync_generator_wrapper(
165-
fn: Callable[[str], ASYNC_GENERATOR_RESYNC_TYPE], kind: str, items_to_parse_name: str, items_to_parse: str | None = None
170+
fn: Callable[[str], ASYNC_GENERATOR_RESYNC_TYPE], kind: str, items_to_parse_name: str, items_to_parse: str | None = None, items_to_parse_top_level_transform: bool = True
166171
) -> ASYNC_GENERATOR_RESYNC_TYPE:
167172
generator = fn(kind)
168173
errors = []
@@ -173,7 +178,8 @@ async def resync_generator_wrapper(
173178
result = validate_result(await anext(generator))
174179

175180
if items_to_parse:
176-
items_to_parse_generator = handle_items_to_parse(result, items_to_parse_name, items_to_parse)
181+
items_to_parse_generator = handle_items_to_parse(result, items_to_parse_name, items_to_parse, items_to_parse_top_level_transform)
182+
del result
177183
async for batch in items_to_parse_generator:
178184
yield batch
179185
else:

port_ocean/tests/core/handlers/mixins/test_live_events.py

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,160 @@ async def test_sync_raw_results_one_raw_result_entity_upserted(
424424
mock_live_events_mixin.entities_state_applier.delete.assert_not_called()
425425

426426

427+
file_resource_config_with_items_to_parse = ResourceConfig(
428+
kind="file",
429+
selector=Selector(query="true"),
430+
port=PortResourceConfig(
431+
entity=MappingsConfig(
432+
mappings=EntityMapping(
433+
identifier=".item.identifier",
434+
title=".item.title",
435+
blueprint='"fileBlueprint"',
436+
properties={"repoName": ".repo.name"},
437+
relations={},
438+
)
439+
),
440+
itemsToParse=".file.content",
441+
),
442+
)
443+
444+
file_raw_item_with_array = {
445+
"file": {
446+
"path": "port.yml",
447+
"content": [
448+
{"identifier": "svc-one", "title": "Service One"},
449+
{"identifier": "svc-two", "title": "Service Two"},
450+
],
451+
},
452+
"repo": {"name": "my-repo"},
453+
}
454+
455+
456+
@pytest.mark.asyncio
457+
async def test_expand_raw_item_without_items_to_parse(
458+
mock_live_events_mixin: LiveEventsMixin,
459+
) -> None:
460+
"""When itemsToParse is not configured, _expand_raw_item yields the original item unchanged"""
461+
resource = ResourceConfig(
462+
kind="repository",
463+
selector=Selector(query="true"),
464+
port=PortResourceConfig(
465+
entity=MappingsConfig(
466+
mappings=EntityMapping(
467+
identifier=".name",
468+
title=".name",
469+
blueprint='"service"',
470+
properties={},
471+
relations={},
472+
)
473+
)
474+
),
475+
)
476+
raw_item = {"name": "my-repo", "url": "https://example.com/my-repo"}
477+
478+
batches: list[Any] = []
479+
async for batch in mock_live_events_mixin._expand_raw_item(raw_item, resource):
480+
batches.extend(batch)
481+
482+
assert batches == [raw_item]
483+
484+
485+
@pytest.mark.asyncio
486+
async def test_expand_raw_item_with_items_to_parse(
487+
mock_live_events_mixin: LiveEventsMixin,
488+
mock_context: PortOceanContext,
489+
) -> None:
490+
"""When itemsToParse is configured, _expand_raw_item fans out array elements into separate items"""
491+
mock_ocean_utils = MagicMock()
492+
mock_ocean_utils.config.yield_items_to_parse_batch_size = 100
493+
mock_ocean_utils.app.integration.entity_processor = JQEntityProcessor(mock_context)
494+
495+
with patch("port_ocean.core.integrations.mixins.utils.ocean", mock_ocean_utils):
496+
batches: list[Any] = []
497+
async for batch in mock_live_events_mixin._expand_raw_item(
498+
file_raw_item_with_array, file_resource_config_with_items_to_parse
499+
):
500+
batches.extend(batch)
501+
502+
assert len(batches) == 2
503+
assert batches[0]["item"] == {"identifier": "svc-one", "title": "Service One"}
504+
assert batches[1]["item"] == {"identifier": "svc-two", "title": "Service Two"}
505+
assert "content" not in batches[0].get("file", {})
506+
assert batches[0]["repo"] == {"name": "my-repo"}
507+
508+
509+
@pytest.mark.asyncio
510+
async def test_parse_raw_event_results_items_to_parse_expansion(
511+
mock_live_events_mixin: LiveEventsMixin,
512+
) -> None:
513+
"""When itemsToParse is configured, _parse_raw_event_results_to_entities produces one entity per array element"""
514+
expanded_item_one = {
515+
"file": {"path": "port.yml"},
516+
"repo": {"name": "my-repo"},
517+
"item": {"identifier": "svc-one", "title": "Service One"},
518+
}
519+
expanded_item_two = {
520+
"file": {"path": "port.yml"},
521+
"repo": {"name": "my-repo"},
522+
"item": {"identifier": "svc-two", "title": "Service Two"},
523+
}
524+
525+
entity_one = Entity(
526+
identifier="svc-one",
527+
blueprint="fileBlueprint",
528+
title="Service One",
529+
team=[],
530+
properties={"repoName": "my-repo"},
531+
relations={},
532+
)
533+
entity_two = Entity(
534+
identifier="svc-two",
535+
blueprint="fileBlueprint",
536+
title="Service Two",
537+
team=[],
538+
properties={"repoName": "my-repo"},
539+
relations={},
540+
)
541+
542+
async def mock_expand(raw_item: Any, resource: Any) -> Any:
543+
yield [expanded_item_one]
544+
yield [expanded_item_two]
545+
546+
mock_live_events_mixin._expand_raw_item = mock_expand # type: ignore
547+
548+
def make_calculation_result(passed: list[Entity]) -> CalculationResult:
549+
return CalculationResult(
550+
entity_selector_diff=EntitySelectorDiff(passed=passed, failed=[]),
551+
errors=[],
552+
misconfigured_entity_keys={},
553+
)
554+
555+
mock_live_events_mixin.entity_processor.parse_items = AsyncMock( # type: ignore
556+
side_effect=[
557+
make_calculation_result([entity_one]),
558+
make_calculation_result([entity_two]),
559+
]
560+
)
561+
562+
webhook_result = WebhookEventRawResults(
563+
updated_raw_results=[file_raw_item_with_array],
564+
deleted_raw_results=[],
565+
)
566+
webhook_result.resource = file_resource_config_with_items_to_parse
567+
568+
entities_to_create, entities_to_delete = (
569+
await mock_live_events_mixin._parse_raw_event_results_to_entities(
570+
[webhook_result]
571+
)
572+
)
573+
574+
assert len(entities_to_create) == 2
575+
assert entities_to_create[0].identifier == "svc-one"
576+
assert entities_to_create[1].identifier == "svc-two"
577+
assert entities_to_delete == []
578+
assert mock_live_events_mixin.entity_processor.parse_items.call_count == 2
579+
580+
427581
@pytest.mark.asyncio
428582
async def test_sync_raw_results_entity_deleted(
429583
mock_live_events_mixin: LiveEventsMixin,

0 commit comments

Comments
 (0)