Skip to content

Commit 2628f96

Browse files
authored
[Core] feat: add live_event kafka metadata (#2936)
# Description What: Added type: "live-event" metadata when sending live events (webhooks) to lakehouse, allowing differentiation between sync data and live event data in Snowflake. Why: To enable downstream consumers to distinguish between data ingested via full sync vs real-time webhook events, enabling different processing logic or analytics. How: Summary - Added type field to lakehouse request body from Ocean live events - Propagated type through lakehouse-writer pipeline into kafka_metadata in Snowflake ## Type of change Please leave one option from the following and delete the rest: - [ ] Bug fix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] New Integration (non-breaking change which adds a new integration) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] Non-breaking change (fix of existing functionality that will not change current behavior) - [ ] Documentation (added/updated documentation) <h4> All tests should be run against the port production environment(using a testing org). </h4> ### Core testing checklist - [ ] Integration able to create all default resources from scratch - [ ] Resync finishes successfully - [ ] Resync able to create entities - [ ] Resync able to update entities - [ ] Resync able to detect and delete entities - [ ] Scheduled resync able to abort existing resync and start a new one - [ ] Tested with at least 2 integrations from scratch - [ ] Tested with Kafka and Polling event listeners - [ ] Tested deletion of entities that don't pass the selector ### Integration testing checklist - [ ] Integration able to create all default resources from scratch - [ ] Completed a full resync from a freshly installed integration and it completed successfully - [ ] Resync able to create entities - [ ] Resync able to update entities - [ ] Resync able to detect and delete entities - [ ] Resync finishes successfully - [ ] If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the `examples` folder in the integration directory. - [ ] If resource kind is updated, run the integration with the example data and check if the expected result is achieved - [ ] If new resource kind is added or updated, validate that live-events for that resource are working as expected - [ ] Docs PR link [here](#) ### Preflight checklist - [ ] Handled rate limiting - [ ] Handled pagination - [ ] Implemented the code in async - [ ] Support Multi account ## Screenshots Include screenshots from your environment showing how the resources of the integration will look. ## API Documentation Provide links to the API documentation used for this integration.
1 parent 9d7a888 commit 2628f96

File tree

7 files changed

+77
-8
lines changed

7 files changed

+77
-8
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
77

88
<!-- towncrier release notes start -->
99

10+
## 0.38.15 (2026-03-15)
11+
12+
### Improvements
13+
14+
- Add type:live_event in body to signify live_event ingested data in lakehouse
15+
1016
## 0.38.14 (2026-03-15)
1117

1218
### Improvements

port_ocean/clients/port/mixins/integrations.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -283,20 +283,25 @@ async def post_integration_raw_data(
283283
sync_id: str,
284284
kind: str,
285285
operation: LakehouseOperation = LakehouseOperation.UPSERT,
286+
data_type: str | None = None,
286287
) -> None:
287288
logger.debug(
288289
"starting POST raw data request", raw_data=raw_data, operation=operation
289290
)
290291
headers = await self.auth.headers()
291292

293+
body: dict[str, Any] = {
294+
"items": raw_data,
295+
"extractionTimestamp": int(datetime.now().timestamp() * 1000),
296+
"operation": operation.value,
297+
}
298+
if data_type is not None:
299+
body["type"] = data_type
300+
292301
response = await self.client.post(
293302
f"{self.auth.ingest_url}/lake/write/integration-type/{quote_plus(self.auth.integration_type)}/integration/{quote_plus(self.integration_identifier)}/sync/{quote_plus(sync_id)}/kind/{quote_plus(kind)}",
294303
headers=headers,
295-
json={
296-
"items": raw_data,
297-
"extractionTimestamp": int(datetime.now().timestamp() * 1000),
298-
"operation": operation.value,
299-
},
304+
json=body,
300305
)
301306
handle_port_status_code(response, should_raise=False, should_log=True)
302307
logger.debug("Finished POST raw data request")

port_ocean/core/integrations/mixins/live_events.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ async def _send_webhook_raw_data_to_lakehouse(
118118
event_id,
119119
kind,
120120
operation=LakehouseOperation.UPSERT,
121+
data_type="live-event",
121122
)
122123
except Exception as e:
123124
logger.warning(
@@ -138,6 +139,7 @@ async def _send_webhook_raw_data_to_lakehouse(
138139
event_id,
139140
kind,
140141
operation=LakehouseOperation.DELETE,
142+
data_type="live-event",
141143
)
142144
except Exception as e:
143145
logger.warning(

port_ocean/core/integrations/mixins/sync_raw.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ async def _register_in_batches(
424424

425425
if lakehouse_data_enabled and raw_results:
426426
await ocean.port_client.post_integration_raw_data(
427-
raw_results, event.id, resource_config.kind
427+
raw_results, event.id, resource_config.kind, data_type="resync"
428428
)
429429

430430
logger.info(
@@ -467,7 +467,7 @@ async def _register_in_batches(
467467
batch_index += 1
468468
if lakehouse_data_enabled:
469469
await ocean.port_client.post_integration_raw_data(
470-
items, event.id, resource_config.kind
470+
items, event.id, resource_config.kind, data_type="resync"
471471
)
472472
number_of_raw_results += len(items)
473473
if send_raw_data_examples_amount > 0:

port_ocean/tests/clients/port/mixins/test_integrations_lakehouse.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,3 +171,55 @@ async def test_post_integration_raw_data_extraction_timestamp(
171171
assert "extractionTimestamp" in expected_json
172172
assert isinstance(expected_json["extractionTimestamp"], int)
173173
assert expected_json["extractionTimestamp"] > 0
174+
175+
176+
async def test_post_integration_raw_data_with_data_type(
177+
lakehouse_integration_client: IntegrationClientMixin,
178+
) -> None:
179+
"""Test post_integration_raw_data with data_type parameter for live events."""
180+
raw_data = [{"name": "test-entity"}]
181+
sync_id = "webhook-event-123"
182+
kind = "repository"
183+
184+
with patch("port_ocean.clients.port.mixins.integrations.handle_port_status_code"):
185+
await lakehouse_integration_client.post_integration_raw_data(
186+
raw_data,
187+
sync_id,
188+
kind,
189+
operation=LakehouseOperation.UPSERT,
190+
data_type="live-event",
191+
)
192+
193+
lakehouse_integration_client.client.post.assert_called_once()
194+
call_args = lakehouse_integration_client.client.post.call_args
195+
196+
expected_json = call_args[1]["json"]
197+
assert expected_json["items"] == raw_data
198+
assert expected_json["operation"] == "upsert"
199+
assert expected_json["type"] == "live-event"
200+
201+
202+
async def test_post_integration_raw_data_with_resync_data_type(
203+
lakehouse_integration_client: IntegrationClientMixin,
204+
) -> None:
205+
"""Test post_integration_raw_data with data_type parameter for resync."""
206+
raw_data = [{"name": "test-entity"}]
207+
sync_id = "resync-123"
208+
kind = "repository"
209+
210+
with patch("port_ocean.clients.port.mixins.integrations.handle_port_status_code"):
211+
await lakehouse_integration_client.post_integration_raw_data(
212+
raw_data,
213+
sync_id,
214+
kind,
215+
operation=LakehouseOperation.UPSERT,
216+
data_type="resync",
217+
)
218+
219+
lakehouse_integration_client.client.post.assert_called_once()
220+
call_args = lakehouse_integration_client.client.post.call_args
221+
222+
expected_json = call_args[1]["json"]
223+
assert expected_json["items"] == raw_data
224+
assert expected_json["operation"] == "upsert"
225+
assert expected_json["type"] == "resync"

port_ocean/tests/core/handlers/mixins/test_live_events_lakehouse.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ async def test_send_webhook_raw_data_to_lakehouse_enabled_upsert(
211211
"test-event-id",
212212
"repository",
213213
operation=LakehouseOperation.UPSERT,
214+
data_type="live-event",
214215
)
215216

216217

@@ -251,6 +252,7 @@ async def test_send_webhook_raw_data_to_lakehouse_enabled_delete(
251252
"test-event-id",
252253
"repository",
253254
operation=LakehouseOperation.DELETE,
255+
data_type="live-event",
254256
)
255257

256258

@@ -356,12 +358,14 @@ async def test_send_webhook_raw_data_to_lakehouse_both_operations(
356358
"test-event-id",
357359
"repository",
358360
operation=LakehouseOperation.UPSERT,
361+
data_type="live-event",
359362
),
360363
call(
361364
delete_data,
362365
"test-event-id",
363366
"repository",
364367
operation=LakehouseOperation.DELETE,
368+
data_type="live-event",
365369
),
366370
]
367371
)

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "port-ocean"
3-
version = "0.38.14"
3+
version = "0.38.15"
44
description = "Port Ocean is a CLI tool for managing your Port projects."
55
readme = "README.md"
66
homepage = "https://app.getport.io"

0 commit comments

Comments
 (0)