Skip to content

Commit 342375c

Browse files
committed
Refactor ConcurrentPerPartitionCursor to not use ConcurrentCursor with _use_global_cursor
1 parent b3f9897 commit 342375c

File tree

7 files changed

+59
-646
lines changed

7 files changed

+59
-646
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 0 additions & 207 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,6 @@ properties:
4040
"$ref": "#/definitions/Spec"
4141
concurrency_level:
4242
"$ref": "#/definitions/ConcurrencyLevel"
43-
api_budget:
44-
title: API Budget
45-
description: Defines how many requests can be made to the API in a given time frame. This field accepts either a generic APIBudget or an HTTP-specific configuration (HTTPAPIBudget) to be applied across all streams.
46-
anyOf:
47-
- "$ref": "#/definitions/APIBudget"
48-
- "$ref": "#/definitions/HTTPAPIBudget"
4943
metadata:
5044
type: object
5145
description: For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.
@@ -1371,207 +1365,6 @@ definitions:
13711365
$parameters:
13721366
type: object
13731367
additional_properties: true
1374-
APIBudget:
1375-
title: API Budget
1376-
description: >
1377-
A generic API budget configuration that defines the policies (rate limiting rules)
1378-
and the maximum number of attempts to acquire a call credit. This budget does not automatically
1379-
update itself based on HTTP response headers.
1380-
type: object
1381-
required:
1382-
- type
1383-
- policies
1384-
properties:
1385-
type:
1386-
type: string
1387-
enum: [APIBudget]
1388-
policies:
1389-
title: Policies
1390-
description: List of call rate policies that define how many calls are allowed.
1391-
type: array
1392-
items:
1393-
anyOf:
1394-
- "$ref": "#/definitions/FixedWindowCallRatePolicy"
1395-
- "$ref": "#/definitions/MovingWindowCallRatePolicy"
1396-
- "$ref": "#/definitions/UnlimitedCallRatePolicy"
1397-
maximum_attempts_to_acquire:
1398-
title: Maximum Attempts to Acquire
1399-
description: The maximum number of attempts to acquire a call before giving up.
1400-
type: integer
1401-
default: 100000
1402-
additionalProperties: true
1403-
HTTPAPIBudget:
1404-
title: HTTP API Budget
1405-
description: >
1406-
An HTTP-specific API budget that extends APIBudget by updating rate limiting information based
1407-
on HTTP response headers. It extracts available calls and the next reset timestamp from the HTTP responses.
1408-
type: object
1409-
required:
1410-
- type
1411-
- policies
1412-
properties:
1413-
type:
1414-
type: string
1415-
enum: [HTTPAPIBudget]
1416-
policies:
1417-
title: Policies
1418-
description: List of call rate policies that define how many calls are allowed.
1419-
type: array
1420-
items:
1421-
anyOf:
1422-
- "$ref": "#/definitions/FixedWindowCallRatePolicy"
1423-
- "$ref": "#/definitions/MovingWindowCallRatePolicy"
1424-
- "$ref": "#/definitions/UnlimitedCallRatePolicy"
1425-
ratelimit_reset_header:
1426-
title: Rate Limit Reset Header
1427-
description: The HTTP response header name that indicates when the rate limit resets.
1428-
type: string
1429-
default: "ratelimit-reset"
1430-
ratelimit_remaining_header:
1431-
title: Rate Limit Remaining Header
1432-
description: The HTTP response header name that indicates the number of remaining allowed calls.
1433-
type: string
1434-
default: "ratelimit-remaining"
1435-
status_codes_for_ratelimit_hit:
1436-
title: Status Codes for Rate Limit Hit
1437-
description: List of HTTP status codes that indicate a rate limit has been hit.
1438-
type: array
1439-
items:
1440-
type: integer
1441-
default: [429]
1442-
maximum_attempts_to_acquire:
1443-
title: Maximum Attempts to Acquire
1444-
description: The maximum number of attempts to acquire a call before giving up.
1445-
type: integer
1446-
default: 100000
1447-
additionalProperties: true
1448-
FixedWindowCallRatePolicy:
1449-
title: Fixed Window Call Rate Policy
1450-
description: A policy that allows a fixed number of calls within a specific time window.
1451-
type: object
1452-
required:
1453-
- type
1454-
- next_reset_ts
1455-
- period
1456-
- call_limit
1457-
- matchers
1458-
properties:
1459-
type:
1460-
type: string
1461-
enum: [FixedWindowCallRatePolicy]
1462-
next_reset_ts:
1463-
title: Next Reset Timestamp
1464-
description: The timestamp when the rate limit will reset.
1465-
type: string
1466-
format: date-time
1467-
period:
1468-
title: Period
1469-
description: The time interval for the rate limit window.
1470-
type: string
1471-
format: duration
1472-
call_limit:
1473-
title: Call Limit
1474-
description: The maximum number of calls allowed within the period.
1475-
type: integer
1476-
matchers:
1477-
title: Matchers
1478-
description: List of matchers that define which requests this policy applies to.
1479-
type: array
1480-
items:
1481-
"$ref": "#/definitions/HttpRequestRegexMatcher"
1482-
additionalProperties: true
1483-
MovingWindowCallRatePolicy:
1484-
title: Moving Window Call Rate Policy
1485-
description: A policy that allows a fixed number of calls within a moving time window.
1486-
type: object
1487-
required:
1488-
- type
1489-
- rates
1490-
- matchers
1491-
properties:
1492-
type:
1493-
type: string
1494-
enum: [MovingWindowCallRatePolicy]
1495-
rates:
1496-
title: Rates
1497-
description: List of rates that define the call limits for different time intervals.
1498-
type: array
1499-
items:
1500-
"$ref": "#/definitions/Rate"
1501-
matchers:
1502-
title: Matchers
1503-
description: List of matchers that define which requests this policy applies to.
1504-
type: array
1505-
items:
1506-
"$ref": "#/definitions/HttpRequestRegexMatcher"
1507-
additionalProperties: true
1508-
UnlimitedCallRatePolicy:
1509-
title: Unlimited Call Rate Policy
1510-
description: A policy that allows unlimited calls for specific requests.
1511-
type: object
1512-
required:
1513-
- type
1514-
- matchers
1515-
properties:
1516-
type:
1517-
type: string
1518-
enum: [UnlimitedCallRatePolicy]
1519-
matchers:
1520-
title: Matchers
1521-
description: List of matchers that define which requests this policy applies to.
1522-
type: array
1523-
items:
1524-
"$ref": "#/definitions/HttpRequestRegexMatcher"
1525-
additionalProperties: true
1526-
Rate:
1527-
title: Rate
1528-
description: Defines a rate limit with a specific number of calls allowed within a time interval.
1529-
type: object
1530-
required:
1531-
- limit
1532-
- interval
1533-
properties:
1534-
limit:
1535-
title: Limit
1536-
description: The maximum number of calls allowed within the interval.
1537-
type: integer
1538-
interval:
1539-
title: Interval
1540-
description: The time interval for the rate limit.
1541-
type: string
1542-
format: duration
1543-
additionalProperties: true
1544-
HttpRequestRegexMatcher:
1545-
title: HTTP Request Matcher
1546-
description: >
1547-
Matches HTTP requests based on method, base URL, URL path pattern, query parameters, and headers.
1548-
Use `url_base` to specify the scheme and host (without trailing slash) and
1549-
`url_path_pattern` to apply a regex to the request path.
1550-
type: object
1551-
properties:
1552-
method:
1553-
title: Method
1554-
description: The HTTP method to match (e.g., GET, POST).
1555-
type: string
1556-
url_base:
1557-
title: URL Base
1558-
description: The base URL (scheme and host, e.g. "https://api.example.com") to match.
1559-
type: string
1560-
url_path_pattern:
1561-
title: URL Path Pattern
1562-
description: A regular expression pattern to match the URL path.
1563-
type: string
1564-
params:
1565-
title: Parameters
1566-
description: The query parameters to match.
1567-
type: object
1568-
additionalProperties: true
1569-
headers:
1570-
title: Headers
1571-
description: The headers to match.
1572-
type: object
1573-
additionalProperties: true
1574-
additionalProperties: true
15751368
DefaultErrorHandler:
15761369
title: Default Error Handler
15771370
description: Component defining how to handle errors. Default behavior includes only retrying server errors (HTTP 5XX) and too many requests (HTTP 429) with an exponential backoff.

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -142,20 +142,16 @@ def close_partition(self, partition: Partition) -> None:
142142
raise ValueError("stream_slice cannot be None")
143143

