Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
dockerImageTag: 4.0.1
dockerImageTag: 4.1.0-rc.1
dockerRepository: airbyte/source-facebook-marketing
documentationUrl: https://docs.airbyte.com/integrations/sources/facebook-marketing
githubIssueLabel: source-facebook-marketing
Expand All @@ -30,6 +30,8 @@ data:
enabled: true
releaseStage: generally_available
releases:
rolloutConfiguration:
enableProgressiveRollout: true
breakingChanges:
2.0.0:
message: "All Ads-Insights-* streams now have updated schemas. Users will need to retest source configuration, refresh the source schema and reset affected streams after upgrading. Please pay attention that data older than 37 months will become unavailable due to FaceBook limitations. For more information [visit](https://docs.airbyte.com/integrations/sources/facebook-marketing-migrations)"
Expand Down
2,223 changes: 1,447 additions & 776 deletions airbyte-integrations/connectors/source-facebook-marketing/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "4.0.1"
version = "4.1.0-rc.1"
name = "source-facebook-marketing"
description = "Source implementation for Facebook Marketing."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand All @@ -17,9 +17,9 @@ include = "source_facebook_marketing"

[tool.poetry.dependencies]
python = "^3.10,<3.12"
airbyte-cdk = "^5"
facebook-business = "23.0.0"
cached-property = "==1.5.2"
airbyte-cdk = "^7"
facebook-business = "^23.0.0"
cached-property = "^2"

[tool.poetry.scripts]
source-facebook-marketing = "source_facebook_marketing.run:run"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import json
import logging
from dataclasses import dataclass
from datetime import timedelta
from time import sleep

import backoff
import pendulum
from facebook_business import FacebookAdsApi
from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.api import FacebookResponse
Expand All @@ -30,8 +30,8 @@ class FacebookAPIException(Exception):
class MyFacebookAdsApi(FacebookAdsApi):
"""Custom Facebook API class to intercept all API calls and handle call rate limits"""

MAX_RATE, MAX_PAUSE_INTERVAL = (95, pendulum.duration(minutes=10))
MIN_RATE, MIN_PAUSE_INTERVAL = (85, pendulum.duration(minutes=2))
MAX_RATE, MAX_PAUSE_INTERVAL = (95, timedelta(minutes=10))
MIN_RATE, MIN_PAUSE_INTERVAL = (85, timedelta(minutes=2))

# see `_should_restore_page_size` method docstring for more info.
# attribute to handle the reduced request limit
Expand All @@ -56,7 +56,7 @@ def ads_insights_throttle(self) -> Throttle:
@staticmethod
def _parse_call_rate_header(headers):
usage = 0
pause_interval = pendulum.duration()
pause_interval = timedelta()

usage_header_business = headers.get("x-business-use-case-usage")
usage_header_app = headers.get("x-app-usage")
Expand Down Expand Up @@ -87,7 +87,7 @@ def _parse_call_rate_header(headers):
)
pause_interval = max(
pause_interval,
pendulum.duration(minutes=usage_limits.get("estimated_time_to_regain_access", 0)),
timedelta(minutes=usage_limits.get("estimated_time_to_regain_access", 0)),
)

return usage, pause_interval
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
#

import logging
from datetime import timedelta
from typing import Any, List, Mapping, Optional, Tuple, Type

import facebook_business
import pendulum

