Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
586 changes: 586 additions & 0 deletions airbyte-integrations/connectors/source-marketo/components.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 9e0556f4-69df-4522-a3fb-03264d36b348
dockerImageTag: 1.6.1
dockerImageTag: 1.7.0
dockerRepository: airbyte/source-marketo
documentationUrl: https://docs.airbyte.com/integrations/sources/marketo
githubIssueLabel: source-marketo
Expand Down
2,227 changes: 1,533 additions & 694 deletions airbyte-integrations/connectors/source-marketo/poetry.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions airbyte-integrations/connectors/source-marketo/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "1.6.1"
version = "1.7.0"
name = "source-marketo"
description = "Source implementation for Marketo."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand All @@ -16,8 +16,8 @@ repository = "https://github.com/airbytehq/airbyte"
include = "source_marketo"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "2.3.0" # Newer CDK versions cause breaking changes. This is the latest version that we are sure can pass tests.
python = "^3.10,<3.12"
airbyte-cdk = "6.48.0"

[tool.poetry.scripts]
source-marketo = "source_marketo.run:run"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,190 @@ definitions:
path: ["program_id"]
value: "{{ stream_partition['program_id'] }}"

# -----------------------------------------------------------------------
# Bulk export streams (Leads + Activities) using AsyncRetriever
# -----------------------------------------------------------------------

# Leads bulk export stream
# API Docs: https://developers.marketo.com/rest-api/bulk-extract/bulk-lead-extract/
leads_stream:
type: DeclarativeStream
name: "leads"
primary_key: "id"
schema_loader:
type: CustomSchemaLoader
class_name: components.MarketoLeadsSchemaLoader
retriever:
type: AsyncRetriever
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: polling_job_timeout: 60 is 60 minutes — verify this is sufficient

The polling_job_timeout in AsyncRetriever is specified in minutes. The original implementation had no timeout — it polled indefinitely until the job completed or failed.

Marketo bulk exports for large datasets can take well over an hour. A 60-minute timeout may cause legitimate long-running exports to be treated as timeouts, failing the sync.

Consider:

  1. Increasing to 180 or 240 minutes, or
  2. Adding a config option for timeout, or
  3. Documenting this new behavioral difference (the original had no timeout)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 5f6419d. Increased polling_job_timeout from 60 to 240 minutes for both the leads and activities streams in manifest.yaml. This provides a generous buffer for large bulk exports while still having a finite timeout (the original had no timeout and polled indefinitely).

field_path: []
status_mapping:
type: AsyncJobStatusMap
running:
- "Queued"
- "Processing"
completed:
- "Completed"
failed:
- "Cancelled"
- "Failed"
timeout: []
status_extractor:
type: DpathExtractor
field_path:
- "result"
- "0"
- "status"
download_target_extractor:
type: DpathExtractor
field_path:
- "result"
- "0"
- "exportId"
creation_requester:
type: CustomRequester
class_name: components.MarketoBulkExportCreationRequester
create_requester:
type: HttpRequester
url_base: "{{ config['domain_url'].rstrip('/') }}/"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Activity stream naming diverges from clean_string() — will cause stream name mismatches

The components_mapping uses Jinja filters for stream naming:

value: "activities_{{ components_values['name'] | lower | replace(' ', '_') }}"

