diff --git a/.circleci/config.yml b/.circleci/config.yml index 8e7f753..c2eb94f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -23,7 +23,7 @@ jobs: name: 'Unit Tests' command: | source /usr/local/share/virtualenvs/tap-sftp/bin/activate - pip install nose coverage + pip install nose coverage parameterized nosetests --with-coverage --cover-erase --cover-package=tap_sftp --cover-html-dir=htmlcov tests/unittests coverage html - store_test_results: diff --git a/tap_sftp/client.py b/tap_sftp/client.py index 0f63315..b194552 100644 --- a/tap_sftp/client.py +++ b/tap_sftp/client.py @@ -145,7 +145,10 @@ def get_files_by_prefix(self, prefix): return files - def get_files(self, prefix, search_pattern, modified_since=None): + def get_files(self, table_spec, modified_since=None): + prefix = table_spec['search_prefix'] + search_pattern = table_spec['search_pattern'] + files = self.get_files_by_prefix(prefix) if files: LOGGER.info('Found %s files in "%s"', len(files), prefix) diff --git a/tap_sftp/discover.py b/tap_sftp/discover.py index bb8b599..b38d381 100644 --- a/tap_sftp/discover.py +++ b/tap_sftp/discover.py @@ -1,19 +1,188 @@ +import gzip +import itertools import json import socket +import zipfile import backoff import singer - -from singer_encodings import json_schema +import csv as python_csv +from singer_encodings import json_schema, csv, compression, jsonl, schema as se_schema from singer import metadata from tap_sftp import client -LOGGER= singer.get_logger() +LOGGER = singer.get_logger() + +def compression_infer_local(iterable, file_name): + """Uses the incoming file_name and checks the end of the string for supported compression types""" + if not file_name: + raise Exception("Need file name") + + if file_name.endswith('.tar.gz'): + raise NotImplementedError("tar.gz not supported") + elif file_name.endswith('.gz'): + yield gzip.GzipFile(fileobj=iterable) + elif file_name.endswith('.zip'): + with zipfile.ZipFile(iterable) as zip: + for name in zip.namelist(): + yield zip.open(name) + else: + yield iterable + +def get_row_iterators_local(iterable, options={}, infer_compression=False, headers_in_catalog=None, with_duplicate_headers=False): + """ + Accepts an iterable, options, compression flag, catalog headers and flag for duplicate headers + to infer compression and yields csv.DictReader objects can be used to yield CSV rows. + """ + if infer_compression: + compressed_iterables = compression_infer_local(iterable, options.get('file_name')) + + for item in compressed_iterables: + + # Try to parse as JSONL + try: + # Duplicate 'item' iterator as 'get_JSONL_iterators' will use 1st row of 'item' to load as JSONL + # Thus, the 'item' will have records after the 1st row on encountering the 'JSONDecodeError' error + # As a result 'csv.get_row_iterator' will sync records after the 1st row + item, item_for_JSONL = itertools.tee(item) + yield ('jsonl', jsonl.get_JSONL_iterators(item_for_JSONL, options)) + continue + except json.JSONDecodeError: + pass + + # Maximize the CSV field width + csv.maximize_csv_field_width() + + # Finally parse as CSV + yield ('csv', csv.get_row_iterator(item, options=options)) + + +# pylint: disable=too-many-arguments +def sample_files_local(conn, table_spec, files, sample_rate=1, max_records=1000, max_files=5): + """Function to sample matched files as per the sampling rate and the max records to sample""" + LOGGER.info("Sampling files (max files: %s)", max_files) + to_return = [] + empty_samples = [] + + files_so_far = 0 + + sorted_files = sorted( + files, key=lambda f: f['last_modified'], reverse=True) + + for f in sorted_files: + empty_file, samples = sample_file_local(conn, table_spec, f, sample_rate, max_records) + + if empty_file: + empty_samples += samples + else: + to_return += samples + + files_so_far += 1 + + if files_so_far >= max_files: + break + + if len(to_return) == 0: + return empty_samples + + return to_return + + +def sample_file_local(conn, table_spec, f, sample_rate, max_records): + """Function to sample a file and return list of records for that file""" + + LOGGER.info('Sampling %s (max records: %s, sample rate: %s)', + f['filepath'], + max_records, + sample_rate) + + samples = [] + file_name = f['filepath'] + + try: + file_handle = conn.get_file_handle(f) + except OSError: + return (False, samples) + + # Add file_name to opts and flag infer_compression to support gzipped files + opts = {'key_properties': table_spec['key_properties'], + 'delimiter': table_spec.get('delimiter', ','), + 'file_name': file_name} + + readers = csv.get_row_iterators(file_handle, options=opts, infer_compression=True) + + for _, reader in readers: + current_row = 0 + for row in reader: + if (current_row % sample_rate) == 0: + if row.get(csv.SDC_EXTRA_COLUMN): + row.pop(csv.SDC_EXTRA_COLUMN) + samples.append(row) + + current_row += 1 + + if len(samples) >= max_records: + break + + LOGGER.info("Sampled %s rows from %s", len(samples), file_name) + # Empty sample to show field selection, if needed + empty_file = False + if len(samples) == 0: + empty_file = True + # If the 'reader' is an instance of 'csv.Dictreader' and has + # fieldnames, prepare samples with 'None' field names + # Assumes all reader objects in readers have the same fieldnames + if isinstance(reader, python_csv.DictReader) and reader.fieldnames is not None: + samples.append({name: None for name in reader.fieldnames}) + + return (empty_file, samples) + +def get_schema_for_table_local(conn, table_spec, sample_rate=1): + """Function to generate schema for the provided data files""" + files = conn.get_files(table_spec) + + if not files: + return {} + + samples = json_schema.sample_files(conn, table_spec, files, sample_rate=sample_rate) + + # Return empty if there is no schema generated + if not any(samples): + return { + 'type': 'object', + 'properties': {}, + } + + schema = se_schema.generate_schema(samples, table_spec) + + data_schema = { + **schema, + **json_schema.get_sdc_columns() + } + + return { + 'type': 'object', + 'properties': data_schema, + } + + +# Override singer_encoding's 'get_row_iterators' as: +# - The Tap is supporting files created without an extension +csv.get_row_iterators = get_row_iterators_local + +# Override singer_encoding's 'sample_file' as: +# - The Tap is looping over the files in the sorted manner of 'last_modified' +# - The Tap is not supporting the skipping of CSV and JSONL files with the wrong extension +json_schema.sample_files = sample_files_local + +# Override singer_encoding's 'sample_file' as: +# - The Tap is not having support for CSV files with duplicate headers +# - The Tap is creating a sample record with 'None' for CSV files with only headers +json_schema.sample_file = sample_file_local def discover_streams(config): streams = [] conn = client.connection(config) - prefix = format(config.get("user_dir", "./")) tables = json.loads(config['tables']) for table_spec in tables: @@ -40,7 +209,7 @@ def discover_streams(config): # generate schema def get_schema(conn, table_spec): LOGGER.info('Sampling records to determine table JSON schema "%s".', table_spec['table_name']) - schema = json_schema.get_schema_for_table(conn, table_spec) + schema = get_schema_for_table_local(conn, table_spec) stream_md = metadata.get_standard_metadata(schema, key_properties=table_spec.get('key_properties'), replication_method='INCREMENTAL') diff --git a/tap_sftp/sync.py b/tap_sftp/sync.py index 69c5740..00d4107 100644 --- a/tap_sftp/sync.py +++ b/tap_sftp/sync.py @@ -27,9 +27,7 @@ def sync_stream(config, state, stream): return 0 table_spec = table_spec[0] - files = conn.get_files(table_spec["search_prefix"], - table_spec["search_pattern"], - modified_since) + files = conn.get_files(table_spec, modified_since) LOGGER.info('Found %s files to be synced.', len(files)) @@ -67,19 +65,44 @@ def sync_file(conn, f, stream, table_spec): readers = csv.get_row_iterators(file_handle, options=opts, infer_compression=True) records_synced = 0 + tap_added_fields = ['_sdc_source_file', '_sdc_source_lineno', 'sdc_extra'] + schema_dict = stream.schema.to_dict() - for reader in readers: + for file_extension, reader in readers: with Transformer() as transformer: + # Row start for files as per the file type + row_start_line = 1 if file_extension == 'jsonl' else 2 for row in reader: + # Skipping the empty line + if len(row) == 0: + continue + custom_columns = { '_sdc_source_file': f["filepath"], - - # index zero, +1 for header row - '_sdc_source_lineno': records_synced + 2 + '_sdc_source_lineno': records_synced + row_start_line } + + # For CSV files, the '_sdc_extra' is handled by 'restkey' in 'csv.DictReader' + # If the file is JSONL then prepare '_sdc_extra' column + if file_extension == 'jsonl': + sdc_extra = [] + + # Get the extra fields ie. (json keys - fields from the catalog - fields added by the tap) + extra_fields = set() + # Create '_sdc_extra' fields if the schema is not empty + if schema_dict.get('properties'): + extra_fields = set(row.keys()) - set(schema_dict.get('properties', {}).keys() - tap_added_fields) + + # Prepare list of extra fields + for extra_field in extra_fields: + sdc_extra.append({extra_field: row.get(extra_field)}) + # If the record contains extra fields, then add the '_sdc_extra' column + if extra_fields: + custom_columns['_sdc_extra'] = sdc_extra + rec = {**row, **custom_columns} - to_write = transformer.transform(rec, stream.schema.to_dict(), metadata.to_map(stream.metadata)) + to_write = transformer.transform(rec, schema_dict, metadata.to_map(stream.metadata)) singer.write_record(stream.tap_stream_id, to_write) records_synced += 1 diff --git a/tests/base.py b/tests/base.py index 3252c33..bc65faa 100644 --- a/tests/base.py +++ b/tests/base.py @@ -32,13 +32,19 @@ def get_test_connection(self): def random_string_generator(self, size=6, chars=string.ascii_uppercase + string.digits): return ''.join(random.choice(chars) for x in range(size)) + def generate_max_size_csv(self): + """Generate field with max size""" + return [[1, 'a'*131074]] + def generate_simple_csv_lines_typeA(self, num_lines): + """Generate CSV data with integer and string dataype""" lines = [] for int_value in range(num_lines): lines.append([int_value, self.random_string_generator(), int_value*5]) return lines def generate_simple_csv_lines_typeB(self, num_lines): + """Generate CSV data with integer, datetime, float and string dataype""" lines = [] start_datetime = datetime(2018, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc) for int_value in range(num_lines): @@ -47,6 +53,7 @@ def generate_simple_csv_lines_typeB(self, num_lines): return lines def generate_simple_csv_lines_typeC(self, num_lines): + """Generate CSV data with integer, datetime, float and string dataype""" lines = [] start_datetime = datetime(2018, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc) for int_value in range(num_lines): @@ -54,6 +61,31 @@ def generate_simple_csv_lines_typeC(self, num_lines): lines.append([int_value, self.random_string_generator(), int_value*5, utils.strftime(start_datetime), int_value + random.random()]) return lines + def generate_simple_jsonl_lines_typeA(self, num_lines): + """Generate JSONL data with integer and string dataype""" + lines = [] + for int_value in range(num_lines): + lines.append({"id": int_value, "string_col": self.random_string_generator(), "integer_col": int_value*5}) + return lines + + def generate_simple_jsonl_lines_typeB(self, num_lines): + """Generate JSONL data with integer, datetime, float and string dataype""" + lines = [] + start_datetime = datetime(2018, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc) + for int_value in range(num_lines): + start_datetime = start_datetime + timedelta(days=5) + lines.append({"id": int_value, "string_col": self.random_string_generator(), "datetime_col": utils.strftime(start_datetime), "number_col": int_value + random.random()}) + return lines + + def generate_simple_jsonl_lines_typeC(self, num_lines): + """Generate JSONL data with integer, datetime, float and string dataype""" + lines = [] + start_datetime = datetime(2018, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc) + for int_value in range(num_lines): + start_datetime = start_datetime + timedelta(days=5) + lines.append({"id": int_value, "string_col": self.random_string_generator(), "integer_col": int_value*5, "datetime_col": utils.strftime(start_datetime), "number_col": int_value + random.random()}) + return lines + def isdir(path, client): try: return S_ISDIR(client.stat(path).st_mode) diff --git a/tests/test_sftp_empty_csv_in_gz.py b/tests/test_sftp_empty_csv_in_gz.py index 5481b15..069a5f3 100644 --- a/tests/test_sftp_empty_csv_in_gz.py +++ b/tests/test_sftp_empty_csv_in_gz.py @@ -61,9 +61,10 @@ def setUp(self): headers = file_group['headers'] directory = file_group['directory'] for filename in file_group['files']: + file_to_gzip = ".".join(filename.split(".")[:-1]) client.chdir(directory) with client.open(filename, 'w') as direct_file: - with gzip.GzipFile(fileobj=direct_file, mode='w') as gzip_file: + with gzip.GzipFile(filename=file_to_gzip, fileobj=direct_file, mode='w') as gzip_file: # write in file if 'num_rows', used to create an empty 'csv.gz' file if file_group.get('num_rows'): with io.TextIOWrapper(gzip_file, encoding='utf-8') as f: diff --git a/tests/test_sftp_gzip.py b/tests/test_sftp_gzip.py index 9abfdee..fcd5f0f 100644 --- a/tests/test_sftp_gzip.py +++ b/tests/test_sftp_gzip.py @@ -69,9 +69,10 @@ def setUp(self): headers = file_group['headers'] directory = file_group['directory'] for filename in file_group['files']: + file_to_gzip = ".".join(filename.split(".")[:-1]) client.chdir(directory) with client.open(filename, 'w') as direct_file: - with gzip.GzipFile(fileobj=direct_file, mode='w') as gzip_file: + with gzip.GzipFile(filename=file_to_gzip, fileobj=direct_file, mode='w') as gzip_file: with io.TextIOWrapper(gzip_file, encoding='utf-8') as f: writer = csv.writer(f) lines = [headers] + file_group['generator'](file_group['num_rows']) diff --git a/tests/test_sftp_jsonl.py b/tests/test_sftp_jsonl.py new file mode 100644 index 0000000..224dff8 --- /dev/null +++ b/tests/test_sftp_jsonl.py @@ -0,0 +1,95 @@ +from base import TestSFTPBase +import os +import json + +class TestSFTPJsonl(TestSFTPBase): + + def name(self): + return "tap_tester_sftp_jsonl" + + def get_files(self): + return [ + { + "directory": "folderA", + "files": ["table_1_fileA.jsonl", "table_3_fileA.jsonl"], + "num_rows": 50, + "generator": self.generate_simple_jsonl_lines_typeA + }, + { + "directory": "folderB", + "files": ["table_2_fileA.jsonl", "table_2_fileB.jsonl", "table_3_fileB.jsonl"], + "num_rows": 50, + "generator": self.generate_simple_jsonl_lines_typeB + }, + { + "directory": "folderC", + "files": ["table_3_fileC.jsonl"], + "num_rows": 50, + "generator": self.generate_simple_jsonl_lines_typeC + }, + ] + + def setUp(self): + if not all([x for x in [os.getenv('TAP_SFTP_USERNAME'), + os.getenv('TAP_SFTP_PASSWORD'), + os.getenv('TAP_SFTP_ROOT_DIR')]]): + #pylint: disable=line-too-long + raise Exception("set TAP_SFTP_USERNAME, TAP_SFTP_PASSWORD, TAP_SFTP_ROOT_DIR") + + root_dir = os.getenv('TAP_SFTP_ROOT_DIR') + + with self.get_test_connection() as client: + # drop all csv files in root dir + client.chdir(root_dir) + try: + TestSFTPJsonl.rm('tap_tester', client) + except FileNotFoundError: + pass + client.mkdir('tap_tester') + client.chdir('tap_tester') + + # Add subdirectories + file_info = self.get_files() + for entry in file_info: + client.mkdir(entry['directory']) + + for file_group in file_info: + directory = file_group['directory'] + for filename in file_group['files']: + client.chdir(directory) + with client.open(filename, 'w') as f: + for record in file_group['generator'](file_group['num_rows']): + f.write(json.dumps(record) + "\n") + client.chdir('..') + + def get_properties(self): + props = self.get_common_properties() + props['tables'] = json.dumps([ + { + "table_name": "table_1", + "delimiter": ",", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "search_pattern": "table_1.*jsonl", + "key_properties": ['id'] + }, + { + "table_name": "table_2", + "delimiter": ",", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "search_pattern": "table_2.*jsonl", + "key_properties": ['id'], + "date_overrides": ["datetime_col"] + }, + { + "table_name": "table_3", + "delimiter": ",", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "search_pattern": "table_3.*jsonl", + "key_properties": ['id'], + "date_overrides": ["datetime_col"] + } + ]) + return props + + def test_run(self): + self.run_test() diff --git a/tests/test_sftp_jsonl_data.py b/tests/test_sftp_jsonl_data.py new file mode 100644 index 0000000..2a536ec --- /dev/null +++ b/tests/test_sftp_jsonl_data.py @@ -0,0 +1,213 @@ +from datetime import datetime, timezone +from decimal import Decimal +from base import TestSFTPBase +from tap_tester import connections, menagerie, runner +import os +import json +from singer import utils + +class TestSFTPJsonlData(TestSFTPBase): + + def name(self): + return "tap_tester_sftp_jsonl_data" + + def generate_jsonl_data(self): + start_datetime = datetime(2018, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc) + return [{"int": 1, "string": "string_data", "float": 1.2, "date": utils.strftime(start_datetime)}] + + def generate_jsonl_dict_data(self): + start_datetime_1 = datetime(2018, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc) + start_datetime_2 = datetime(2019, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc) + return [{ + "int": 1, + "dict_int": {1: 1, 2: 2}, + "dict_float": {1.2: 2.0}, + "dict_string": {"key1": "value1", "key2": "value2"}, + "dict_dict": {"key": {"name": "john"}}, + "dict_list": {"ids": [1, 2, 3]}, + "dict_datetime": {"date_1": utils.strftime(start_datetime_1), "date_2": utils.strftime(start_datetime_2)} + }] + + def generate_jsonl_list_data(self): + start_datetime_1 = datetime(2018, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc) + start_datetime_2 = datetime(2019, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc) + return [{ + "int": 1, + "list_int": [1, 2, 3], + "list_float": [1.2, 2.3, 3.4], + "list_string": ["data", "of", "string"], + "list_dict": [{"id": 1}, {"id": 2}], + "list_list": [[1, 2 , 3], [4, 5, 6]], + "list_datetime": [utils.strftime(start_datetime_1), utils.strftime(start_datetime_2)] + }] + + def get_files(self): + return [ + { + "directory": "mytable", + "files": ["file1.jsonl"], + "generator": self.generate_jsonl_data + }, + { + "directory": "mytable_list", + "files": ["file2.jsonl"], + "generator": self.generate_jsonl_list_data + }, + { + "directory": "mytable_dict", + "files": ["file3.jsonl"], + "generator": self.generate_jsonl_dict_data + } + ] + + def setUp(self): + if not all([x for x in [os.getenv('TAP_SFTP_USERNAME'), + os.getenv('TAP_SFTP_PASSWORD'), + os.getenv('TAP_SFTP_ROOT_DIR')]]): + #pylint: disable=line-too-long + raise Exception("set TAP_SFTP_USERNAME, TAP_SFTP_PASSWORD, TAP_SFTP_ROOT_DIR") + + root_dir = os.getenv('TAP_SFTP_ROOT_DIR') + + with self.get_test_connection() as client: + # drop all csv files in root dir + client.chdir(root_dir) + try: + TestSFTPJsonlData.rm('tap_tester', client) + except FileNotFoundError: + pass + client.mkdir('tap_tester') + client.chdir('tap_tester') + + # Add subdirectories + file_info = self.get_files() + for entry in file_info: + client.mkdir(entry['directory']) + + for file_group in file_info: + directory = file_group['directory'] + for filename in file_group['files']: + client.chdir(directory) + with client.open(filename, 'w') as f: + for record in file_group['generator'](): + f.write(json.dumps(record) + "\n") + client.chdir('..') + + def get_properties(self): + props = self.get_common_properties() + props['tables'] = json.dumps([ + { + "table_name": "mytable", + "delimiter": ",", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "search_pattern": "file1.*jsonl", + "key_properties": [] + }, + { + "table_name": "mytable_list", + "delimiter": ",", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "search_pattern": "file2.*jsonl", + "key_properties": [] + }, + { + "table_name": "mytable_dict", + "delimiter": ",", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "search_pattern": "file3.*jsonl", + "key_properties": [] + } + ]) + return props + + def expected_check_streams(self): + return {"mytable", "mytable_dict", "mytable_list"} + + def expected_pks(self): + return { + "mytable": set(), + "mytable_dict": set(), + "mytable_list": set() + } + + def expected_data(self): + return { + "mytable": { + "int": 1, + "string": "string_data", + "float": Decimal("1.2"), + "date": "2018-01-01T19:29:14.578000Z", + "_sdc_source_lineno": 1 + }, + "mytable_list": { + "int": 1, + "list_int": [1, 2, 3], + "list_float": [Decimal("1.2"), Decimal("2.3"), Decimal("3.4")], + "list_string": ["data", "of", "string"], + "list_dict": [{"id": 1}, {"id": 2}], + "list_list": ["[1, 2, 3]", "[4, 5, 6]"], + "list_datetime": ["2018-01-01T19:29:14.578000Z", "2019-01-01T19:29:14.578000Z"], + "_sdc_source_lineno": 1 + }, + "mytable_dict": { + "int": 1, + "dict_int": {"1": 1, "2": 2}, + "dict_float": {"1.2": Decimal(2.0)}, + "dict_string": {"key1": "value1", "key2": "value2"}, + "dict_dict": {"key": {"name": "john"}}, + "dict_list": {"ids": [1, 2, 3]}, + "dict_datetime": {"date_1": "2018-01-01T19:29:14.578000Z", "date_2": "2019-01-01T19:29:14.578000Z"}, + "_sdc_source_lineno": 1 + } + } + + def test_run(self): + conn_id = connections.ensure_connection(self) + + # run in discovery mode + check_job_name = runner.run_check_mode(self, conn_id) + + # verify check exit codes + exit_status = menagerie.get_exit_status(conn_id, check_job_name) + menagerie.verify_check_exit_status(self, exit_status, check_job_name) + + # verify the tap discovered the right streams + catalog = menagerie.get_catalogs(conn_id) + found_catalog_names = set(map(lambda c: c['tap_stream_id'], catalog)) + + # assert we find the correct streams + self.assertEqual(self.expected_check_streams(), found_catalog_names) + + for tap_stream_id in self.expected_check_streams(): + found_stream = [c for c in catalog if c['tap_stream_id'] == tap_stream_id][0] + schema_and_metadata = menagerie.get_annotated_schema(conn_id, found_stream['stream_id']) + main_metadata = schema_and_metadata["metadata"] + stream_metadata = [mdata for mdata in main_metadata if mdata["breadcrumb"] == []] + + # assert that the pks are correct + self.assertEqual(self.expected_pks()[tap_stream_id], + set(stream_metadata[0]['metadata']['table-key-properties'])) + + for stream_catalog in catalog: + annotated_schema = menagerie.get_annotated_schema(conn_id, stream_catalog['stream_id']) + connections.select_catalog_and_fields_via_metadata(conn_id, + stream_catalog, + annotated_schema['annotated-schema'], + []) + + # Run sync + sync_job_name = runner.run_sync_mode(self, conn_id) + + exit_status = menagerie.get_exit_status(conn_id, sync_job_name) + menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) + + # verify the persisted schema was correct + messages_by_stream = runner.get_records_from_target_output() + + for stream in self.expected_check_streams(): + records = [record.get("data") for record in messages_by_stream.get(stream, {}).get("messages", []) + if record.get("action") == "upsert"] + self.assertEqual(len(records), 1) + for record in records: + del record["_sdc_source_file"] + self.assertEqual([self.expected_data().get(stream)], records) diff --git a/tests/test_sftp_jsonl_gzip.py b/tests/test_sftp_jsonl_gzip.py new file mode 100644 index 0000000..4604cb9 --- /dev/null +++ b/tests/test_sftp_jsonl_gzip.py @@ -0,0 +1,104 @@ +from base import TestSFTPBase +import os +import json +import gzip +import io + +class TestSFTPGzip(TestSFTPBase): + + def name(self): + return "tap_tester_sftp_gzip" + + def get_files(self): + return [ + { + "headers": ['id', 'string_col', 'integer_col'], + "directory": "folderA", + "files": ["table_1_fileA.jsonl.gz", "table_3_fileA.jsonl.gz"], + "num_rows": 50, + "generator": self.generate_simple_jsonl_lines_typeA + }, + { + "headers": ['id', 'string_col', 'datetime_col', 'number_col'], + "directory": "folderB", + "files": ["table_2_fileA.jsonl.gz", "table_2_fileB.jsonl.gz", "table_3_fileB.jsonl.gz"], + "num_rows": 50, + "generator": self.generate_simple_jsonl_lines_typeB + }, + { + "headers": ['id', 'string_col', 'integer_col', 'datetime_col', 'number_col'], + "directory": "folderC", + "files": ["table_3_fileC.jsonl.gz"], + "num_rows": 50, + "generator": self.generate_simple_jsonl_lines_typeC + }, + ] + + def setUp(self): + if not all([x for x in [os.getenv('TAP_SFTP_USERNAME'), + os.getenv('TAP_SFTP_PASSWORD'), + os.getenv('TAP_SFTP_ROOT_DIR')]]): + #pylint: disable=line-too-long + raise Exception("set TAP_SFTP_USERNAME, TAP_SFTP_PASSWORD, TAP_SFTP_ROOT_DIR") + + root_dir = os.getenv('TAP_SFTP_ROOT_DIR') + + with self.get_test_connection() as client: + # drop all csv files in root dir + client.chdir(root_dir) + try: + TestSFTPGzip.rm('tap_tester', client) + except FileNotFoundError: + pass + client.mkdir('tap_tester') + client.chdir('tap_tester') + + # Add subdirectories + file_info = self.get_files() + for entry in file_info: + client.mkdir(entry['directory']) + + # Add csv files + for file_group in file_info: + directory = file_group['directory'] + for filename in file_group['files']: + file_to_gzip = ".".join(filename.split(".")[:-1]) + client.chdir(directory) + with client.open(filename, 'w') as direct_file: + with gzip.GzipFile(filename=file_to_gzip, fileobj=direct_file, mode='w') as gzip_file: + with io.TextIOWrapper(gzip_file, encoding='utf-8') as f: + for record in file_group['generator'](file_group['num_rows']): + f.write(json.dumps(record) + "\n") + client.chdir('..') + + def get_properties(self): + props = self.get_common_properties() + props['tables'] = json.dumps([ + { + "table_name": "table_1", + "delimiter": ",", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "search_pattern": "table_1.*jsonl", + "key_properties": ['id'] + }, + { + "table_name": "table_2", + "delimiter": ",", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "search_pattern": "table_2.*jsonl", + "key_properties": ['id'], + "date_overrides": ["datetime_col"] + }, + { + "table_name": "table_3", + "delimiter": ",", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "search_pattern": "table_3.*jsonl", + "key_properties": ['id'], + "date_overrides": ["datetime_col"] + } + ]) + return props + + def test_run(self): + self.run_test() \ No newline at end of file diff --git a/tests/test_sftp_jsonl_zip.py b/tests/test_sftp_jsonl_zip.py new file mode 100644 index 0000000..d5cdee5 --- /dev/null +++ b/tests/test_sftp_jsonl_zip.py @@ -0,0 +1,114 @@ +from base import TestSFTPBase +import os +import json +import zipfile + +class TestSFTPZipJsonl(TestSFTPBase): + + def name(self): + return "tap_tester_sftp_jsonl_zip" + + def get_files(self): + return [ + { + "headers": ['id', 'string_col', 'integer_col'], + "directory": "folderA", + "files": ["table_1_fileA.jsonl", "table_3_fileA.jsonl"], + "archive": 'table_1.zip', + "num_rows": 50, + "generator": self.generate_simple_jsonl_lines_typeA + }, + { + "headers": ['id', 'string_col', 'datetime_col', 'number_col'], + "directory": "folderB", + "files": ["table_2_fileA.jsonl", "table_2_fileB.jsonl", "table_3_fileB.jsonl"], + "archive": 'table_2.zip', + "num_rows": 50, + "generator": self.generate_simple_jsonl_lines_typeB + }, + { + "headers": ['id', 'string_col', 'integer_col'], + "directory": "folderC", + "files": ["table_3_fileC.jsonl"], + "archive": 'table_3.zip', + "num_rows": 50, + "generator": self.generate_simple_jsonl_lines_typeA + }, + ] + + def expected_first_sync_row_counts(self): + return { + 'table_1': 100, + 'table_2': 150, + 'table_3': 50 + } + + def setUp(self): + if not all([x for x in [os.getenv('TAP_SFTP_USERNAME'), + os.getenv('TAP_SFTP_PASSWORD'), + os.getenv('TAP_SFTP_ROOT_DIR')]]): + #pylint: disable=line-too-long + raise Exception("set TAP_SFTP_USERNAME, TAP_SFTP_PASSWORD, TAP_SFTP_ROOT_DIR") + + root_dir = os.getenv('TAP_SFTP_ROOT_DIR') + + with self.get_test_connection() as client: + # drop all csv files in root dir + client.chdir(root_dir) + try: + TestSFTPZipJsonl.rm('tap_tester', client) + except FileNotFoundError: + pass + client.mkdir('tap_tester') + client.chdir('tap_tester') + + # Add subdirectories + file_info = self.get_files() + for entry in file_info: + client.mkdir(entry['directory']) + + # Add csv files + for file_group in file_info: + directory = file_group['directory'] + client.chdir(directory) + with client.open(file_group['archive'], 'w') as direct_file: + with zipfile.ZipFile(direct_file, mode='w') as zip_file: + lines = file_group['generator'](file_group['num_rows']) + total = '' + for line in lines: + total += json.dumps(line) + '\n' + for file_name in file_group['files']: + zip_file.writestr(file_name, total) + client.chdir('..') + + def get_properties(self): + props = self.get_common_properties() + props['tables'] = json.dumps([ + { + "table_name": "table_1", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "delimiter": ",", + "search_pattern": "table_1\.zip", + "key_properties": ['id'] + }, + { + "table_name": "table_2", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "delimiter": ",", + "search_pattern": "table_2\.zip", + "key_properties": ['id'], + "date_overrides": ["datetime_col"] + }, + { + "table_name": "table_3", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "delimiter": ",", + "search_pattern": "table_3\.zip", + "key_properties": ['id'], + "date_overrides": ["datetime_col"] + } + ]) + return props + + def test_run(self): + self.run_test() diff --git a/tests/test_sftp_max_size.py b/tests/test_sftp_max_size.py new file mode 100644 index 0000000..63876b6 --- /dev/null +++ b/tests/test_sftp_max_size.py @@ -0,0 +1,99 @@ +from base import TestSFTPBase +from tap_tester import connections, runner +import os +import csv +import json + +class TestSFTPDiscovery(TestSFTPBase): + + def name(self): + return "tap_tester_sftp_maximize_csv_field_width" + + def get_files(self): + return [ + { + "headers": ['id', 'name'], + "directory": "max_csv", + "files": ["max_csv_file.csv"], + "num_rows": 1, + "generator": self.generate_max_size_csv + } + ] + + def setUp(self): + if not all([x for x in [os.getenv('TAP_SFTP_USERNAME'), + os.getenv('TAP_SFTP_PASSWORD'), + os.getenv('TAP_SFTP_ROOT_DIR')]]): + #pylint: disable=line-too-long + raise Exception("set TAP_SFTP_USERNAME, TAP_SFTP_PASSWORD, TAP_SFTP_ROOT_DIR") + + root_dir = os.getenv('TAP_SFTP_ROOT_DIR') + + with self.get_test_connection() as client: + # drop all csv files in root dir + client.chdir(root_dir) + try: + TestSFTPDiscovery.rm('tap_tester', client) + except FileNotFoundError: + pass + client.mkdir('tap_tester') + + # Add subdirectories + client.mkdir('tap_tester/max_csv') + + # Add csv files + client.chdir('tap_tester') + + for file_group in self.get_files(): + headers = file_group['headers'] + directory = file_group['directory'] + for filename in file_group['files']: + client.chdir(directory) + with client.open(filename, 'w') as f: + writer = csv.writer(f) + lines = [headers] + file_group['generator']() + writer.writerows(lines) + client.chdir('..') + + def get_properties(self): + props = self.get_common_properties() + props['tables'] = json.dumps([ + { + "table_name": "csv_with_max_field_width", + "delimiter": ",", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "search_pattern": "max_csv_file.csv", + "key_properties": ['id'] + } + ]) + return props + + def expected_first_sync_streams(self): + return {'csv_with_max_field_width'} + + def expected_check_streams(self): + return {'csv_with_max_field_width'} + + def expected_pks(self): + return { + 'csv_with_max_field_width': {'id'} + } + + def test_run(self): + conn_id = connections.ensure_connection(self) + + found_catalogs = self.run_and_verify_check_mode(conn_id) + + self.perform_and_verify_table_and_field_selection(conn_id, found_catalogs) + + self.run_and_verify_sync(conn_id) + + expected_records = 1 + record_count = runner.get_upserts_from_target_output() + # Verify record counts + self.assertEqual(expected_records, len(record_count)) + + records = runner.get_records_from_target_output() + actual_records = [record.get('data') for record in records.get('csv_with_max_field_width').get('messages')] + # Verify the record we created of length greater than 'csv.field_size_limit' of '131072' is replicated + self.assertEqual(actual_records, [{'id': 1, 'name': '{}'.format('a'*131074), '_sdc_source_file': os.getenv('TAP_SFTP_ROOT_DIR') + '/tap_tester/max_csv/max_csv_file.csv', '_sdc_source_lineno': 2}]) diff --git a/tests/unittests/test_JSONL.py b/tests/unittests/test_JSONL.py new file mode 100644 index 0000000..4aa87cd --- /dev/null +++ b/tests/unittests/test_JSONL.py @@ -0,0 +1,38 @@ +import unittest +from unittest import mock +from parameterized import parameterized +from tap_sftp import discover +from singer_encodings.jsonl import get_JSONL_iterators + +class JSONLIterator: + def __init__(self, data): + self.data = data + + def decode(self, encoding): + return self.data + +class TestRowIterators(unittest.TestCase): + """Test cases to verify we call 'row_iterator' for JSONL or CSV as per the file extension""" + @parameterized.expand([ + ["csv", ["test.csv", [[b'id,name\n', b'1,test1\n']]], 1], + ["jsonl", ["test.jsonl", [[b'{"id": 1, "name": "test1"}\n']]], 0], + ["zip_csv", ["test.zip", [[b'id,name\n', b'1,test1\n']]], 1], + ["zip_jsonl", ["test.zip", [[b'{"id": 1, "name": "test1"}\n']]], 0], + ["gz_csv", ["test.gz", [[b'id,name\n', b'1,test1\n']]], 1], + ["gz_jsonl", ["test.gz", [[b'{"id": 1, "name": "test1"}\n']]], 0], + ]) + @mock.patch("singer_encodings.jsonl.get_JSONL_iterators", side_effect=get_JSONL_iterators) + @mock.patch("singer_encodings.csv.get_row_iterator") + @mock.patch("tap_sftp.discover.compression_infer_local") + def test_get_row_iterators_local(self, name, test_data, expected_data, mocked_infer, mocked_get_csv_row_iterator, mocked_get_JSONL_iterators): + # Mock file iterable + mocked_infer.return_value = test_data[1] + options = { + "file_name": test_data[0] + } + # Function call + list(discover.get_row_iterators_local(iterable=[], options=options, infer_compression=True)) + # Verify the call count for JSONL or CSV row_iterator + self.assertEqual(mocked_get_csv_row_iterator.call_count, expected_data) + # Verify we try to parse for JSONL first for every file + self.assertEqual(mocked_get_JSONL_iterators.call_count, 1) diff --git a/tests/unittests/test_JSONL_sync_sdc_fields.py b/tests/unittests/test_JSONL_sync_sdc_fields.py new file mode 100644 index 0000000..5c79c14 --- /dev/null +++ b/tests/unittests/test_JSONL_sync_sdc_fields.py @@ -0,0 +1,50 @@ +import unittest +from parameterized import parameterized +from unittest import mock +from tap_sftp import client, sync +import singer + +@mock.patch("tap_sftp.client.SFTPConnection.sftp") +@mock.patch('singer.Transformer.transform') +@mock.patch('singer_encodings.csv.get_row_iterators') +class TestSyncJSONLsdcFields(unittest.TestCase): + @parameterized.expand([ + ["with_sdc_fields", [{"id": 1}], {'id': 1, '_sdc_source_file': '/root_dir/data.jsonl', '_sdc_source_lineno': 1}], + ["with_sdc_extra", [{"id": 1, "name": "abc"}], {'id': 1, 'name': 'abc', '_sdc_source_file': '/root_dir/data.jsonl', '_sdc_source_lineno': 1, '_sdc_extra': [{"name": "abc"}]}] + ]) + def test_sync_JSONL(self, mocked_get_row_iterators, mocked_transform, mocked_sftp, name, test_data, expected_data): + """Test cases to verify we prepare '_sdc_extra' fields for JSONL files""" + mocked_get_row_iterators.return_value = [('jsonl', test_data)] + conn = client.SFTPConnection("10.0.0.1", "username", port="22") + table_spec = { + "key_properties": [], + "delimiter": None, + "table_name": "test", + "search_prefix": None, + "search_pattern": "data.jsonl" + } + stream = singer.CatalogEntry(tap_stream_id="test", \ + schema=singer.Schema(properties={"id": singer.Schema(type=["null", "integer"])}), metadata=[]) + f = {"filepath": "/root_dir/data.jsonl", "last_modified": "2022-01-01"} + sync.sync_file(conn=conn, f=f, stream=stream, table_spec=table_spec) + args = mocked_transform.call_args.args + records = args[0] + self.assertEqual(records, expected_data) + + def test_sync_JSONL_empty_schema_with_records(self, mocked_get_row_iterators, mocked_transform, mocked_sftp): + """Test case to verify we are not creating sdc extra field if the schema is empty {} for JSONL files""" + mocked_get_row_iterators.return_value = [('jsonl', [{"id": 1}])] + conn = client.SFTPConnection("10.0.0.1", "username", port="22") + table_spec = { + "key_properties": [], + "delimiter": None, + "table_name": "test", + "search_prefix": None, + "search_pattern": "data.jsonl" + } + stream = singer.CatalogEntry(tap_stream_id="test", schema=singer.Schema(properties={}), metadata=[]) + f = {"filepath": "/root_dir/data.jsonl", "last_modified": "2022-01-01"} + sync.sync_file(conn=conn, f=f, stream=stream, table_spec=table_spec) + args = mocked_transform.call_args.args + records = args[0] + self.assertEqual(records, {'id': 1, '_sdc_source_file': '/root_dir/data.jsonl', '_sdc_source_lineno': 1}) diff --git a/tests/unittests/test_permission_error.py b/tests/unittests/test_permission_error.py index de52acf..67b7e17 100644 --- a/tests/unittests/test_permission_error.py +++ b/tests/unittests/test_permission_error.py @@ -3,6 +3,7 @@ import tap_sftp.client as client import tap_sftp.sync as sync import paramiko +import singer @mock.patch("tap_sftp.client.SFTPConnection.sftp") @mock.patch("tap_sftp.client.LOGGER.warn") @@ -55,7 +56,7 @@ def test_no_error_during_sync(self, mocked_get_row_iterators, mocked_stats, mock conn = client.SFTPConnection("10.0.0.1", "username", port="22") - rows_synced = sync.sync_file(conn, {"filepath": "/root_dir/file.csv.gz", "last_modified": "2020-01-01"}, None, {"key_properties": ["id"], "delimiter": ","}) + rows_synced = sync.sync_file(conn, {"filepath": "/root_dir/file.csv.gz", "last_modified": "2020-01-01"}, singer.CatalogEntry(tap_stream_id="test", schema=singer.Schema(), metadata=[]), {"key_properties": ["id"], "delimiter": ","}) # check if "csv.get_row_iterators" is called if it is called then error has not occurred # if it is not called then error has occured and function returned from the except block self.assertEquals(1, mocked_get_row_iterators.call_count) @@ -68,7 +69,7 @@ def test_permisison_error_during_sync(self, mocked_get_row_iterators, mocked_log conn = client.SFTPConnection("10.0.0.1", "username", port="22") - rows_synced = sync.sync_file(conn, {"filepath": "/root_dir/file.csv.gz", "last_modified": "2020-01-01"}, None, {"key_properties": ["id"], "delimiter": ","}) + rows_synced = sync.sync_file(conn, {"filepath": "/root_dir/file.csv.gz", "last_modified": "2020-01-01"}, singer.CatalogEntry(tap_stream_id="test", schema=singer.Schema(), metadata=[]), {"key_properties": ["id"], "delimiter": ","}) # check if "csv.get_row_iterators" is called if it is called then error has not occurred # if it is not called then error has occured and function returned from the except block self.assertEquals(0, mocked_get_row_iterators.call_count) @@ -82,7 +83,7 @@ def test_oserror_during_sync(self, mocked_get_row_iterators, mocked_logger, mock conn = client.SFTPConnection("10.0.0.1", "username", port="22") - rows_synced = sync.sync_file(conn, {"filepath": "/root_dir/file.csv.gz", "last_modified": "2020-01-01"}, None, {"key_properties": ["id"], "delimiter": ","}) + rows_synced = sync.sync_file(conn, {"filepath": "/root_dir/file.csv.gz", "last_modified": "2020-01-01"}, singer.CatalogEntry(tap_stream_id="test", schema=singer.Schema(), metadata=[]), {"key_properties": ["id"], "delimiter": ","}) # check if "csv.get_row_iterators" is called if it is called then error has not occurred # if it is not called then error has occured and function returned from the except block self.assertEquals(0, mocked_get_row_iterators.call_count) diff --git a/tests/unittests/test_sorted_files.py b/tests/unittests/test_sorted_files.py index ff2f167..f714b2d 100644 --- a/tests/unittests/test_sorted_files.py +++ b/tests/unittests/test_sorted_files.py @@ -40,8 +40,11 @@ def test_sorted_files(self, mocked_all_files): }] mocked_all_files.return_value = files_list - - files = conn.get_files("/root", "file[0-9].csv") + table_spec = { + "search_prefix": "/root", + "search_pattern": "file[0-9].csv" + } + files = conn.get_files(table_spec) # expected files in increasing order of "last_modified" expected_files_list = ["/root/file1.csv", "/root/file3.csv", "/root/file2.csv", "/root/file4.csv"] @@ -74,7 +77,11 @@ def test_sorted_files_negative(self, mocked_all_files): # setting "modified_since" to now modified_since = singer.utils.strptime_to_utc(datetime.utcnow().replace(tzinfo=pytz.UTC).isoformat()) - files = conn.get_files("/root", "file[0-9].csv", modified_since) + table_spec = { + "search_prefix": "/root", + "search_pattern": "file[0-9].csv" + } + files = conn.get_files(table_spec, modified_since) # as all the modified date is lesser than "modified_since" thus, no files will be returned expected_files_list = [] diff --git a/tests/unittests/test_timeout.py b/tests/unittests/test_timeout.py index a1e5cda..31802ef 100644 --- a/tests/unittests/test_timeout.py +++ b/tests/unittests/test_timeout.py @@ -140,7 +140,7 @@ class TimeoutBackoff(unittest.TestCase): """ @mock.patch("singer.metadata.get_standard_metadata") - @mock.patch("singer_encodings.json_schema.get_schema_for_table") + @mock.patch("tap_sftp.discover.get_schema_for_table_local") def test_timeout_backoff__get_schema(self, mocked_get_schema_for_table, mocked_get_standard_metadata): """ Test case to verify we backoff and retry for 'get_schema' function