from airbyte_cdk.models import (
AdvancedAuth,
Expand All @@ -19,6 +19,7 @@
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.utils import AirbyteTracedException
from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_now, ab_datetime_parse
from source_facebook_marketing.api import API
from source_facebook_marketing.spec import ConnectorConfig, ValidAdStatuses
from source_facebook_marketing.streams import (
Expand Down Expand Up @@ -80,10 +81,10 @@ def _validate_and_transform(self, config: Mapping[str, Any]):
config.default_ads_insights_action_breakdowns = default_ads_insights_action_breakdowns

if config.start_date:
config.start_date = pendulum.instance(config.start_date)
config.start_date = AirbyteDateTime.from_datetime(config.start_date)

if config.end_date:
config.end_date = pendulum.instance(config.end_date)
config.end_date = AirbyteDateTime.from_datetime(config.end_date)

config.account_ids = list(config.account_ids)

Expand All @@ -99,7 +100,7 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
try:
config = self._validate_and_transform(config)

if config.end_date > pendulum.now():
if config.end_date > ab_datetime_now():
return False, "Date range can not be in the future."
if config.start_date and config.end_date < config.start_date:
return False, "End date must be equal or after start date."
Expand Down Expand Up @@ -147,7 +148,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
api = API(access_token=config.access_token, page_size=config.page_size)

# if start_date not specified then set default start_date for report streams to 2 years ago
report_start_date = config.start_date or pendulum.now().add(years=-2)
report_start_date = config.start_date or (ab_datetime_now() - timedelta(days=365 * 2))

insights_args = dict(
api=api,
Expand Down Expand Up @@ -325,7 +326,7 @@ def get_custom_insights_streams(self, api: API, config: ConnectorConfig) -> List
action_breakdowns=list(set(insight.action_breakdowns)),
action_breakdowns_allow_empty=config.action_breakdowns_allow_empty,
time_increment=insight.time_increment,
start_date=insight.start_date or config.start_date or pendulum.now().add(years=-2),
start_date=insight.start_date or config.start_date or (ab_datetime_now() - timedelta(days=365 * 2)),
end_date=insight.end_date or config.end_date,
insights_lookback_window=insight.insights_lookback_window or config.insights_lookback_window,
insights_job_timeout=insight.insights_job_timeout or config.insights_job_timeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@
import copy
import logging
from abc import ABC, abstractmethod
from datetime import date, datetime, timedelta
from enum import Enum
from typing import Any, Iterator, List, Mapping, Optional, Type, Union

import backoff
import pendulum
from facebook_business.adobjects.ad import Ad
from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.adobjects.adreportrun import AdReportRun
from facebook_business.adobjects.adset import AdSet
from facebook_business.adobjects.campaign import Campaign
from facebook_business.adobjects.objectparser import ObjectParser
from facebook_business.api import FacebookAdsApi, FacebookAdsApiBatch, FacebookBadObjectError, FacebookResponse
from pendulum.duration import Duration

from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_now
from source_facebook_marketing.streams.common import retry_pattern

from ..utils import validate_start_date
from ..utils import DateInterval, validate_start_date


logger = logging.getLogger("airbyte")
Expand Down Expand Up @@ -73,7 +73,7 @@ class Status(str, Enum):
class AsyncJob(ABC):
"""Abstract AsyncJob base class"""

def __init__(self, api: FacebookAdsApi, interval: pendulum.Period):
def __init__(self, api: FacebookAdsApi, interval: DateInterval):
"""Init generic async job

:param api: FB API instance (to create batch, etc)
Expand All @@ -84,7 +84,7 @@ def __init__(self, api: FacebookAdsApi, interval: pendulum.Period):
self._attempt_number = 0

@property
def interval(self) -> pendulum.Period:
def interval(self) -> DateInterval:
"""Job identifier, in most cases start of the interval"""
return self._interval

Expand Down Expand Up @@ -198,7 +198,7 @@ def __init__(
self,
edge_object: Union[AdAccount, Campaign, AdSet, Ad],
params: Mapping[str, Any],
job_timeout: Duration,
job_timeout: timedelta,
**kwargs,
):
"""Initialize
Expand All @@ -210,8 +210,8 @@ def __init__(
super().__init__(**kwargs)
self._params = dict(params)
self._params["time_range"] = {
"since": self._interval.start.to_date_string(),
"until": self._interval.end.to_date_string(),
"since": self._interval.to_date_string(self._interval.start),
"until": self._interval.to_date_string(self._interval.end),
}
self._job_timeout = job_timeout

Expand Down Expand Up @@ -251,9 +251,9 @@ def _split_by_edge_class(self, edge_class: Union[Type[Campaign], Type[AdSet], Ty

params = dict(copy.deepcopy(self._params))
# get objects from attribution window as well (28 day + 1 current day)
new_start = self._interval.start - pendulum.duration(days=28 + 1)
new_start = AirbyteDateTime.from_datetime(datetime.combine(self._interval.start - timedelta(days=28 + 1), datetime.min.time()))
new_start = validate_start_date(new_start)
params["time_range"].update(since=new_start.to_date_string())
params["time_range"].update(since=new_start.strftime("%Y-%m-%d"))
params.update(fields=[pk_name], level=level)
params.pop("time_increment") # query all days
result = self._edge_object.get_insights(params=params)
Expand All @@ -278,7 +278,7 @@ def start(self):
raise RuntimeError(f"{self}: Incorrect usage of start - the job already started, use restart instead")

self._job = self._edge_object.get_insights(params=self._params, is_async=True)
self._start_time = pendulum.now()
self._start_time = ab_datetime_now()
self._attempt_number += 1
logger.info(f"{self}: created AdReportRun")

Expand All @@ -295,12 +295,12 @@ def restart(self):
logger.info(f"{self}: restarted.")

@property
def elapsed_time(self) -> Optional[pendulum.duration]:
def elapsed_time(self) -> Optional[timedelta]:
"""Elapsed time since the job start"""
if not self._start_time:
return None

end_time = self._finish_time or pendulum.now()
end_time = self._finish_time or ab_datetime_now()
return end_time - self._start_time

@property
Expand Down Expand Up @@ -359,16 +359,16 @@ def _check_status(self) -> bool:

if self.elapsed_time > self._job_timeout:
logger.info(f"{self}: run more than maximum allowed time {self._job_timeout}.")
self._finish_time = pendulum.now()
self._finish_time = ab_datetime_now()
self._failed = True
return True
elif job_status == Status.COMPLETED:
self._finish_time = pendulum.now() # TODO: is not actual running time, but interval between check_status calls
self._finish_time = ab_datetime_now() # TODO: is not actual running time, but interval between check_status calls
return True
elif job_status in [Status.FAILED, Status.SKIPPED]:
self._finish_time = pendulum.now()
self._finish_time = ab_datetime_now()
self._failed = True
logger.info(f"{self}: has status {job_status} after {self.elapsed_time.in_seconds()} seconds.")
logger.info(f"{self}: has status {job_status} after {self.elapsed_time.total_seconds()} seconds.")
return True

return False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,22 @@
#

import logging
from datetime import date, timedelta
from functools import cache, cached_property
from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Union

import pendulum
from facebook_business.exceptions import FacebookBadObjectError, FacebookRequestError

import airbyte_cdk.sources.utils.casing as casing
from airbyte_cdk.models import FailureType, SyncMode
from airbyte_cdk.sources.streams.core import package_name_from_class
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
from airbyte_cdk.utils import AirbyteTracedException
from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_now, ab_datetime_parse
from source_facebook_marketing.streams.async_job import AsyncJob, InsightAsyncJob
from source_facebook_marketing.streams.async_job_manager import InsightAsyncJobManager
from source_facebook_marketing.streams.common import traced_exception
from source_facebook_marketing.utils import DateInterval

from .base_streams import FBMarketingIncrementalStream

Expand Down Expand Up @@ -58,7 +60,8 @@ class AdsInsights(FBMarketingIncrementalStream):
# Facebook store metrics maximum of 37 months old. Any time range that
# older than 37 months from current date would result in 400 Bad request HTTP response.
# https://developers.facebook.com/docs/marketing-api/reference/ad-account/insights/#overview
INSIGHTS_RETENTION_PERIOD = pendulum.duration(months=37)
# Min number of days that can occur in 37 months is 1123 days.
INSIGHTS_RETENTION_PERIOD = timedelta(days=1123)

action_attribution_windows = ALL_ACTION_ATTRIBUTION_WINDOWS
time_increment = 1
Expand Down Expand Up @@ -98,7 +101,7 @@ def __init__(
self.entity_prefix = level

# state
self._cursor_values: Optional[Mapping[str, pendulum.Date]] = None # latest period that was read for each account
self._cursor_values: Optional[Mapping[str, date]] = None # latest period that was read for each account
self._next_cursor_values = self._get_start_date()
self._completed_slices = {account_id: set() for account_id in self._account_ids}

Expand Down Expand Up @@ -130,11 +133,11 @@ def insights_lookback_period(self):
But in some cases users my have define their own lookback window, that's
why the value for `insights_lookback_window` is set through the config.
"""
return pendulum.duration(days=self._insights_lookback_window)
return timedelta(days=self._insights_lookback_window)

@property
def insights_job_timeout(self):
return pendulum.duration(minutes=self._insights_job_timeout)
return timedelta(minutes=self._insights_job_timeout)

def _transform_breakdown(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
for breakdown in self.breakdowns:
Expand Down Expand Up @@ -214,24 +217,28 @@ def state(self, value: Mapping[str, Any]):
return

self._cursor_values = {
account_id: pendulum.parse(transformed_state[account_id][self.cursor_field]).date()
account_id: ab_datetime_parse(transformed_state[account_id][self.cursor_field]).date()
if transformed_state.get(account_id, {}).get(self.cursor_field)
else None
for account_id in self._account_ids
}
self._completed_slices = {
account_id: set(pendulum.parse(v).date() for v in transformed_state.get(account_id, {}).get("slices", []))
account_id: set(ab_datetime_parse(v).date() for v in transformed_state.get(account_id, {}).get("slices", []))
for account_id in self._account_ids
}

self._next_cursor_values = self._get_start_date()

def _date_intervals(self, account_id: str) -> Iterator[pendulum.Date]:
def _date_intervals(self, account_id: str) -> Iterator[date]:
"""Get date period to sync"""
if self._end_date < self._next_cursor_values[account_id]:
return
date_range = self._end_date - self._next_cursor_values[account_id]
yield from date_range.range("days", self.time_increment)

# Generate date intervals manually using standard datetime arithmetic
current_date = self._next_cursor_values[account_id]
while current_date <= self._end_date:
yield current_date
current_date += timedelta(days=self.time_increment)

def _advance_cursor(self, account_id: str):
"""Iterate over state, find continuing sequence of slices. Get last value, advance cursor there and remove slices from state"""
Expand Down Expand Up @@ -259,8 +266,8 @@ def _generate_async_jobs(self, params: Mapping, account_id: str) -> Iterator[Asy
and ts_start < self._next_cursor_values.get(account_id, self._start_date) - self.insights_lookback_period
):
continue
ts_end = ts_start + pendulum.duration(days=self.time_increment - 1)
interval = pendulum.Period(ts_start, ts_end)
ts_end = ts_start + timedelta(days=self.time_increment - 1)
interval = DateInterval(start=ts_start, end=ts_end)
yield InsightAsyncJob(
api=self._api.api,
edge_object=self._api.get_account(account_id=account_id),
Expand Down Expand Up @@ -320,7 +327,7 @@ def stream_slices(
except FacebookRequestError as exc:
raise traced_exception(exc)

def _get_start_date(self) -> Mapping[str, pendulum.Date]:
def _get_start_date(self) -> Mapping[str, date]:
"""Get start date to begin sync with. It is not that trivial as it might seem.
There are few rules:
- don't read data older than start_date
Expand All @@ -332,15 +339,15 @@ def _get_start_date(self) -> Mapping[str, pendulum.Date]:

:return: the first date to sync
"""
today = pendulum.today(tz=pendulum.tz.UTC).date()
today = ab_datetime_now().date()
oldest_date = today - self.INSIGHTS_RETENTION_PERIOD

start_dates_for_account = {}
for account_id in self._account_ids:
cursor_value = self._cursor_values.get(account_id) if self._cursor_values else None
if cursor_value:
start_date = cursor_value
refresh_date: pendulum.Date = cursor_value - self.insights_lookback_period
refresh_date: date = cursor_value - self.insights_lookback_period
if start_date > refresh_date:
logger.info(
f"The cursor value within refresh period ({self.insights_lookback_period}), start sync from {refresh_date} instead."
Expand Down
Loading
Loading