diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..84fff8f --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,47 @@ +default_stages: [commit] +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v5.0.0 + hooks: + - id: check-merge-conflict + - id: check-docstring-first + - id: debug-statements + - id: trailing-whitespace + - id: check-toml + - id: end-of-file-fixer + - id: check-yaml + - id: sort-simple-yaml + - id: check-json + - id: pretty-format-json + args: ['--autofix','--no-sort-keys'] + + - repo: https://github.com/psf/black + rev: 23.12.0 + hooks: + - id: black + + - repo: https://github.com/pycqa/flake8 + rev: 7.1.2 + hooks: + - id: flake8 + args: ["--ignore=W503,E501,C901"] + additional_dependencies: [ + 'flake8-print', + 'flake8-debugger', + ] + + - repo: https://github.com/PyCQA/bandit + rev: '1.7.10' + hooks: + - id: bandit + + - repo: https://github.com/PyCQA/docformatter + rev: v1.7.5 + hooks: + - id: docformatter + args: [--in-place] + + - repo: https://github.com/codespell-project/codespell + rev: v2.4.1 + hooks: + - id: codespell diff --git a/CHANGELOG.md b/CHANGELOG.md index 8016173..81486c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,25 +1,2 @@ -# Changelog -## 2.0.3 - * Adds a proper circleci config; makes pylint happy; bump library versions [#27](https://github.com/singer-io/tap-quickbase/pull/27) - -## 2.0.2 - * Update version of `requests` to `2.20.0` in response to CVE 2018-18074 - -## 2.0.1 - * Detect out of range timestamps before emitting records and provide context to help identify the faulty record [#19](https://github.com/singer-io/tap-quickbase/pull/19) - -## 2.0.0 - * Replace spaces and hyphens in field names with underscores, remove all other non-alphanumeric characters - -## 1.0.3 - * Ensures that the base url has a trailing slash and uses the HTTPS protocol. - -## 1.0.2 - * Use metadata in all situations instead of custom schema extensions. [#13](https://github.com/singer-io/tap-quickbase/pull/13) - -## 1.0.1 - * Now throws an informative error when the app being connected contains no tables. - -## 1.0.0 - * Initial release. + * Initial Commit diff --git a/README.md b/README.md index 37b6d90..9cbf27b 100644 --- a/README.md +++ b/README.md @@ -1,293 +1,202 @@ # tap-quickbase -This is a [Singer](https://singer.io) tap that produces JSON-formatted data -following the [Singer spec](https://github.com/singer-io/getting-started/blob/master/SPEC.md). +This is a [Singer](https://singer.io) tap that produces JSON-formatted data +following the [Singer +spec](https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md). This tap: -- Pulls raw data from Quickbase's [API](http://help.quickbase.com/api-guide/index.html) -- Extracts data based on table/column specifications in properties.json + +- Pulls raw data from the [Quickbase API]. +- Extracts the following resources: + - [Apps](https://developer.quickbase.com/operation/getApp) + + - [Events](https://developer.quickbase.com/operation/getAppEvents) + + - [Roles](https://developer.quickbase.com/operation/getRoles) + + - [AppTables](https://developer.quickbase.com/operation/getAppTables) + + - [Tables](https://developer.quickbase.com/operation/getTable) + + - [TableRelationships](https://developer.quickbase.com/operation/getRelationships) + + - [TableReports](https://developer.quickbase.com/operation/getTableReports) + + - [GetReports](https://developer.quickbase.com/operation/getReport) + + - [Fields](https://developer.quickbase.com/operation/getFields) + + - [GetFields](https://developer.quickbase.com/operation/getField) + + - [FieldsUsage](https://developer.quickbase.com/operation/getFieldsUsage) + + - [GetFieldUsage](https://developer.quickbase.com/operation/getFieldUsage) + - Outputs the schema for each resource - Incrementally pulls data based on the input state - ```bash - mkvirtualenv -p python3 tap-quickbase - pip install git+https://github.com/flash716/tap-quickbase.git - tap-quickbase --config config.json --discover - tap-quickbase --config config.json --properties properties.json --state state.json - ``` -## Usage +## Streams -**Install** -```bash -$ mkvirtualenv -p python3 tap-quickbase -$ pip install tap-quickbase -``` -or -```bash -$ git clone git@github.com:flash716/tap-quickbase.git -$ cd tap-quickbase -$ mkvirtualenv -p python3 tap-quickbase -$ cd tap-quickbase -$ pip install . -``` +**[apps](https://developer.quickbase.com/operation/getApp)** +- Primary keys: ['id'] +- Replication strategy: INCREMENTAL +**[events](https://developer.quickbase.com/operation/getAppEvents)** +- Primary keys: ['id'] +- Replication strategy: FULL_TABLE -**Find your Quickbase Authentication Information** +**[roles](https://developer.quickbase.com/operation/getRoles)** +- Primary keys: ['id'] +- Replication strategy: FULL_TABLE -- Quickbase URL -- AppID -- [User Token](http://help.quickbase.com/api-guide/index.html#create_user_tokens.html) +**[app_tables](https://developer.quickbase.com/operation/getAppTables)** +- Primary keys: ['id'] +- Replication strategy: INCREMENTAL +**[tables](https://developer.quickbase.com/operation/getTable)** +- Primary keys: ['id'] +- Replication strategy: INCREMENTAL -**Create a configuration file** +**[table_relationships](https://developer.quickbase.com/operation/getRelationships)** +- Primary keys: ['id'] +- Replication strategy: FULL_TABLE -Create a JSON file called `config.tap.json` containing the information you just -found as well as a default start_date to begin pulling data from. +**[table_reports](https://developer.quickbase.com/operation/getTableReports)** +- Primary keys: ['id'] +- Replication strategy: FULL_TABLE -```json -{ - "qb_url": "https://yoursubdomain.quickbase.com/db/", - "qb_appid": "your_appid", - "qb_user_token": "your_user_token", - "start_date": "1970-01-01T00:00:01Z" -} -``` +**[get_reports](https://developer.quickbase.com/operation/getReport)** +- Primary keys: ['id'] +- Replication strategy: FULL_TABLE +**[fields](https://developer.quickbase.com/operation/getFields)** +- Primary keys: ['id'] +- Replication strategy: FULL_TABLE -**Discovery mode** +**[get_fields](https://developer.quickbase.com/operation/getField)** +- Primary keys: ['id'] +- Replication strategy: FULL_TABLE -The tap can be invoked in discovery mode to find the available tables and columns -in the app's data. +**[fields_usage](https://developer.quickbase.com/operation/getFieldsUsage)** +- Primary keys: ['id'] +- Replication strategy: FULL_TABLE -```bash -$ tap-quickbase --config config.tap.json --discover > properties.json -``` +**[get_field_usage](https://developer.quickbase.com/operation/getFieldUsage)** +- Primary keys: ['id'] +- Replication strategy: FULL_TABLE -A discovered catalog is output via stdout to `properties.json`, with a JSON-schema -description of each table. A source table directly corresponds to a Singer stream. -```json -{ - "streams": [ - { - "type": "object", - "key_properties": [ - "rid" - ], - "stream_alias": "table_name", - "table_name": "table_id", - "tap_stream_id": "app_name__table_name", - "stream": "app_name__table_name", - "schema": { - "properties": { - "rid": { - "type": [ - "null", - "string" - ], - "inclusion": "automatic" - }, - "datecreated": { - "type": [ - "null", - "string" - ], - "format": "date-time", - "inclusion": "automatic" - }, - "datemodified": { - "type": [ - "null", - "string" - ], - "format": "date-time", - "inclusion": "automatic" - }, - "companyid": { - "type": [ - "null", - "string" - ], - "inclusion": "available" - } - } - }, - "metadata": [ - { - "metadata": { - "tap-quickbase.app_id": "app_id" - }, - "breadcrumb": [] - }, - { - "metadata": { - "tap-quickbase.id": "1" - }, - "breadcrumb": [ - "properties", - "datecreated" - ] - }, - { - "metadata": { - "tap-quickbase.id": "2" - }, - "breadcrumb": [ - "properties", - "datemodified" - ] - }, - { - "metadata": { - "tap-quickbase.id": "6" - }, - "breadcrumb": [ - "properties", - "companyid" - ] - } - ] - } - ] -} -``` -**Field selection** +## Authentication -In sync mode, `tap-quickbase` consumes a modified version of the catalog where -tables and fields have been marked as _selected_. +## Quick Start -Redirect output from the tap's discovery mode to a file so that it can be -modified: +1. Install -```bash -$ tap-quickbase -c config.tap.json --discover > properties.json -``` + Clone this repository, and then install using setup.py. We recommend using a virtualenv: -Then edit `properties.json` to make selections. -In this example we want the `table_name` table. -The stream's schema gets a top-level `selected` flag, as does its columns' schemas: + ```bash + > virtualenv -p python3 venv + > source venv/bin/activate + > python setup.py install + OR + > cd .../tap-quickbase + > pip install -e . + ``` +2. Dependent libraries. The following dependent libraries were installed. + ```bash + > pip install singer-python + > pip install target-stitch + > pip install target-json -```json -{ - "streams": [ - { - "type": "object", - "selected": "true", - "key_properties": [ - "rid" - ], - "stream_alias": "table_name", - "table_name": "table_id", - "tap_stream_id": "app_name__table_name", - "stream": "app_name__table_name", - "schema": { - "properties": { - "rid": { - "selected": "true", - "type": [ - "null", - "string" - ], - "inclusion": "automatic" - }, - "datecreated": { - "selected": "true", - "type": [ - "null", - "string" - ], - "format": "date-time", - "inclusion": "automatic" - }, - "datemodified": { - "selected": "true", - "type": [ - "null", - "string" - ], - "format": "date-time", - "inclusion": "automatic" - }, - "companyid": { - "selected": "true", - "type": [ - "null", - "string" - ], - "inclusion": "available" - } - } - }, - "metadata": [ - { - "metadata": { - "tap-quickbase.id": "1" - }, - "breadcrumb": [ - "properties", - "datecreated" - ] - }, - { - "metadata": { - "tap-quickbase.id": "2" - }, - "breadcrumb": [ - "properties", - "datemodified" - ] - }, - { - "metadata": { - "tap-quickbase.id": "6" - }, - "breadcrumb": [ - "properties", - "companyid" - ] - } - ] - } - ] -} -``` + ``` + - [singer-tools](https://github.com/singer-io/singer-tools) + - [target-stitch](https://github.com/singer-io/target-stitch) -**Sync mode** +3. Create your tap's `config.json` file. The tap config file for this tap should include these entries: + - `start_date` - the default value to use if no bookmark exists for an endpoint (rfc3339 date string) + - `user_agent` (string, optional): Process and email for API logging purposes. Example: `tap-quickbase ` + - `request_timeout` (integer, `300`): Max time for which request should wait to get a response. Default request_timeout is 300 seconds. -With an annotated properties catalog, the tap can be invoked in sync mode: + ```json + { + "start_date": "2019-01-01T00:00:00Z", + "user_agent": "tap-quickbase ", + "request_timeout": 300 + } + ``` + + Optionally, also create a `state.json` file. `currently_syncing` is an optional attribute used for identifying the last object to be synced in case the job is interrupted mid-stream. The next run would begin where the last job left off. -```bash -$ tap-quickbase -c config.tap.json --properties properties.json -``` + ```json + { + "currently_syncing": "engage", + "bookmarks": { + "export": "2019-09-27T22:34:39.000000Z", + "funnels": "2019-09-28T15:30:26.000000Z", + "revenue": "2019-09-28T18:23:53Z" + } + } + ``` -Messages are written to standard output following the Singer specification. -The resultant stream of JSON data can be consumed by a Singer target: +4. Run the Tap in Discovery Mode + This creates a catalog.json for selecting objects/fields to integrate: + ```bash + tap-quickbase --config config.json --discover > catalog.json + ``` + See the Singer docs on discovery mode + [here](https://github.com/singer-io/getting-started/blob/master/docs/DISCOVERY_MODE.md#discovery-mode). -```bash -$ tap-quickbase -c config.tap.json --properties properties.json | target-stitch --config config.target.json -``` +5. Run the Tap in Sync Mode (with catalog) and [write out to state file](https://github.com/singer-io/getting-started/blob/master/docs/RUNNING_AND_DEVELOPING.md#running-a-singer-tap-with-a-singer-target) -## Replication methods and state file + For Sync mode: + ```bash + > tap-quickbase --config tap_config.json --catalog catalog.json > state.json + > tail -1 state.json > state.json.tmp && mv state.json.tmp state.json + ``` + To load to json files to verify outputs: + ```bash + > tap-quickbase --config tap_config.json --catalog catalog.json | target-json > state.json + > tail -1 state.json > state.json.tmp && mv state.json.tmp state.json + ``` + To pseudo-load to [Stitch Import API](https://github.com/singer-io/target-stitch) with dry run: + ```bash + > tap-quickbase --config tap_config.json --catalog catalog.json | target-stitch --config target_config.json --dry-run > state.json + > tail -1 state.json > state.json.tmp && mv state.json.tmp state.json + ``` -In the above example, we invoked `tap-quickbase` without providing a _state_ file -and without specifying a replication method. The two ways to replicate a given -table are `FULL_TABLE` and `INCREMENTAL`. `FULL_TABLE` replication is used by -default. +6. Test the Tap + While developing the quickbase tap, the following utilities were run in accordance with Singer.io best practices: + Pylint to improve [code quality](https://github.com/singer-io/getting-started/blob/master/docs/BEST_PRACTICES.md#code-quality): + ```bash + > pylint tap_quickbase -d missing-docstring -d logging-format-interpolation -d too-many-locals -d too-many-arguments + ``` + Pylint test resulted in the following score: + ```bash + Your code has been rated at 9.67/10 + ``` -### Full Table + To [check the tap](https://github.com/singer-io/singer-tools#singer-check-tap) and verify working: + ```bash + > tap_quickbase --config tap_config.json --catalog catalog.json | singer-check-tap > state.json + > tail -1 state.json > state.json.tmp && mv state.json.tmp state.json + ``` -Full-table replication extracts all data from the source table each time the tap -is invoked. + #### Unit Tests -### Incremental + Unit tests may be run with the following. -Incremental replication works in conjunction with a state file to only extract -new records each time the tap is invoked. + ``` + python -m pytest --verbose + ``` + Note, you may need to install test dependencies. + ``` + pip install -e .'[dev]' + ``` --- -Copyright © 2018 Stitch +Copyright © 2019 Stitch diff --git a/setup.py b/setup.py index 5cf2290..f01192c 100644 --- a/setup.py +++ b/setup.py @@ -1,23 +1,28 @@ -#!/usr/bin/env python -from setuptools import setup -setup(name='tap-quickbase', - version='2.0.3', - description='Singer.io tap for extracting data from QuickBase', - author='Stitch', - url='https://singer.io', - classifiers=['Programming Language :: Python :: 3 :: Only'], - py_modules=['tap_quickbase'], +from setuptools import setup, find_packages + + +setup(name="tap-quickbase", + version="0.0.1", + description="Singer.io tap for extracting data from Quickbase API", + author="Stitch", + url="http://singer.io", + classifiers=["Programming Language :: Python :: 3 :: Only"], + py_modules=["tap_quickbase"], install_requires=[ - 'singer-python==5.13.2', - 'requests==2.32.4', - 'python-dateutil==2.9.0', - 'pytz==2018.9', + "singer-python==6.3.0", + "requests==2.32.4", + "backoff==2.2.1", + "parameterized" ], - entry_points=''' - [console_scripts] - tap-quickbase=tap_quickbase:main - ''', - packages=['tap_quickbase'], + entry_points=""" + [console_scripts] + tap-quickbase=tap_quickbase:main + """, + packages=find_packages(), + package_data = { + "tap_quickbase": ["schemas/*.json"], + }, + include_package_data=True, ) diff --git a/tap_quickbase/__init__.py b/tap_quickbase/__init__.py index a32e364..e869094 100644 --- a/tap_quickbase/__init__.py +++ b/tap_quickbase/__init__.py @@ -1,491 +1,45 @@ -#!/usr/bin/env python3 -# pylint: disable=missing-docstring,not-an-iterable,too-many-locals,too-many-arguments,invalid-name -import copy -import datetime -import time -import os -import re -import pytz - - -import dateutil.parser +import sys +import json import singer -from singer.catalog import Catalog, CatalogEntry -import singer.utils as singer_utils -import singer.metadata as singer_metadata -import singer.metrics as metrics -from singer.schema import Schema - -from tap_quickbase import qbconn +from tap_quickbase.client import Client +from tap_quickbase.discover import discover +from tap_quickbase.sync import sync -REQUIRED_CONFIG_KEYS = ['qb_url', 'qb_appid', 'qb_user_token', 'start_date'] -DATETIME_FMT = "%Y-%m-%dT%H:%M:%S.%fZ" -CONFIG = {} -STATE = {} -NUM_RECORDS = 100 LOGGER = singer.get_logger() -REPLICATION_KEY = qbconn.sanitize_field_name('date modified') - -DEBUG_FLAG = False - -class TimestampOutOfRangeException(Exception): - pass - -def format_child_field_name(parent_name, child_name): - return "{}.{}".format(parent_name, child_name) - -def format_epoch_milliseconds(epoch_timestamp): - # NB: Quick Base allows year values greater than 9999 - # datetime.fromutctimestamp() only supports up to year 2038, so create one directly from struct_time - # This will throw a ValueError with year values > 9999 - epoch_sec = int(epoch_timestamp) / 1000.0 - epoch_time = time.gmtime(epoch_sec) - return datetime.datetime(*epoch_time[:6]).replace(tzinfo=pytz.UTC).strftime(DATETIME_FMT) - -def convert_to_epoch_milliseconds(dt_string): - dt = datetime.datetime.strptime(dt_string, DATETIME_FMT) - epoch = datetime.datetime.utcfromtimestamp(0) - return int((dt-epoch).total_seconds() * 1000.0) - -def build_state(raw_state, catalog): - LOGGER.info('Building State from raw state %s', raw_state) - - state = {} - - for catalog_entry in catalog.streams: - start = singer.get_bookmark(raw_state, catalog_entry.tap_stream_id, REPLICATION_KEY) - if not start: - start = CONFIG.get( - 'start_date', - datetime.datetime.utcfromtimestamp(0).strftime(DATETIME_FMT) - ) - state = singer.write_bookmark(state, catalog_entry.tap_stream_id, REPLICATION_KEY, start) - - return state - -def populate_schema_leaf(schema, field_info, id_num, breadcrumb, metadata): - """ - Populates a leaf in the schema. A leaf corresponds to a JSON boolean, - number, or string field - """ - #add metadata - inclusion = 'available' if id_num != '2' else 'automatic' - metadata.append( - { - 'metadata': { - 'tap-quickbase.id': id_num, - 'inclusion': inclusion - }, - 'breadcrumb': list(breadcrumb) - } - ) - - #populate schema - field_type = ['null'] - field_format = None - # https://help.quickbase.com/user-assistance/field_types.html - if field_info.get('base_type') == 'bool': - field_type.append('boolean') - elif field_info.get('base_type') == 'float': - field_type.append('number') - elif field_info.get('base_type') == 'int64': - if field_info.get('type') in ('timestamp', 'date'): - field_type.append('string') - field_format = 'date-time' - else: - # `timeofday` comes out of the API as an integer for how many milliseconds - # through the day, 900000 would be 12:15am - # `duration` comes out as an integer for how many milliseconds the duration is, - # 1000 would be 1 second - # let's just pass these as an integer - field_type.append('integer') - else: - field_type.append('string') - - schema.type = field_type - if field_format is not None: - schema.format = field_format - -def populate_schema_node(schema, field_info, id_field_map, breadcrumb, metadata): - """ - Populates a node in the schema. A node corresponds to a JSON object, which has - properties (children) - """ - # add metadata - metadata.append( - { - 'metadata': { - 'inclusion': 'available' - }, - 'breadcrumb': list(breadcrumb) - } - ) - - #populate schema - schema.type = ['null','object'] - schema.properties = {} - for id_num in field_info.get('composite_fields'): - child_field_info = id_field_map[id_num] - breadcrumb.extend(['properties',child_field_info.get('name')]) - - child_schema = Schema() - if child_field_info.get('composite_fields'): - populate_schema_node(child_schema, child_field_info, id_field_map, breadcrumb, metadata) - else: - populate_schema_leaf(child_schema, child_field_info, id_num, breadcrumb, metadata) - - schema.properties[child_field_info.get('name')] = child_schema - - # remove 'properties' and 'child_field_name' from breadcrumb - breadcrumb.pop() - breadcrumb.pop() - -def discover_catalog(conn): - """Returns a Catalog describing the table structure of the target application""" - entries = [] - - for table in conn.get_tables(): - # the stream is in format app_name__table_name with all non alphanumeric - # and `_` characters replaced with an `_`. - stream = re.sub( - '[^0-9a-z_]+', - '_', - "{}__{}".format(table.get('app_name').lower(), table.get('name')).lower() - ) - - # by default we will ALWAYS have 'rid' as an automatically included primary key field. - schema = Schema( - type=['null','object'], - additionalProperties=False - ) - schema.properties = { - 'rid': Schema( - type=['string'] - ) - } - metadata = [ - { - 'metadata': { - 'inclusion': 'automatic' - }, - 'breadcrumb': ['properties','rid'] - }, - { - 'metadata': { - 'tap-quickbase.app_id': conn.appid - }, - 'breadcrumb': [] - } - ] - - # build hierarchial schema - id_to_fields = conn.get_fields(table.get('id')) - for id_num,field_info in id_to_fields.items(): - - breadcrumb = ['properties', field_info.get('name')] - - # if this field has a parent, it will be added by the parent - if field_info.get('parent_field_id'): - continue - - # if this field has children, add them - if field_info.get('composite_fields'): - node_schema = Schema() - populate_schema_node(node_schema, field_info, id_to_fields, breadcrumb, metadata) - schema.properties[field_info.get('name')] = node_schema - - #otherwise, add field - else: - leaf_schema = Schema() - populate_schema_leaf(leaf_schema, field_info, id_num, breadcrumb, metadata) - schema.properties[field_info.get('name')] = leaf_schema - - entry = CatalogEntry( - table=table.get('id'), - stream_alias=table.get('name'), - stream=stream, - tap_stream_id=stream, - key_properties=['rid'], - replication_key=REPLICATION_KEY, - replication_method = 'INCREMENTAL', - schema=schema, - metadata=metadata - ) - - entries.append(entry) - - return Catalog(entries) - - -def do_discover(conn): - discover_catalog(conn).dump() - -@singer.utils.ratelimit(2, 1) -def request(conn, table_id, query_params): - headers = {} - if 'user_agent' in CONFIG: - headers['User-Agent'] = CONFIG['user_agent'] - return conn.query(table_id, query_params, headers=headers) - -def build_field_lists(schema, metadata, breadcrumb): - """ - Use the schema to build a field list for the query and a translation table for the returned data - :return: - """ - field_list = [] - ids_to_breadcrumbs = {} - for name, sub_schema in schema.properties.items(): - breadcrumb.extend(['properties', name]) - field_id = singer_metadata.get(metadata, tuple(breadcrumb), 'tap-quickbase.id') - selected = singer_metadata.get(metadata, tuple(breadcrumb), 'selected') - inclusion = singer_metadata.get(metadata, tuple(breadcrumb), 'inclusion') - if field_id and (selected or inclusion == 'automatic'): - field_list.append(field_id) - ids_to_breadcrumbs[field_id] = list(breadcrumb) - elif sub_schema.properties and (selected or inclusion == 'automatic'): - for name, _child_schema in sub_schema.properties.items(): - breadcrumb.extend(['properties', name]) # Select children of objects - metadata = singer_metadata.write(metadata, tuple(breadcrumb), 'selected', True) - breadcrumb.pop() - breadcrumb.pop() - sub_field_list, sub_ids_to_breadcrumbs = build_field_lists(sub_schema, metadata, breadcrumb) - field_list.extend(sub_field_list) - ids_to_breadcrumbs.update(sub_ids_to_breadcrumbs) +REQUIRED_CONFIG_KEYS = ['access_token', 'start_date'] - breadcrumb.pop() - breadcrumb.pop() - - return (field_list, ids_to_breadcrumbs) - -def transform_bools(record, schema): - for field_prop, sub_schema in schema['properties'].items(): - field_type = sub_schema.get('type') - if not field_type: - continue - if not record.get(field_prop, None): - continue - if 'boolean' in field_type: - record[field_prop] = 'false' if record.get(field_prop)=='0' else 'true' - if 'object' in field_type: - record[field_prop] = transform_bools(record[field_prop], sub_schema) - return record - -def transform_datetimes(record, schema, stream_name): - for field_prop, sub_schema in schema['properties'].items(): - field_type = sub_schema.get('type') - field_format = sub_schema.get('format') - if not field_format: - continue - if not record.get(field_prop, None): - continue - if 'date-time' == field_format: - try: - record[field_prop] = format_epoch_milliseconds(record[field_prop]) - except ValueError as ex: - LOGGER.error("Record containing out of range timestamp: %s", record) - raise TimestampOutOfRangeException(('Error syncing stream "{}" - ' + - 'Found out of range timestamp: {} for field: "{}"') - .format(stream_name, - time.gmtime(int(record[field_prop]) / 1000.0)[:6], - field_prop)) from ex - if 'object' in field_type: - record[field_prop] = transform_datetimes(record[field_prop], sub_schema, stream_name) - return record - - -def build_record(row, ids_to_breadcrumbs): - record = {} - for field_id, field_value in row.items(): - if field_id=='rid': - record['rid'] = field_value - else: - breadcrumb = ids_to_breadcrumbs[field_id] - insert_value_at_breadcrumb(breadcrumb, field_value, record) - return record - -def insert_value_at_breadcrumb(breadcrumb, value, record): - if len(breadcrumb) == 2: - record[breadcrumb[1]] = value - else: - if record.get(breadcrumb[1]): - insert_value_at_breadcrumb(breadcrumb[2:], value, record[breadcrumb[1]]) - else: - record[breadcrumb[1]] = {} - insert_value_at_breadcrumb(breadcrumb[2:], value, record[breadcrumb[1]]) - -def gen_request(conn, stream, params=None): +def do_discover(): """ - Fetch the data we need from Quickbase. Uses a modified version of the Quickbase API SDK. - This will page through data num_records at a time and transform and then yield each result. + Discover and emit the catalog to stdout """ - params = params or {} - table_id = stream.table - properties = stream.schema.properties - metadata = singer_metadata.to_map(stream.metadata) - - if not properties: - return - - field_list, ids_to_breadcrumbs = build_field_lists(stream.schema, metadata, []) - if not field_list: - return - - # we always want the Date Modified field - if '2' not in field_list: - LOGGER.warning("Date Modified field not included for %s. Skipping.", stream.tap_stream_id) - - query_params = { - 'clist': '.'.join(field_list), - 'slist': '2', # 2 is always the modified date column we are keying off of - 'options': "num-{}".format(NUM_RECORDS), - } + LOGGER.info("Starting discover") + catalog = discover() + json.dump(catalog.to_dict(), sys.stdout, indent=2) + LOGGER.info("Finished discover") - start = None - if 'start' in params: - start = params['start'] - while True: - if start: - start_millis = str(convert_to_epoch_milliseconds(start)) - query_params['query'] = "{2.AF.%s}" % start_millis - - results = request(conn, table_id, query_params) - for res in results: - start = format_epoch_milliseconds(res['2']) # update start to this record's updatedate for next page of query - # translate column ids to column names - new_res = build_record(res, ids_to_breadcrumbs) - yield new_res - - # if we got less than the max number of records then we're at the end and can break - if len(results) < NUM_RECORDS: - break - - -def get_start(table_id, state): +@singer.utils.handle_top_exception(LOGGER) +def main(): """ - default to the CONFIG's start_date if the table does not have an entry in STATE. + Run the tap """ - start = singer.get_bookmark(state, table_id, REPLICATION_KEY) - if not start: - start = CONFIG.get( - 'start_date', - datetime.datetime.utcfromtimestamp(0).strftime(DATETIME_FMT) - ) - singer.write_bookmark(state, table_id, REPLICATION_KEY, start) - return start - -def sync_table(conn, catalog_entry, state): - metadata = singer_metadata.to_map(catalog_entry.metadata) - LOGGER.info("Beginning sync for %s.", catalog_entry.stream) - - entity = catalog_entry.tap_stream_id - if not entity: - return - - start = get_start(entity, state) - formatted_start = dateutil.parser.parse(start).strftime(DATETIME_FMT) - params = { - 'start': formatted_start, - } - - with metrics.record_counter(None) as counter: - counter.tags['app'] = singer_metadata.get(metadata, tuple(), "tap-quickbase.app_id") - counter.tags['table'] = catalog_entry.table - - extraction_time = singer_utils.now() - for rows_saved, row in enumerate(gen_request(conn, catalog_entry, params)): - counter.increment() - schema_dict = catalog_entry.schema.to_dict() - rec = transform_bools(row, schema_dict) - rec = transform_datetimes(rec, schema_dict, catalog_entry.stream) - rec = singer.transform(rec, schema_dict, singer.UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) - - yield singer.RecordMessage( - stream=catalog_entry.stream, - record=rec, - time_extracted=extraction_time - ) - - state = singer.write_bookmark( - state, - catalog_entry.tap_stream_id, - REPLICATION_KEY, - rec[REPLICATION_KEY] - ) - if (rows_saved+1) % 1000 == 0: - yield singer.StateMessage(value=copy.deepcopy(state)) - - -def generate_messages(conn, catalog, state): - for catalog_entry in catalog.streams: - - if not catalog_entry.is_selected(): - continue - - # Emit a SCHEMA message before we sync any records - yield singer.SchemaMessage( - stream=catalog_entry.stream, - schema=catalog_entry.schema.to_dict(), - key_properties=catalog_entry.key_properties, - bookmark_properties=[REPLICATION_KEY] - ) - - metadata = singer_metadata.to_map(catalog_entry.metadata) - # Emit a RECORD message for each record in the result set - with metrics.job_timer('sync_table') as timer: - timer.tags['app'] = singer_metadata.get(metadata, tuple(), "tap-quickbase.app_id") - timer.tags['table'] = catalog_entry.table - yield from sync_table(conn, catalog_entry, state) - - # Emit a state message - yield singer.StateMessage(value=copy.deepcopy(state)) - - -def do_sync(conn, catalog, state): - LOGGER.info("Starting QuickBase sync") - - for message in generate_messages(conn, catalog, state): - singer.write_message(message) - -def correct_base_url(url): - result = url - if url.startswith('http:'): - LOGGER.warning("Replacing 'http' with 'https' for 'qb_url' configuration option. Quick Base requires https connections.") - result = 'https:' + url[5:] - - if not url.endswith('/'): - result = result + '/' - - return result - -def main_impl(): - args = singer.utils.parse_args(REQUIRED_CONFIG_KEYS) - CONFIG.update(args.config) - base_url = correct_base_url(CONFIG['qb_url']) - conn = qbconn.QBConn( - base_url, - CONFIG['qb_appid'], - user_token=CONFIG['qb_user_token'], - logger=LOGGER - ) - - if args.discover: - do_discover(conn) - - elif args.properties: - catalog = Catalog.from_dict(args.properties) - state = build_state(args.state, catalog) - do_sync(conn, catalog, state) + parsed_args = singer.utils.parse_args(REQUIRED_CONFIG_KEYS) + state = {} + if parsed_args.state: + state = parsed_args.state + with Client(parsed_args.config) as client: + if parsed_args.discover: + do_discover() + elif parsed_args.catalog: + sync( + client=client, + config=parsed_args.config, + catalog=parsed_args.catalog, + state=state) -def main(): - try: - main_impl() - except Exception as exc: - LOGGER.critical(exc) - raise exc - -if __name__ == '__main__': +if __name__ == "__main__": main() + diff --git a/tap_quickbase/client.py b/tap_quickbase/client.py new file mode 100644 index 0000000..9f78b86 --- /dev/null +++ b/tap_quickbase/client.py @@ -0,0 +1,122 @@ +from typing import Any, Dict, Mapping, Optional, Tuple + +import backoff +import requests +from requests import session +from requests.exceptions import Timeout, ConnectionError, ChunkedEncodingError +from singer import get_logger, metrics + +from tap_quickbase.exceptions import ERROR_CODE_EXCEPTION_MAPPING, QuickbaseError, QuickbaseBackoffError + +LOGGER = get_logger() +REQUEST_TIMEOUT = 300 + +def raise_for_error(response: requests.Response) -> None: + """Raises the associated response exception. Takes in a response object, + checks the status code, and throws the associated exception based on the + status code. + + :param resp: requests.Response object + """ + try: + response_json = response.json() + except Exception: + response_json = {} + if response.status_code not in [200, 201, 204]: + if response_json.get("error"): + message = f"HTTP-error-code: {response.status_code}, Error: {response_json.get('error')}" + else: + error_message = ERROR_CODE_EXCEPTION_MAPPING.get( + response.status_code, {} + ).get("message", "Unknown Error") + message = f"HTTP-error-code: {response.status_code}, Error: {response_json.get('message', error_message)}" + exc = ERROR_CODE_EXCEPTION_MAPPING.get(response.status_code, {}).get( + "raise_exception", QuickbaseError + ) + raise exc(message, response) from None + +class Client: + """ + A Wrapper class. + ~~~ + Performs: + - Authentication + - Response parsing + - HTTP Error handling and retry + """ + + def __init__(self, config: Mapping[str, Any]) -> None: + self.config = config + self._session = session() + self.base_url = "https://api.quickbase.com/" + config_request_timeout = config.get("request_timeout") + self.request_timeout = float(config_request_timeout) if config_request_timeout else REQUEST_TIMEOUT + + def __enter__(self): + self.check_api_credentials() + return self + + def __exit__(self, exception_type, exception_value, traceback): + self._session.close() + + def check_api_credentials(self) -> None: + pass + + def authenticate(self, headers: Dict, params: Dict) -> Tuple[Dict, Dict]: + """Authenticates the request with the token""" + headers["Authorization"] = self.config["access_token"] + return headers, params + + def make_request( + self, + method: str, + endpoint: str, + params: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, Any]] = None, + body: Optional[Dict[str, Any]] = None, + path: Optional[str] = None + ) -> Any: + """ + Sends an HTTP request to the specified API endpoint. + """ + params = params or {} + headers = headers or {} + body = body or {} + endpoint = endpoint or f"{self.base_url}/{path}" + headers, params = self.authenticate(headers, params) + return self.__make_request( + method, endpoint, + headers=headers, + params=params, + data=body, + timeout=self.request_timeout + ) + + @backoff.on_exception( + wait_gen=backoff.expo, + exception=( + ConnectionResetError, + ConnectionError, + ChunkedEncodingError, + Timeout, + QuickbaseBackoffError + ), + max_tries=5, + factor=2, + ) + def __make_request( + self, method: str, endpoint: str, **kwargs + ) -> Optional[Mapping[Any, Any]]: + """Performs HTTP Operations.""" + method = method.upper() + with metrics.http_request_timer(endpoint): + if method in ("GET", "POST"): + if method == "GET": + kwargs.pop("data", None) + response = self._session.request(method, endpoint, **kwargs) + raise_for_error(response) + else: + raise ValueError(f"Unsupported method: {method}") + + return response.json() + diff --git a/tap_quickbase/discover.py b/tap_quickbase/discover.py new file mode 100644 index 0000000..3276639 --- /dev/null +++ b/tap_quickbase/discover.py @@ -0,0 +1,39 @@ +import singer +from singer import metadata +from singer.catalog import Catalog, CatalogEntry, Schema +from tap_quickbase.schema import get_schemas + +LOGGER = singer.get_logger() + + +def discover() -> Catalog: + """ + Run the discovery mode, prepare the catalog file and return the catalog. + """ + schemas, field_metadata = get_schemas() + catalog = Catalog([]) + + for stream_name, schema_dict in schemas.items(): + try: + schema = Schema.from_dict(schema_dict) + mdata = field_metadata[stream_name] + except Exception as err: + LOGGER.error(err) + LOGGER.error("stream_name: {}".format(stream_name)) + LOGGER.error("type schema_dict: {}".format(type(schema_dict))) + raise err + + key_properties = metadata.to_map(mdata).get((), {}).get("table-key-properties") + + catalog.streams.append( + CatalogEntry( + stream=stream_name, + tap_stream_id=stream_name, + key_properties=key_properties, + schema=schema, + metadata=mdata, + ) + ) + + return catalog + diff --git a/tap_quickbase/exceptions.py b/tap_quickbase/exceptions.py new file mode 100644 index 0000000..f1e7480 --- /dev/null +++ b/tap_quickbase/exceptions.py @@ -0,0 +1,106 @@ +class QuickbaseError(Exception): + """class representing Generic Http error.""" + + def __init__(self, message=None, response=None): + super().__init__(message) + self.message = message + self.response = response + + +class QuickbaseBackoffError(QuickbaseError): + """class representing backoff error handling.""" + pass + +class QuickbaseBadRequestError(QuickbaseError): + """class representing 400 status code.""" + pass + +class QuickbaseUnauthorizedError(QuickbaseError): + """class representing 401 status code.""" + pass + + +class QuickbaseForbiddenError(QuickbaseError): + """class representing 403 status code.""" + pass + +class QuickbaseNotFoundError(QuickbaseError): + """class representing 404 status code.""" + pass + +class QuickbaseConflictError(QuickbaseError): + """class representing 409 status code.""" + pass + +class QuickbaseUnprocessableEntityError(QuickbaseBackoffError): + """class representing 422 status code.""" + pass + +class QuickbaseRateLimitError(QuickbaseBackoffError): + """class representing 429 status code.""" + pass + +class QuickbaseInternalServerError(QuickbaseBackoffError): + """class representing 500 status code.""" + pass + +class QuickbaseNotImplementedError(QuickbaseBackoffError): + """class representing 501 status code.""" + pass + +class QuickbaseBadGatewayError(QuickbaseBackoffError): + """class representing 502 status code.""" + pass + +class QuickbaseServiceUnavailableError(QuickbaseBackoffError): + """class representing 503 status code.""" + pass + +ERROR_CODE_EXCEPTION_MAPPING = { + 400: { + "raise_exception": QuickbaseBadRequestError, + "message": "A validation exception has occurred." + }, + 401: { + "raise_exception": QuickbaseUnauthorizedError, + "message": "The access token provided is expired, revoked, malformed or invalid for other reasons." + }, + 403: { + "raise_exception": QuickbaseForbiddenError, + "message": "You are missing the following required scopes: read" + }, + 404: { + "raise_exception": QuickbaseNotFoundError, + "message": "The resource you have specified cannot be found." + }, + 409: { + "raise_exception": QuickbaseConflictError, + "message": "The API request cannot be completed because the requested operation would conflict with an existing item." + }, + 422: { + "raise_exception": QuickbaseUnprocessableEntityError, + "message": "The request content itself is not processable by the server." + }, + 429: { + "raise_exception": QuickbaseRateLimitError, + "message": "The API rate limit for your organisation/application pairing has been exceeded." + }, + 500: { + "raise_exception": QuickbaseInternalServerError, + "message": "The server encountered an unexpected condition which prevented" \ + " it from fulfilling the request." + }, + 501: { + "raise_exception": QuickbaseNotImplementedError, + "message": "The server does not support the functionality required to fulfill the request." + }, + 502: { + "raise_exception": QuickbaseBadGatewayError, + "message": "Server received an invalid response." + }, + 503: { + "raise_exception": QuickbaseServiceUnavailableError, + "message": "API service is currently unavailable." + } +} + diff --git a/tap_quickbase/qbconn.py b/tap_quickbase/qbconn.py deleted file mode 100644 index 565b1cb..0000000 --- a/tap_quickbase/qbconn.py +++ /dev/null @@ -1,155 +0,0 @@ -#!/usr/bin/python3 -from xml.etree import ElementTree -import logging -import re -import requests - -# This regex is used to transform the column name in `get_fields` -SEPARATORS_TRANSLATION = re.compile(r"[-\s]") -COLUMN_NAME_TRANSLATION = re.compile(r"[^a-zA-Z0-9_]") -UNDERSCORE_CONSOLIDATION = re.compile(r"_+") - -def sanitize_field_name(name): - result = name.lower() - result = SEPARATORS_TRANSLATION.sub('_', result) # Replace separator characters with underscores - result = COLUMN_NAME_TRANSLATION.sub('', result) # Remove all other non-alphanumeric characters - return UNDERSCORE_CONSOLIDATION.sub('_', result) # Consolidate consecutive underscores - -class QBConn: - """ - QBConn was borrowed heavily from pybase - https://github.com/QuickbaseAdmirer/Quickbase-Python-SDK - """ - def __init__(self, url, appid, user_token=None, realm="", logger=None): - - self.url = url - self.user_token = user_token - self.appid = appid - self.realm = realm # This allows one QuickBase realm to proxy for another - # Set after every API call. - # A non-zero value indicates an error. A negative value indicates an error with this lib - self.error = 0 - self.error_code = None - self.logger = logger or logging.getLogger(__name__) - - def request(self, params, url_ext, headers=None): - """ - Adds the appropriate fields to the request and sends it to QB - Takes a dict of parameter:value pairs and the url extension (main or your table ID, mostly) - """ - headers = headers or {} - url = self.url - url += url_ext - - # log the API request before adding sensitive info to the request - self.logger.info("API GET {}, {}".format(url, params)) - params['usertoken'] = self.user_token - params['realmhost'] = self.realm - - resp = requests.get(url, params, headers=headers) - - if re.match(r'^<\?xml version=', resp.content.decode("utf-8")) is None: - print("No useful data received") - self.error = -1 - return None - - tree = ElementTree.fromstring(resp.content) - self.error_code = int(tree.find('errcode').text) - if self.error_code != 0: - error = tree.find('errdetail') - error = tree.find('errtext') if error is None else error # XML nodes are falsy, so must explicitly check for None - self.error = error.text if error is not None else "No error description provided by Quick Base." - raise Exception("Error response from Quick Base (Code {}): {}".format(self.error_code, self.error)) - return tree - - def query(self, table_id, query, headers=None): - """ - Executes a query on tableID - Returns a list of dicts containing fieldid:value pairs. - record ID will always be specified by the "rid" key - """ - headers = headers or {} - params = dict(query) - params['act'] = "API_DoQuery" - params['includeRids'] = '1' - params['fmt'] = "structured" - records = self.request(params, table_id, headers=headers).find('table').find('records') - data = [] - for record in records: - temp = {} - temp['rid'] = record.attrib['rid'] - for field in record: - if field.tag == "f": - temp[field.attrib['id']] = field.text - data.append(temp) - return data - - def get_tables(self): - if not self.appid: - return {} - - params = {'act': 'API_GetSchema'} - schema = self.request(params, self.appid) - remote_tables = schema.find('table').find('chdbids') - app_name = schema.find('table').find('name').text - tables = [] - if remote_tables is None: - raise Exception("Error discovering streams: The specified application contains no tables.") - for remote_table in remote_tables: - tables.append({ - 'id': remote_table.text, - 'name': remote_table.attrib['name'][6:], - 'app_name': app_name, - 'app_id': self.appid - }) - return tables - - def get_fields(self, table_id): - params = {'act': 'API_GetSchema'} - schema = self.request(params, table_id) - remote_fields = schema.find('table').find('fields') - - id_to_field = {} - field_to_ids = {} - for remote_field in remote_fields: - name = sanitize_field_name(remote_field.find('label').text.lower().replace('"', "'")) - id_num = remote_field.attrib['id'] - if field_to_ids.get(name): - field_to_ids[name].append(id_num) - else: - field_to_ids[name] = [id_num] - - # pull out composite field info (child fields) - composite_fields = [] - composite_fields_element = remote_field.find('compositeFields') - if composite_fields_element: - for composite_field_element in composite_fields_element: - composite_fields.append(composite_field_element.attrib['id']) - - # pull out parent field info (useful to know if field is a child) - parent_field_id_element = remote_field.find('parentFieldID') - if parent_field_id_element is not None: - parent_field_id = parent_field_id_element.text - else: - parent_field_id = "" - - field_info = { - 'id': id_num, - 'name': name, - 'type': remote_field.attrib['field_type'], - 'base_type': remote_field.attrib['base_type'], - 'parent_field_id': parent_field_id, - 'composite_fields': composite_fields - } - - id_to_field[id_num] = field_info - - # handle duplicate field names by appending id num to end of name - for _field_name, field_id_list in field_to_ids.items(): - field_id_list = [i for i in field_id_list if not id_to_field[i].get('parent_field_id')] - if len(field_id_list) > 1: - for dup_id in field_id_list: - dup_field_info = id_to_field[dup_id] - dup_field_info['name'] = dup_field_info['name'] + '_' + dup_id - - return id_to_field diff --git a/tap_quickbase/schema.py b/tap_quickbase/schema.py new file mode 100644 index 0000000..64d085a --- /dev/null +++ b/tap_quickbase/schema.py @@ -0,0 +1,80 @@ +import os +import json +import singer +from typing import Dict, Tuple +from singer import metadata +from tap_quickbase.streams import STREAMS + +LOGGER = singer.get_logger() + + +def get_abs_path(path: str) -> str: + """ + Get the absolute path for the schema files. + """ + return os.path.join(os.path.dirname(os.path.realpath(__file__)), path) + + +def load_schema_references() -> Dict: + """ + Load the schema files from the schema folder and return the schema references. + """ + shared_schema_path = get_abs_path("schemas/shared") + + shared_file_names = [] + if os.path.exists(shared_schema_path): + shared_file_names = [ + f + for f in os.listdir(shared_schema_path) + if os.path.isfile(os.path.join(shared_schema_path, f)) + ] + + refs = {} + for shared_schema_file in shared_file_names: + with open(os.path.join(shared_schema_path, shared_schema_file)) as data_file: + refs["shared/" + shared_schema_file] = json.load(data_file) + + return refs + + +def get_schemas() -> Tuple[Dict, Dict]: + """ + Load the schema references, prepare metadata for each streams and return schema and metadata for the catalog. + """ + schemas = {} + field_metadata = {} + + refs = load_schema_references() + for stream_name, stream_obj in STREAMS.items(): + schema_path = get_abs_path("schemas/{}.json".format(stream_name)) + with open(schema_path) as file: + schema = json.load(file) + + schemas[stream_name] = schema + schema = singer.resolve_schema_references(schema, refs) + + mdata = metadata.new() + mdata = metadata.get_standard_metadata( + schema=schema, + key_properties=getattr(stream_obj, "key_properties"), + valid_replication_keys=(getattr(stream_obj, "replication_keys") or []), + replication_method=getattr(stream_obj, "replication_method"), + ) + mdata = metadata.to_map(mdata) + + automatic_keys = getattr(stream_obj, "replication_keys") or [] + for field_name in schema.get("properties", {}).keys(): + if field_name in automatic_keys: + mdata = metadata.write( + mdata, ("properties", field_name), "inclusion", "automatic" + ) + + parent_tap_stream_id = getattr(stream_obj, "parent", None) + if parent_tap_stream_id: + mdata = metadata.write(mdata, (), 'parent-tap-stream-id', parent_tap_stream_id) + + mdata = metadata.to_list(mdata) + field_metadata[stream_name] = mdata + + return schemas, field_metadata + diff --git a/tap_quickbase/schemas/app_tables.json b/tap_quickbase/schemas/app_tables.json new file mode 100644 index 0000000..cd2e5c9 --- /dev/null +++ b/tap_quickbase/schemas/app_tables.json @@ -0,0 +1,103 @@ +{ + "type": "object", + "properties": { + "name": { + "type": [ + "null", + "string" + ] + }, + "created": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "updated": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "alias": { + "type": [ + "null", + "string" + ] + }, + "description": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "string" + ] + }, + "nextRecordId": { + "type": [ + "null", + "integer" + ] + }, + "nextFieldId": { + "type": [ + "null", + "integer" + ] + }, + "defaultSortFieldId": { + "type": [ + "null", + "integer" + ] + }, + "defaultSortOrder": { + "type": [ + "null", + "string" + ] + }, + "keyFieldId": { + "type": [ + "null", + "integer" + ] + }, + "singleRecordName": { + "type": [ + "null", + "string" + ] + }, + "pluralRecordName": { + "type": [ + "null", + "string" + ] + }, + "sizeLimit": { + "type": [ + "null", + "string" + ] + }, + "spaceUsed": { + "type": [ + "null", + "string" + ] + }, + "spaceRemaining": { + "type": [ + "null", + "string" + ] + } + } +} diff --git a/tap_quickbase/schemas/apps.json b/tap_quickbase/schemas/apps.json new file mode 100644 index 0000000..4f0eb31 --- /dev/null +++ b/tap_quickbase/schemas/apps.json @@ -0,0 +1,125 @@ +{ + "type": "object", + "properties": { + "ancestorId": { + "type": [ + "null", + "string" + ] + }, + "created": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "dateFormat": { + "type": [ + "null", + "string" + ] + }, + "description": { + "type": [ + "null", + "string" + ] + }, + "hasEveryoneOnTheInternet": { + "type": [ + "null", + "boolean" + ] + }, + "memoryInfo": { + "type": "object", + "properties": { + "estMemory": { + "type": [ + "null", + "integer" + ] + }, + "estMemoryInclDependentApps": { + "type": [ + "null", + "integer" + ] + } + } + }, + "id": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "securityProperties": { + "type": "object", + "properties": { + "allowClone": { + "type": [ + "null", + "boolean" + ] + }, + "allowExport": { + "type": [ + "null", + "boolean" + ] + }, + "enableAppTokens": { + "type": [ + "null", + "boolean" + ] + }, + "hideFromPublic": { + "type": [ + "null", + "boolean" + ] + }, + "mustBeRealmApproved": { + "type": [ + "null", + "boolean" + ] + }, + "useIPFilter": { + "type": [ + "null", + "boolean" + ] + } + } + }, + "timeZone": { + "type": [ + "null", + "string" + ] + }, + "updated": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "dataClassification": { + "type": [ + "null", + "string" + ] + } + } +} diff --git a/tap_quickbase/schemas/events.json b/tap_quickbase/schemas/events.json new file mode 100644 index 0000000..ccedd84 --- /dev/null +++ b/tap_quickbase/schemas/events.json @@ -0,0 +1,58 @@ +{ + "type": "object", + "properties": { + "type": { + "type": [ + "null", + "string" + ] + }, + "owner": { + "type": "object", + "properties": { + "email": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "userName": { + "type": [ + "null", + "string" + ] + } + } + }, + "isActive": { + "type": [ + "null", + "boolean" + ] + }, + "tableId": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + } + } +} diff --git a/tap_quickbase/schemas/fields.json b/tap_quickbase/schemas/fields.json new file mode 100644 index 0000000..d9d7369 --- /dev/null +++ b/tap_quickbase/schemas/fields.json @@ -0,0 +1,189 @@ +{ + "type": "object", + "properties": { + "id": { + "type": [ + "null", + "integer" + ] + }, + "label": { + "type": [ + "null", + "string" + ] + }, + "fieldType": { + "type": [ + "null", + "string" + ] + }, + "mode": { + "type": [ + "null", + "string" + ] + }, + "noWrap": { + "type": [ + "null", + "boolean" + ] + }, + "bold": { + "type": [ + "null", + "boolean" + ] + }, + "required": { + "type": [ + "null", + "boolean" + ] + }, + "appearsByDefault": { + "type": [ + "null", + "boolean" + ] + }, + "findEnabled": { + "type": [ + "null", + "boolean" + ] + }, + "unique": { + "type": [ + "null", + "boolean" + ] + }, + "doesDataCopy": { + "type": [ + "null", + "boolean" + ] + }, + "fieldHelp": { + "type": [ + "null", + "string" + ] + }, + "audited": { + "type": [ + "null", + "boolean" + ] + }, + "properties": { + "type": "object", + "properties": { + "primaryKey": { + "type": [ + "null", + "boolean" + ] + }, + "foreignKey": { + "type": [ + "null", + "boolean" + ] + }, + "numLines": { + "type": [ + "null", + "integer" + ] + }, + "maxLength": { + "type": [ + "null", + "integer" + ] + }, + "appendOnly": { + "type": [ + "null", + "boolean" + ] + }, + "allowHTML": { + "type": [ + "null", + "boolean" + ] + }, + "allowMentions": { + "type": [ + "null", + "boolean" + ] + }, + "sortAsGiven": { + "type": [ + "null", + "boolean" + ] + }, + "carryChoices": { + "type": [ + "null", + "boolean" + ] + }, + "allowNewChoices": { + "type": [ + "null", + "boolean" + ] + }, + "formula": { + "type": [ + "null", + "string" + ] + }, + "defaultValue": { + "type": [ + "null", + "string" + ] + } + } + }, + "permissions": { + "type": [ + "null", + "array" + ], + "items": { + "type": "object", + "properties": { + "permissionType": { + "type": [ + "null", + "string" + ] + }, + "role": { + "type": [ + "null", + "string" + ] + }, + "roleId": { + "type": [ + "null", + "integer" + ] + } + } + } + } + } +} diff --git a/tap_quickbase/schemas/fields_usage.json b/tap_quickbase/schemas/fields_usage.json new file mode 100644 index 0000000..e8561f8 --- /dev/null +++ b/tap_quickbase/schemas/fields_usage.json @@ -0,0 +1,192 @@ +{ + "type": "object", + "properties": { + "actions": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "appHomePages": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "dashboards": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "defaultReports": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "exactForms": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "fields": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "forms": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "notifications": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "personalReports": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "pipelines": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "relationships": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "reminders": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "reports": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "roles": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "tableImports": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "tableRules": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "webhooks": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + } + } +} diff --git a/tap_quickbase/schemas/get_field_usage.json b/tap_quickbase/schemas/get_field_usage.json new file mode 100644 index 0000000..e8561f8 --- /dev/null +++ b/tap_quickbase/schemas/get_field_usage.json @@ -0,0 +1,192 @@ +{ + "type": "object", + "properties": { + "actions": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "appHomePages": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "dashboards": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "defaultReports": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "exactForms": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "fields": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "forms": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "notifications": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "personalReports": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "pipelines": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "relationships": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "reminders": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "reports": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "roles": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "tableImports": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "tableRules": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + }, + "webhooks": { + "type": "object", + "properties": { + "count": { + "type": [ + "null", + "integer" + ] + } + } + } + } +} diff --git a/tap_quickbase/schemas/get_fields.json b/tap_quickbase/schemas/get_fields.json new file mode 100644 index 0000000..d9d7369 --- /dev/null +++ b/tap_quickbase/schemas/get_fields.json @@ -0,0 +1,189 @@ +{ + "type": "object", + "properties": { + "id": { + "type": [ + "null", + "integer" + ] + }, + "label": { + "type": [ + "null", + "string" + ] + }, + "fieldType": { + "type": [ + "null", + "string" + ] + }, + "mode": { + "type": [ + "null", + "string" + ] + }, + "noWrap": { + "type": [ + "null", + "boolean" + ] + }, + "bold": { + "type": [ + "null", + "boolean" + ] + }, + "required": { + "type": [ + "null", + "boolean" + ] + }, + "appearsByDefault": { + "type": [ + "null", + "boolean" + ] + }, + "findEnabled": { + "type": [ + "null", + "boolean" + ] + }, + "unique": { + "type": [ + "null", + "boolean" + ] + }, + "doesDataCopy": { + "type": [ + "null", + "boolean" + ] + }, + "fieldHelp": { + "type": [ + "null", + "string" + ] + }, + "audited": { + "type": [ + "null", + "boolean" + ] + }, + "properties": { + "type": "object", + "properties": { + "primaryKey": { + "type": [ + "null", + "boolean" + ] + }, + "foreignKey": { + "type": [ + "null", + "boolean" + ] + }, + "numLines": { + "type": [ + "null", + "integer" + ] + }, + "maxLength": { + "type": [ + "null", + "integer" + ] + }, + "appendOnly": { + "type": [ + "null", + "boolean" + ] + }, + "allowHTML": { + "type": [ + "null", + "boolean" + ] + }, + "allowMentions": { + "type": [ + "null", + "boolean" + ] + }, + "sortAsGiven": { + "type": [ + "null", + "boolean" + ] + }, + "carryChoices": { + "type": [ + "null", + "boolean" + ] + }, + "allowNewChoices": { + "type": [ + "null", + "boolean" + ] + }, + "formula": { + "type": [ + "null", + "string" + ] + }, + "defaultValue": { + "type": [ + "null", + "string" + ] + } + } + }, + "permissions": { + "type": [ + "null", + "array" + ], + "items": { + "type": "object", + "properties": { + "permissionType": { + "type": [ + "null", + "string" + ] + }, + "role": { + "type": [ + "null", + "string" + ] + }, + "roleId": { + "type": [ + "null", + "integer" + ] + } + } + } + } + } +} diff --git a/tap_quickbase/schemas/get_reports.json b/tap_quickbase/schemas/get_reports.json new file mode 100644 index 0000000..89591a5 --- /dev/null +++ b/tap_quickbase/schemas/get_reports.json @@ -0,0 +1,192 @@ +{ + "type": "object", + "properties": { + "id": { + "type": [ + "null", + "string" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "type": { + "type": [ + "null", + "string" + ] + }, + "description": { + "type": [ + "null", + "string" + ] + }, + "query": { + "type": "object", + "properties": { + "tableId": { + "type": [ + "null", + "string" + ] + }, + "filter": { + "type": [ + "null", + "string" + ] + }, + "formulaFields": { + "type": [ + "null", + "array" + ], + "items": { + "type": "object", + "properties": { + "id": { + "type": [ + "null", + "integer" + ] + }, + "label": { + "type": [ + "null", + "string" + ] + }, + "fieldType": { + "type": [ + "null", + "string" + ] + }, + "formula": { + "type": [ + "null", + "string" + ] + }, + "decimalPrecision": { + "type": [ + "null", + "integer" + ] + } + } + } + }, + "fields": { + "type": [ + "null", + "array" + ], + "items": { + "type": [ + "null", + "integer" + ] + } + }, + "sortBy": { + "type": [ + "null", + "array" + ], + "items": { + "type": "object", + "properties": { + "fieldId": { + "type": [ + "null", + "integer" + ] + }, + "order": { + "type": [ + "null", + "string" + ] + } + } + } + }, + "groupBy": { + "type": [ + "null", + "array" + ], + "items": { + "type": "object", + "properties": { + "fieldId": { + "type": [ + "null", + "integer" + ] + }, + "grouping": { + "type": [ + "null", + "string" + ] + } + } + } + } + } + }, + "properties": { + "type": "object", + "properties": { + "displayOnlyNewOrChangedRecords": { + "type": [ + "null", + "boolean" + ] + }, + "columnProperties": { + "type": [ + "null", + "array" + ], + "items": { + "type": "object", + "properties": { + "fieldId": { + "type": [ + "null", + "integer" + ] + }, + "labelOverride": { + "type": [ + "null", + "string" + ] + } + } + } + } + } + }, + "usedLast": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "usedCount": { + "type": [ + "null", + "integer" + ] + } + } +} diff --git a/tap_quickbase/schemas/roles.json b/tap_quickbase/schemas/roles.json new file mode 100644 index 0000000..4846345 --- /dev/null +++ b/tap_quickbase/schemas/roles.json @@ -0,0 +1,34 @@ +{ + "type": "object", + "properties": { + "access": { + "type": "object", + "properties": { + "id": { + "type": [ + "null", + "integer" + ] + }, + "type": { + "type": [ + "null", + "string" + ] + } + } + }, + "id": { + "type": [ + "null", + "integer" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + } + } +} diff --git a/tap_quickbase/schemas/table_relationships.json b/tap_quickbase/schemas/table_relationships.json new file mode 100644 index 0000000..56854ff --- /dev/null +++ b/tap_quickbase/schemas/table_relationships.json @@ -0,0 +1,110 @@ +{ + "type": "object", + "properties": { + "childTableId": { + "type": [ + "null", + "string" + ] + }, + "foreignKeyField": { + "type": "object", + "properties": { + "id": { + "type": [ + "null", + "integer" + ] + }, + "label": { + "type": [ + "null", + "string" + ] + }, + "type": { + "type": [ + "null", + "string" + ] + } + } + }, + "id": { + "type": [ + "null", + "integer" + ] + }, + "isCrossApp": { + "type": [ + "null", + "boolean" + ] + }, + "lookupFields": { + "type": [ + "null", + "array" + ], + "items": { + "type": "object", + "properties": { + "id": { + "type": [ + "null", + "integer" + ] + }, + "label": { + "type": [ + "null", + "string" + ] + }, + "type": { + "type": [ + "null", + "string" + ] + } + } + } + }, + "parentTableId": { + "type": [ + "null", + "string" + ] + }, + "summaryFields": { + "type": [ + "null", + "array" + ], + "items": { + "type": "object", + "properties": { + "id": { + "type": [ + "null", + "integer" + ] + }, + "label": { + "type": [ + "null", + "string" + ] + }, + "type": { + "type": [ + "null", + "string" + ] + } + } + } + } + } +} diff --git a/tap_quickbase/schemas/table_reports.json b/tap_quickbase/schemas/table_reports.json new file mode 100644 index 0000000..943b4ab --- /dev/null +++ b/tap_quickbase/schemas/table_reports.json @@ -0,0 +1,207 @@ +{ + "type": "object", + "properties": { + "description": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "string" + ] + }, + "incomplete": { + "type": [ + "null", + "boolean" + ] + }, + "name": { + "type": [ + "null", + "string" + ] + }, + "properties": { + "type": "object", + "properties": { + "barDataSources": { + "type": [ + "null", + "array" + ], + "items": { + "type": "object", + "properties": { + "aggregation": { + "type": [ + "null", + "string" + ] + }, + "fieldId": { + "type": [ + "null", + "integer" + ] + }, + "type": { + "type": [ + "null", + "string" + ] + } + } + } + }, + "barLabel": { + "type": [ + "null", + "string" + ] + }, + "categories": { + "type": "object", + "properties": { + "fieldId": { + "type": [ + "null", + "integer" + ] + }, + "grouping": { + "type": [ + "null", + "string" + ] + }, + "label": { + "type": [ + "null", + "string" + ] + }, + "order": { + "type": [ + "null", + "string" + ] + } + } + }, + "chartType": { + "type": [ + "null", + "string" + ] + }, + "lineDataSources": { + "type": [ + "null", + "array" + ], + "items": { + "type": "object", + "properties": { + "aggregation": { + "type": [ + "null", + "string" + ] + }, + "fieldId": { + "type": [ + "null", + "integer" + ] + }, + "type": { + "type": [ + "null", + "string" + ] + } + } + } + }, + "lineLabel": { + "type": [ + "null", + "string" + ] + }, + "sortBy": { + "type": [ + "null", + "array" + ], + "items": { + "type": "object", + "properties": { + "by": { + "type": [ + "null", + "string" + ] + }, + "order": { + "type": [ + "null", + "string" + ] + } + } + } + } + } + }, + "query": { + "type": "object", + "properties": { + "filter": { + "type": [ + "null", + "string" + ] + }, + "formulaFields": { + "type": [ + "null", + "array" + ], + "items": { + "type": "string" + } + }, + "tableId": { + "type": [ + "null", + "string" + ] + } + } + }, + "type": { + "type": [ + "null", + "string" + ] + }, + "usedCount": { + "type": [ + "null", + "integer" + ] + }, + "usedLast": { + "type": [ + "null", + "string" + ], + "format": "date-time" + } + } +} diff --git a/tap_quickbase/schemas/tables.json b/tap_quickbase/schemas/tables.json new file mode 100644 index 0000000..cd2e5c9 --- /dev/null +++ b/tap_quickbase/schemas/tables.json @@ -0,0 +1,103 @@ +{ + "type": "object", + "properties": { + "name": { + "type": [ + "null", + "string" + ] + }, + "created": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "updated": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "alias": { + "type": [ + "null", + "string" + ] + }, + "description": { + "type": [ + "null", + "string" + ] + }, + "id": { + "type": [ + "null", + "string" + ] + }, + "nextRecordId": { + "type": [ + "null", + "integer" + ] + }, + "nextFieldId": { + "type": [ + "null", + "integer" + ] + }, + "defaultSortFieldId": { + "type": [ + "null", + "integer" + ] + }, + "defaultSortOrder": { + "type": [ + "null", + "string" + ] + }, + "keyFieldId": { + "type": [ + "null", + "integer" + ] + }, + "singleRecordName": { + "type": [ + "null", + "string" + ] + }, + "pluralRecordName": { + "type": [ + "null", + "string" + ] + }, + "sizeLimit": { + "type": [ + "null", + "string" + ] + }, + "spaceUsed": { + "type": [ + "null", + "string" + ] + }, + "spaceRemaining": { + "type": [ + "null", + "string" + ] + } + } +} diff --git a/tap_quickbase/streams/__init__.py b/tap_quickbase/streams/__init__.py new file mode 100644 index 0000000..e56d8cc --- /dev/null +++ b/tap_quickbase/streams/__init__.py @@ -0,0 +1,28 @@ +from tap_quickbase.streams.apps import Apps +from tap_quickbase.streams.events import Events +from tap_quickbase.streams.roles import Roles +from tap_quickbase.streams.app_tables import AppTables +from tap_quickbase.streams.tables import Tables +from tap_quickbase.streams.table_relationships import TableRelationships +from tap_quickbase.streams.table_reports import TableReports +from tap_quickbase.streams.get_reports import GetReports +from tap_quickbase.streams.fields import Fields +from tap_quickbase.streams.get_fields import GetFields +from tap_quickbase.streams.fields_usage import FieldsUsage +from tap_quickbase.streams.get_field_usage import GetFieldUsage + +STREAMS = { + "apps": Apps, + "events": Events, + "roles": Roles, + "app_tables": AppTables, + "tables": Tables, + "table_relationships": TableRelationships, + "table_reports": TableReports, + "get_reports": GetReports, + "fields": Fields, + "get_fields": GetFields, + "fields_usage": FieldsUsage, + "get_field_usage": GetFieldUsage, +} + diff --git a/tap_quickbase/streams/abstracts.py b/tap_quickbase/streams/abstracts.py new file mode 100644 index 0000000..0cfcba6 --- /dev/null +++ b/tap_quickbase/streams/abstracts.py @@ -0,0 +1,302 @@ +from abc import ABC, abstractmethod +import json +from typing import Any, Dict, Tuple, List, Iterator +from singer import ( + Transformer, + get_bookmark, + get_logger, + metrics, + write_bookmark, + write_record, + write_schema, + metadata +) + +LOGGER = get_logger() + + +class BaseStream(ABC): + """ + A Base Class providing structure and boilerplate for generic streams + and required attributes for any kind of stream + ~~~ + Provides: + - Basic Attributes (stream_name,replication_method,key_properties) + - Helper methods for catalog generation + - `sync` and `get_records` method for performing sync + """ + + url_endpoint = "" + path = "" + page_size = 100 + next_page_key = "next" + headers = {'Accept': 'application/json'} + children = [] + parent = "" + data_key = "" + parent_bookmark_key = "" + http_method = "POST" + + def __init__(self, client=None, catalog=None) -> None: + self.client = client + self.catalog = catalog + self.schema = catalog.schema.to_dict() + self.metadata = metadata.to_map(catalog.metadata) + self.child_to_sync = [] + self.params = {} + self.data_payload = {} + + @property + @abstractmethod + def tap_stream_id(self) -> str: + """Unique identifier for the stream. + + This is allowed to be different from the name of the stream, in + order to allow for sources that have duplicate stream names. + """ + + @property + @abstractmethod + def replication_method(self) -> str: + """Defines the sync mode of a stream.""" + + @property + @abstractmethod + def replication_keys(self) -> List: + """Defines the replication key for incremental sync mode of a + stream.""" + + @property + @abstractmethod + def key_properties(self) -> Tuple[str, str]: + """List of key properties for stream.""" + + def is_selected(self): + return metadata.get(self.metadata, (), "selected") + + @abstractmethod + def sync( + self, + state: Dict, + transformer: Transformer, + parent_obj: Dict = None, + ) -> Dict: + """ + Performs a replication sync for the stream. + ~~~ + Args: + - state (dict): represents the state file for the tap. + - transformer (object): A Object of the singer.transformer class. + - parent_obj (dict): The parent object for the stream. + + Returns: + - bool: The return value. True for success, False otherwise. + + Docs: + - https://github.com/singer-io/getting-started/blob/master/docs/SYNC_MODE.md + """ + + + def get_records(self) -> Iterator: + """Interacts with api client interaction and pagination.""" + self.params["page"] = self.page_size + next_page = 1 + while next_page: + response = self.client.make_request( + self.http_method, + self.url_endpoint, + self.params, + self.headers, + body=json.dumps(self.data_payload), + path=self.path + ) + raw_records = response.get(self.data_key, []) + next_page = response.get(self.next_page_key) + + self.params[self.next_page_key] = next_page + yield from raw_records + + def write_schema(self) -> None: + """ + Write a schema message. + """ + try: + write_schema(self.tap_stream_id, self.schema, self.key_properties) + except OSError as err: + LOGGER.error( + "OS Error while writing schema for: {}".format(self.tap_stream_id) + ) + raise err + + def update_params(self, **kwargs) -> None: + """ + Update params for the stream + """ + self.params.update(kwargs) + + def update_data_payload(self, **kwargs) -> None: + """ + Update JSON body for the stream + """ + self.data_payload.update(kwargs) + + def modify_object(self, record: Dict, parent_record: Dict = None) -> Dict: + """ + Modify the record before writing to the stream + """ + return record + + def get_url_endpoint(self, parent_obj: Dict = None) -> str: + """ + Get the URL endpoint for the stream + """ + return self.url_endpoint or f"{self.client.base_url}/{self.path}" + + +class IncrementalStream(BaseStream): + """Base Class for Incremental Stream.""" + + + def get_bookmark(self, state: dict, stream: str, key: Any = None) -> int: + """A wrapper for singer.get_bookmark to deal with compatibility for + bookmark values or start values.""" + return get_bookmark( + state, + stream, + key or self.replication_keys[0], + self.client.config["start_date"], + ) + + def write_bookmark(self, state: dict, stream: str, key: Any = None, value: Any = None) -> Dict: + """A wrapper for singer.get_bookmark to deal with compatibility for + bookmark values or start values.""" + if not (key or self.replication_keys): + return state + + current_bookmark = get_bookmark(state, stream, key or self.replication_keys[0], self.client.config["start_date"]) + value = max(current_bookmark, value) + return write_bookmark( + state, stream, key or self.replication_keys[0], value + ) + + + def sync( + self, + state: Dict, + transformer: Transformer, + parent_obj: Dict = None, + ) -> Dict: + """Implementation for `type: Incremental` stream.""" + bookmark_date = self.get_bookmark(state, self.tap_stream_id) + current_max_bookmark_date = bookmark_date + self.update_params(updated_since=bookmark_date) + self.update_data_payload(parent_obj) + self.url_endpoint = self.get_url_endpoint(parent_obj) + + with metrics.record_counter(self.tap_stream_id) as counter: + for record in self.get_records(): + record = self.modify_object(record, parent_obj) + transformed_record = transformer.transform( + record, self.schema, self.metadata + ) + + record_bookmark = transformed_record[self.replication_keys[0]] + if record_bookmark >= bookmark_date: + if self.is_selected(): + write_record(self.tap_stream_id, transformed_record) + counter.increment() + + current_max_bookmark_date = max( + current_max_bookmark_date, record_bookmark + ) + + for child in self.child_to_sync: + child.sync(state=state, transformer=transformer, parent_obj=record) + + state = self.write_bookmark(state, self.tap_stream_id, value=current_max_bookmark_date) + return counter.value + + +class FullTableStream(BaseStream): + """Base Class for Incremental Stream.""" + + replication_keys = [] + + def sync( + self, + state: Dict, + transformer: Transformer, + parent_obj: Dict = None, + ) -> Dict: + """Abstract implementation for `type: Fulltable` stream.""" + self.url_endpoint = self.get_url_endpoint(parent_obj) + self.update_data_payload(parent_obj) + with metrics.record_counter(self.tap_stream_id) as counter: + for record in self.get_records(): + transformed_record = transformer.transform( + record, self.schema, self.metadata + ) + if self.is_selected(): + write_record(self.tap_stream_id, transformed_record) + counter.increment() + + for child in self.child_to_sync: + child.sync(state=state, transformer=transformer, parent_obj=record) + + return counter.value + + +class ParentBaseStream(IncrementalStream): + """Base Class for Parent Stream.""" + + def get_bookmark(self, state: Dict, stream: str, key: Any = None) -> int: + """A wrapper for singer.get_bookmark to deal with compatibility for + bookmark values or start values.""" + + min_parent_bookmark = ( + super().get_bookmark(state, stream) if self.is_selected() else None + ) + for child in self.child_to_sync: + bookmark_key = f"{self.tap_stream_id}_{self.replication_keys[0]}" + child_bookmark = super().get_bookmark( + state, child.tap_stream_id, key=bookmark_key + ) + min_parent_bookmark = ( + min(min_parent_bookmark, child_bookmark) + if min_parent_bookmark + else child_bookmark + ) + + return min_parent_bookmark + + def write_bookmark( + self, state: Dict, stream: str, key: Any = None, value: Any = None + ) -> Dict: + """A wrapper for singer.get_bookmark to deal with compatibility for + bookmark values or start values.""" + if self.is_selected(): + super().write_bookmark(state, stream, value=value) + + for child in self.child_to_sync: + bookmark_key = f"{self.tap_stream_id}_{self.replication_keys[0]}" + super().write_bookmark( + state, child.tap_stream_id, key=bookmark_key, value=value + ) + + return state + + +class ChildBaseStream(IncrementalStream): + """Base Class for Child Stream.""" + + def get_url_endpoint(self, parent_obj=None): + """Prepare URL endpoint for child streams.""" + return f"{self.client.base_url}/{self.path.format(parent_obj['id'])}" + + def get_bookmark(self, state: Dict, stream: str, key: Any = None) -> int: + """Singleton bookmark value for child streams.""" + if not self.bookmark_value: + self.bookmark_value = super().get_bookmark(state, stream) + + return self.bookmark_value + diff --git a/tap_quickbase/streams/app_tables.py b/tap_quickbase/streams/app_tables.py new file mode 100644 index 0000000..1b26cd5 --- /dev/null +++ b/tap_quickbase/streams/app_tables.py @@ -0,0 +1,11 @@ +from tap_quickbase.streams.abstracts import ChildBaseStream + +class AppTables(ChildBaseStream): + tap_stream_id = "app_tables" + key_properties = ["id"] + replication_method = "INCREMENTAL" + replication_keys = ["updated"] + path = "v1/tables?appId={appId}" + parent = "apps" + bookmark_value = None + diff --git a/tap_quickbase/streams/apps.py b/tap_quickbase/streams/apps.py new file mode 100644 index 0000000..721b9ba --- /dev/null +++ b/tap_quickbase/streams/apps.py @@ -0,0 +1,9 @@ +from tap_quickbase.streams.abstracts import IncrementalStream + +class Apps(IncrementalStream): + tap_stream_id = "apps" + key_properties = ["id"] + replication_method = "INCREMENTAL" + replication_keys = ["updated"] + path = "v1/apps/{appId}" + diff --git a/tap_quickbase/streams/events.py b/tap_quickbase/streams/events.py new file mode 100644 index 0000000..ac3b7e3 --- /dev/null +++ b/tap_quickbase/streams/events.py @@ -0,0 +1,10 @@ +from tap_quickbase.streams.abstracts import FullTableStream + +class Events(FullTableStream): + tap_stream_id = "events" + key_properties = ["id"] + replication_method = "FULL_TABLE" + replication_keys = [] + path = "v1/apps/{appId}/events" + parent = "apps" + diff --git a/tap_quickbase/streams/fields.py b/tap_quickbase/streams/fields.py new file mode 100644 index 0000000..08b0e20 --- /dev/null +++ b/tap_quickbase/streams/fields.py @@ -0,0 +1,9 @@ +from tap_quickbase.streams.abstracts import FullTableStream + +class Fields(FullTableStream): + tap_stream_id = "fields" + key_properties = ["id"] + replication_method = "FULL_TABLE" + replication_keys = [] + path = "v1/fields?tableId={tableId}" + diff --git a/tap_quickbase/streams/fields_usage.py b/tap_quickbase/streams/fields_usage.py new file mode 100644 index 0000000..6d51b44 --- /dev/null +++ b/tap_quickbase/streams/fields_usage.py @@ -0,0 +1,9 @@ +from tap_quickbase.streams.abstracts import FullTableStream + +class FieldsUsage(FullTableStream): + tap_stream_id = "fields_usage" + key_properties = ["id"] + replication_method = "FULL_TABLE" + replication_keys = [] + path = "v1/fields/usage?tableId={tableId}" + diff --git a/tap_quickbase/streams/get_field_usage.py b/tap_quickbase/streams/get_field_usage.py new file mode 100644 index 0000000..0c03516 --- /dev/null +++ b/tap_quickbase/streams/get_field_usage.py @@ -0,0 +1,10 @@ +from tap_quickbase.streams.abstracts import FullTableStream + +class GetFieldUsage(FullTableStream): + tap_stream_id = "get_field_usage" + key_properties = ["id"] + replication_method = "FULL_TABLE" + replication_keys = [] + path = "v1/fields/usage/{fieldId}?tableId={tableId}" + parent = "fields_usage" + diff --git a/tap_quickbase/streams/get_fields.py b/tap_quickbase/streams/get_fields.py new file mode 100644 index 0000000..3e1ed25 --- /dev/null +++ b/tap_quickbase/streams/get_fields.py @@ -0,0 +1,10 @@ +from tap_quickbase.streams.abstracts import FullTableStream + +class GetFields(FullTableStream): + tap_stream_id = "get_fields" + key_properties = ["id"] + replication_method = "FULL_TABLE" + replication_keys = [] + path = "v1/fields/{fieldId}?tableId={tableId}" + parent = "fields" + diff --git a/tap_quickbase/streams/get_reports.py b/tap_quickbase/streams/get_reports.py new file mode 100644 index 0000000..ec2b3e9 --- /dev/null +++ b/tap_quickbase/streams/get_reports.py @@ -0,0 +1,10 @@ +from tap_quickbase.streams.abstracts import FullTableStream + +class GetReports(FullTableStream): + tap_stream_id = "get_reports" + key_properties = ["id"] + replication_method = "FULL_TABLE" + replication_keys = [] + path = "v1/reports/{reportId}?tableId={tableId}" + parent = "table_reports" + diff --git a/tap_quickbase/streams/roles.py b/tap_quickbase/streams/roles.py new file mode 100644 index 0000000..e9305ab --- /dev/null +++ b/tap_quickbase/streams/roles.py @@ -0,0 +1,10 @@ +from tap_quickbase.streams.abstracts import FullTableStream + +class Roles(FullTableStream): + tap_stream_id = "roles" + key_properties = ["id"] + replication_method = "FULL_TABLE" + replication_keys = [] + path = "v1/apps/{appId}/roles" + parent = "apps" + diff --git a/tap_quickbase/streams/table_relationships.py b/tap_quickbase/streams/table_relationships.py new file mode 100644 index 0000000..0ffc7af --- /dev/null +++ b/tap_quickbase/streams/table_relationships.py @@ -0,0 +1,10 @@ +from tap_quickbase.streams.abstracts import FullTableStream + +class TableRelationships(FullTableStream): + tap_stream_id = "table_relationships" + key_properties = ["id"] + replication_method = "FULL_TABLE" + replication_keys = [] + data_key = "relationships" + path = "v1/tables/{tableId}/relationships" + diff --git a/tap_quickbase/streams/table_reports.py b/tap_quickbase/streams/table_reports.py new file mode 100644 index 0000000..1a9b7d9 --- /dev/null +++ b/tap_quickbase/streams/table_reports.py @@ -0,0 +1,10 @@ +from tap_quickbase.streams.abstracts import FullTableStream + +class TableReports(FullTableStream): + tap_stream_id = "table_reports" + key_properties = ["id"] + replication_method = "FULL_TABLE" + replication_keys = [] + path = "v1/reports?tableId={tableId}" + parent = "tables" + diff --git a/tap_quickbase/streams/tables.py b/tap_quickbase/streams/tables.py new file mode 100644 index 0000000..ca29491 --- /dev/null +++ b/tap_quickbase/streams/tables.py @@ -0,0 +1,11 @@ +from tap_quickbase.streams.abstracts import ChildBaseStream + +class Tables(ChildBaseStream): + tap_stream_id = "tables" + key_properties = ["id"] + replication_method = "INCREMENTAL" + replication_keys = ["updated"] + path = "v1/tables/{tableId}?appId={appId}" + parent = "app_tables" + bookmark_value = None + diff --git a/tap_quickbase/sync.py b/tap_quickbase/sync.py new file mode 100644 index 0000000..902e07e --- /dev/null +++ b/tap_quickbase/sync.py @@ -0,0 +1,67 @@ +import singer +from typing import Dict +from tap_quickbase.streams import STREAMS +from tap_quickbase.client import Client + +LOGGER = singer.get_logger() + + +def update_currently_syncing(state: Dict, stream_name: str) -> None: + """ + Update currently_syncing in state and write it + """ + if not stream_name and singer.get_currently_syncing(state): + del state["currently_syncing"] + else: + singer.set_currently_syncing(state, stream_name) + singer.write_state(state) + + +def write_schema(stream, client, streams_to_sync, catalog) -> None: + """ + Write schema for stream and its children + """ + if stream.is_selected(): + stream.write_schema() + + for child in stream.children: + child_obj = STREAMS[child](client, catalog.get_stream(child)) + write_schema(child_obj, client, streams_to_sync, catalog) + if child in streams_to_sync: + stream.child_to_sync.append(child_obj) + + +def sync(client: Client, config: Dict, catalog: singer.Catalog, state) -> None: + """ + Sync selected streams from catalog + """ + + streams_to_sync = [] + for stream in catalog.get_selected_streams(state): + streams_to_sync.append(stream.stream) + LOGGER.info("selected_streams: {}".format(streams_to_sync)) + + last_stream = singer.get_currently_syncing(state) + LOGGER.info("last/currently syncing stream: {}".format(last_stream)) + + with singer.Transformer() as transformer: + for stream_name in streams_to_sync: + + stream = STREAMS[stream_name](client, catalog.get_stream(stream_name)) + if stream.parent: + if stream.parent not in streams_to_sync: + streams_to_sync.append(stream.parent) + continue + + write_schema(stream, client, streams_to_sync, catalog) + LOGGER.info("START Syncing: {}".format(stream_name)) + update_currently_syncing(state, stream_name) + total_records = stream.sync(state=state, transformer=transformer) + + update_currently_syncing(state, None) + LOGGER.info( + "FINISHED Syncing: {}, total_records: {}".format( + stream_name, total_records + ) + ) + diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/base.py b/tests/base.py new file mode 100644 index 0000000..cb280da --- /dev/null +++ b/tests/base.py @@ -0,0 +1,152 @@ +import copy +import os +import unittest +from datetime import datetime as dt +from datetime import timedelta + +import dateutil.parser +import pytz +from tap_tester import connections, menagerie, runner +from tap_tester.logger import LOGGER +from tap_tester.base_suite_tests.base_case import BaseCase + + +class QuickbaseBaseTest(BaseCase): + """Setup expectations for test sub classes. + + Metadata describing streams. A bunch of shared methods that are used + in tap-tester tests. Shared tap-specific methods (as needed). + """ + start_date = "2019-01-01T00:00:00Z" + PARENT_TAP_STREAM_ID = "parent-tap-stream-id" + + @staticmethod + def tap_name(): + """The name of the tap.""" + return "tap-quickbase" + + @staticmethod + def get_type(): + """The name of the tap.""" + return "platform.quickbase" + + @classmethod + def expected_metadata(cls): + """The expected streams and metadata about the streams.""" + return { + "apps": { + cls.PRIMARY_KEYS: { "id" }, + cls.REPLICATION_METHOD: cls.INCREMENTAL, + cls.REPLICATION_KEYS: set(), + cls.OBEYS_START_DATE: False, + cls.API_LIMIT: 100 + }, + "events": { + cls.PRIMARY_KEYS: { "id" }, + cls.REPLICATION_METHOD: cls.FULL_TABLE, + cls.REPLICATION_KEYS: set(), + cls.OBEYS_START_DATE: False, + cls.PARENT_TAP_STREAM_ID: "apps", + cls.API_LIMIT: 100 + }, + "roles": { + cls.PRIMARY_KEYS: { "id" }, + cls.REPLICATION_METHOD: cls.FULL_TABLE, + cls.REPLICATION_KEYS: set(), + cls.OBEYS_START_DATE: False, + cls.PARENT_TAP_STREAM_ID: "apps", + cls.API_LIMIT: 100 + }, + "app_tables": { + cls.PRIMARY_KEYS: { "id" }, + cls.REPLICATION_METHOD: cls.INCREMENTAL, + cls.REPLICATION_KEYS: set(), + cls.OBEYS_START_DATE: False, + cls.PARENT_TAP_STREAM_ID: "apps", + cls.API_LIMIT: 100 + }, + "tables": { + cls.PRIMARY_KEYS: { "id" }, + cls.REPLICATION_METHOD: cls.INCREMENTAL, + cls.REPLICATION_KEYS: set(), + cls.OBEYS_START_DATE: False, + cls.PARENT_TAP_STREAM_ID: "app_tables", + cls.API_LIMIT: 100 + }, + "table_relationships": { + cls.PRIMARY_KEYS: { "id" }, + cls.REPLICATION_METHOD: cls.FULL_TABLE, + cls.REPLICATION_KEYS: set(), + cls.OBEYS_START_DATE: False, + cls.API_LIMIT: 100 + }, + "table_reports": { + cls.PRIMARY_KEYS: { "id" }, + cls.REPLICATION_METHOD: cls.FULL_TABLE, + cls.REPLICATION_KEYS: set(), + cls.OBEYS_START_DATE: False, + cls.PARENT_TAP_STREAM_ID: "tables", + cls.API_LIMIT: 100 + }, + "get_reports": { + cls.PRIMARY_KEYS: { "id" }, + cls.REPLICATION_METHOD: cls.FULL_TABLE, + cls.REPLICATION_KEYS: set(), + cls.OBEYS_START_DATE: False, + cls.PARENT_TAP_STREAM_ID: "table_reports", + cls.API_LIMIT: 100 + }, + "fields": { + cls.PRIMARY_KEYS: { "id" }, + cls.REPLICATION_METHOD: cls.FULL_TABLE, + cls.REPLICATION_KEYS: set(), + cls.OBEYS_START_DATE: False, + cls.API_LIMIT: 100 + }, + "get_fields": { + cls.PRIMARY_KEYS: { "id" }, + cls.REPLICATION_METHOD: cls.FULL_TABLE, + cls.REPLICATION_KEYS: set(), + cls.OBEYS_START_DATE: False, + cls.PARENT_TAP_STREAM_ID: "fields", + cls.API_LIMIT: 100 + }, + "fields_usage": { + cls.PRIMARY_KEYS: { "id" }, + cls.REPLICATION_METHOD: cls.FULL_TABLE, + cls.REPLICATION_KEYS: set(), + cls.OBEYS_START_DATE: False, + cls.API_LIMIT: 100 + }, + "get_field_usage": { + cls.PRIMARY_KEYS: { "id" }, + cls.REPLICATION_METHOD: cls.FULL_TABLE, + cls.REPLICATION_KEYS: set(), + cls.OBEYS_START_DATE: False, + cls.PARENT_TAP_STREAM_ID: "fields_usage", + cls.API_LIMIT: 100 + } + } + + @staticmethod + def get_credentials(): + """Authentication information for the test account.""" + credentials_dict = {} + creds = {'user_token': 'TAP_QUICKBASE_USER_TOKEN'} + + for cred in creds: + credentials_dict[cred] = os.getenv(creds[cred]) + + return credentials_dict + + def get_properties(self, original: bool = True): + """Configuration of properties required for the tap.""" + return_value = { + "start_date": "2022-07-01T00:00:00Z" + } + if original: + return return_value + + return_value["start_date"] = self.start_date + return return_value + diff --git a/tests/mock_connection.py b/tests/mock_connection.py deleted file mode 100644 index 3bebe81..0000000 --- a/tests/mock_connection.py +++ /dev/null @@ -1,67 +0,0 @@ -class MockConnection(): - appid = "app_id" - - def get_tables(self): - return [ - { - 'id': '1', - 'name': 'table_name', - 'app_name': 'app_name', - 'app_id': 'app_id', - } - ] - - def get_fields(self, table_id): - fields = {'1': { - '1': { - 'id': '1', - 'name': 'datecreated', - 'type': 'timestamp', - 'base_type': 'int64', - 'parent_field_id': '', - }, - '2': { - 'id': '2', - 'name': 'datemodified', - 'type': 'timestamp', - 'base_type': 'int64', - 'parent_field_id': '', - }, - '3': { - 'id': '3', - 'name': 'text_field', - 'type': 'text', - 'base_type': 'text', - 'parent_field_id': '', - }, - '4': { - 'id': '4', - 'name': 'boolean_field', - 'type': 'checkbox', - 'base_type': 'bool', - 'parent_field_id': '', - }, - '5': { - 'id': '5', - 'name': 'float_field', - 'type': 'float', - 'base_type': 'float', - 'parent_field_id': '', - }, - '6': { - 'id': '6', - 'name': 'child_text_field', - 'type': 'text', - 'base_type': 'float', - 'parent_field_id': '7', - }, - '7': { - 'id':'7', - 'name': 'parent_field', - 'type': 'text', - 'base_type': 'text', - 'parent_field_id': '', - 'composite_fields': ['6'], - } - }} - return fields[table_id] diff --git a/tests/test_all_fields.py b/tests/test_all_fields.py new file mode 100644 index 0000000..6b4bea6 --- /dev/null +++ b/tests/test_all_fields.py @@ -0,0 +1,20 @@ +from base import QuickbaseBaseTest +from tap_tester.base_suite_tests.all_fields_test import AllFieldsTest + +KNOWN_MISSING_FIELDS = { + +} + + +class QuickbaseAllFields(AllFieldsTest, QuickbaseBaseTest): + """Ensure running the tap with all streams and fields selected results in + the replication of all fields.""" + + @staticmethod + def name(): + return "tap_tester_quickbase_all_fields_test" + + def streams_to_test(self): + streams_to_exclude = {} + return self.expected_stream_names().difference(streams_to_exclude) + diff --git a/tests/test_automatic_fields.py b/tests/test_automatic_fields.py new file mode 100644 index 0000000..1e937e9 --- /dev/null +++ b/tests/test_automatic_fields.py @@ -0,0 +1,18 @@ +"""Test that with no fields selected for a stream automatic fields are still +replicated.""" +from base import QuickbaseBaseTest +from tap_tester.base_suite_tests.automatic_fields_test import MinimumSelectionTest + + +class QuickbaseAutomaticFields(MinimumSelectionTest, QuickbaseBaseTest): + """Test that with no fields selected for a stream automatic fields are + still replicated.""" + + @staticmethod + def name(): + return "tap_tester_quickbase_automatic_fields_test" + + def streams_to_test(self): + streams_to_exclude = {} + return self.expected_stream_names().difference(streams_to_exclude) + diff --git a/tests/test_bookmark.py b/tests/test_bookmark.py new file mode 100644 index 0000000..6a5f76f --- /dev/null +++ b/tests/test_bookmark.py @@ -0,0 +1,20 @@ +from base import QuickbaseBaseTest +from tap_tester.base_suite_tests.bookmark_test import BookmarkTest + + +class QuickbaseBookMarkTest(BookmarkTest, QuickbaseBaseTest): + """Test tap sets a bookmark and respects it for the next sync of a + stream.""" + bookmark_format = "%Y-%m-%dT%H:%M:%S.%fZ" + initial_bookmarks = { + "bookmarks": { + } + } + @staticmethod + def name(): + return "tap_tester_quickbase_bookmark_test" + + def streams_to_test(self): + streams_to_exclude = {} + return self.expected_stream_names().difference(streams_to_exclude) + diff --git a/tests/test_discovery.py b/tests/test_discovery.py new file mode 100644 index 0000000..1a04bba --- /dev/null +++ b/tests/test_discovery.py @@ -0,0 +1,15 @@ +"""Test tap discovery mode and metadata.""" +from base import QuickbaseBaseTest +from tap_tester.base_suite_tests.discovery_test import DiscoveryTest + + +class QuickbaseDiscoveryTest(DiscoveryTest, QuickbaseBaseTest): + """Test tap discovery mode and metadata conforms to standards.""" + + @staticmethod + def name(): + return "tap_tester_quickbase_discovery_test" + + def streams_to_test(self): + return self.expected_stream_names() + diff --git a/tests/test_interrupted_sync.py b/tests/test_interrupted_sync.py new file mode 100644 index 0000000..0fc3b95 --- /dev/null +++ b/tests/test_interrupted_sync.py @@ -0,0 +1,24 @@ + +from base import QuickbaseBaseTest +from tap_tester.base_suite_tests.interrupted_sync_test import InterruptedSyncTest + + +class QuickbaseInterruptedSyncTest(QuickbaseBaseTest): + """Test tap sets a bookmark and respects it for the next sync of a + stream.""" + + @staticmethod + def name(): + return "tap_tester_quickbase_interrupted_sync_test" + + def streams_to_test(self): + return self.expected_stream_names() + + + def manipulate_state(self): + return { + "currently_syncing": "prospects", + "bookmarks": { + } + } + diff --git a/tests/test_pagination.py b/tests/test_pagination.py new file mode 100644 index 0000000..7fd2363 --- /dev/null +++ b/tests/test_pagination.py @@ -0,0 +1,16 @@ +from tap_tester.base_suite_tests.pagination_test import PaginationTest +from base import QuickbaseBaseTest + +class QuickbasePaginationTest(PaginationTest, QuickbaseBaseTest): + """ + Ensure tap can replicate multiple pages of data for streams that use pagination. + """ + + @staticmethod + def name(): + return "tap_tester_quickbase_pagination_test" + + def streams_to_test(self): + streams_to_exclude = {} + return self.expected_stream_names().difference(streams_to_exclude) + diff --git a/tests/test_start_date.py b/tests/test_start_date.py new file mode 100644 index 0000000..7837434 --- /dev/null +++ b/tests/test_start_date.py @@ -0,0 +1,24 @@ +from base import QuickbaseBaseTest +from tap_tester.base_suite_tests.start_date_test import StartDateTest + + + +class QuickbaseStartDateTest(StartDateTest, QuickbaseBaseTest): + """Instantiate start date according to the desired data set and run the + test.""" + + @staticmethod + def name(): + return "tap_tester_quickbase_start_date_test" + + def streams_to_test(self): + streams_to_exclude = {} + return self.expected_stream_names().difference(streams_to_exclude) + + @property + def start_date_1(self): + return "2015-03-25T00:00:00Z" + @property + def start_date_2(self): + return "2017-01-25T00:00:00Z" + diff --git a/tests/test_tap.py b/tests/test_tap.py deleted file mode 100644 index 8498278..0000000 --- a/tests/test_tap.py +++ /dev/null @@ -1,114 +0,0 @@ -import unittest -import tap_quickbase -import singer.metadata as singer_metadata - -from .mock_connection import MockConnection - - -class TestDiscoverCatalog(unittest.TestCase): - - @classmethod - def setUpClass(cls): - cls.conn = MockConnection() - cls.catalog = tap_quickbase.discover_catalog(cls.conn) - - def test_tables_length(self): - self.assertEqual(1, len(self.catalog.streams)) - - def test_tap_stream_id(self): - self.assertEqual("app_name__table_name", self.catalog.streams[0].tap_stream_id) - - def test_app_metadata(self): - metadata = singer_metadata.to_map(self.catalog.streams[0].metadata) - self.assertEqual("app_id", singer_metadata.get(metadata, tuple(), "tap-quickbase.app_id")) - - def test_key_properties(self): - self.assertEqual(1, len(self.catalog.streams[0].key_properties)) - self.assertEqual("rid", self.catalog.streams[0].key_properties[0]) - - def test_discovered_properties(self): - api_fields = set([f["name"] for f in self.conn.get_fields('1').values()]) - schema_fields = set(self.catalog.streams[0].schema.properties.keys()) - api_fields.remove('child_text_field') # Children are nested - schema_fields.remove('rid') # rid is added artificially in discovery mode - self.assertEqual(api_fields, schema_fields) - - def test_properties_rid_automatic(self): - metadata = singer_metadata.to_map(self.catalog.streams[0].metadata) - self.assertEqual( - "automatic", - singer_metadata.get(metadata, ("properties", "rid"), "inclusion") - ) - - def test_properties_timestamp(self): - self.assertEqual( - "string", - self.catalog.streams[0].schema.properties['datecreated'].type[1] - ) - self.assertEqual( - "date-time", - self.catalog.streams[0].schema.properties['datecreated'].format - ) - - def test_properties_string(self): - self.assertEqual( - "string", - self.catalog.streams[0].schema.properties['text_field'].type[1] - ) - - def test_properties_boolean(self): - self.assertEqual( - "boolean", - self.catalog.streams[0].schema.properties['boolean_field'].type[1] - ) - - def test_properties_float(self): - self.assertEqual( - "number", - self.catalog.streams[0].schema.properties['float_field'].type[1] - ) - - def test_metadata_length(self): - additional_metadata_count = 2 # app_id root level, and rid record - self.assertEqual(len(self.conn.get_fields('1')) + additional_metadata_count, - len(self.catalog.streams[0].metadata)) - - def test_metadata_datecreated_id(self): - found_breadcrumb = False - for meta in self.catalog.streams[0].metadata: - if tuple(meta['breadcrumb']) == ("properties", "datecreated", ): - found_breadcrumb = True - self.assertEqual("1", meta['metadata']['tap-quickbase.id']) - self.assertTrue(found_breadcrumb) - - def test_child_field(self): - composite_name = tap_quickbase.format_child_field_name("parent_field", "child_text_field") - pieces = composite_name.split('.') - parent_name = pieces[0] - child_name = pieces[1] - self.assertTrue( - child_name in self.catalog.streams[0].schema.properties[parent_name].properties - ) - - -class TestBuildFieldList(unittest.TestCase): - - @classmethod - def setUpClass(cls): - cls.conn = MockConnection() - cls.catalog = tap_quickbase.discover_catalog(cls.conn) - cls.schema = cls.catalog.streams[0].schema - cls.properties = cls.schema.properties - cls.metadata = singer_metadata.to_map(cls.catalog.streams[0].metadata) - - def test_build_field_list(self): - # by default only datemodified is included as a query field - field_list, ids_to_breadcrumbs = tap_quickbase.build_field_lists(self.schema, self.metadata, []) - self.assertEqual(1, len(field_list)) - self.assertEqual(['properties', 'datemodified'], ids_to_breadcrumbs['2']) - - def test_build_field_list_include_datecreated(self): - singer_metadata.write(self.metadata, ('properties','datecreated'), 'selected', True) - field_list, ids_to_breadcrumbs = tap_quickbase.build_field_lists(self.schema, self.metadata, []) - self.assertEqual(2, len(field_list)) - self.assertEqual(['properties', 'datecreated'], ids_to_breadcrumbs['1']) diff --git a/tests/unittests/test_client.py b/tests/unittests/test_client.py new file mode 100644 index 0000000..be4a6ca --- /dev/null +++ b/tests/unittests/test_client.py @@ -0,0 +1,134 @@ +import unittest +import requests +from unittest.mock import patch +from parameterized import parameterized +from requests.exceptions import Timeout, ConnectionError, ChunkedEncodingError +from tap_quickbase.client import Client +from tap_quickbase.exceptions import * + + +default_config = { + "base_url": "https://api.example.com", + "request_timeout": 30, + "access_token": "dummy_token", +} + +DEFAULT_REQUEST_TIMEOUT = 300 + +class MockResponse: + """Mocked standard HTTPResponse to test error handling.""" + + def __init__( + self, status_code, resp = "", content=[""], headers=None, raise_error=True, text={} + ): + self.json_data = resp + self.status_code = status_code + self.content = content + self.headers = headers + self.raise_error = raise_error + self.text = text + self.reason = "error" + + def raise_for_status(self): + """If an error occur, this method returns a HTTPError object. + + Raises: + requests.HTTPError: Mock http error. + + Returns: + int: Returns status code if not error occurred. + """ + if not self.raise_error: + return self.status_code + + raise requests.HTTPError("mock sample message") + + def json(self): + """Returns a JSON object of the result.""" + return self.text + +class TestClient(unittest.TestCase): + + def setUp(self): + """Set up the client with default configuration.""" + self.client = Client(default_config) + + @parameterized.expand([ + ["empty value", "", DEFAULT_REQUEST_TIMEOUT], + ["string value", "12", 12.0], + ["integer value", 10, 10.0], + ["float value", 20.0, 20.0], + ["zero value", 0, DEFAULT_REQUEST_TIMEOUT] + ]) + @patch("tap_quickbase.client.session") + def test_client_initialization(self, test_name, input_value, expected_value, mock_session): + default_config["request_timeout"] = input_value + client = Client(default_config) + assert client.request_timeout == expected_value + assert isinstance(client._session, mock_session().__class__) + + + @patch("tap_quickbase.client.Client._Client__make_request") + def test_client_get(self, mock_make_request): + mock_make_request.return_value = {"data": "ok"} + result = self.client.get("https://api.example.com/resource") + assert result == {"data": "ok"} + mock_make_request.assert_called_once() + + + @patch("tap_quickbase.client.Client._Client__make_request") + def test_client_post(self, mock_make_request): + mock_make_request.return_value = {"created": True} + result = self.client.post("https://api.example.com/resource", body={"key": "value"}) + assert result == {"created": True} + mock_make_request.assert_called_once() + + @parameterized.expand([ + ["400 error", 400, MockResponse(400), QuickbaseBadRequestError, "A validation exception has occurred."], + ["401 error", 401, MockResponse(401), QuickbaseUnauthorizedError, "The access token provided is expired, revoked, malformed or invalid for other reasons."], + ["403 error", 403, MockResponse(403), QuickbaseForbiddenError, "You are missing the following required scopes: read"], + ["404 error", 404, MockResponse(404), QuickbaseNotFoundError, "The resource you have specified cannot be found."], + ["409 error", 409, MockResponse(409), QuickbaseConflictError, "The API request cannot be completed because the requested operation would conflict with an existing item."], + ]) + def test_make_request_http_failure_without_retry(self, test_name, error_code, mock_response, error, error_message): + + with patch.object(self.client._session, "request", return_value=mock_response): + with self.assertRaises(error) as e: + self.client._Client__make_request("GET", "https://api.example.com/resource") + + expected_error_message = (f"HTTP-error-code: {error_code}, Error: {error_message}") + self.assertEqual(str(e.exception), expected_error_message) + + @parameterized.expand([ + ["422 error", 422, MockResponse(422), QuickbaseUnprocessableEntityError, "The request content itself is not processable by the server."], + ["429 error", 429, MockResponse(429), QuickbaseRateLimitError, "The API rate limit for your organisation/application pairing has been exceeded."], + ["500 error", 500, MockResponse(500), QuickbaseInternalServerError, "The server encountered an unexpected condition which prevented it from fulfilling the request."], + ["501 error", 501, MockResponse(501), QuickbaseNotImplementedError, "The server does not support the functionality required to fulfill the request."], + ["502 error", 502, MockResponse(502), QuickbaseBadGatewayError, "Server received an invalid response."], + ["503 error", 503, MockResponse(503), QuickbaseServiceUnavailableError, "API service is currently unavailable."], + ]) + @patch("time.sleep") + def test_make_request_http_failure_with_retry(self, test_name, error_code, mock_response, error, error_message, mock_sleep): + + with patch.object(self.client._session, "request", return_value=mock_response) as mock_request: + with self.assertRaises(error) as e: + self.client._Client__make_request("GET", "https://api.example.com/resource") + + expected_error_message = (f"HTTP-error-code: {error_code}, Error: {error_message}") + self.assertEqual(str(e.exception), expected_error_message) + self.assertEqual(mock_request.call_count, 5) + + @parameterized.expand([ + ["ConnectionResetError", ConnectionResetError], + ["ConnectionError", ConnectionError], + ["ChunkedEncodingError", ChunkedEncodingError], + ["Timeout", Timeout], + ]) + @patch("time.sleep") + def test_make_request_other_failure_with_retry(self, test_name, error, mock_sleep): + + with patch.object(self.client._session, "request", side_effect=error) as mock_request: + with self.assertRaises(error) as e: + self.client._Client__make_request("GET", "https://api.example.com/resource") + + self.assertEqual(mock_request.call_count, 5) diff --git a/tests/unittests/test_incremental_sync.py b/tests/unittests/test_incremental_sync.py new file mode 100644 index 0000000..d81afdc --- /dev/null +++ b/tests/unittests/test_incremental_sync.py @@ -0,0 +1,54 @@ +import unittest +from unittest.mock import patch, MagicMock +from tap_quickbase.streams.abstracts import IncrementalStream + +class ConcreteParentBaseStream(IncrementalStream): + @property + def key_properties(self): + return ["id"] + + @property + def replication_keys(self): + return ["updated_at"] + + @property + def replication_method(self): + return "INCREMENTAL" + + @property + def tap_stream_id(self): + return "stream_1" + +class TestSync(unittest.TestCase): + @patch("tap_quickbase.streams.abstracts.metadata.to_map") + def setUp(self, mock_to_map): + + mock_catalog = MagicMock() + mock_catalog.schema.to_dict.return_value = {"key": "value"} + mock_catalog.metadata = "mock_metadata" + mock_to_map.return_value = {"metadata_key": "metadata_value"} + + self.stream = ConcreteParentBaseStream(catalog=mock_catalog) + self.stream.client = MagicMock() + self.stream.child_to_sync = [] + + @patch("tap_quickbase.streams.abstracts.get_bookmark", return_value=100) + def test_write_bookmark_with_state(self, mock_get_bookmark): + + state = {'bookmarks': {'stream_1': {'updated_at': 100}}} + result = self.stream.write_bookmark(state, "stream_1", "updated_at", 200) + self.assertEqual(result, {'bookmarks': {'stream_1': {'updated_at': 200}}}) + + @patch("tap_quickbase.streams.abstracts.get_bookmark", return_value=100) + def test_write_bookmark_without_state(self, mock_get_bookmark): + + state = {} + result = self.stream.write_bookmark(state, "stream_1", "updated_at", 200) + self.assertEqual(result, {'bookmarks': {'stream_1': {'updated_at': 200}}}) + + @patch("tap_quickbase.streams.abstracts.get_bookmark", return_value=300) + def test_write_bookmark_with_old_value(self, mock_get_bookmark): + + state = {'bookmarks': {'stream_1': {'updated_at': 300}}} + result = self.stream.write_bookmark(state, "stream_1", "updated_at", 200) + self.assertEqual(result, {'bookmarks': {'stream_1': {'updated_at': 300}}}) diff --git a/tests/unittests/test_parent_child_bookmark.py b/tests/unittests/test_parent_child_bookmark.py new file mode 100644 index 0000000..6cfd8eb --- /dev/null +++ b/tests/unittests/test_parent_child_bookmark.py @@ -0,0 +1,94 @@ +import unittest +from unittest.mock import patch, MagicMock +from tap_quickbase.streams.abstracts import ParentBaseStream + +class ConcreteParentBaseStream(ParentBaseStream): + @property + def key_properties(self): + return ["id"] + + @property + def replication_keys(self): + return ["updated_at"] + + @property + def replication_method(self): + return "INCREMENTAL" + + @property + def tap_stream_id(self): + return "parent_stream" + +class TestSync(unittest.TestCase): + @patch("tap_quickbase.streams.abstracts.metadata.to_map") + def setUp(self, mock_to_map): + + mock_catalog = MagicMock() + mock_catalog.schema.to_dict.return_value = {"key": "value"} + mock_catalog.metadata = "mock_metadata" + mock_to_map.return_value = {"metadata_key": "metadata_value"} + + self.stream = ConcreteParentBaseStream(catalog=mock_catalog) + self.stream.child_to_sync = [] + + @patch("tap_quickbase.streams.abstracts.ParentBaseStream.is_selected", return_value=True) + @patch("tap_quickbase.streams.abstracts.ParentBaseStream.get_bookmark", return_value=100) + def test_get_bookmark_parent_only_selected(self, mock_get_bookmark, mock_is_selected): + + state = {} + result = self.stream.get_bookmark(state, "parent_stream") + mock_get_bookmark.assert_called_once_with(state, "parent_stream") + self.assertEqual(result, 100) + + @patch("tap_quickbase.streams.abstracts.BaseStream.is_selected", return_value=False) + @patch("tap_quickbase.streams.abstracts.IncrementalStream.get_bookmark", return_value = 100) + def test_get_bookmark_parent_only_but_not_selected(self, mock_get_bookmark, mock_is_selected): + + state = {} + result = self.stream.get_bookmark(state, "parent_stream") + self.assertEqual(result, None) + + @patch("tap_quickbase.streams.abstracts.BaseStream.is_selected", return_value=True) + @patch("tap_quickbase.streams.abstracts.IncrementalStream.get_bookmark", side_effect = [100, 50, 75]) + def test_get_bookmark_with_children(self, mock_get_bookmark, mock_is_selected): + + child1 = MagicMock() + child1.tap_stream_id = "child_stream_1" + child2 = MagicMock() + child2.tap_stream_id = "child_stream_2" + self.stream.child_to_sync = [child1, child2] + + state = {} + result = self.stream.get_bookmark(state, "parent_stream") + + self.assertEqual(mock_get_bookmark.call_count, 3) + mock_get_bookmark.assert_any_call(state, "parent_stream") + mock_get_bookmark.assert_any_call( + state, "child_stream_1", key="parent_stream_updated_at" + ) + mock_get_bookmark.assert_any_call( + state, "child_stream_2", key="parent_stream_updated_at" + ) + self.assertEqual(result, 50) + + @patch("tap_quickbase.streams.abstracts.BaseStream.is_selected", return_value=False) + @patch("tap_quickbase.streams.abstracts.IncrementalStream.get_bookmark", side_effect = [75, 50]) + def test_get_bookmark_only_children_selected(self, mock_get_bookmark, mock_is_selected): + + child1 = MagicMock() + child1.tap_stream_id = "child_stream_1" + child2 = MagicMock() + child2.tap_stream_id = "child_stream_2" + self.stream.child_to_sync = [child1, child2] + + state = {} + result = self.stream.get_bookmark(state, "parent_stream") + + self.assertEqual(mock_get_bookmark.call_count, 2) + mock_get_bookmark.assert_any_call( + state, "child_stream_1", key="parent_stream_updated_at" + ) + mock_get_bookmark.assert_any_call( + state, "child_stream_2", key="parent_stream_updated_at" + ) + self.assertEqual(result, 50) diff --git a/tests/unittests/test_sync.py b/tests/unittests/test_sync.py new file mode 100644 index 0000000..d3ef29d --- /dev/null +++ b/tests/unittests/test_sync.py @@ -0,0 +1,126 @@ +import unittest +from unittest.mock import patch, MagicMock +from tap_quickbase.sync import write_schema, sync, update_currently_syncing + +class TestSync(unittest.TestCase): + + def test_write_schema_only_parent_selected(self): + mock_stream = MagicMock() + mock_stream.is_selected.return_value = True + mock_stream.children = ["invoice_payments", "invoice_line_items"] + mock_stream.child_to_sync = [] + + client = MagicMock() + catalog = MagicMock() + catalog.get_stream.return_value = MagicMock() + + write_schema(mock_stream, client, [], catalog) + + mock_stream.write_schema.assert_called_once() + self.assertEqual(len(mock_stream.child_to_sync), 0) + + def test_write_schema_parent_child_both_selected(self): + mock_stream = MagicMock() + mock_stream.is_selected.return_value = True + mock_stream.children = ["invoice_payments", "invoice_line_items"] + mock_stream.child_to_sync = [] + + client = MagicMock() + catalog = MagicMock() + catalog.get_stream.return_value = MagicMock() + + write_schema(mock_stream, client, ["invoice_payments"], catalog) + + mock_stream.write_schema.assert_called_once() + self.assertEqual(len(mock_stream.child_to_sync), 1) + + def test_write_schema_child_selected(self): + mock_stream = MagicMock() + mock_stream.is_selected.return_value = False + mock_stream.children = ["invoice_payments", "invoice_line_items"] + mock_stream.child_to_sync = [] + + client = MagicMock() + catalog = MagicMock() + catalog.get_stream.return_value = MagicMock() + + write_schema(mock_stream, client, ["invoice_payments", "invoice_line_items"], catalog) + + self.assertEqual(mock_stream.write_schema.call_count, 0) + self.assertEqual(len(mock_stream.child_to_sync), 2) + + @patch("singer.write_schema") + @patch("singer.get_currently_syncing") + @patch("singer.Transformer") + @patch("singer.write_state") + @patch("tap_quickbase.streams.abstracts.IncrementalStream.sync") + def test_sync_stream1_called(self, mock_sync, mock_write_state, mock_transformer, mock_get_currently_syncing, mock_write_schema): + mock_catalog = MagicMock() + invoice_stream = MagicMock() + invoice_stream.stream = "invoices" + expense_stream = MagicMock() + expense_stream.stream = "expenses" + mock_catalog.get_selected_streams.return_value = [ + invoice_stream, + expense_stream + ] + state = {} + + client = MagicMock() + config = {} + + sync(client, config, mock_catalog, state) + + self.assertEqual(mock_sync.call_count, 2) + + @patch("singer.write_schema") + @patch("singer.get_currently_syncing") + @patch("singer.Transformer") + @patch("singer.write_state") + @patch("tap_quickbase.streams.abstracts.IncrementalStream.sync") + def test_sync_child_selected(self, mock_sync, mock_write_state, mock_transformer, mock_get_currently_syncing, mock_write_schema): + mock_catalog = MagicMock() + invoice_messages_stream = MagicMock() + invoice_messages_stream.stream = "invoice_messages" + invoice_payments_stream = MagicMock() + invoice_payments_stream.stream = "invoice_payments" + mock_catalog.get_selected_streams.return_value = [ + invoice_messages_stream, + invoice_payments_stream + ] + state = {} + + client = MagicMock() + config = {} + + sync(client, config, mock_catalog, state) + + self.assertEqual(mock_sync.call_count, 1) + + @patch("singer.get_currently_syncing") + @patch("singer.set_currently_syncing") + @patch("singer.write_state") + def test_remove_currently_syncing(self, mock_write_state, mock_set_currently_syncing, mock_get_currently_syncing): + mock_get_currently_syncing.return_value = "some_stream" + state = {"currently_syncing": "some_stream"} + + update_currently_syncing(state, None) + + mock_get_currently_syncing.assert_called_once_with(state) + mock_set_currently_syncing.assert_not_called() + mock_write_state.assert_called_once_with(state) + self.assertNotIn("currently_syncing", state) + + @patch("singer.get_currently_syncing") + @patch("singer.set_currently_syncing") + @patch("singer.write_state") + def test_set_currently_syncing(self, mock_write_state, mock_set_currently_syncing, mock_get_currently_syncing): + mock_get_currently_syncing.return_value = None + state = {} + + update_currently_syncing(state, "new_stream") + + mock_get_currently_syncing.assert_not_called() + mock_set_currently_syncing.assert_called_once_with(state, "new_stream") + mock_write_state.assert_called_once_with(state) + self.assertNotIn("currently_syncing", state)