diff --git a/tap_mambu/schema.py b/tap_mambu/schema.py index 49a04fb..d537bbd 100644 --- a/tap_mambu/schema.py +++ b/tap_mambu/schema.py @@ -43,11 +43,6 @@ 'replication_method': 'INCREMENTAL', 'replication_keys': ['last_modified_date'] }, - 'saving_accounts': { - 'key_properties': ['id'], - 'replication_method': 'INCREMENTAL', - 'replication_keys': ['last_modified_date'] - }, 'deposit_products': { 'key_properties': ['id'], 'replication_method': 'INCREMENTAL', diff --git a/tap_mambu/schemas/deposit_accounts.json b/tap_mambu/schemas/deposit_accounts.json index 8fb9d53..c65e54c 100644 --- a/tap_mambu/schemas/deposit_accounts.json +++ b/tap_mambu/schemas/deposit_accounts.json @@ -158,6 +158,14 @@ "available_balance": { "type": ["null", "number"], "multipleOf": 1e-10 + }, + "blocked_balance": { + "type": ["null", "number"], + "multipleOf": 1e-10 + }, + "forward_available_balance": { + "type": ["null", "number"], + "multipleOf": 1e-10 } } }, @@ -178,7 +186,7 @@ "type": ["null", "object"], "additionalProperties": false, "properties": { - "allowed_overdraft": { + "allow_overdraft": { "type": ["null", "boolean"] }, "overdraft_limit": { @@ -324,6 +332,11 @@ "type": ["null", "number"], "multipleOf": 1e-10 } + , + "negative_interest_accrued": { + "type": ["null", "number"], + "multipleOf": 1e-10 + } } }, "name": { diff --git a/tap_mambu/schemas/saving_accounts.json b/tap_mambu/schemas/saving_accounts.json deleted file mode 100644 index c1ee92a..0000000 --- a/tap_mambu/schemas/saving_accounts.json +++ /dev/null @@ -1,136 +0,0 @@ -{ - "type": "object", - "additionalProperties": false, - "properties": { - "encoded_key": { - "type": ["null", "string"] - }, - "id": { - "type": ["null", "string"] - }, - "account_state": { - "type": ["null", "string"] - }, - "account_holder_key": { - "type": ["null", "string"] - }, - "account_holder_type": { - "type": ["null", "string"] - }, - "name": { - "type": ["null", "string"] - }, - "creation_date": { - "type": ["null", "string"], - "format": "date-time" - }, - "approved_date": { - "type": ["null", "string"], - "format": "date-time" - }, - "activation_date": { - "type": ["null", "string"], - "format": "date-time" - }, - "last_modified_date": { - "type": ["null", "string"], - "format": "date-time" - }, - "last_account_appraisal_date": { - "type": ["null", "string"], - "format": "date-time" - }, - "product_type_key": { - "type": ["null", "string"] - }, - "account_type": { - "type": ["null", "string"] - }, - "balance": { - "type": ["null", "number"], - "multipleOf": 1e-10 - }, - "accrued_interest": { - "type": ["null", "number"], - "multipleOf": 1e-10 - }, - "overdraft_interest_accrued": { - "type": ["null", "number"], - "multipleOf": 1e-10 - }, - "technical_overdraft_interest_accrued": { - "type": ["null", "number"], - "multipleOf": 1e-10 - }, - "technical_overdraft_amount": { - "type": ["null", "number"], - "multipleOf": 1e-10 - }, - "interest_due": { - "type": ["null", "number"], - "multipleOf": 1e-10 - }, - "technical_interest_due": { - "type": ["null", "number"], - "multipleOf": 1e-10 - }, - "fees_due": { - "type": ["null", "number"], - "multipleOf": 1e-10 - }, - "overdraft_limit": { - "type": ["null", "number"], - "multipleOf": 1e-10 - }, - "allow_overdraft": { - "type": ["null", "boolean"] - }, - "locked_balance": { - "type": ["null", "number"], - "multipleOf": 1e-10 - }, - "hold_balance": { - "type": ["null", "number"], - "multipleOf": 1e-10 - }, - "currency_code": { - "type": ["null", "string"] - }, - "currency": { - "type": ["null", "object"], - "additionalProperties": false, - "properties": { - "code": { - "type": ["null", "string"] - }, - "name": { - "type": ["null", "string"] - }, - "symbol": { - "type": ["null", "string"] - }, - "digits_after_decimal": { - "type": ["null", "integer"] - }, - "currency_symbol_position": { - "type": ["null", "string"] - }, - "is_base_currency": { - "type": ["null", "boolean"] - }, - "creation_date": { - "type": ["null", "string"], - "format": "date-time" - }, - "last_modified_date": { - "type": ["null", "string"], - "format": "date-time" - } - } - }, - "available_balance": { - "type": ["null", "number"], - "multipleOf": 1e-10 - } - } -} diff --git a/tap_mambu/sync.py b/tap_mambu/sync.py index 7d8e008..667f266 100644 --- a/tap_mambu/sync.py +++ b/tap_mambu/sync.py @@ -1,14 +1,16 @@ -from _datetime import timedelta from datetime import datetime import singer +from _datetime import timedelta from singer import Transformer, metadata, metrics, utils from singer.utils import strftime, strptime_to_utc -from tap_mambu.transform import transform_json, transform_activities + +from tap_mambu.transform import transform_activities, transform_json LOGGER = singer.get_logger() LOOKBACK_DEFAULT = 14 + def write_schema(catalog, stream_name): stream = catalog.get_stream(stream_name) schema = stream.schema.to_dict() @@ -33,9 +35,9 @@ def get_bookmark(state, stream, sub_type, default): return default if sub_type == 'self': - return (state.get('bookmarks', {}).get(stream, default)) + return state.get('bookmarks', {}).get(stream, default) else: - return (state.get('bookmarks', {}).get(stream, {}).get(sub_type, default)) + return state.get('bookmarks', {}).get(stream, {}).get(sub_type, default) def write_bookmark(state, stream, sub_type, value): @@ -58,17 +60,19 @@ def transform_datetime(this_dttm): return new_dttm -def process_records(catalog, #pylint: disable=too-many-branches - stream_name, - records, - time_extracted, - bookmark_field=None, - bookmark_type=None, - max_bookmark_value=None, - last_datetime=None, - last_integer=None, - parent=None, - parent_id=None): +def process_records( + catalog, # pylint: disable=too-many-branches + stream_name, + records, + time_extracted, + bookmark_field=None, + bookmark_type=None, + max_bookmark_value=None, + last_datetime=None, + last_integer=None, + parent=None, + parent_id=None, +): stream = catalog.get_stream(stream_name) schema = stream.schema.to_dict() stream_metadata = metadata.to_map(stream.metadata) @@ -81,9 +85,9 @@ def process_records(catalog, #pylint: disable=too-many-branches # Transform record for Singer.io with Transformer() as transformer: - transformed_record = transformer.transform(record, - schema, - stream_metadata) + transformed_record = transformer.transform( + record, schema, stream_metadata + ) # Reset max_bookmark_value to new value if higher if bookmark_field and (bookmark_field in transformed_record): @@ -99,43 +103,56 @@ def process_records(catalog, #pylint: disable=too-many-branches if bookmark_type == 'integer': # Keep only records whose bookmark is after the last_integer if transformed_record[bookmark_field] >= last_integer: - write_record(stream_name, transformed_record, time_extracted=time_extracted) + write_record( + stream_name, + transformed_record, + time_extracted=time_extracted, + ) counter.increment() elif bookmark_type == 'datetime': last_dttm = transform_datetime(last_datetime) - bookmark_dttm = transform_datetime(transformed_record[bookmark_field]) + bookmark_dttm = transform_datetime( + transformed_record[bookmark_field] + ) # Keep only records whose bookmark is after the last_datetime if bookmark_dttm >= last_dttm: - write_record(stream_name, transformed_record, time_extracted=time_extracted) + write_record( + stream_name, + transformed_record, + time_extracted=time_extracted, + ) counter.increment() else: - write_record(stream_name, transformed_record, time_extracted=time_extracted) + write_record( + stream_name, transformed_record, time_extracted=time_extracted + ) counter.increment() return max_bookmark_value, len(records) # Sync a specific parent or child endpoint. -def sync_endpoint(client, #pylint: disable=too-many-branches - catalog, - state, - start_date, - stream_name, - path, - endpoint_config, - api_version, - api_method, - static_params, - sub_type, - bookmark_query_field=None, - bookmark_field=None, - bookmark_type=None, - data_key=None, - body=None, - id_fields=None, - parent=None, - parent_id=None): - +def sync_endpoint( + client, # pylint: disable=too-many-branches + catalog, + state, + start_date, + stream_name, + path, + endpoint_config, + api_version, + api_method, + static_params, + sub_type, + bookmark_query_field=None, + bookmark_field=None, + bookmark_type=None, + data_key=None, + body=None, + id_fields=None, + parent=None, + parent_id=None, +): # Get the latest bookmark for the stream and set the last_integer/datetime last_datetime = None @@ -155,20 +172,22 @@ def sync_endpoint(client, #pylint: disable=too-many-branches # Each page has an offset (starting value) and a limit (batch size, number of records) # Increase the "offset" by the "limit" for each batch. # Continue until the "record_count" returned < "limit" is null/zero or - offset = 0 # Starting offset value for each batch API call - limit = client.page_size # Batch size; Number of records per API call - total_records = 0 # Initialize total - record_count = limit # Initialize, reset for each API call + offset = 0 # Starting offset value for each batch API call + limit = client.page_size # Batch size; Number of records per API call + total_records = 0 # Initialize total + record_count = limit # Initialize, reset for each API call paginate = endpoint_config.get('paginate', True) - while record_count == limit: # break out of loop when record_count < limit (or not data returned) + while ( + record_count == limit + ): # break out of loop when record_count < limit (or not data returned) pagination_params = {} if paginate: pagination_params['offset'] = offset pagination_params['limit'] = limit params = { **pagination_params, - **static_params # adds in endpoint specific, sort, filter params + **static_params, # adds in endpoint specific, sort, filter params } if bookmark_query_field: @@ -177,14 +196,23 @@ def sync_endpoint(client, #pylint: disable=too-many-branches elif bookmark_type == 'integer': params[bookmark_query_field] = last_integer - LOGGER.info('Stream: {}, Type: {} - Sync start {}'.format( - stream_name, sub_type, - 'since: {}, '.format(last_datetime) if bookmark_query_field else '')) + LOGGER.info( + 'Stream: {}, Type: {} - Sync start {}'.format( + stream_name, + sub_type, + 'since: {}, '.format(last_datetime) if bookmark_query_field else '', + ) + ) # Squash params to query-string params - querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]) - LOGGER.info('URL for {} ({}, {}): {}/{}?{}'\ - .format(stream_name, api_method, api_version, client.base_url, path, querystring)) + querystring = '&'.join( + ['%s=%s' % (key, value) for (key, value) in params.items()] + ) + LOGGER.info( + 'URL for {} ({}, {}): {}/{}?{}'.format( + stream_name, api_method, api_version, client.base_url, path, querystring + ) + ) if body is not None: LOGGER.info('body = {}'.format(body)) @@ -195,20 +223,21 @@ def sync_endpoint(client, #pylint: disable=too-many-branches version=api_version, params=querystring, endpoint=stream_name, - json=body) + json=body, + ) # time_extracted: datetime when the data was extracted from the API time_extracted = utils.now() if not data or data is None or data == []: record_count = 0 LOGGER.warning('Stream: {} - NO DATA RESULTS') - break # NO DATA + break # NO DATA # Transform data with transform_json from transform.py # This function converts camelCase to snake_case for fieldname keys. # The data_key may identify array/list of records below the element # LOGGER.info('data = {}'.format(data)) # TESTING, comment out - transformed_data = [] # initialize the record list + transformed_data = [] # initialize the record list data_list = [] # If a single record dictionary, append to a list[] if isinstance(data, dict): @@ -218,7 +247,7 @@ def sync_endpoint(client, #pylint: disable=too-many-branches transformed_data = transform_json(data, stream_name) elif data_key in data: transformed_data = transform_json(data, data_key)[data_key] - + if stream_name == 'activities': transformed_data = transform_activities(transformed_data) @@ -226,7 +255,7 @@ def sync_endpoint(client, #pylint: disable=too-many-branches if not transformed_data or transformed_data is None: record_count = 0 LOGGER.warning('Stream: {} - NO TRANSFORMED DATA RESULTS') - break # No data results + break # No data results # Process records and get the max_bookmark_value and record_count for the set of records max_bookmark_value, record_count = process_records( @@ -240,7 +269,8 @@ def sync_endpoint(client, #pylint: disable=too-many-branches last_datetime=last_datetime, last_integer=last_integer, parent=parent, - parent_id=parent_id) + parent_id=parent_id, + ) total_records = total_records + record_count @@ -248,9 +278,9 @@ def sync_endpoint(client, #pylint: disable=too-many-branches children = endpoint_config.get('children') if children: for child_stream_name, child_endpoint_config in children.items(): - should_stream, last_stream_child = should_sync_stream(get_selected_streams(catalog), - None, - child_stream_name) + should_stream, last_stream_child = should_sync_stream( + get_selected_streams(catalog), None, child_stream_name + ) if should_stream: # For each parent record for record in transformed_data: @@ -265,11 +295,14 @@ def sync_endpoint(client, #pylint: disable=too-many-branches parent_id = record.get(parent_id_field) # sync_endpoint for child - LOGGER.info('Syncing: {}, parent_stream: {}, parent_id: {}'.format( - child_stream_name, - stream_name, - parent_id)) - child_path = child_endpoint_config.get('path').format(str(parent_id)) + LOGGER.info( + 'Syncing: {}, parent_stream: {}, parent_id: {}'.format( + child_stream_name, stream_name, parent_id + ) + ) + child_path = child_endpoint_config.get('path').format( + str(parent_id) + ) child_total_records = sync_endpoint( client=client, catalog=catalog, @@ -282,25 +315,26 @@ def sync_endpoint(client, #pylint: disable=too-many-branches api_method=child_endpoint_config.get('api_method', 'GET'), static_params=child_endpoint_config.get('params', {}), sub_type=sub_type, - bookmark_query_field=child_endpoint_config.get('bookmark_query_field'), + bookmark_query_field=child_endpoint_config.get( + 'bookmark_query_field' + ), bookmark_field=child_endpoint_config.get('bookmark_field'), bookmark_type=child_endpoint_config.get('bookmark_type'), data_key=child_endpoint_config.get('data_key', None), body=child_endpoint_config.get('body', None), id_fields=child_endpoint_config.get('id_fields'), parent=child_endpoint_config.get('parent'), - parent_id=parent_id) - LOGGER.info('Synced: {}, parent_id: {}, total_records: {}'.format( - child_stream_name, - parent_id, - child_total_records)) + parent_id=parent_id, + ) + LOGGER.info( + 'Synced: {}, parent_id: {}, total_records: {}'.format( + child_stream_name, parent_id, child_total_records + ) + ) # Update the state with the max_bookmark_value for the stream if bookmark_field: - write_bookmark(state, - stream_name, - sub_type, - max_bookmark_value) + write_bookmark(state, stream_name, sub_type, max_bookmark_value) # to_rec: to record; ending record for the batch to_rec = offset + limit @@ -308,10 +342,9 @@ def sync_endpoint(client, #pylint: disable=too-many-branches to_rec = total_records if paginate: - LOGGER.info('{} - Synced records: {} to {}'.format( - stream_name, - offset, - to_rec)) + LOGGER.info( + '{} - Synced records: {} to {}'.format(stream_name, offset, to_rec) + ) # Pagination: increment the offset by the limit (batch-size) offset = offset + limit @@ -368,26 +401,30 @@ def sync(client, config, catalog, state): communications_dt_str = transform_datetime(communications_dttm_str) # LOGGER.info('communications bookmark_date = {}'.format(communications_dt_str)) - deposit_transactions_dttm_str = get_bookmark(state, 'deposit_transactions', 'self', start_date) + deposit_transactions_dttm_str = get_bookmark( + state, 'deposit_transactions', 'self', start_date + ) deposit_transactions_dt_str = transform_datetime(deposit_transactions_dttm_str) # LOGGER.info('deposit_transactions bookmark_date = {}'.format(deposit_transactions_dt_str)) - loan_transactions_dttm_str = get_bookmark(state, 'loan_transactions', 'self', start_date) + loan_transactions_dttm_str = get_bookmark( + state, 'loan_transactions', 'self', start_date + ) loan_transactions_dt_str = transform_datetime(loan_transactions_dttm_str) loan_transactions_dttm = strptime_to_utc(loan_transactions_dt_str) clients_dttm_str = get_bookmark(state, 'clients', 'self', start_date) clients_dt_str = transform_datetime(clients_dttm_str) - # TEMP: pre fetch of deposit accounts using search - # REVIEW: Can we remove `[:19].replace('T', ' ')` - saving_accounts_str = get_bookmark(state, 'saving_accounts', 'self', start_date) - saving_accounts_dt_str = transform_datetime(saving_accounts_str)[:19].replace('T', ' ') + deposit_accounts_str = get_bookmark(state, 'deposit_accounts', 'self', start_date) + deposit_accounts_dt_str = transform_datetime(deposit_accounts_str) groups_dttm_str = get_bookmark(state, 'groups', 'self', start_date) groups_dt_str = transform_datetime(groups_dttm_str) - gl_journal_entries_dttm_str = get_bookmark(state, 'gl_journal_entries', 'self', start_date) + gl_journal_entries_dttm_str = get_bookmark( + state, 'gl_journal_entries', 'self', start_date + ) gl_journal_entries_dt_str = transform_datetime(gl_journal_entries_dttm_str) lookback_days = int(config.get('lookback_window', LOOKBACK_DEFAULT)) @@ -421,34 +458,28 @@ def sync(client, config, catalog, state): 'params': { 'sortBy': 'lastModifiedDate:ASC', 'detailsLevel': 'FULL', - 'paginationDetails': 'ON' + 'paginationDetails': 'ON', }, 'bookmark_field': 'last_modified_date', 'bookmark_type': 'datetime', - 'id_fields': ['id'] + 'id_fields': ['id'], }, 'communications': { 'path': 'communications/messages:search', 'api_version': 'v2', 'api_method': 'POST', - 'params': { - 'detailsLevel': 'FULL' - }, + 'params': {'detailsLevel': 'FULL'}, 'body': [ - { - 'field': 'state', - 'operator': 'EQUALS', - 'value': 'SENT' - }, + {'field': 'state', 'operator': 'EQUALS', 'value': 'SENT'}, { 'field': 'creationDate', 'operator': 'AFTER', - 'value': communications_dt_str - } + 'value': communications_dt_str, + }, ], 'bookmark_field': 'creation_date', 'bookmark_type': 'datetime', - 'id_fields': ['encoded_key'] + 'id_fields': ['encoded_key'], }, 'centres': { 'path': 'centres', @@ -457,35 +488,30 @@ def sync(client, config, catalog, state): 'params': { 'sortBy': 'lastModifiedDate:ASC', 'detailsLevel': 'FULL', - 'paginationDetails': 'ON' + 'paginationDetails': 'ON', }, 'bookmark_field': 'last_modified_date', 'bookmark_type': 'datetime', - 'id_fields': ['id'] + 'id_fields': ['id'], }, 'clients': { 'path': 'clients:search', 'api_version': 'v2', 'api_method': 'POST', - 'params': { - 'detailsLevel': 'FULL' - }, + 'params': {'detailsLevel': 'FULL'}, 'body': { - "sortingCriteria": { - "field": "lastModifiedDate", - "order": "ASC" - }, + "sortingCriteria": {"field": "lastModifiedDate", "order": "ASC"}, "filterCriteria": [ { "field": "lastModifiedDate", "operator": "AFTER", - "value": clients_dt_str + "value": clients_dt_str, } - ] + ], }, 'bookmark_field': 'last_modified_date', 'bookmark_type': 'datetime', - 'id_fields': ['id'] + 'id_fields': ['id'], }, 'credit_arrangements': { 'path': 'creditarrangements', @@ -494,26 +520,33 @@ def sync(client, config, catalog, state): 'params': { 'sortBy': 'creationDate:ASC', 'detailsLevel': 'FULL', - 'paginationDetails': 'ON' + 'paginationDetails': 'ON', }, 'bookmark_field': 'last_modified_date', 'bookmark_type': 'datetime', - 'id_fields': ['id'] + 'id_fields': ['id'], }, 'custom_field_sets': { 'path': 'customfieldsets', 'api_version': 'v1', 'api_method': 'GET', 'params': {}, - 'id_fields': ['id'] + 'id_fields': ['id'], }, 'deposit_accounts': { - 'path': 'deposits', + 'path': 'deposits:search', 'api_version': 'v2', - 'api_method': 'GET', - 'params': { - 'sortBy': 'lastModifiedDate:ASC', - 'detailsLevel': 'FULL' + 'api_method': 'POST', + 'params': {'detailsLevel': 'FULL'}, + 'body': { + "sortingCriteria": {"field": "lastModifiedDate", "order": "DESC"}, + "filterCriteria": [ + { + "field": "lastModifiedDate", + "operator": "AFTER", + "value": deposit_accounts_dt_str, + } + ], }, 'bookmark_field': 'last_modified_date', 'bookmark_type': 'datetime', @@ -524,95 +557,58 @@ def sync(client, config, catalog, state): 'path': 'deposits/{}/cards', 'api_version': 'v2', 'api_method': 'GET', - 'params': { - 'detailsLevel': 'FULL' - }, + 'params': {'detailsLevel': 'FULL'}, 'id_fields': ['deposit_id', 'reference_token'], - 'parent': 'deposit' + 'parent': 'deposit', } - } - }, - # TEMP: pre fetch of deposit accounts using search - 'saving_accounts': { - 'path': 'savings/search', - 'api_version': 'v1', - 'api_method': 'POST', - 'params': { - 'sortBy': 'lastModifiedDate:ASC', - 'fullDetails': True - }, - 'body': { - "filterConstraints": [ - { - "filterSelection": "LAST_MODIFIED_DATE", - "filterElement": "AFTER", - "value": saving_accounts_dt_str - } - ] }, - 'bookmark_field': 'last_modified_date', - 'bookmark_type': 'datetime', - 'id_fields': ['id'], - 'store_ids': True, }, 'deposit_products': { 'path': 'savingsproducts', 'api_version': 'v1', 'api_method': 'GET', - 'params': { - "fullDetails": True - }, + 'params': {"fullDetails": True}, 'bookmark_field': 'last_modified_date', 'bookmark_type': 'datetime', - 'id_fields': ['id'] + 'id_fields': ['id'], }, 'deposit_transactions': { 'path': 'deposits/transactions:search', 'api_version': 'v2', 'api_method': 'POST', - 'params': { - 'detailsLevel': 'FULL' - }, + 'params': {'detailsLevel': 'FULL'}, 'body': { - "sortingCriteria": { - "field": "creationDate", - "order": "ASC" - }, + "sortingCriteria": {"field": "creationDate", "order": "ASC"}, "filterCriteria": [ { "field": "creationDate", "operator": "AFTER", - "value": deposit_transactions_dt_str + "value": deposit_transactions_dt_str, } - ] + ], }, 'bookmark_field': 'creation_date', 'bookmark_type': 'datetime', - 'id_fields': ['encoded_key'] + 'id_fields': ['encoded_key'], }, 'groups': { 'path': 'groups:search', 'api_version': 'v2', 'api_method': 'POST', - 'params': { - 'detailsLevel': 'FULL' - }, + 'params': {'detailsLevel': 'FULL'}, 'body': { - "sortingCriteria": { - "field": "lastModifiedDate", - "order": "ASC" - }, + "sortingCriteria": {"field": "lastModifiedDate", "order": "ASC"}, "filterCriteria": [ { "field": "lastModifiedDate", "operator": "AFTER", - "value": groups_dt_str + "value": groups_dt_str, } - ] + ], }, 'bookmark_field': 'last_modified_date', 'bookmark_type': 'datetime', - 'id_fields': ['id'] + 'id_fields': ['id'], }, 'loan_accounts': { 'path': 'loans', @@ -621,7 +617,7 @@ def sync(client, config, catalog, state): 'params': { 'sortBy': 'lastModifiedDate:ASC', 'detailsLevel': 'FULL', - 'paginationDetails': 'ON' + 'paginationDetails': 'ON', }, 'bookmark_field': 'last_modified_date', 'bookmark_type': 'datetime', @@ -631,49 +627,39 @@ def sync(client, config, catalog, state): 'path': 'loans/{}/repayments', 'api_version': 'v1', 'api_method': 'GET', - 'params': { - 'detailsLevel': 'FULL', - 'paginationDetails': 'ON' - }, + 'params': {'detailsLevel': 'FULL', 'paginationDetails': 'ON'}, 'id_fields': ['encoded_key'], - 'parent': 'loan_accounts' + 'parent': 'loan_accounts', } - } + }, }, 'loan_products': { 'path': 'loanproducts', 'api_version': 'v1', 'api_method': 'GET', - 'params': { - "fullDetails": True - }, + 'params': {"fullDetails": True}, 'bookmark_field': 'last_modified_date', 'bookmark_type': 'datetime', - 'id_fields': ['id'] + 'id_fields': ['id'], }, 'loan_transactions': { 'path': 'loans/transactions:search', 'api_version': 'v2', 'api_method': 'POST', - 'params': { - 'detailsLevel': 'FULL' - }, + 'params': {'detailsLevel': 'FULL'}, 'body': { - "sortingCriteria": { - "field": "creationDate", - "order": "ASC" - }, + "sortingCriteria": {"field": "creationDate", "order": "ASC"}, "filterCriteria": [ { "field": "creationDate", "operator": "AFTER", - "value": loan_transactions_dt_str + "value": loan_transactions_dt_str, } - ] + ], }, 'bookmark_field': 'creation_date', 'bookmark_type': 'datetime', - 'id_fields': ['encoded_key'] + 'id_fields': ['encoded_key'], }, 'tasks': { 'path': 'tasks', @@ -682,11 +668,11 @@ def sync(client, config, catalog, state): 'params': { 'sortBy': 'lastModifiedDate:ASC', 'detailsLevel': 'FULL', - 'paginationDetails': 'ON' + 'paginationDetails': 'ON', }, 'bookmark_field': 'last_modified_date', 'bookmark_type': 'datetime', - 'id_fields': ['id'] + 'id_fields': ['id'], }, 'transaction_channels': { 'path': 'organization/transactionChannels', @@ -696,7 +682,7 @@ def sync(client, config, catalog, state): 'detailsLevel': 'FULL', }, 'id_fields': ['id'], - 'paginate': False + 'paginate': False, }, 'users': { 'path': 'users', @@ -705,63 +691,55 @@ def sync(client, config, catalog, state): 'params': { 'sortBy': 'lastModifiedDate:ASC', 'detailsLevel': 'FULL', - 'paginationDetails': 'ON' + 'paginationDetails': 'ON', }, 'bookmark_field': 'last_modified_date', 'bookmark_type': 'datetime', - 'id_fields': ['id'] + 'id_fields': ['id'], }, 'gl_accounts': { 'path': 'glaccounts', 'api_version': 'v1', 'api_method': 'GET', - 'params': { - 'type': '{sub_type}' - }, + 'params': {'type': '{sub_type}'}, 'id_fields': ['gl_code'], 'bookmark_field': 'last_modified_date', 'bookmark_type': 'datetime', - 'sub_types': ['ASSET', 'LIABILITY', 'EQUITY', 'INCOME', 'EXPENSE'] + 'sub_types': ['ASSET', 'LIABILITY', 'EQUITY', 'INCOME', 'EXPENSE'], }, 'gl_journal_entries': { 'path': 'gljournalentries:search', 'api_version': 'v2', 'api_method': 'POST', 'body': { - "sortingCriteria": { - "field": "bookingDate", - "order": "ASC" - }, + "sortingCriteria": {"field": "bookingDate", "order": "ASC"}, "filterCriteria": [ { "field": "bookingDate", "operator": "AFTER", - "value": gl_journal_entries_dt_str + "value": gl_journal_entries_dt_str, } - ] + ], }, 'id_fields': ['entry_id'], 'bookmark_field': 'booking_date', - 'bookmark_type': 'datetime' + 'bookmark_type': 'datetime', }, 'activities': { 'path': 'activities', 'api_version': 'v1', 'api_method': 'GET', - 'params' : { - 'from': '{activities_from_dt_str}', - 'to': '{now_date_str}' - }, + 'params': {'from': '{activities_from_dt_str}', 'to': '{now_date_str}'}, 'id_fields': ['encoded_key'], 'bookmark_field': 'timestamp', - 'bookmark_type': 'datetime' + 'bookmark_type': 'datetime', }, 'index_rate_sources': { 'path': 'indexratesources', 'api_version': 'v2', 'api_method': 'GET', 'id_fields': ['encoded_key'], - 'params': {} + 'params': {}, }, 'installments': { 'path': 'installments', @@ -770,11 +748,11 @@ def sync(client, config, catalog, state): 'id_fields': ['encoded_key'], 'params': { 'dueFrom': '{installments_from_dt_str}', - 'dueTo': '{now_date_str}' + 'dueTo': '{now_date_str}', }, 'bookmark_field': 'last_paid_date', - 'bookmark_type': 'datetime' - } + 'bookmark_type': 'datetime', + }, } selected_streams = get_selected_streams(catalog) @@ -790,9 +768,9 @@ def sync(client, config, catalog, state): # For each endpoint (above), determine if the stream should be streamed # (based on the catalog and last_stream), then sync those streams. for stream_name, endpoint_config in endpoints.items(): - should_stream, last_stream = should_sync_stream(selected_streams, - last_stream, - stream_name) + should_stream, last_stream = should_sync_stream( + selected_streams, last_stream, stream_name + ) if should_stream: # loop through each sub type @@ -803,41 +781,45 @@ def sync(client, config, catalog, state): if stream_name == 'activities': now_date_str = strftime(utils.now())[:10] activities_from_dttm_str = get_bookmark( - state, 'activities', sub_type, start_date) + state, 'activities', sub_type, start_date + ) activities_from_dt_str = transform_datetime( - activities_from_dttm_str)[:10] - activities_from_param = endpoint_config.get( - 'params', {}).get('from') + activities_from_dttm_str + )[:10] + activities_from_param = endpoint_config.get('params', {}).get( + 'from' + ) if activities_from_param: endpoint_config['params']['from'] = activities_from_dt_str - activities_to_param = endpoint_config.get( - 'params', {}).get('to') + activities_to_param = endpoint_config.get('params', {}).get('to') if activities_to_param: endpoint_config['params']['to'] = now_date_str if stream_name == 'installments': now_date_str = strftime(utils.now())[:10] installments_from_dttm_str = get_bookmark( - state, 'installments', sub_type, start_date) + state, 'installments', sub_type, start_date + ) installments_from_dt_str = transform_datetime( - installments_from_dttm_str)[:10] - installments_from_param = endpoint_config.get( - 'params', {}).get('dueFrom') + installments_from_dttm_str + )[:10] + installments_from_param = endpoint_config.get('params', {}).get( + 'dueFrom' + ) if installments_from_param: endpoint_config['params']['dueFrom'] = installments_from_dt_str - installments_to_param = endpoint_config.get( - 'params', {}).get('dueTo') + installments_to_param = endpoint_config.get('params', {}).get( + 'dueTo' + ) if installments_to_param: endpoint_config['params']['dueTo'] = now_date_str - update_currently_syncing(state, stream_name) path = endpoint_config.get('path') sub_type_param = endpoint_config.get('params', {}).get('type') if sub_type_param: endpoint_config['params']['type'] = sub_type - total_records = sync_endpoint( client=client, catalog=catalog, @@ -855,10 +837,11 @@ def sync(client, config, catalog, state): bookmark_type=endpoint_config.get('bookmark_type'), data_key=endpoint_config.get('data_key', None), body=endpoint_config.get('body', None), - id_fields=endpoint_config.get('id_fields')) + id_fields=endpoint_config.get('id_fields'), + ) update_currently_syncing(state, None) - LOGGER.info('Synced: {}, total_records: {}'.format( - stream_name, - total_records)) + LOGGER.info( + 'Synced: {}, total_records: {}'.format(stream_name, total_records) + ) LOGGER.info('FINISHED Syncing: {}'.format(stream_name))