Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: b117307c-14b6-41aa-9422-947e34922962
dockerImageTag: 2.7.0
dockerImageTag: 2.7.1
releases:
rolloutConfiguration:
enableProgressiveRollout: false
Expand All @@ -28,7 +28,6 @@ data:
registryOverrides:
cloud:
enabled: true
dockerImageTag: 2.6.5
oss:
enabled: true
releaseStage: generally_available
Expand Down
6 changes: 3 additions & 3 deletions airbyte-integrations/connectors/source-salesforce/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.7.0"
version = "2.7.1"
name = "source-salesforce"
description = "Source implementation for Salesforce."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from source_salesforce.api import UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS
from source_salesforce.streams import LOOKBACK_SECONDS

from airbyte_cdk.models import AirbyteStateBlob, SyncMode
from airbyte_cdk.models import SyncMode
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
from airbyte_cdk.test.state_builder import StateBuilder
from integration.utils import create_base_url, given_authentication, given_stream, read
Expand All @@ -37,14 +37,13 @@ def create_http_request(stream_name: str, field_names: List[str], access_token:
)


def create_http_response(field_names: List[str], record_count: int = 1) -> HttpResponse:
def create_http_response(field_names: List[str], record_count: int = 1, cursor_value: str = "2021-01-18T21:18:20.000Z") -> HttpResponse:
"""
This method does not handle field types for now which may cause some test failures on change if we start considering using some
fields for calculation. One example of that would be cursor field parsing to datetime.
"""
records = [
{field: "2021-01-18T21:18:20.000Z" if field in {"SystemModstamp"} else f"{field}_value" for field in field_names}
for i in range(record_count)
{field: cursor_value if field in {"SystemModstamp"} else f"{field}_value" for field in field_names} for i in range(record_count)
]
return HttpResponse(json.dumps({"records": records}))

Expand Down Expand Up @@ -156,15 +155,24 @@ def test_given_sequential_state_when_read_then_migrate_to_partitioned_state(self

def test_given_partitioned_state_when_read_then_sync_missing_partitions_and_update_state(self) -> None:
missing_chunk = (_NOW - timedelta(days=5), _NOW - timedelta(days=3))
cursor_value = _to_partitioned_datetime(_NOW - timedelta(days=2))
most_recent_state_value = _NOW - timedelta(days=1)
start = _calculate_start_time(_NOW - timedelta(days=10))
state = StateBuilder().with_stream_state(
_STREAM_NAME,
{
"state_type": "date-range",
"slices": [
{"start": start.strftime("%Y-%m-%dT%H:%M:%S.000") + "Z", "end": _to_partitioned_datetime(missing_chunk[0])},
{"start": _to_partitioned_datetime(missing_chunk[1]), "end": _to_partitioned_datetime(most_recent_state_value)},
{
"start": start.strftime("%Y-%m-%dT%H:%M:%S.000") + "Z",
"end": _to_partitioned_datetime(missing_chunk[0]),
"most_recent_cursor_value": _to_partitioned_datetime(missing_chunk[0]),
},
{
"start": _to_partitioned_datetime(missing_chunk[1]),
"end": _to_partitioned_datetime(most_recent_state_value),
"most_recent_cursor_value": _to_partitioned_datetime(missing_chunk[1]),
},
],
},
)
Expand All @@ -180,7 +188,7 @@ def test_given_partitioned_state_when_read_then_sync_missing_partitions_and_upda
HttpRequest(
f"{_BASE_URL}/queryAll?q=SELECT+{_A_FIELD_NAME},{_CURSOR_FIELD}+FROM+{_STREAM_NAME}+WHERE+SystemModstamp+%3E%3D+{_to_url(most_recent_state_value - _LOOKBACK_WINDOW)}+AND+SystemModstamp+%3C+{_to_url(_NOW)}"
),
create_http_response([_A_FIELD_NAME, _CURSOR_FIELD], record_count=1),
create_http_response([_A_FIELD_NAME, _CURSOR_FIELD], record_count=1, cursor_value=cursor_value),
)

output = read(_STREAM_NAME, SyncMode.incremental, self._config, state)
Expand All @@ -192,7 +200,7 @@ def test_given_partitioned_state_when_read_then_sync_missing_partitions_and_upda
{
"start": _to_partitioned_datetime(start),
"end": _to_partitioned_datetime(_NOW),
"most_recent_cursor_value": "2021-01-18T21:18:20.000Z",
"most_recent_cursor_value": cursor_value,
}
],
}
Loading
Loading