144144
partition_key = self._to_partition_key(stream_slice.partition)
145-
self._cursor_per_partition[partition_key].close_partition(partition=partition)
145+
if not self._use_global_cursor:
146+
self._cursor_per_partition[partition_key].close_partition(partition=partition)
146147
with self._lock:
147148
self._semaphore_per_partition[partition_key].acquire()
148149
cursor = self._cursor_per_partition[partition_key]
149150
if (
150151
partition_key in self._finished_partitions
151152
and self._semaphore_per_partition[partition_key]._value == 0
152153
):
153-
if (
154-
self._new_global_cursor is None
155-
or self._new_global_cursor[self.cursor_field.cursor_field_key]
156-
< cursor.state[self.cursor_field.cursor_field_key]
157-
):
158-
self._new_global_cursor = copy.deepcopy(cursor.state)
154+
self._update_global_cursor(cursor.state[self.cursor_field.cursor_field_key])
159155
if not self._use_global_cursor:
160156
self._emit_state_message()
161157

@@ -366,9 +362,22 @@ def observe(self, record: Record) -> None:
366362
raise ValueError(
367363
"Invalid state as stream slices that are emitted should refer to an existing cursor"
368364
)
369-
self._cursor_per_partition[
370-
self._to_partition_key(record.associated_slice.partition)
371-
].observe(record)
365+
366+
record_cursor = self._connector_state_converter.parse_value(
367+
self._cursor_field.extract_value(record)
368+
)
369+
self._update_global_cursor(record_cursor)
370+
if not self._use_global_cursor:
371+
self._cursor_per_partition[
372+
self._to_partition_key(record.associated_slice.partition)
373+
].observe(record)
374+
375+
def _update_global_cursor(self, value: Mapping[str, Any]) -> None:
376+
if (
377+
self._new_global_cursor is None
378+
or self._new_global_cursor[self.cursor_field.cursor_field_key] < value
379+
):
380+
self._new_global_cursor = {self.cursor_field.cursor_field_key: copy.deepcopy(value)}
372381

373382
def _to_partition_key(self, partition: Mapping[str, Any]) -> str:
374383
return self._partition_serializer.to_partition_key(partition)

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
137137
self._source_config, config
138138
)
139139

140-
api_budget_model = self._source_config.get("api_budget")
141-
if api_budget_model:
142-
self._constructor.set_api_budget(api_budget_model, config)
143-
144140
source_streams = [
145141
self._constructor.create_component(
146142
DeclarativeStreamModel,

0 commit comments

Comments
 (0)