But the original Python code used clean_string() which has special handling for:

  • CamelCase splitting: "visitWebPage""visit_web_page" (Jinja would give "visitwebpage")
  • Abbreviation handling: "Change Data Value""change_data_value" but "Update SLA""update_sla" (Jinja lower doesn't split on case boundaries)
  • Special-case fixes: "api method name""api_method_name"

This means existing connections that have state saved under the clean_string()-derived names will see different stream names after this migration, causing:

  1. State loss (incremental syncs restart from the beginning)
  2. Possible duplicate data in destinations
  3. Broken configured catalogs

This is a breaking change despite the PR claiming non-breaking. You need to either:

  1. Use a custom ComponentsResolver that applies clean_string() to the name, or
  2. Call clean_string() from within the Jinja template (register it as a custom Jinja function), or
  3. Use the MarketoActivitiesComponentsResolver (which does use clean_string()) instead of HttpComponentsResolver

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on the naming concern. I investigated this carefully — Marketo activity type names from the /rest/v1/activities/types.json endpoint are always in "Title Case With Spaces" format (e.g. "Send Email", "Visit Web Page", "Change Data Value"). For these space-separated names, | lower | replace(' ', '_') produces identical output to clean_string():

  • "Send Email" | lower | replace(' ', '_')"send_email"
  • clean_string("Send Email")"send_email"

The clean_string() function's camelCase splitting and abbreviation handling only diverge for camelCase or abbreviation-containing inputs. Since Marketo activity type names don't use camelCase (they're "Title Case With Spaces"), the Jinja filters are equivalent.

Added a code comment in the manifest explaining this equivalence and noting where it would diverge (if Marketo ever returned camelCase names).

path: "bulk/v1/leads/export/create.json"
http_method: POST
authenticator: "#/definitions/authenticator"
error_handler:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: activity_attributes passed as Jinja string, not JSON — verify runtime behavior

The components_mapping passes:

value: "{{ components_values.get('attributes', []) }}"

This will render the Python list as a string representation (e.g., "[{'name': 'Campaign Run ID', 'dataType': 'integer'}]") — note single quotes, which is not valid JSON.

The MarketoActivitySchemaLoader handles this by checking isinstance(self.activity_attributes, str) and calling json.loads(), but json.loads will fail on Python's string representation (single quotes aren't valid JSON).

Either:

  1. Use | tojson Jinja filter: "{{ components_values.get('attributes', []) | tojson }}"
  2. Or verify that the CDK's ComponentMappingDefinition passes the value as a native Python object rather than rendering it through Jinja string interpolation.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 5f6419d. Added | tojson Jinja filter to ensure valid JSON serialization:

value: "{{ components_values.get('attributes', []) | tojson }}"

This ensures the Python list is serialized as proper JSON (double quotes) rather than Python repr (single quotes), so json.loads() in MarketoActivitySchemaLoader.__post_init__ will parse it correctly.

type: DefaultErrorHandler
max_retries: 3
$parameters:
name: "leads_create"
enqueue_requester:
type: HttpRequester
url_base: "{{ config['domain_url'].rstrip('/') }}/"
path: "bulk/v1/leads/export/{{ stream_slice.extra_fields['export_id'] }}/enqueue.json"
http_method: POST
authenticator: "#/definitions/authenticator"
$parameters:
name: "leads_enqueue"
include_fields_from_describe: "true"
polling_requester:
type: HttpRequester
url_base: "{{ config['domain_url'].rstrip('/') }}/"
path: "bulk/v1/leads/export/{{ creation_response['result'][0]['exportId'] }}/status.json"
http_method: GET
authenticator: "#/definitions/authenticator"
download_requester:
type: HttpRequester
url_base: "{{ config['domain_url'].rstrip('/') }}/"
path: "bulk/v1/leads/export/{{ creation_response['result'][0]['exportId'] }}/file.json"
http_method: GET
authenticator: "#/definitions/authenticator"
download_decoder:
type: CustomDecoder
class_name: components.MarketoCsvDecoder
polling_job_timeout: 60
incremental_sync:
type: DatetimeBasedCursor
cursor_field: "updatedAt"
datetime_format: "%Y-%m-%dT%H:%M:%SZ"
start_datetime: "{{ config['start_date'] }}"
end_datetime: "{{ config.get('end_date', now_utc().strftime('%Y-%m-%dT%H:%M:%SZ')) }}"
cursor_granularity: "PT1S"
step: "P{{ config.get('window_in_days', 30) }}D"
transformations:
- type: CustomTransformation
class_name: components.MarketoRecordTransformation

# Activities bulk export stream template (used by DynamicDeclarativeStream)
# API Docs: https://developers.marketo.com/rest-api/bulk-extract/bulk-activity-extract/
activities_stream_template:
type: DeclarativeStream
name: "placeholder_activity_stream"
primary_key: "marketoGUID"
schema_loader:
type: CustomSchemaLoader
class_name: components.MarketoActivitySchemaLoader
activity_attributes: "placeholder_attributes"
retriever:
type: AsyncRetriever
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: []
status_mapping:
type: AsyncJobStatusMap
running:
- "Queued"
- "Processing"
completed:
- "Completed"
failed:
- "Cancelled"
- "Failed"
timeout: []
status_extractor:
type: DpathExtractor
field_path:
- "result"
- "0"
- "status"
download_target_extractor:
type: DpathExtractor
field_path:
- "result"
- "0"
- "exportId"
creation_requester:
type: CustomRequester
class_name: components.MarketoBulkExportCreationRequester
create_requester:
type: HttpRequester
url_base: "{{ config['domain_url'].rstrip('/') }}/"
path: "bulk/v1/activities/export/create.json"
http_method: POST
authenticator: "#/definitions/authenticator"
error_handler:
type: DefaultErrorHandler
max_retries: 3
$parameters:
name: "activities_create"
enqueue_requester:
type: HttpRequester
url_base: "{{ config['domain_url'].rstrip('/') }}/"
path: "bulk/v1/activities/export/{{ stream_slice.extra_fields['export_id'] }}/enqueue.json"
http_method: POST
authenticator: "#/definitions/authenticator"
$parameters:
name: "activities_enqueue"
polling_requester:
type: HttpRequester
url_base: "{{ config['domain_url'].rstrip('/') }}/"
path: "bulk/v1/activities/export/{{ creation_response['result'][0]['exportId'] }}/status.json"
http_method: GET
authenticator: "#/definitions/authenticator"
download_requester:
type: HttpRequester
url_base: "{{ config['domain_url'].rstrip('/') }}/"
path: "bulk/v1/activities/export/{{ creation_response['result'][0]['exportId'] }}/file.json"
http_method: GET
authenticator: "#/definitions/authenticator"
download_decoder:
type: CustomDecoder
class_name: components.MarketoCsvDecoder
polling_job_timeout: 60
incremental_sync:
type: DatetimeBasedCursor
cursor_field: "activityDate"
datetime_format: "%Y-%m-%dT%H:%M:%SZ"
start_datetime: "{{ config['start_date'] }}"
end_datetime: "{{ config.get('end_date', now_utc().strftime('%Y-%m-%dT%H:%M:%SZ')) }}"
cursor_granularity: "PT1S"
step: "P{{ config.get('window_in_days', 30) }}D"
transformations:
- type: CustomTransformation
class_name: components.MarketoRecordTransformation

streams:
# Full refresh streams
- "#/definitions/activity_types_stream"
Expand All @@ -244,6 +428,36 @@ streams:
# Substreams
- "#/definitions/program_tokens_stream"

# Bulk export streams
- "#/definitions/leads_stream"

dynamic_streams:
- type: DynamicDeclarativeStream
stream_template:
$ref: "#/definitions/activities_stream_template"
components_resolver:
type: HttpComponentsResolver
retriever:
type: SimpleRetriever
requester:
$ref: "#/definitions/requester"
path: "rest/v1/activities/types.json"
http_method: GET
record_selector:
$ref: "#/definitions/selector"
paginator:
$ref: "#/definitions/cursor_paginator"
components_mapping:
- type: ComponentMappingDefinition
field_path:
- name
value: "activities_{{ components_values['name'] | lower | replace(' ', '_') }}"
- type: ComponentMappingDefinition
field_path:
- schema_loader
- activity_attributes
value: "{{ components_values.get('attributes', []) }}"

check:
type: CheckStream
stream_names:
Expand Down
Loading
Loading