-
Notifications
You must be signed in to change notification settings - Fork 32
Description
Summary / Motivation
We're proposing extending the Python CDK declarative framework beyond list-only reads to support fetch (GET single record), create (POST), update (PATCH/POST), replace (PUT), and delete operations. The goal is to enable bidirectional sync capabilities while maintaining the declarative, low-code nature of the CDK.
The new SimpleBidirectionalRetriever component will provide a DRY configuration that shares authentication, base URL, error handling, headers, and response parsing across all operations. This allows connector builders to define write operations declaratively without custom Python code.
We'll ship incrementally across phases, starting with read-only fetch operations and progressing to write operations. The design leverages standard operation profiles (e.g., REST_CRUDL_POST_1999, REST_CRUDL_PATCH_2010) that provide sensible defaults for conventional REST APIs, requiring minimal configuration for most connectors.
Key Design Win: Apart from selecting the profile, connectors need zero additional config to add read_one (Phase 1), create_one (Phase 2), update_one/delete_one (Phase 3) functionality. Only non-standard operations (like replace_one: False for Stripe) require explicit configuration.
Scope
In Scope
- New retriever type:
SimpleBidirectionalRetriever - Standard operation profiles:
REST_CRUDL_POST_1999(POST for updates),REST_CRUDL_PATCH_2010(PATCH for updates) - Operations:
read_many,read_one,create_one,update_one,replace_one,delete_one - Bulk operation fallbacks:
create_many,update_many,delete_many,replace_many(fall back to sequential single operations) - CLI subcommands:
source-declarative-manifest fetch(Phase 1), write operations via stdin (Phase 2+) - Catalog
record_filterswith list-of-lists-of-maps semantics (outer list = OR slices, inner list = AND conditions) - Per-operation configuration overrides (path, method, body, extractor)
- Optional response parsing for write operations (default: parse and verify)
Out of Scope (for now)
- Search/filter as distinct operation (will be integrated as list operation capabilities)
- Stripe
/customers/searchendpoint (reserved config space but deferred) - Airbyte protocol changes (Phase 2+ use CLI-specific stdin JSONL envelope)
- Optimistic concurrency (ETag/If-Match headers) - planned for future enhancement
Phases and Acceptance Criteria
Phase 1a: Fetch (No Credentials)
Target APIs: JSONPlaceholder, Pokémon TCG
Goal: Prove fetch operations work with auth-less APIs
Configuration needed:
standard_operations_profile: REST_CRUDL_POST_1999 # or REST_CRUDL_PATCH_2010
# That's it! read_one inherits GET /collection/{id} from profileCLI:
source-declarative-manifest fetch --config config.json --catalog configured_catalog.jsonCatalog record_filters format:
Acceptance Criteria:
- Supports single and multiple IDs via
record_filters - Handles non-array responses via
field_path: [](single object) - Works with composite primary keys (AND within inner list)
- Unit and integration tests on JSONPlaceholder and Pokémon TCG
- Error handling for 404s and invalid IDs
Phase 1b: Fetch (With Credentials)
Target APIs: Stripe customers, GitHub issues
Goal: Prove fetch works with authenticated APIs using existing declarative auth
Configuration needed:
standard_operations_profile: REST_CRUDL_POST_1999
# Zero additional config needed - auth inherited from base_requesterAcceptance Criteria:
- Auth via existing declarative authenticator blocks (Bearer, OAuth, API Key)
- Stripe: Verify DRY sharing of
base_requesterconfig with list operation - GitHub: Demonstrate parity (note: GitHub connector is Python-based, not manifest-based)
- Integration tests with test credentials
Phase 2: Create (POST)
Input format: stdin JSONL envelope
{"stream": "customers", "disposition": "create", "record": {"email": "[email protected]", "name": "Test User"}}Configuration needed:
standard_operations_profile: REST_CRUDL_POST_1999
# Zero additional config needed - POST /collection inherited from profileCLI:
cat records.jsonl | source-declarative-manifest write --config config.json --catalog catalog.jsonAcceptance Criteria:
- Send full record body by default (configurable JSON vs form-encoded)
- 2xx status codes treated as success
- Response parsed by default to retrieve generated ID and confirm field values
- Stripe: Support both JSON (
request_body_json) and form-encoded (request_body_data) - Error handling for validation failures (4xx responses)
- Dry-run mode (optional
--dry-runflag)
Phase 3: Update (PATCH/POST) and Delete
Input format: stdin JSONL envelope
{"stream": "customers", "disposition": "update", "record": {"id": "cus_123", "email": "[email protected]"}}
{"stream": "customers", "disposition": "delete", "record": {"id": "cus_123"}}Configuration needed:
standard_operations_profile: REST_CRUDL_POST_1999 # Uses POST for updates
# or
standard_operations_profile: REST_CRUDL_PATCH_2010 # Uses PATCH for updates
# Zero additional config needed - methods inherited from profileAcceptance Criteria:
- Default PATCH method works for
REST_CRUDL_PATCH_2010profile - Default POST method works for
REST_CRUDL_POST_1999profile (Stripe, older APIs) - Per-stream override of method for non-standard APIs
- Response parsed by default to confirm update
- Handles partial updates (only changed fields in body)
- DELETE operations return success on 204 No Content
- Error handling for 404s (record not found) and 409s (conflicts)
Phase 4+: Replace and Bulk Operations
Configuration needed:
standard_operations_profile: REST_CRUDL_POST_1999
standard_operations_subset: [read, create, update, delete] # Omit 'replace'
# Stripe doesn't support PUT full replacement
# CDK will fall back to read_one+update_one when replace is requestedAcceptance Criteria:
-
replace_oneuses PUT by default (from profile) - Fallback to read_one + update_one when
replace_one: False - Bulk operations (
create_many,update_many,delete_many) fall back to sequential single operations - Document race condition risks for emulated replace operations
Design: SimpleBidirectionalRetriever
Final YAML Design (from PR)
See the complete, finalized YAML example in PR: airbytehq/airbyte#69247
The PR demonstrates the final design applied to Stripe's base_retriever with:
standard_operations_profile: REST_CRUDL_POST_1999standard_operations_subset: [read, create, update, delete](simple list filter)- Commented-out defaults for clarity
- Complete documentation of both profile variants
Standard Operation Profiles
REST_CRUDL_POST_1999 (RFC 2616 HTTP/1.1 circa 1999):
read_many: GET /collectionread_one: GET /collection/{id}create_one: POST /collectionupdate_one: POST /collection/{id} (partial update)replace_one: PUT /collection/{id} (full replacement)delete_one: DELETE /collection/{id}
REST_CRUDL_PATCH_2010 (RFC 5789 circa 2010):
- Same as above, except:
update_one: PATCH /collection/{id} (partial update)
Component Structure
base_retriever:
type: SimpleBidirectionalRetriever
base_requester:
$ref: "#/definitions/base_requester"
## Opt into standard REST API conventions:
standard_operations_profile: REST_CRUDL_POST_1999
## Filter which operation types to enable (applies on top of profile):
standard_operations_subset: [read, create, update, delete]
## Subset options: read, create, update, replace, delete
## - read: Enables read_many and read_one
## - create: Enables create_one (create_many falls back to create_one loop)
## - update: Enables update_one (update_many falls back to update_one loop)
## - replace: Enables replace_one (replace_many falls back to replace_one loop)
## - delete: Enables delete_one (delete_many falls back to delete_one loop)
## Note: 'replace' omitted for Stripe since it doesn't support PUT replacement
## Renamed from 'record_selector' for clarity:
multi_record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: [data] # Array response for read_many
paginator:
$ref: "#/definitions/base_paginator"
## Default single_record_selector (commented out - matches defaults):
# single_record_selector:
# type: RecordSelector
# extractor:
# type: DpathExtractor
# field_path: [] # Single object response for read_one, create_one, update_one
## Operation overrides (commented out - inherited from profile):
# operation_overrides:
# read_many: {} ## Use default base_requester, multi_record_selector, paginator
# read_one: {} ## Use default base_requester, single_record_selector
# create_one:
# requester:
# $ref: "#/definitions/base_requester"
# http_method: POST
# request_body:
# type: RequestBodyJsonObject
# value: "{{ record }}"
# update_one:
# requester:
# $ref: "#/definitions/base_requester"
# path_suffix: "/{{ record.id }}" ## Planned CDK feature
# request_body:
# type: RequestBodyJsonObject
# value: "{{ record }}"
# delete_one:
# requester:
# $ref: "#/definitions/base_requester"
# http_method: DELETE
# path_suffix: "/{{ record.id }}"Key Design Principles
- Profile-based defaults: Standard REST conventions baked into profiles
- Opt-out vs opt-in: Only specify what's different from the profile
- DRY configuration:
base_requestershared across all operations - $ref with inline overrides: Shallow merge semantics for requester customization
- Sensible fallbacks: Unsupported operations have documented emulation strategies
Defaults and Overrides
| Operation | Default Path | Default Method | Default Extractor | When to Override |
|---|---|---|---|---|
| read_many | {path} |
GET |
field_path: [data] |
Non-standard array wrapper |
| read_one | {path}/{id} |
GET |
field_path: [] |
Non-standard path pattern |
| create_one | {path} |
POST |
field_path: [] |
Form-encoded body |
| update_one | {path}/{id} |
POST or PATCH |
field_path: [] |
Profile determines method |
| replace_one | {path}/{id} |
PUT |
field_path: [] |
API doesn't support PUT |
| delete_one | {path}/{id} |
DELETE |
(none) | API returns object on delete |
Catalog record_filters Semantics
Format: List-of-lists-of-maps
- Outer list: OR across slices (fetch any of these records)
- Inner list: AND within slice (all conditions must match)
Examples:
Single-field primary key (fetch 3 records):
"record_filters": [
[{"id": "1"}],
[{"id": "2"}],
[{"id": "3"}]
]Composite primary key (fetch 2 records):
"record_filters": [
[{"company_id": "123", "property": "status"}],
[{"company_id": "456", "property": "size"}]
]Disposition Semantics (Phase 2+)
Input envelope format:
{"stream": "customers", "disposition": "create|update|replace|delete", "record": {...}, "identifiers": [...]}Disposition values:
create: Insert new record (fails if exists, unless upsert strategy specified)update: Partial update (merge provided fields, preserve unmentioned fields)replace: Full replacement (unmentioned fields cleared per API rules)delete: Remove record
Mapping to operations:
disposition=create→create_oneoperationdisposition=update→update_oneoperation (PATCH or POST per profile)disposition=replace→replace_oneoperation (PUT, or fallback to read+update)disposition=delete→delete_oneoperation (DELETE)
CLI/UX
Phase 1: Fetch
# Fetch specific records via catalog record_filters
source-declarative-manifest fetch \
--config config.json \
--catalog configured_catalog.json \
--manifest manifest.yaml
# Output: JSONL to stdout
{"id": "cus_123", "email": "[email protected]", ...}
{"id": "cus_456", "email": "[email protected]", ...}Phase 2+: Create/Update/Replace/Delete
# Write records via stdin JSONL envelope
cat records.jsonl | source-declarative-manifest write \
--config config.json \
--catalog catalog.json \
--manifest manifest.yaml
# Input format (records.jsonl):
{"stream": "customers", "disposition": "create", "record": {"email": "[email protected]"}}
{"stream": "customers", "disposition": "update", "record": {"id": "cus_123", "email": "[email protected]"}}
{"stream": "customers", "disposition": "replace", "record": {"id": "cus_123", "email": "[email protected]", "name": "Full Object"}}
{"stream": "customers", "disposition": "delete", "record": {"id": "cus_789"}}
# Optional dry-run mode
cat records.jsonl | source-declarative-manifest write --dry-run ...Exit Codes
0: All operations succeeded1: One or more operations failed (details in stderr)2: Configuration error
Risks & Open Questions
Risks
- Stripe body encoding: Stripe supports both JSON and form-encoded. Default to JSON; document form-encoded as override.
- Composite/nested primary keys: Use dot notation
{{ record.user.id }}for nested keys. - Partial updates: Default to full record body; add
partial: trueoption later if needed. - Race conditions: Emulated replace (read+update) subject to race conditions. Document risks; plan ETag/If-Match support for future.
Open Questions
- Primary key injection: Should we add a
pk()Jinja macro to extract PK values without hardcoding field names? Proposal: Yes, plan for future enhancement. - path_suffix: Should we implement ergonomic path appending? Proposal: Yes, implement
path_suffixto avoid full path overrides. - replace_unset_strategy: Should we support explicit null clearing for replace operations? Proposal: Add
replace_unset_strategy: omit|explicit_null|customoption. - Optimistic concurrency: Should we support ETag/If-Match headers? Proposal: Plan for future enhancement, not MVP.
References
- Design PR (Final YAML): dummy-pr(do not merge): Explore
SimpleBidirectionalRetrieverasbase_retrieverfor Stripe API airbyte#69247 - Epic Issue: [Epic] Declarative CDK: Add Fetch/Create/Update via SimpleBidirectionalRetriever #833
- Devin session: https://app.devin.ai/sessions/dc10fcdabf624323a9bef22bae444da8
- Stripe API docs:
- List customers: https://docs.stripe.com/api/customers/list
- Create customer: https://docs.stripe.com/api/customers/create
- Update customer: https://docs.stripe.com/api/customers/update
- Current Stripe manifest: https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/manifest.yaml
- HTTP RFCs:
- RFC 2616 (HTTP/1.1, 1999): https://www.rfc-editor.org/rfc/rfc2616
- RFC 5789 (PATCH, 2010): https://www.rfc-editor.org/rfc/rfc5789
Implementation Notes
-
Schema changes: Add
SimpleBidirectionalRetrievertodeclarative_component_schema.yaml -
New fields:
standard_operations_profile: Enum (REST_CRUDL_POST_1999,REST_CRUDL_PATCH_2010)standard_operations_subset: List of operation types (read,create,update,replace,delete)single_record_selector: RecordSelector for single-object responsesmulti_record_selector: RecordSelector for array responses (renamed fromrecord_selector)
-
Operation semantics:
readin subset enables bothread_manyandread_one(special case)- Other operations (
create,update,replace,delete) enable only_onevariants _manyvariants fall back to loops over_oneunless explicitly defined inoperation_overrides
-
Edge case handling: For asymmetric support (e.g., list-only or fetch-only resources), use
enabled: true/falseinoperation_overrides:operation_overrides: read_many: enabled: false # Disable for fetch-only/singleton resources (e.g., /v1/balance) read_one: enabled: false # Disable for list-only resources
-
HttpMethod enum: Verify PATCH is included (currently has GET, POST, DELETE)
-
Interpolation context:
- Add
recordvariable to Jinja context for operation paths - Plan
pk()macro for primary key extraction
- Add
-
path_suffix: Implement ergonomic path appending with slash normalization
-
Error handling: Reuse existing
error_handlerfrombase_requesterfor all operations -
http_deep_link_pattern: Optional field for advertising deep links to web dashboards/UI pages
- Uses hybrid templating: Jinja for config/static parts + simple field interpolation for record fields
- Field interpolation syntax:
{field.name}using dot notation for nested access - Two-phase evaluation: Jinja at catalog generation time, field interpolation at record emission time
- Advertised in catalog via JSON Schema extension:
x-airbyte-deep_link_url
Pattern 1 - Static URL root (Stripe):
retriever: $ref: "#/definitions/base_retriever" $parameters: path: customers http_deep_link_pattern: "https://dashboard.stripe.com/customers/{id}"
Use when the dashboard URL base is static and doesn't depend on config.
Pattern 2 - Record-provided deep URL (GitHub):
retriever: $ref: "#/definitions/base_retriever" $parameters: path: issues http_deep_link_pattern: "{html_url}"
Use when the API already provides the full web URL in the record. No concatenation needed - just reference the field.
Pattern 3 - Nulled out on condition (GitHub Enterprise):
retriever: $ref: "#/definitions/base_retriever" $parameters: path: issues http_deep_link_pattern: "{{ None if config.get('api_url') != 'https://api.github.com' else 'https://github.com/issues/{number}' }}"
Use when URL translation is unreliable for custom API roots. Set to null when custom
api_urlis detected and can't be safely translated to a web URL. -
Testing strategy:
- Unit tests: Mock HTTP responses for each operation
- Integration tests: Use real APIs (JSONPlaceholder, Stripe test mode)
- Regression tests: Ensure existing list-only connectors still work
Requested by: AJ Steers (@aaronsteers)
Last updated: November 9, 2025
{ "streams": [{ "stream": {"name": "posts", "primary_key": ["id"]}, "record_filters": [ [{"id": "1"}], // Fetch post with id=1 [{"id": "2"}], // Fetch post with id=2 [{"id": "100"}] // Fetch post with id=100 ] }] }