From c3975387f05e45d2ee7f18489d8732f5e819be29 Mon Sep 17 00:00:00 2001 From: harshpatel4crest Date: Thu, 18 Aug 2022 16:58:18 +0530 Subject: [PATCH 01/20] added support for JSONL files --- tap_sftp/discover.py | 81 +++++++++++++++++++++++++++++++++++++++++++- tap_sftp/sync.py | 17 ++++++++++ 2 files changed, 97 insertions(+), 1 deletion(-) diff --git a/tap_sftp/discover.py b/tap_sftp/discover.py index bb8b599..db31dff 100644 --- a/tap_sftp/discover.py +++ b/tap_sftp/discover.py @@ -1,14 +1,93 @@ +import itertools import json import socket import backoff import singer -from singer_encodings import json_schema +from singer_encodings import json_schema, csv, compression from singer import metadata from tap_sftp import client LOGGER= singer.get_logger() +def get_row_iterators_local(iterable, options={}, infer_compression=False): + """Accepts an interable, options and a flag to infer compression and yields + csv.DictReader objects which can be used to yield CSV rows.""" + if infer_compression: + compressed_iterables = compression.infer(iterable, options.get('file_name')) + + for item in compressed_iterables: + file_name_splitted = options.get('file_name').split('.') + extension = file_name_splitted[-1] + # Get the extension of the zipped file + if extension == 'zip': + extension = item.name.split('.')[-1] + # Get the extension of the gzipped file ie. file.csv.gz -> csv + if extension == 'gz': + extension = file_name_splitted[-2] + + # If the extension is 'csv' of 'txt', then use singer_encoding's 'get_row_iterator' + if extension in ['csv', 'txt']: + yield csv.get_row_iterator(item, options=options) + # If the extension is JSONL then use 'get_JSONL_iterators' + elif extension == 'jsonl': + yield get_JSONL_iterators(item, options) + +def get_JSONL_iterators(iterator, options): + # Get JSOL rows + records = get_JSONL_rows(iterator) + check_jsonl_sample_records, records = itertools.tee(records) + + # Veirfy the 'date_overrides' and 'key_properties' as per the config + check_key_properties_and_date_overrides_for_jsonl_file(options, check_jsonl_sample_records) + return records + +def check_key_properties_and_date_overrides_for_jsonl_file(options, jsonl_sample_records): + + all_keys = set() + for record in jsonl_sample_records: + keys = record.keys() + all_keys.update(keys) + + if options.get('key_properties'): + key_properties = set(options['key_properties']) + if not key_properties.issubset(all_keys): + raise Exception('CSV file missing required headers: {}' + .format(key_properties - all_keys)) + + if options.get('date_overrides'): + date_overrides = set(options['date_overrides']) + if not date_overrides.issubset(all_keys): + raise Exception('CSV file missing date_overrides headers: {}' + .format(date_overrides - all_keys)) + +def get_JSONL_rows(iterator): + # Return JSON rows from JSONL file + for row in iterator: + decoded_row = row.decode('utf-8') + if decoded_row.strip(): + row = json.loads(decoded_row) + # Skipping the empty json. + if len(row) == 0: + continue + else: + continue + + yield row + +# Override singer_endoding's 'get_row_iterators' as per the the Tap's JSONL support +csv.get_row_iterators = get_row_iterators_local + +# Override the '_sdc_extra' column value as per the JSONL supported format +json_schema.SDC_EXTRA_VALUE = { + 'type': 'array', 'items': { + 'anyOf': [ + {'type': 'object', 'properties': {}}, + {'type': 'string'} + ] + } +} + def discover_streams(config): streams = [] diff --git a/tap_sftp/sync.py b/tap_sftp/sync.py index 69c5740..2972683 100644 --- a/tap_sftp/sync.py +++ b/tap_sftp/sync.py @@ -67,6 +67,7 @@ def sync_file(conn, f, stream, table_spec): readers = csv.get_row_iterators(file_handle, options=opts, infer_compression=True) records_synced = 0 + tap_added_fields = ['_sdc_source_file', '_sdc_source_lineno', 'sdc_extra'] for reader in readers: with Transformer() as transformer: @@ -77,6 +78,22 @@ def sync_file(conn, f, stream, table_spec): # index zero, +1 for header row '_sdc_source_lineno': records_synced + 2 } + + # For CSV files, the '_sdc_extra' is handled by 'restkey' in 'csv.DictReader' + # If the file is JSONL then prepare '_sdc_extra' column + if f['filepath'].split('.')[-1] == 'jsonl': + sdc_extra = [] + + # Get the extra fields ie. (json keys - fields from catalog - fields added by the tap) + extra_fields = set(row.keys()) - set(stream.schema.to_dict().get('properties').keys() - tap_added_fields) + + # Prepare list of extra fields + for extra_field in extra_fields: + sdc_extra.append({extra_field: row.get(extra_field)}) + # If the record contains extra fields, then add '_sdc_extra' column + if extra_fields: + custom_columns['_sdc_extra'] = sdc_extra + rec = {**row, **custom_columns} to_write = transformer.transform(rec, stream.schema.to_dict(), metadata.to_map(stream.metadata)) From 9bf5594eecb8ce78b82afe69f02be6eefee75b8b Mon Sep 17 00:00:00 2001 From: harshpatel4crest Date: Thu, 18 Aug 2022 17:28:40 +0530 Subject: [PATCH 02/20] updated code --- tap_sftp/discover.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tap_sftp/discover.py b/tap_sftp/discover.py index db31dff..f194997 100644 --- a/tap_sftp/discover.py +++ b/tap_sftp/discover.py @@ -80,7 +80,8 @@ def get_JSONL_rows(iterator): # Override the '_sdc_extra' column value as per the JSONL supported format json_schema.SDC_EXTRA_VALUE = { - 'type': 'array', 'items': { + 'type': 'array', + 'items': { 'anyOf': [ {'type': 'object', 'properties': {}}, {'type': 'string'} From e0728c8b39ec7b9a3663376efdaadeb9b89dcafe Mon Sep 17 00:00:00 2001 From: harshpatel4crest Date: Tue, 23 Aug 2022 17:39:41 +0530 Subject: [PATCH 03/20] added utils for gzip to handle GZIP files and handled empty jsonl files --- tap_sftp/discover.py | 47 ++++++++++++++++++++++++++-------- tap_sftp/gzip_utils.py | 57 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 10 deletions(-) create mode 100644 tap_sftp/gzip_utils.py diff --git a/tap_sftp/discover.py b/tap_sftp/discover.py index f194997..2fc3bf6 100644 --- a/tap_sftp/discover.py +++ b/tap_sftp/discover.py @@ -1,15 +1,37 @@ +import io import itertools import json import socket import backoff import singer +import gzip +import zipfile from singer_encodings import json_schema, csv, compression from singer import metadata -from tap_sftp import client +from tap_sftp import client, gzip_utils LOGGER= singer.get_logger() +def compression_infer_local(iterable, file_name): + """Uses the incoming file_name and checks the end of the string for supported compression types""" + if not file_name: + raise Exception("Need file name") + + if file_name.endswith('.tar.gz'): + raise NotImplementedError("tar.gz not supported") + elif file_name.endswith('.gz'): + file_bytes = iterable.read() + # Send file object and file name + yield [gzip.GzipFile(fileobj=io.BytesIO(file_bytes)), \ + gzip_utils.get_file_name_from_gzfile(fileobj=io.BytesIO(file_bytes))] + elif file_name.endswith('.zip'): + with zipfile.ZipFile(iterable) as zip: + for name in zip.namelist(): + yield zip.open(name) + else: + yield iterable + def get_row_iterators_local(iterable, options={}, infer_compression=False): """Accepts an interable, options and a flag to infer compression and yields csv.DictReader objects which can be used to yield CSV rows.""" @@ -24,14 +46,19 @@ def get_row_iterators_local(iterable, options={}, infer_compression=False): extension = item.name.split('.')[-1] # Get the extension of the gzipped file ie. file.csv.gz -> csv if extension == 'gz': - extension = file_name_splitted[-2] + # Get file name + gzip_file_name = item[1] + # Set iterator 'item' + item = item[0] + # Get file extension + extension = gzip_file_name.split('.')[-1] - # If the extension is 'csv' of 'txt', then use singer_encoding's 'get_row_iterator' - if extension in ['csv', 'txt']: - yield csv.get_row_iterator(item, options=options) # If the extension is JSONL then use 'get_JSONL_iterators' - elif extension == 'jsonl': + if extension == 'jsonl': yield get_JSONL_iterators(item, options) + # Assuming the extension is 'csv' of 'txt', then use singer_encoding's 'get_row_iterator' + else: + yield csv.get_row_iterator(item, options=options) def get_JSONL_iterators(iterator, options): # Get JSOL rows @@ -67,17 +94,17 @@ def get_JSONL_rows(iterator): decoded_row = row.decode('utf-8') if decoded_row.strip(): row = json.loads(decoded_row) - # Skipping the empty json. - if len(row) == 0: - continue else: continue yield row -# Override singer_endoding's 'get_row_iterators' as per the the Tap's JSONL support +# Override singer_encoding's 'get_row_iterators' as per the the Tap's JSONL support csv.get_row_iterators = get_row_iterators_local +# Override singer_encoding's 'infer' as per the the Tap's JSONL support for GZIP files +compression.infer = compression_infer_local + # Override the '_sdc_extra' column value as per the JSONL supported format json_schema.SDC_EXTRA_VALUE = { 'type': 'array', diff --git a/tap_sftp/gzip_utils.py b/tap_sftp/gzip_utils.py new file mode 100644 index 0000000..71c3f46 --- /dev/null +++ b/tap_sftp/gzip_utils.py @@ -0,0 +1,57 @@ +import gzip +import struct + +def get_file_name_from_gzfile(fileobj=None): + """Reading headers of GzipFile and returning filename.""" + + _gz = gzip.GzipFile(fileobj=fileobj) + _fp = _gz.fileobj + + # the magic 2 bytes: if 0x1f 0x8b (037 213 in octal) + magic = _fp.read(2) + if magic == b'': + return None + + if magic != b'\037\213': + raise OSError('Not a gzipped file (%r)' % magic) + + (method, flag, _) = struct.unpack(" + # specifies FNAME is encoded in latin1 + while True: + s = _fp.read(1) + if not s or s == b'\000': + break + _fname.append(s) + return ''.join([s.decode('latin1') for s in _fname]) + + return None + +def _read_exact(fp, n): + """This is the gzip.GzipFile._read_exact() method from the Python library. + """ + data = fp.read(n) + while len(data) < n: + b = fp.read(n - len(data)) + if not b: + raise EOFError("Compressed file ended before the " + "end-of-stream marker was reached") + data += b + return data \ No newline at end of file From 33d0d1e1c8b8ceceedb045c7804ef13029553b1e Mon Sep 17 00:00:00 2001 From: harshpatel4crest Date: Wed, 24 Aug 2022 14:04:28 +0530 Subject: [PATCH 04/20] added unittests --- tap_sftp/discover.py | 6 +- tests/unittests/test_JSONL.py | 51 ++++++++ tests/unittests/test_JSONL_sync_sdc_fields.py | 31 +++++ tests/unittests/test_gzip_utils.py | 115 ++++++++++++++++++ 4 files changed, 200 insertions(+), 3 deletions(-) create mode 100644 tests/unittests/test_JSONL.py create mode 100644 tests/unittests/test_JSONL_sync_sdc_fields.py create mode 100644 tests/unittests/test_gzip_utils.py diff --git a/tap_sftp/discover.py b/tap_sftp/discover.py index 2fc3bf6..6dfc2a5 100644 --- a/tap_sftp/discover.py +++ b/tap_sftp/discover.py @@ -99,13 +99,13 @@ def get_JSONL_rows(iterator): yield row -# Override singer_encoding's 'get_row_iterators' as per the the Tap's JSONL support +# Override singer_encoding's 'get_row_iterators' as per the Tap's JSONL support csv.get_row_iterators = get_row_iterators_local -# Override singer_encoding's 'infer' as per the the Tap's JSONL support for GZIP files +# Override singer_encoding's 'infer' as per the Tap's JSONL support for GZIP files compression.infer = compression_infer_local -# Override the '_sdc_extra' column value as per the JSONL supported format +# Override the '_sdc_extra' column value as per the JSONL-supported format json_schema.SDC_EXTRA_VALUE = { 'type': 'array', 'items': { diff --git a/tests/unittests/test_JSONL.py b/tests/unittests/test_JSONL.py new file mode 100644 index 0000000..2b48ab1 --- /dev/null +++ b/tests/unittests/test_JSONL.py @@ -0,0 +1,51 @@ +import unittest +from parameterized import parameterized +from tap_sftp import discover + +class JSONLIterator: + def __init__(self, data): + self.data = data + + def decode(self, encoding): + return self.data + +class Iterable: + def __init__(self, data): + self.data = data + + def read(self): + return self.data + +class TestCheckJSONLKeyProperties(unittest.TestCase): + """Test cases to verify we raise error if we asr missing Primary Key or Date Overrides value in JSONL data""" + def test_get_JSONL_iterators_positive(self): + options = { + "key_properties": ["id"], + "date_overrides": ["updated_at"] + } + records = [ + JSONLIterator('{"id": 1, "name": "abc", "updated_at": "2022-01-01"}') + ] + discover.get_JSONL_iterators( + options=options, + iterator=records + ) + + @parameterized.expand([ + ["raise_key_properties_error", '{"name": "abc", "updated_at": "2022-01-01"}', "CSV file missing required headers: {\'id\'}"], + ["raise_date_overrides_error", '{"id": 1, "name": "abc"}', "CSV file missing date_overrides headers: {\'updated_at\'}"] + ]) + def test_get_JSONL_iterators(self, name, test_data, expected_data): + options = { + "key_properties": ["id"], + "date_overrides": ["updated_at"] + } + records = [ + JSONLIterator(test_data) + ] + with self.assertRaises(Exception) as e: + discover.get_JSONL_iterators( + options=options, + iterator=records + ) + self.assertEqual(str(e.exception), expected_data) diff --git a/tests/unittests/test_JSONL_sync_sdc_fields.py b/tests/unittests/test_JSONL_sync_sdc_fields.py new file mode 100644 index 0000000..a238858 --- /dev/null +++ b/tests/unittests/test_JSONL_sync_sdc_fields.py @@ -0,0 +1,31 @@ +import unittest +from parameterized import parameterized +from unittest import mock +from tap_sftp import client, sync +import singer + +class TestSyncJSONLsdcFields(unittest.TestCase): + @parameterized.expand([ + ["with_sdc_fields", [{"id": 1}], {'id': 1, '_sdc_source_file': '/root_dir/data.jsonl', '_sdc_source_lineno': 2}], + ["with_sdc_extra", [{"id": 1, "name": "abc"}], {'id': 1, 'name': 'abc', '_sdc_source_file': '/root_dir/data.jsonl', '_sdc_source_lineno': 2, '_sdc_extra': [{"name": "abc"}]}] + ]) + @mock.patch("tap_sftp.client.SFTPConnection.sftp") + @mock.patch('singer.Transformer.transform') + @mock.patch('singer_encodings.csv.get_row_iterators') + def test_sync_JSONL(self, name, test_data, expected_data, mocked_get_row_iterators, mocked_transform, mocked_sftp): + mocked_get_row_iterators.return_value = test_data + conn = client.SFTPConnection("10.0.0.1", "username", port="22") + table_spec = { + "key_properties": [], + "delimiter": None, + "table_name": "test", + "search_prefix": None, + "search_pattern": "data.jsonl" + } + stream = singer.CatalogEntry(tap_stream_id="test", \ + schema=singer.Schema(properties={"id": singer.Schema(type=["null", "integer"])}), metadata=[]) + f = {"filepath": "/root_dir/data.jsonl", "last_modified": "2022-01-01"} + sync.sync_file(conn=conn, f=f, stream=stream, table_spec=table_spec) + args = mocked_transform.call_args.args + records = args[0] + self.assertEqual(records, expected_data) diff --git a/tests/unittests/test_gzip_utils.py b/tests/unittests/test_gzip_utils.py new file mode 100644 index 0000000..29288de --- /dev/null +++ b/tests/unittests/test_gzip_utils.py @@ -0,0 +1,115 @@ +import unittest +from unittest import mock +from parameterized import parameterized +from tap_sftp.gzip_utils import get_file_name_from_gzfile , _read_exact + +class FileHandler: + ''' + Class to handle return values for mocked function calls. + ''' + + def __init__(self, file, fileobj): + self.filename = file + self.fileobj = fileobj + +class TestClass(unittest.TestCase): + ''' + Test class to verify working of functions implemented for gz_file. + ''' + + @mock.patch("gzip.GzipFile") + def test_get_file_name_from_gzfile_returns_none(self, mocked_gzip_file): + ''' + Test case to verify that no file name is returned. + ''' + + mocked_gzip_file.return_value = FileHandler("Test", mocked_gzip_file) + mocked_gzip_file.read.return_value = b'' + returned = get_file_name_from_gzfile(fileobj = None) + + self.assertEqual(returned, None) + + @mock.patch("gzip.GzipFile") + def test_get_file_name_from_gzfile_raises_gzipped_file_error(self, mocked_gzip_file): + ''' + Test case to verify that OSError is raised if incorrect file is passed. + ''' + + with self.assertRaises(OSError) as e: + get_file_name_from_gzfile(fileobj = None) + + self.assertTrue("Not a gzipped file" in str(e.exception)) + + @mock.patch("gzip.GzipFile") + def test_get_file_name_from_gzfile_raises_compression_method_error(self , mocked_gzip_file): + ''' + Test case to verify that OSError is raised if file is compressed by unknown methods. + ''' + + mocked_gzip_file.return_value = FileHandler("Test", mocked_gzip_file) + mocked_gzip_file.read.return_value = b'\037\213' + + with self.assertRaises(OSError) as e: + get_file_name_from_gzfile(fileobj = None) + + self.assertEqual(str(e.exception), "Unknown compression method") + + @parameterized.expand([ + ["with_gzip_extension", "test.txt.gzip", "test.txt"], + ["without_gzip_extension", "test.txt", "test.txt"] + ]) + @mock.patch("gzip.GzipFile") + @mock.patch("tap_sftp.gzip_utils._read_exact") + def test_get_file_name_from_gzfile_no_flag(self,name, + test_value1, test_value2, mocked_read_exact, mocked_gzip_file): + ''' + Test case to verify file name when flag = 0. + ''' + + mocked_gzip_file.return_value = FileHandler("Test", mocked_gzip_file) + mocked_gzip_file.read.return_value = b'\037\213' + mocked_read_exact.return_value = b'\x08\x00\x17\x00\x00\x00\x00\x00' + mocked_gzip_file.name = test_value1 + + gz_file_name = get_file_name_from_gzfile(fileobj = "fileobj") + + self.assertEqual(gz_file_name, test_value2) + + @mock.patch("gzip.GzipFile") + @mock.patch("tap_sftp.gzip_utils._read_exact") + def test_get_file_name_from_gzfile_with_flag(self, mocked_read_exact, mocked_gzip_file): + ''' + Test case to verify file name when flag != 0 . + ''' + + mocked_gzip_file.return_value = FileHandler("Test", mocked_gzip_file) + mocked_gzip_file.read.side_effect = [b'\037\213',b'test_file', b'\000'] + mocked_read_exact.side_effect = [b'\x08\x0c\x17\x00\x00\x00\x00\x00', b'\x01\x00', b'\x01'] + + gz_file_name = get_file_name_from_gzfile(fileobj = "fileobj") + + self.assertEqual(gz_file_name, "test_file") + + @mock.patch("gzip.GzipFile") + def test_read_exact(self, mocked_gzip): + ''' + Test case to verify that data of expected length(in bytes) is received. + ''' + + mocked_gzip.read.side_effect = [b'\x00',b'\x05\x03',b'\x01'] + data = _read_exact(mocked_gzip , 2) + + self.assertEqual(data, b'\x00\x05\x03') + + @mock.patch("gzip.GzipFile") + def test_read_exact_raises_error(self, mocked_gzip): + ''' + Test case to verify EOFError is raised. + ''' + + mocked_gzip.read.side_effect = [b'\x05', b'',b'\x01'] + + with self.assertRaises(EOFError) as e: + _read_exact(mocked_gzip , 2) + + self.assertEqual(str(e.exception), 'Compressed file ended before the end-of-stream marker was reached') From d9ef37e0b2f09a174c1ce85a87987dc23efe9bea Mon Sep 17 00:00:00 2001 From: harshpatel4crest Date: Wed, 24 Aug 2022 14:22:23 +0530 Subject: [PATCH 05/20] added parameterized in config.yml --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 8e7f753..c2eb94f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -23,7 +23,7 @@ jobs: name: 'Unit Tests' command: | source /usr/local/share/virtualenvs/tap-sftp/bin/activate - pip install nose coverage + pip install nose coverage parameterized nosetests --with-coverage --cover-erase --cover-package=tap_sftp --cover-html-dir=htmlcov tests/unittests coverage html - store_test_results: From f7b260d809126c74eb7c6d20dcad5760123cf1f4 Mon Sep 17 00:00:00 2001 From: harshpatel4crest Date: Wed, 24 Aug 2022 14:28:47 +0530 Subject: [PATCH 06/20] updated unittests --- tests/unittests/test_JSONL_sync_sdc_fields.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unittests/test_JSONL_sync_sdc_fields.py b/tests/unittests/test_JSONL_sync_sdc_fields.py index a238858..0381aee 100644 --- a/tests/unittests/test_JSONL_sync_sdc_fields.py +++ b/tests/unittests/test_JSONL_sync_sdc_fields.py @@ -6,8 +6,8 @@ class TestSyncJSONLsdcFields(unittest.TestCase): @parameterized.expand([ - ["with_sdc_fields", [{"id": 1}], {'id': 1, '_sdc_source_file': '/root_dir/data.jsonl', '_sdc_source_lineno': 2}], - ["with_sdc_extra", [{"id": 1, "name": "abc"}], {'id': 1, 'name': 'abc', '_sdc_source_file': '/root_dir/data.jsonl', '_sdc_source_lineno': 2, '_sdc_extra': [{"name": "abc"}]}] + ["with_sdc_fields", [[{"id": 1}]], {'id': 1, '_sdc_source_file': '/root_dir/data.jsonl', '_sdc_source_lineno': 2}], + ["with_sdc_extra", [[{"id": 1, "name": "abc"}]], {'id': 1, 'name': 'abc', '_sdc_source_file': '/root_dir/data.jsonl', '_sdc_source_lineno': 2, '_sdc_extra': [{"name": "abc"}]}] ]) @mock.patch("tap_sftp.client.SFTPConnection.sftp") @mock.patch('singer.Transformer.transform') From e5b22d1526e9a047df0c6ae7077493dbba252d78 Mon Sep 17 00:00:00 2001 From: harshpatel4crest Date: Wed, 24 Aug 2022 15:05:43 +0530 Subject: [PATCH 07/20] added test cases for row iterator function --- tests/unittests/test_JSONL.py | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/tests/unittests/test_JSONL.py b/tests/unittests/test_JSONL.py index 2b48ab1..442f7ab 100644 --- a/tests/unittests/test_JSONL.py +++ b/tests/unittests/test_JSONL.py @@ -1,4 +1,5 @@ import unittest +from unittest import mock from parameterized import parameterized from tap_sftp import discover @@ -10,8 +11,9 @@ def decode(self, encoding): return self.data class Iterable: - def __init__(self, data): + def __init__(self, data, name=None): self.data = data + self.name = name def read(self): return self.data @@ -49,3 +51,28 @@ def test_get_JSONL_iterators(self, name, test_data, expected_data): iterator=records ) self.assertEqual(str(e.exception), expected_data) + +class TestRowIterators(unittest.TestCase): + """Test cases to verify we call 'row_iterator' for JSONL or CSV as per the file extension""" + @parameterized.expand([ + ["csv", ["test.csv", [Iterable("")]], [1, 0]], + ["jsonl", ["test.jsonl", [Iterable('{"id": 1}')]], [0, 1]], + ["zip_csv", ["test.zip", [Iterable("", "test.csv")]], [1, 0]], + ["zip_jsonl", ["test.zip", [Iterable("", "test.jsonl")]], [0, 1]], + ["gz_csv", ["test.gz", [[Iterable(""), "test.csv"]]], [1, 0]], + ["gz_jsonl", ["test.gz", [[Iterable(""), "test.jsonl"]]], [0, 1]], + ]) + @mock.patch("tap_sftp.discover.get_JSONL_iterators") + @mock.patch("singer_encodings.csv.get_row_iterator") + @mock.patch("singer_encodings.compression.infer") + def test_get_row_iterators_local(self, name, test_data, expected_data, mocked_infer, mocked_get_csv_row_iterator, mocked_get_JSONL_iterators): + # Mock file iterable + mocked_infer.return_value = test_data[1] + options = { + "file_name": test_data[0] + } + # Function call + list(discover.get_row_iterators_local(iterable=[], options=options, infer_compression=True)) + # Verify the call count for JSONL or CSV row_iterator + self.assertEqual(mocked_get_csv_row_iterator.call_count, expected_data[0]) + self.assertEqual(mocked_get_JSONL_iterators.call_count, expected_data[1]) From cff80f3d1edf874725b27314bbbedc92d55c4ff6 Mon Sep 17 00:00:00 2001 From: harshpatel4crest Date: Thu, 25 Aug 2022 15:06:21 +0530 Subject: [PATCH 08/20] updated code to skip gz file to no-name flag and handle max csv field width --- tap_sftp/discover.py | 82 +++++++++++++++++++++++++++++++++++++++----- tap_sftp/sync.py | 2 +- 2 files changed, 74 insertions(+), 10 deletions(-) diff --git a/tap_sftp/discover.py b/tap_sftp/discover.py index 6dfc2a5..b2932e8 100644 --- a/tap_sftp/discover.py +++ b/tap_sftp/discover.py @@ -2,12 +2,13 @@ import itertools import json import socket +import sys import backoff import singer import gzip import zipfile - -from singer_encodings import json_schema, csv, compression +import csv as python_csv +from singer_encodings import json_schema, csv from singer import metadata from tap_sftp import client, gzip_utils @@ -22,9 +23,15 @@ def compression_infer_local(iterable, file_name): raise NotImplementedError("tar.gz not supported") elif file_name.endswith('.gz'): file_bytes = iterable.read() + gz_file_name = None + try: + gz_file_name = gzip_utils.get_file_name_from_gzfile(fileobj=io.BytesIO(file_bytes)) + except AttributeError: + # If a file is compressed using gzip command with --no-name attribute, + # It will not return the file name and timestamp. Hence we will skip such files. + LOGGER.warning('Skipping "%s" file as we did not get the original file name.', file_name) # Send file object and file name - yield [gzip.GzipFile(fileobj=io.BytesIO(file_bytes)), \ - gzip_utils.get_file_name_from_gzfile(fileobj=io.BytesIO(file_bytes))] + yield [gzip.GzipFile(fileobj=io.BytesIO(file_bytes)), gz_file_name] elif file_name.endswith('.zip'): with zipfile.ZipFile(iterable) as zip: for name in zip.namelist(): @@ -32,11 +39,62 @@ def compression_infer_local(iterable, file_name): else: yield iterable +def maximize_csv_field_width(): + """Set the max filed size as per the system's maxsize""" + + current_field_size_limit = csv.csv.field_size_limit() + field_size_limit = sys.maxsize + + if current_field_size_limit != field_size_limit: + csv.csv.field_size_limit(field_size_limit) + LOGGER.info("Changed the CSV field size limit from %s to %s", + current_field_size_limit, + field_size_limit) + +def sample_file_local(conn, table_spec, f, sample_rate, max_records): + samples = [] + try: + file_handle = conn.get_file_handle(f) + except OSError: + return (False, samples) + + # Add file_name to opts and flag infer_compression to support gzipped files + opts = {'key_properties': table_spec['key_properties'], + 'delimiter': table_spec['delimiter'], + 'file_name': f['filepath']} + + readers = get_row_iterators_local(file_handle, options=opts, infer_compression=True) + + for reader in readers: + current_row = 0 + for row in reader: + if (current_row % sample_rate) == 0: + if row.get(csv.SDC_EXTRA_COLUMN): + row.pop(csv.SDC_EXTRA_COLUMN) + samples.append(row) + + current_row += 1 + + if len(samples) >= max_records: + break + + # Empty sample to show field selection, if needed + empty_file = False + if len(samples) == 0: + empty_file = True + # If the 'reader' is an instance of 'csv.Dictreader' and has + # fieldnames, prepare samples with None field names + # Assumes all reader objects in readers have the same fieldnames + if isinstance(reader, python_csv.DictReader) and reader.fieldnames is not None: + samples.append({name: None for name in reader.fieldnames}) + + return (empty_file, samples) + def get_row_iterators_local(iterable, options={}, infer_compression=False): """Accepts an interable, options and a flag to infer compression and yields csv.DictReader objects which can be used to yield CSV rows.""" if infer_compression: - compressed_iterables = compression.infer(iterable, options.get('file_name')) + compressed_iterables = compression_infer_local(iterable, options.get('file_name')) for item in compressed_iterables: file_name_splitted = options.get('file_name').split('.') @@ -51,13 +109,19 @@ def get_row_iterators_local(iterable, options={}, infer_compression=False): # Set iterator 'item' item = item[0] # Get file extension - extension = gzip_file_name.split('.')[-1] + extension = gzip_file_name.split('.')[-1] if gzip_file_name else gzip_file_name + # For GZ files, if the file is gzipped with --no-name, then + # the 'extension' will be 'None'. Hence, send an empty list + if not extension: + yield [] # If the extension is JSONL then use 'get_JSONL_iterators' - if extension == 'jsonl': + elif extension == 'jsonl': yield get_JSONL_iterators(item, options) # Assuming the extension is 'csv' of 'txt', then use singer_encoding's 'get_row_iterator' else: + # Maximize the CSV field width + maximize_csv_field_width() yield csv.get_row_iterator(item, options=options) def get_JSONL_iterators(iterator, options): @@ -102,8 +166,8 @@ def get_JSONL_rows(iterator): # Override singer_encoding's 'get_row_iterators' as per the Tap's JSONL support csv.get_row_iterators = get_row_iterators_local -# Override singer_encoding's 'infer' as per the Tap's JSONL support for GZIP files -compression.infer = compression_infer_local +# Override singer_encoding's 'sample_file' as per the Tap's JSONL support +json_schema.sample_file = sample_file_local # Override the '_sdc_extra' column value as per the JSONL-supported format json_schema.SDC_EXTRA_VALUE = { diff --git a/tap_sftp/sync.py b/tap_sftp/sync.py index 2972683..5a40d9f 100644 --- a/tap_sftp/sync.py +++ b/tap_sftp/sync.py @@ -85,7 +85,7 @@ def sync_file(conn, f, stream, table_spec): sdc_extra = [] # Get the extra fields ie. (json keys - fields from catalog - fields added by the tap) - extra_fields = set(row.keys()) - set(stream.schema.to_dict().get('properties').keys() - tap_added_fields) + extra_fields = set(row.keys()) - set(stream.schema.to_dict().get('properties', {}).keys() - tap_added_fields) # Prepare list of extra fields for extra_field in extra_fields: From af8a88ab1e4d45e74390228f01121d8912a47707 Mon Sep 17 00:00:00 2001 From: harshpatel4crest Date: Thu, 25 Aug 2022 17:36:01 +0530 Subject: [PATCH 09/20] added unittest and tap-tester --- tests/base.py | 4 + tests/test_sftp_max_size.py | 99 +++++++++++++++++++ tests/unittests/test_JSONL.py | 4 +- tests/unittests/test_JSONL_sync_sdc_fields.py | 27 ++++- 4 files changed, 129 insertions(+), 5 deletions(-) create mode 100644 tests/test_sftp_max_size.py diff --git a/tests/base.py b/tests/base.py index 3252c33..e09fc54 100644 --- a/tests/base.py +++ b/tests/base.py @@ -32,6 +32,10 @@ def get_test_connection(self): def random_string_generator(self, size=6, chars=string.ascii_uppercase + string.digits): return ''.join(random.choice(chars) for x in range(size)) + def generate_max_size_csv(self): + """Generate field with max size""" + return [[1, 'a'*131074]] + def generate_simple_csv_lines_typeA(self, num_lines): lines = [] for int_value in range(num_lines): diff --git a/tests/test_sftp_max_size.py b/tests/test_sftp_max_size.py new file mode 100644 index 0000000..63876b6 --- /dev/null +++ b/tests/test_sftp_max_size.py @@ -0,0 +1,99 @@ +from base import TestSFTPBase +from tap_tester import connections, runner +import os +import csv +import json + +class TestSFTPDiscovery(TestSFTPBase): + + def name(self): + return "tap_tester_sftp_maximize_csv_field_width" + + def get_files(self): + return [ + { + "headers": ['id', 'name'], + "directory": "max_csv", + "files": ["max_csv_file.csv"], + "num_rows": 1, + "generator": self.generate_max_size_csv + } + ] + + def setUp(self): + if not all([x for x in [os.getenv('TAP_SFTP_USERNAME'), + os.getenv('TAP_SFTP_PASSWORD'), + os.getenv('TAP_SFTP_ROOT_DIR')]]): + #pylint: disable=line-too-long + raise Exception("set TAP_SFTP_USERNAME, TAP_SFTP_PASSWORD, TAP_SFTP_ROOT_DIR") + + root_dir = os.getenv('TAP_SFTP_ROOT_DIR') + + with self.get_test_connection() as client: + # drop all csv files in root dir + client.chdir(root_dir) + try: + TestSFTPDiscovery.rm('tap_tester', client) + except FileNotFoundError: + pass + client.mkdir('tap_tester') + + # Add subdirectories + client.mkdir('tap_tester/max_csv') + + # Add csv files + client.chdir('tap_tester') + + for file_group in self.get_files(): + headers = file_group['headers'] + directory = file_group['directory'] + for filename in file_group['files']: + client.chdir(directory) + with client.open(filename, 'w') as f: + writer = csv.writer(f) + lines = [headers] + file_group['generator']() + writer.writerows(lines) + client.chdir('..') + + def get_properties(self): + props = self.get_common_properties() + props['tables'] = json.dumps([ + { + "table_name": "csv_with_max_field_width", + "delimiter": ",", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "search_pattern": "max_csv_file.csv", + "key_properties": ['id'] + } + ]) + return props + + def expected_first_sync_streams(self): + return {'csv_with_max_field_width'} + + def expected_check_streams(self): + return {'csv_with_max_field_width'} + + def expected_pks(self): + return { + 'csv_with_max_field_width': {'id'} + } + + def test_run(self): + conn_id = connections.ensure_connection(self) + + found_catalogs = self.run_and_verify_check_mode(conn_id) + + self.perform_and_verify_table_and_field_selection(conn_id, found_catalogs) + + self.run_and_verify_sync(conn_id) + + expected_records = 1 + record_count = runner.get_upserts_from_target_output() + # Verify record counts + self.assertEqual(expected_records, len(record_count)) + + records = runner.get_records_from_target_output() + actual_records = [record.get('data') for record in records.get('csv_with_max_field_width').get('messages')] + # Verify the record we created of length greater than 'csv.field_size_limit' of '131072' is replicated + self.assertEqual(actual_records, [{'id': 1, 'name': '{}'.format('a'*131074), '_sdc_source_file': os.getenv('TAP_SFTP_ROOT_DIR') + '/tap_tester/max_csv/max_csv_file.csv', '_sdc_source_lineno': 2}]) diff --git a/tests/unittests/test_JSONL.py b/tests/unittests/test_JSONL.py index 442f7ab..5f7b358 100644 --- a/tests/unittests/test_JSONL.py +++ b/tests/unittests/test_JSONL.py @@ -61,10 +61,12 @@ class TestRowIterators(unittest.TestCase): ["zip_jsonl", ["test.zip", [Iterable("", "test.jsonl")]], [0, 1]], ["gz_csv", ["test.gz", [[Iterable(""), "test.csv"]]], [1, 0]], ["gz_jsonl", ["test.gz", [[Iterable(""), "test.jsonl"]]], [0, 1]], + ["gz_csv_no_name", ["test.gz", [[Iterable(""), None]]], [0, 0]], + ["gz_jsonl_no_name", ["test.gz", [[Iterable(""), None]]], [0, 0]], ]) @mock.patch("tap_sftp.discover.get_JSONL_iterators") @mock.patch("singer_encodings.csv.get_row_iterator") - @mock.patch("singer_encodings.compression.infer") + @mock.patch("tap_sftp.discover.compression_infer_local") def test_get_row_iterators_local(self, name, test_data, expected_data, mocked_infer, mocked_get_csv_row_iterator, mocked_get_JSONL_iterators): # Mock file iterable mocked_infer.return_value = test_data[1] diff --git a/tests/unittests/test_JSONL_sync_sdc_fields.py b/tests/unittests/test_JSONL_sync_sdc_fields.py index 0381aee..8ee782e 100644 --- a/tests/unittests/test_JSONL_sync_sdc_fields.py +++ b/tests/unittests/test_JSONL_sync_sdc_fields.py @@ -4,15 +4,16 @@ from tap_sftp import client, sync import singer +@mock.patch("tap_sftp.client.SFTPConnection.sftp") +@mock.patch('singer.Transformer.transform') +@mock.patch('singer_encodings.csv.get_row_iterators') class TestSyncJSONLsdcFields(unittest.TestCase): @parameterized.expand([ ["with_sdc_fields", [[{"id": 1}]], {'id': 1, '_sdc_source_file': '/root_dir/data.jsonl', '_sdc_source_lineno': 2}], ["with_sdc_extra", [[{"id": 1, "name": "abc"}]], {'id': 1, 'name': 'abc', '_sdc_source_file': '/root_dir/data.jsonl', '_sdc_source_lineno': 2, '_sdc_extra': [{"name": "abc"}]}] ]) - @mock.patch("tap_sftp.client.SFTPConnection.sftp") - @mock.patch('singer.Transformer.transform') - @mock.patch('singer_encodings.csv.get_row_iterators') - def test_sync_JSONL(self, name, test_data, expected_data, mocked_get_row_iterators, mocked_transform, mocked_sftp): + def test_sync_JSONL(self, mocked_get_row_iterators, mocked_transform, mocked_sftp, name, test_data, expected_data): + """Test cases to verify we prepare '_sdc_extra' fields for JSONL files""" mocked_get_row_iterators.return_value = test_data conn = client.SFTPConnection("10.0.0.1", "username", port="22") table_spec = { @@ -29,3 +30,21 @@ def test_sync_JSONL(self, name, test_data, expected_data, mocked_get_row_iterato args = mocked_transform.call_args.args records = args[0] self.assertEqual(records, expected_data) + + def test_sync_JSONL_empty_schema(self, mocked_get_row_iterators, mocked_transform, mocked_sftp): + """Test case to verify we do not get 'NoneType' error if schema is empty {}""" + mocked_get_row_iterators.return_value = [[{"id": 1}]] + conn = client.SFTPConnection("10.0.0.1", "username", port="22") + table_spec = { + "key_properties": [], + "delimiter": None, + "table_name": "test", + "search_prefix": None, + "search_pattern": "data.jsonl" + } + stream = singer.CatalogEntry(tap_stream_id="test", schema=singer.Schema(), metadata=[]) + f = {"filepath": "/root_dir/data.jsonl", "last_modified": "2022-01-01"} + sync.sync_file(conn=conn, f=f, stream=stream, table_spec=table_spec) + args = mocked_transform.call_args.args + records = args[0] + self.assertEqual(records, {'id': 1, '_sdc_source_file': '/root_dir/data.jsonl', '_sdc_source_lineno': 2, '_sdc_extra': [{'id': 1}]}) From 03b4fb541462ecd89d6f368ff67471ed14396769 Mon Sep 17 00:00:00 2001 From: harshpatel4crest Date: Thu, 25 Aug 2022 18:50:15 +0530 Subject: [PATCH 10/20] updated JSONL exception message --- tap_sftp/discover.py | 4 ++-- tests/unittests/test_JSONL.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tap_sftp/discover.py b/tap_sftp/discover.py index b2932e8..35e5a9d 100644 --- a/tap_sftp/discover.py +++ b/tap_sftp/discover.py @@ -143,13 +143,13 @@ def check_key_properties_and_date_overrides_for_jsonl_file(options, jsonl_sample if options.get('key_properties'): key_properties = set(options['key_properties']) if not key_properties.issubset(all_keys): - raise Exception('CSV file missing required headers: {}' + raise Exception('JSONL file missing required headers: {}' .format(key_properties - all_keys)) if options.get('date_overrides'): date_overrides = set(options['date_overrides']) if not date_overrides.issubset(all_keys): - raise Exception('CSV file missing date_overrides headers: {}' + raise Exception('JSONL file missing date_overrides headers: {}' .format(date_overrides - all_keys)) def get_JSONL_rows(iterator): diff --git a/tests/unittests/test_JSONL.py b/tests/unittests/test_JSONL.py index 5f7b358..cec4a53 100644 --- a/tests/unittests/test_JSONL.py +++ b/tests/unittests/test_JSONL.py @@ -34,8 +34,8 @@ def test_get_JSONL_iterators_positive(self): ) @parameterized.expand([ - ["raise_key_properties_error", '{"name": "abc", "updated_at": "2022-01-01"}', "CSV file missing required headers: {\'id\'}"], - ["raise_date_overrides_error", '{"id": 1, "name": "abc"}', "CSV file missing date_overrides headers: {\'updated_at\'}"] + ["raise_key_properties_error", '{"name": "abc", "updated_at": "2022-01-01"}', "JSONL file missing required headers: {\'id\'}"], + ["raise_date_overrides_error", '{"id": 1, "name": "abc"}', "JSONL file missing date_overrides headers: {\'updated_at\'}"] ]) def test_get_JSONL_iterators(self, name, test_data, expected_data): options = { From d24bbcc8c1876d85595c1e13c14d8e1678d36d8b Mon Sep 17 00:00:00 2001 From: harshpatel4crest Date: Fri, 26 Aug 2022 17:08:46 +0530 Subject: [PATCH 11/20] updated the code to not add sdc_extra if schema is empty --- tap_sftp/sync.py | 8 ++++++-- tests/unittests/test_JSONL_sync_sdc_fields.py | 6 +++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/tap_sftp/sync.py b/tap_sftp/sync.py index 5a40d9f..b9ec7bb 100644 --- a/tap_sftp/sync.py +++ b/tap_sftp/sync.py @@ -68,6 +68,7 @@ def sync_file(conn, f, stream, table_spec): records_synced = 0 tap_added_fields = ['_sdc_source_file', '_sdc_source_lineno', 'sdc_extra'] + schema_dict = stream.schema.to_dict() for reader in readers: with Transformer() as transformer: @@ -85,7 +86,10 @@ def sync_file(conn, f, stream, table_spec): sdc_extra = [] # Get the extra fields ie. (json keys - fields from catalog - fields added by the tap) - extra_fields = set(row.keys()) - set(stream.schema.to_dict().get('properties', {}).keys() - tap_added_fields) + extra_fields = set() + # Create '_sdc_extra' fields if the schema is not empty + if schema_dict: + extra_fields = set(row.keys()) - set(schema_dict.get('properties', {}).keys() - tap_added_fields) # Prepare list of extra fields for extra_field in extra_fields: @@ -96,7 +100,7 @@ def sync_file(conn, f, stream, table_spec): rec = {**row, **custom_columns} - to_write = transformer.transform(rec, stream.schema.to_dict(), metadata.to_map(stream.metadata)) + to_write = transformer.transform(rec, schema_dict, metadata.to_map(stream.metadata)) singer.write_record(stream.tap_stream_id, to_write) records_synced += 1 diff --git a/tests/unittests/test_JSONL_sync_sdc_fields.py b/tests/unittests/test_JSONL_sync_sdc_fields.py index 8ee782e..f155384 100644 --- a/tests/unittests/test_JSONL_sync_sdc_fields.py +++ b/tests/unittests/test_JSONL_sync_sdc_fields.py @@ -31,8 +31,8 @@ def test_sync_JSONL(self, mocked_get_row_iterators, mocked_transform, mocked_sft records = args[0] self.assertEqual(records, expected_data) - def test_sync_JSONL_empty_schema(self, mocked_get_row_iterators, mocked_transform, mocked_sftp): - """Test case to verify we do not get 'NoneType' error if schema is empty {}""" + def test_sync_JSONL_empty_schema_with_records(self, mocked_get_row_iterators, mocked_transform, mocked_sftp): + """Test case to verify we are not creating sdc extra field if the schema is empty {} for JSONL files""" mocked_get_row_iterators.return_value = [[{"id": 1}]] conn = client.SFTPConnection("10.0.0.1", "username", port="22") table_spec = { @@ -47,4 +47,4 @@ def test_sync_JSONL_empty_schema(self, mocked_get_row_iterators, mocked_transfor sync.sync_file(conn=conn, f=f, stream=stream, table_spec=table_spec) args = mocked_transform.call_args.args records = args[0] - self.assertEqual(records, {'id': 1, '_sdc_source_file': '/root_dir/data.jsonl', '_sdc_source_lineno': 2, '_sdc_extra': [{'id': 1}]}) + self.assertEqual(records, {'id': 1, '_sdc_source_file': '/root_dir/data.jsonl', '_sdc_source_lineno': 2}) From 3bb11aa8e0fdf7164b938438e3cf959519198220 Mon Sep 17 00:00:00 2001 From: harshpatel4crest Date: Wed, 31 Aug 2022 17:56:04 +0530 Subject: [PATCH 12/20] added sampling for list and dict --- tap_sftp/discover.py | 13 ++- tap_sftp/schema.py | 139 +++++++++++++++++++++++ tests/unittests/test_permission_error.py | 7 +- 3 files changed, 151 insertions(+), 8 deletions(-) create mode 100644 tap_sftp/schema.py diff --git a/tap_sftp/discover.py b/tap_sftp/discover.py index 35e5a9d..c221a72 100644 --- a/tap_sftp/discover.py +++ b/tap_sftp/discover.py @@ -10,7 +10,7 @@ import csv as python_csv from singer_encodings import json_schema, csv from singer import metadata -from tap_sftp import client, gzip_utils +from tap_sftp import client, gzip_utils, schema LOGGER= singer.get_logger() @@ -98,18 +98,18 @@ def get_row_iterators_local(iterable, options={}, infer_compression=False): for item in compressed_iterables: file_name_splitted = options.get('file_name').split('.') - extension = file_name_splitted[-1] + extension = file_name_splitted[-1].lower() # Get the extension of the zipped file if extension == 'zip': - extension = item.name.split('.')[-1] + extension = item.name.split('.')[-1].lower() # Get the extension of the gzipped file ie. file.csv.gz -> csv - if extension == 'gz': + elif extension == 'gz': # Get file name gzip_file_name = item[1] # Set iterator 'item' item = item[0] # Get file extension - extension = gzip_file_name.split('.')[-1] if gzip_file_name else gzip_file_name + extension = gzip_file_name.split('.')[-1].lower() if gzip_file_name else gzip_file_name # For GZ files, if the file is gzipped with --no-name, then # the 'extension' will be 'None'. Hence, send an empty list @@ -169,6 +169,9 @@ def get_JSONL_rows(iterator): # Override singer_encoding's 'sample_file' as per the Tap's JSONL support json_schema.sample_file = sample_file_local +# Override singer_encoding's 'generate_schema' as the Tap's JSONL support +json_schema.generate_schema = schema.generate_schema + # Override the '_sdc_extra' column value as per the JSONL-supported format json_schema.SDC_EXTRA_VALUE = { 'type': 'array', diff --git a/tap_sftp/schema.py b/tap_sftp/schema.py new file mode 100644 index 0000000..d2451e3 --- /dev/null +++ b/tap_sftp/schema.py @@ -0,0 +1,139 @@ +import singer +LOGGER = singer.get_logger() + +def generate_schema(samples, table_spec): + """Function to generate the schema as the records""" + counts = {} + for sample in samples: + # {'id': {'integer': 45}, 'name': {'string' : 45}} + counts = count_sample(sample, counts, table_spec) + + for key, value in counts.items(): + datatype = pick_datatype(value) + + if 'list.' in datatype: + child_datatype = datatype.rsplit('.', maxsplit=1)[-1] + counts[key] = { + 'anyOf': [ + {'type': 'array', 'items': datatype_schema(child_datatype)}, + {'type': ['null', 'string']} + ] + } + elif datatype == 'list': + counts[key] = { + 'anyOf': [ + {'type': 'array', 'items': {'type': ['null', 'string']}}, # NOTE: Wrong? type: null, string + {'type': ['null', 'string']} + ] + } + else: + counts[key] = datatype_schema(datatype) + + return counts + +def datatype_schema(datatype): + """Function to create schema for the field as per the datatype""" + if datatype == 'date-time': + schema = { + 'anyOf': [ + {'type': ['null', 'string'], 'format': 'date-time'}, + {'type': ['null', 'string']} + ] + } + elif datatype == 'dict': + schema = { + 'anyOf': [ + {'type': 'object', 'properties': {}}, + {'type': ['null', 'string']} + ] + } + else: + types = ['null', datatype] + if datatype != 'string': + types.append('string') + schema = { + 'type': types, + } + return schema + +def pick_datatype(counts): + """Function to get the datatype from the counts""" + # Default return + to_return = 'string' + list_of_datatypes = ['list.date-time', 'list.dict', 'list.integer', + 'list.number', 'list.string', 'list', 'date-time', 'dict'] + + for data_types in list_of_datatypes: + if counts.get(data_types, 0) > 0: + return data_types + + # Return the integer or number datatype + if len(counts) == 1: + if counts.get('integer', 0) > 0: + to_return = 'integer' + elif counts.get('number', 0) > 0: + to_return = 'number' + + # If the data is of integer and number, then return number as the datatype + elif(len(counts) == 2 and + counts.get('integer', 0) > 0 and + counts.get('number', 0) > 0): + to_return = 'number' + + return to_return + +def count_sample(sample, counts, table_spec): + """Function to count the records as per the datatype""" + for key, value in sample.items(): + if key not in counts: + counts[key] = {} + + date_overrides = table_spec.get('date_overrides', []) + datatype = infer(key, value, date_overrides) + + if datatype is not None: + counts[key][datatype] = counts[key].get(datatype, 0) + 1 + + return counts + +def infer(key, datum, date_overrides, second_call=False): + """Function to return the inferred data type""" + if datum is None or datum == '': + return None + + try: + if isinstance(datum, list): + data_type = 'string' + if second_call: # Use string for nested list + LOGGER.warning( + 'Unsupported type for "%s", List inside list is not supported hence will be treated as a string', key) + elif not datum: # Empty list + data_type = 'list' + else: + data_type = 'list.' + infer(key, datum[0], date_overrides, second_call=True) + return data_type + + if key in date_overrides: + return 'date-time' + + if isinstance(datum, dict): + return 'dict' + + # NOTE: Only float? Data loss: 1.99 -> 1 + try: + int(datum) + return 'integer' + except (ValueError, TypeError): + pass + + try: + # numbers are NOT floats, they are DECIMALS + float(datum) + return 'number' + except (ValueError, TypeError): + pass + + except (ValueError, TypeError): + pass + + return 'string' diff --git a/tests/unittests/test_permission_error.py b/tests/unittests/test_permission_error.py index de52acf..67b7e17 100644 --- a/tests/unittests/test_permission_error.py +++ b/tests/unittests/test_permission_error.py @@ -3,6 +3,7 @@ import tap_sftp.client as client import tap_sftp.sync as sync import paramiko +import singer @mock.patch("tap_sftp.client.SFTPConnection.sftp") @mock.patch("tap_sftp.client.LOGGER.warn") @@ -55,7 +56,7 @@ def test_no_error_during_sync(self, mocked_get_row_iterators, mocked_stats, mock conn = client.SFTPConnection("10.0.0.1", "username", port="22") - rows_synced = sync.sync_file(conn, {"filepath": "/root_dir/file.csv.gz", "last_modified": "2020-01-01"}, None, {"key_properties": ["id"], "delimiter": ","}) + rows_synced = sync.sync_file(conn, {"filepath": "/root_dir/file.csv.gz", "last_modified": "2020-01-01"}, singer.CatalogEntry(tap_stream_id="test", schema=singer.Schema(), metadata=[]), {"key_properties": ["id"], "delimiter": ","}) # check if "csv.get_row_iterators" is called if it is called then error has not occurred # if it is not called then error has occured and function returned from the except block self.assertEquals(1, mocked_get_row_iterators.call_count) @@ -68,7 +69,7 @@ def test_permisison_error_during_sync(self, mocked_get_row_iterators, mocked_log conn = client.SFTPConnection("10.0.0.1", "username", port="22") - rows_synced = sync.sync_file(conn, {"filepath": "/root_dir/file.csv.gz", "last_modified": "2020-01-01"}, None, {"key_properties": ["id"], "delimiter": ","}) + rows_synced = sync.sync_file(conn, {"filepath": "/root_dir/file.csv.gz", "last_modified": "2020-01-01"}, singer.CatalogEntry(tap_stream_id="test", schema=singer.Schema(), metadata=[]), {"key_properties": ["id"], "delimiter": ","}) # check if "csv.get_row_iterators" is called if it is called then error has not occurred # if it is not called then error has occured and function returned from the except block self.assertEquals(0, mocked_get_row_iterators.call_count) @@ -82,7 +83,7 @@ def test_oserror_during_sync(self, mocked_get_row_iterators, mocked_logger, mock conn = client.SFTPConnection("10.0.0.1", "username", port="22") - rows_synced = sync.sync_file(conn, {"filepath": "/root_dir/file.csv.gz", "last_modified": "2020-01-01"}, None, {"key_properties": ["id"], "delimiter": ","}) + rows_synced = sync.sync_file(conn, {"filepath": "/root_dir/file.csv.gz", "last_modified": "2020-01-01"}, singer.CatalogEntry(tap_stream_id="test", schema=singer.Schema(), metadata=[]), {"key_properties": ["id"], "delimiter": ","}) # check if "csv.get_row_iterators" is called if it is called then error has not occurred # if it is not called then error has occured and function returned from the except block self.assertEquals(0, mocked_get_row_iterators.call_count) From 67df52af4f631cdf8d608ec77732311058f3484f Mon Sep 17 00:00:00 2001 From: harshpatel4crest Date: Fri, 2 Sep 2022 14:53:35 +0530 Subject: [PATCH 13/20] updated sampling logic for int and float --- tap_sftp/schema.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/tap_sftp/schema.py b/tap_sftp/schema.py index d2451e3..4ddc977 100644 --- a/tap_sftp/schema.py +++ b/tap_sftp/schema.py @@ -22,7 +22,7 @@ def generate_schema(samples, table_spec): elif datatype == 'list': counts[key] = { 'anyOf': [ - {'type': 'array', 'items': {'type': ['null', 'string']}}, # NOTE: Wrong? type: null, string + {'type': 'array', 'items': {'type': ['null', 'string']}}, {'type': ['null', 'string']} ] } @@ -60,8 +60,8 @@ def pick_datatype(counts): """Function to get the datatype from the counts""" # Default return to_return = 'string' - list_of_datatypes = ['list.date-time', 'list.dict', 'list.integer', - 'list.number', 'list.string', 'list', 'date-time', 'dict'] + list_of_datatypes = ['list.date-time', 'list.dict', 'list.number', + 'list.integer', 'list.string', 'list', 'date-time', 'dict'] for data_types in list_of_datatypes: if counts.get(data_types, 0) > 0: @@ -119,16 +119,19 @@ def infer(key, datum, date_overrides, second_call=False): if isinstance(datum, dict): return 'dict' - # NOTE: Only float? Data loss: 1.99 -> 1 try: - int(datum) + # Convert the data into the string before integer conversion + # As for CSV, all the data will be replicated into the string as a result, int("1.1") will result into ValueError + # Whereas for JSONL, all the data will be replicated into original form thus, int(1.1) will not raise any error. + # Hence, wrong datatype will be assigned + int(str(datum)) return 'integer' except (ValueError, TypeError): pass try: # numbers are NOT floats, they are DECIMALS - float(datum) + float(str(datum)) return 'number' except (ValueError, TypeError): pass From 49c7b1f3e6de971af8a679fc0eecfcb04d60a546 Mon Sep 17 00:00:00 2001 From: harshpatel4crest Date: Mon, 5 Sep 2022 12:09:58 +0530 Subject: [PATCH 14/20] added tap-tester tests for JSONL and updated the code to start line count from 1 for JSONL files --- tap_sftp/discover.py | 11 +- tap_sftp/sync.py | 8 +- tests/base.py | 22 ++ tests/test_sftp_empty_csv_in_gz.py | 3 +- tests/test_sftp_gzip.py | 3 +- tests/test_sftp_jsonl.py | 95 ++++++++ tests/test_sftp_jsonl_data.py | 213 ++++++++++++++++++ tests/test_sftp_jsonl_gzip.py | 104 +++++++++ tests/test_sftp_jsonl_zip.py | 114 ++++++++++ tests/unittests/test_JSONL_sync_sdc_fields.py | 10 +- 10 files changed, 569 insertions(+), 14 deletions(-) create mode 100644 tests/test_sftp_jsonl.py create mode 100644 tests/test_sftp_jsonl_data.py create mode 100644 tests/test_sftp_jsonl_gzip.py create mode 100644 tests/test_sftp_jsonl_zip.py diff --git a/tap_sftp/discover.py b/tap_sftp/discover.py index c221a72..c00d87e 100644 --- a/tap_sftp/discover.py +++ b/tap_sftp/discover.py @@ -65,7 +65,7 @@ def sample_file_local(conn, table_spec, f, sample_rate, max_records): readers = get_row_iterators_local(file_handle, options=opts, infer_compression=True) - for reader in readers: + for _, reader in readers: current_row = 0 for row in reader: if (current_row % sample_rate) == 0: @@ -114,15 +114,15 @@ def get_row_iterators_local(iterable, options={}, infer_compression=False): # For GZ files, if the file is gzipped with --no-name, then # the 'extension' will be 'None'. Hence, send an empty list if not extension: - yield [] + yield (None, []) # If the extension is JSONL then use 'get_JSONL_iterators' elif extension == 'jsonl': - yield get_JSONL_iterators(item, options) + yield ('jsonl', get_JSONL_iterators(item, options)) # Assuming the extension is 'csv' of 'txt', then use singer_encoding's 'get_row_iterator' else: # Maximize the CSV field width maximize_csv_field_width() - yield csv.get_row_iterator(item, options=options) + yield ('csv', csv.get_row_iterator(item, options=options)) def get_JSONL_iterators(iterator, options): # Get JSOL rows @@ -158,6 +158,9 @@ def get_JSONL_rows(iterator): decoded_row = row.decode('utf-8') if decoded_row.strip(): row = json.loads(decoded_row) + # Skip if the row is empty + if not row: + continue else: continue diff --git a/tap_sftp/sync.py b/tap_sftp/sync.py index b9ec7bb..543caab 100644 --- a/tap_sftp/sync.py +++ b/tap_sftp/sync.py @@ -70,19 +70,21 @@ def sync_file(conn, f, stream, table_spec): tap_added_fields = ['_sdc_source_file', '_sdc_source_lineno', 'sdc_extra'] schema_dict = stream.schema.to_dict() - for reader in readers: + for file_extension, reader in readers: with Transformer() as transformer: + # Row start for files as per the file type + row_start_line = 2 if file_extension == 'csv' else 1 for row in reader: custom_columns = { '_sdc_source_file': f["filepath"], # index zero, +1 for header row - '_sdc_source_lineno': records_synced + 2 + '_sdc_source_lineno': records_synced + row_start_line } # For CSV files, the '_sdc_extra' is handled by 'restkey' in 'csv.DictReader' # If the file is JSONL then prepare '_sdc_extra' column - if f['filepath'].split('.')[-1] == 'jsonl': + if file_extension == 'jsonl': sdc_extra = [] # Get the extra fields ie. (json keys - fields from catalog - fields added by the tap) diff --git a/tests/base.py b/tests/base.py index e09fc54..58e4b2d 100644 --- a/tests/base.py +++ b/tests/base.py @@ -58,6 +58,28 @@ def generate_simple_csv_lines_typeC(self, num_lines): lines.append([int_value, self.random_string_generator(), int_value*5, utils.strftime(start_datetime), int_value + random.random()]) return lines + def generate_simple_jsonl_lines_typeA(self, num_lines): + lines = [] + for int_value in range(num_lines): + lines.append({"id": int_value, "string_col": self.random_string_generator(), "integer_col": int_value*5}) + return lines + + def generate_simple_jsonl_lines_typeB(self, num_lines): + lines = [] + start_datetime = datetime(2018, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc) + for int_value in range(num_lines): + start_datetime = start_datetime + timedelta(days=5) + lines.append({"id": int_value, "string_col": self.random_string_generator(), "datetime_col": utils.strftime(start_datetime), "number_col": int_value + random.random()}) + return lines + + def generate_simple_jsonl_lines_typeC(self, num_lines): + lines = [] + start_datetime = datetime(2018, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc) + for int_value in range(num_lines): + start_datetime = start_datetime + timedelta(days=5) + lines.append({"id": int_value, "string_col": self.random_string_generator(), "integer_col": int_value*5, "datetime_col": utils.strftime(start_datetime), "number_col": int_value + random.random()}) + return lines + def isdir(path, client): try: return S_ISDIR(client.stat(path).st_mode) diff --git a/tests/test_sftp_empty_csv_in_gz.py b/tests/test_sftp_empty_csv_in_gz.py index 5481b15..069a5f3 100644 --- a/tests/test_sftp_empty_csv_in_gz.py +++ b/tests/test_sftp_empty_csv_in_gz.py @@ -61,9 +61,10 @@ def setUp(self): headers = file_group['headers'] directory = file_group['directory'] for filename in file_group['files']: + file_to_gzip = ".".join(filename.split(".")[:-1]) client.chdir(directory) with client.open(filename, 'w') as direct_file: - with gzip.GzipFile(fileobj=direct_file, mode='w') as gzip_file: + with gzip.GzipFile(filename=file_to_gzip, fileobj=direct_file, mode='w') as gzip_file: # write in file if 'num_rows', used to create an empty 'csv.gz' file if file_group.get('num_rows'): with io.TextIOWrapper(gzip_file, encoding='utf-8') as f: diff --git a/tests/test_sftp_gzip.py b/tests/test_sftp_gzip.py index 9abfdee..fcd5f0f 100644 --- a/tests/test_sftp_gzip.py +++ b/tests/test_sftp_gzip.py @@ -69,9 +69,10 @@ def setUp(self): headers = file_group['headers'] directory = file_group['directory'] for filename in file_group['files']: + file_to_gzip = ".".join(filename.split(".")[:-1]) client.chdir(directory) with client.open(filename, 'w') as direct_file: - with gzip.GzipFile(fileobj=direct_file, mode='w') as gzip_file: + with gzip.GzipFile(filename=file_to_gzip, fileobj=direct_file, mode='w') as gzip_file: with io.TextIOWrapper(gzip_file, encoding='utf-8') as f: writer = csv.writer(f) lines = [headers] + file_group['generator'](file_group['num_rows']) diff --git a/tests/test_sftp_jsonl.py b/tests/test_sftp_jsonl.py new file mode 100644 index 0000000..224dff8 --- /dev/null +++ b/tests/test_sftp_jsonl.py @@ -0,0 +1,95 @@ +from base import TestSFTPBase +import os +import json + +class TestSFTPJsonl(TestSFTPBase): + + def name(self): + return "tap_tester_sftp_jsonl" + + def get_files(self): + return [ + { + "directory": "folderA", + "files": ["table_1_fileA.jsonl", "table_3_fileA.jsonl"], + "num_rows": 50, + "generator": self.generate_simple_jsonl_lines_typeA + }, + { + "directory": "folderB", + "files": ["table_2_fileA.jsonl", "table_2_fileB.jsonl", "table_3_fileB.jsonl"], + "num_rows": 50, + "generator": self.generate_simple_jsonl_lines_typeB + }, + { + "directory": "folderC", + "files": ["table_3_fileC.jsonl"], + "num_rows": 50, + "generator": self.generate_simple_jsonl_lines_typeC + }, + ] + + def setUp(self): + if not all([x for x in [os.getenv('TAP_SFTP_USERNAME'), + os.getenv('TAP_SFTP_PASSWORD'), + os.getenv('TAP_SFTP_ROOT_DIR')]]): + #pylint: disable=line-too-long + raise Exception("set TAP_SFTP_USERNAME, TAP_SFTP_PASSWORD, TAP_SFTP_ROOT_DIR") + + root_dir = os.getenv('TAP_SFTP_ROOT_DIR') + + with self.get_test_connection() as client: + # drop all csv files in root dir + client.chdir(root_dir) + try: + TestSFTPJsonl.rm('tap_tester', client) + except FileNotFoundError: + pass + client.mkdir('tap_tester') + client.chdir('tap_tester') + + # Add subdirectories + file_info = self.get_files() + for entry in file_info: + client.mkdir(entry['directory']) + + for file_group in file_info: + directory = file_group['directory'] + for filename in file_group['files']: + client.chdir(directory) + with client.open(filename, 'w') as f: + for record in file_group['generator'](file_group['num_rows']): + f.write(json.dumps(record) + "\n") + client.chdir('..') + + def get_properties(self): + props = self.get_common_properties() + props['tables'] = json.dumps([ + { + "table_name": "table_1", + "delimiter": ",", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "search_pattern": "table_1.*jsonl", + "key_properties": ['id'] + }, + { + "table_name": "table_2", + "delimiter": ",", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "search_pattern": "table_2.*jsonl", + "key_properties": ['id'], + "date_overrides": ["datetime_col"] + }, + { + "table_name": "table_3", + "delimiter": ",", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "search_pattern": "table_3.*jsonl", + "key_properties": ['id'], + "date_overrides": ["datetime_col"] + } + ]) + return props + + def test_run(self): + self.run_test() diff --git a/tests/test_sftp_jsonl_data.py b/tests/test_sftp_jsonl_data.py new file mode 100644 index 0000000..673e24c --- /dev/null +++ b/tests/test_sftp_jsonl_data.py @@ -0,0 +1,213 @@ +from datetime import datetime, timezone +from decimal import Decimal +from base import TestSFTPBase +from tap_tester import connections, menagerie, runner +import os +import json +from singer import utils + +class TestSFTPJsonlData(TestSFTPBase): + + def name(self): + return "tap_tester_sftp_jsonl_data" + + def generate_jsonl_data(self): + start_datetime = datetime(2018, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc) + return [{"int": 1, "string": "string_data", "float": 1.22, "date": utils.strftime(start_datetime)}] + + def generate_jsonl_dict_data(self): + start_datetime_1 = datetime(2018, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc) + start_datetime_2 = datetime(2019, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc) + return [{ + "int": 1, + "dict_int": {1: 1, 2: 2}, + "dict_float": {1.2: 2.0}, + "dict_string": {"key1": "value1", "key2": "value2"}, + "dict_dict": {"key": {"name": "john"}}, + "dict_list": {"ids": [1, 2, 3]}, + "dict_datetime": {"date_1": utils.strftime(start_datetime_1), "date_2": utils.strftime(start_datetime_2)} + }] + + def generate_jsonl_list_data(self): + start_datetime_1 = datetime(2018, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc) + start_datetime_2 = datetime(2019, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc) + return [{ + "int": 1, + "list_int": [1, 2, 3], + "list_float": [1.2, 2.3, 3.4], + "list_string": ["data", "of", "string"], + "list_dict": [{"id": 1}, {"id": 2}], + "list_list": [[1, 2 , 3], [4, 5, 6]], + "list_datetime": [utils.strftime(start_datetime_1), utils.strftime(start_datetime_2)] + }] + + def get_files(self): + return [ + { + "directory": "mytable", + "files": ["file1.jsonl"], + "generator": self.generate_jsonl_data + }, + { + "directory": "mytable_list", + "files": ["file2.jsonl"], + "generator": self.generate_jsonl_list_data + }, + { + "directory": "mytable_dict", + "files": ["file3.jsonl"], + "generator": self.generate_jsonl_dict_data + } + ] + + def setUp(self): + if not all([x for x in [os.getenv('TAP_SFTP_USERNAME'), + os.getenv('TAP_SFTP_PASSWORD'), + os.getenv('TAP_SFTP_ROOT_DIR')]]): + #pylint: disable=line-too-long + raise Exception("set TAP_SFTP_USERNAME, TAP_SFTP_PASSWORD, TAP_SFTP_ROOT_DIR") + + root_dir = os.getenv('TAP_SFTP_ROOT_DIR') + + with self.get_test_connection() as client: + # drop all csv files in root dir + client.chdir(root_dir) + try: + TestSFTPJsonlData.rm('tap_tester', client) + except FileNotFoundError: + pass + client.mkdir('tap_tester') + client.chdir('tap_tester') + + # Add subdirectories + file_info = self.get_files() + for entry in file_info: + client.mkdir(entry['directory']) + + for file_group in file_info: + directory = file_group['directory'] + for filename in file_group['files']: + client.chdir(directory) + with client.open(filename, 'w') as f: + for record in file_group['generator'](): + f.write(json.dumps(record) + "\n") + client.chdir('..') + + def get_properties(self): + props = self.get_common_properties() + props['tables'] = json.dumps([ + { + "table_name": "mytable", + "delimiter": ",", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "search_pattern": "file1.*jsonl", + "key_properties": [] + }, + { + "table_name": "mytable_list", + "delimiter": ",", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "search_pattern": "file2.*jsonl", + "key_properties": [] + }, + { + "table_name": "mytable_dict", + "delimiter": ",", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "search_pattern": "file3.*jsonl", + "key_properties": [] + } + ]) + return props + + def expected_check_streams(self): + return {"mytable", "mytable_dict", "mytable_list"} + + def expected_pks(self): + return { + "mytable": set(), + "mytable_dict": set(), + "mytable_list": set() + } + + def expected_data(self): + return { + "mytable": { + "int": 1, + "string": "string_data", + "float": Decimal(1.22), + "date": "2018-01-01T19:29:14.578000Z", + "_sdc_source_lineno": 1 + }, + "mytable_list": { + "int": 1, + "list_int": [1, 2, 3], + "list_float": [Decimal(1.2), Decimal(2.3), Decimal(3.4)], + "list_string": ["data", "of", "string"], + "list_dict": [{"id": 1}, {"id": 2}], + "list_list": ["[1, 2, 3]", "[4, 5, 6]"], + "list_datetime": ["2018-01-01T19:29:14.578000Z", "2019-01-01T19:29:14.578000Z"], + "_sdc_source_lineno": 1 + }, + "mytable_dict": { + "int": 1, + "dict_int": {"1": 1, "2": 2}, + "dict_float": {"1.2": Decimal(2.0)}, + "dict_string": {"key1": "value1", "key2": "value2"}, + "dict_dict": {"key": {"name": "john"}}, + "dict_list": {"ids": [1, 2, 3]}, + "dict_datetime": {"date_1": "2018-01-01T19:29:14.578000Z", "date_2": "2019-01-01T19:29:14.578000Z"}, + "_sdc_source_lineno": 1 + } + } + + def test_run(self): + conn_id = connections.ensure_connection(self) + + # run in discovery mode + check_job_name = runner.run_check_mode(self, conn_id) + + # verify check exit codes + exit_status = menagerie.get_exit_status(conn_id, check_job_name) + menagerie.verify_check_exit_status(self, exit_status, check_job_name) + + # verify the tap discovered the right streams + catalog = menagerie.get_catalogs(conn_id) + found_catalog_names = set(map(lambda c: c['tap_stream_id'], catalog)) + + # assert we find the correct streams + self.assertEqual(self.expected_check_streams(), found_catalog_names) + + for tap_stream_id in self.expected_check_streams(): + found_stream = [c for c in catalog if c['tap_stream_id'] == tap_stream_id][0] + schema_and_metadata = menagerie.get_annotated_schema(conn_id, found_stream['stream_id']) + main_metadata = schema_and_metadata["metadata"] + stream_metadata = [mdata for mdata in main_metadata if mdata["breadcrumb"] == []] + + # assert that the pks are correct + self.assertEqual(self.expected_pks()[tap_stream_id], + set(stream_metadata[0]['metadata']['table-key-properties'])) + + for stream_catalog in catalog: + annotated_schema = menagerie.get_annotated_schema(conn_id, stream_catalog['stream_id']) + connections.select_catalog_and_fields_via_metadata(conn_id, + stream_catalog, + annotated_schema['annotated-schema'], + []) + + # Run sync + sync_job_name = runner.run_sync_mode(self, conn_id) + + exit_status = menagerie.get_exit_status(conn_id, sync_job_name) + menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) + + # verify the persisted schema was correct + messages_by_stream = runner.get_records_from_target_output() + + for stream in self.expected_check_streams(): + records = [record.get("data") for record in messages_by_stream.get(stream, {}).get("messages", []) + if record.get("action") == "upsert"] + self.assertEqual(len(records), 1) + for record in records: + del record["_sdc_source_file"] + self.assertEqual([self.expected_data().get(stream)], records) diff --git a/tests/test_sftp_jsonl_gzip.py b/tests/test_sftp_jsonl_gzip.py new file mode 100644 index 0000000..4604cb9 --- /dev/null +++ b/tests/test_sftp_jsonl_gzip.py @@ -0,0 +1,104 @@ +from base import TestSFTPBase +import os +import json +import gzip +import io + +class TestSFTPGzip(TestSFTPBase): + + def name(self): + return "tap_tester_sftp_gzip" + + def get_files(self): + return [ + { + "headers": ['id', 'string_col', 'integer_col'], + "directory": "folderA", + "files": ["table_1_fileA.jsonl.gz", "table_3_fileA.jsonl.gz"], + "num_rows": 50, + "generator": self.generate_simple_jsonl_lines_typeA + }, + { + "headers": ['id', 'string_col', 'datetime_col', 'number_col'], + "directory": "folderB", + "files": ["table_2_fileA.jsonl.gz", "table_2_fileB.jsonl.gz", "table_3_fileB.jsonl.gz"], + "num_rows": 50, + "generator": self.generate_simple_jsonl_lines_typeB + }, + { + "headers": ['id', 'string_col', 'integer_col', 'datetime_col', 'number_col'], + "directory": "folderC", + "files": ["table_3_fileC.jsonl.gz"], + "num_rows": 50, + "generator": self.generate_simple_jsonl_lines_typeC + }, + ] + + def setUp(self): + if not all([x for x in [os.getenv('TAP_SFTP_USERNAME'), + os.getenv('TAP_SFTP_PASSWORD'), + os.getenv('TAP_SFTP_ROOT_DIR')]]): + #pylint: disable=line-too-long + raise Exception("set TAP_SFTP_USERNAME, TAP_SFTP_PASSWORD, TAP_SFTP_ROOT_DIR") + + root_dir = os.getenv('TAP_SFTP_ROOT_DIR') + + with self.get_test_connection() as client: + # drop all csv files in root dir + client.chdir(root_dir) + try: + TestSFTPGzip.rm('tap_tester', client) + except FileNotFoundError: + pass + client.mkdir('tap_tester') + client.chdir('tap_tester') + + # Add subdirectories + file_info = self.get_files() + for entry in file_info: + client.mkdir(entry['directory']) + + # Add csv files + for file_group in file_info: + directory = file_group['directory'] + for filename in file_group['files']: + file_to_gzip = ".".join(filename.split(".")[:-1]) + client.chdir(directory) + with client.open(filename, 'w') as direct_file: + with gzip.GzipFile(filename=file_to_gzip, fileobj=direct_file, mode='w') as gzip_file: + with io.TextIOWrapper(gzip_file, encoding='utf-8') as f: + for record in file_group['generator'](file_group['num_rows']): + f.write(json.dumps(record) + "\n") + client.chdir('..') + + def get_properties(self): + props = self.get_common_properties() + props['tables'] = json.dumps([ + { + "table_name": "table_1", + "delimiter": ",", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "search_pattern": "table_1.*jsonl", + "key_properties": ['id'] + }, + { + "table_name": "table_2", + "delimiter": ",", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "search_pattern": "table_2.*jsonl", + "key_properties": ['id'], + "date_overrides": ["datetime_col"] + }, + { + "table_name": "table_3", + "delimiter": ",", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "search_pattern": "table_3.*jsonl", + "key_properties": ['id'], + "date_overrides": ["datetime_col"] + } + ]) + return props + + def test_run(self): + self.run_test() \ No newline at end of file diff --git a/tests/test_sftp_jsonl_zip.py b/tests/test_sftp_jsonl_zip.py new file mode 100644 index 0000000..d5cdee5 --- /dev/null +++ b/tests/test_sftp_jsonl_zip.py @@ -0,0 +1,114 @@ +from base import TestSFTPBase +import os +import json +import zipfile + +class TestSFTPZipJsonl(TestSFTPBase): + + def name(self): + return "tap_tester_sftp_jsonl_zip" + + def get_files(self): + return [ + { + "headers": ['id', 'string_col', 'integer_col'], + "directory": "folderA", + "files": ["table_1_fileA.jsonl", "table_3_fileA.jsonl"], + "archive": 'table_1.zip', + "num_rows": 50, + "generator": self.generate_simple_jsonl_lines_typeA + }, + { + "headers": ['id', 'string_col', 'datetime_col', 'number_col'], + "directory": "folderB", + "files": ["table_2_fileA.jsonl", "table_2_fileB.jsonl", "table_3_fileB.jsonl"], + "archive": 'table_2.zip', + "num_rows": 50, + "generator": self.generate_simple_jsonl_lines_typeB + }, + { + "headers": ['id', 'string_col', 'integer_col'], + "directory": "folderC", + "files": ["table_3_fileC.jsonl"], + "archive": 'table_3.zip', + "num_rows": 50, + "generator": self.generate_simple_jsonl_lines_typeA + }, + ] + + def expected_first_sync_row_counts(self): + return { + 'table_1': 100, + 'table_2': 150, + 'table_3': 50 + } + + def setUp(self): + if not all([x for x in [os.getenv('TAP_SFTP_USERNAME'), + os.getenv('TAP_SFTP_PASSWORD'), + os.getenv('TAP_SFTP_ROOT_DIR')]]): + #pylint: disable=line-too-long + raise Exception("set TAP_SFTP_USERNAME, TAP_SFTP_PASSWORD, TAP_SFTP_ROOT_DIR") + + root_dir = os.getenv('TAP_SFTP_ROOT_DIR') + + with self.get_test_connection() as client: + # drop all csv files in root dir + client.chdir(root_dir) + try: + TestSFTPZipJsonl.rm('tap_tester', client) + except FileNotFoundError: + pass + client.mkdir('tap_tester') + client.chdir('tap_tester') + + # Add subdirectories + file_info = self.get_files() + for entry in file_info: + client.mkdir(entry['directory']) + + # Add csv files + for file_group in file_info: + directory = file_group['directory'] + client.chdir(directory) + with client.open(file_group['archive'], 'w') as direct_file: + with zipfile.ZipFile(direct_file, mode='w') as zip_file: + lines = file_group['generator'](file_group['num_rows']) + total = '' + for line in lines: + total += json.dumps(line) + '\n' + for file_name in file_group['files']: + zip_file.writestr(file_name, total) + client.chdir('..') + + def get_properties(self): + props = self.get_common_properties() + props['tables'] = json.dumps([ + { + "table_name": "table_1", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "delimiter": ",", + "search_pattern": "table_1\.zip", + "key_properties": ['id'] + }, + { + "table_name": "table_2", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "delimiter": ",", + "search_pattern": "table_2\.zip", + "key_properties": ['id'], + "date_overrides": ["datetime_col"] + }, + { + "table_name": "table_3", + "search_prefix": os.getenv("TAP_SFTP_ROOT_DIR") + "/tap_tester", + "delimiter": ",", + "search_pattern": "table_3\.zip", + "key_properties": ['id'], + "date_overrides": ["datetime_col"] + } + ]) + return props + + def test_run(self): + self.run_test() diff --git a/tests/unittests/test_JSONL_sync_sdc_fields.py b/tests/unittests/test_JSONL_sync_sdc_fields.py index f155384..f3a18b9 100644 --- a/tests/unittests/test_JSONL_sync_sdc_fields.py +++ b/tests/unittests/test_JSONL_sync_sdc_fields.py @@ -9,12 +9,12 @@ @mock.patch('singer_encodings.csv.get_row_iterators') class TestSyncJSONLsdcFields(unittest.TestCase): @parameterized.expand([ - ["with_sdc_fields", [[{"id": 1}]], {'id': 1, '_sdc_source_file': '/root_dir/data.jsonl', '_sdc_source_lineno': 2}], - ["with_sdc_extra", [[{"id": 1, "name": "abc"}]], {'id': 1, 'name': 'abc', '_sdc_source_file': '/root_dir/data.jsonl', '_sdc_source_lineno': 2, '_sdc_extra': [{"name": "abc"}]}] + ["with_sdc_fields", [{"id": 1}], {'id': 1, '_sdc_source_file': '/root_dir/data.jsonl', '_sdc_source_lineno': 1}], + ["with_sdc_extra", [{"id": 1, "name": "abc"}], {'id': 1, 'name': 'abc', '_sdc_source_file': '/root_dir/data.jsonl', '_sdc_source_lineno': 1, '_sdc_extra': [{"name": "abc"}]}] ]) def test_sync_JSONL(self, mocked_get_row_iterators, mocked_transform, mocked_sftp, name, test_data, expected_data): """Test cases to verify we prepare '_sdc_extra' fields for JSONL files""" - mocked_get_row_iterators.return_value = test_data + mocked_get_row_iterators.return_value = [('jsonl', test_data)] conn = client.SFTPConnection("10.0.0.1", "username", port="22") table_spec = { "key_properties": [], @@ -33,7 +33,7 @@ def test_sync_JSONL(self, mocked_get_row_iterators, mocked_transform, mocked_sft def test_sync_JSONL_empty_schema_with_records(self, mocked_get_row_iterators, mocked_transform, mocked_sftp): """Test case to verify we are not creating sdc extra field if the schema is empty {} for JSONL files""" - mocked_get_row_iterators.return_value = [[{"id": 1}]] + mocked_get_row_iterators.return_value = [('jsonl', [{"id": 1}])] conn = client.SFTPConnection("10.0.0.1", "username", port="22") table_spec = { "key_properties": [], @@ -47,4 +47,4 @@ def test_sync_JSONL_empty_schema_with_records(self, mocked_get_row_iterators, mo sync.sync_file(conn=conn, f=f, stream=stream, table_spec=table_spec) args = mocked_transform.call_args.args records = args[0] - self.assertEqual(records, {'id': 1, '_sdc_source_file': '/root_dir/data.jsonl', '_sdc_source_lineno': 2}) + self.assertEqual(records, {'id': 1, '_sdc_source_file': '/root_dir/data.jsonl', '_sdc_source_lineno': 1}) From 500c5ccdc751674b7020ee1e63ba5e75f2aa2822 Mon Sep 17 00:00:00 2001 From: harshpatel4crest Date: Mon, 5 Sep 2022 14:10:37 +0530 Subject: [PATCH 15/20] added unittest for schema --- tests/unittests/test_schema.py | 155 +++++++++++++++++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100644 tests/unittests/test_schema.py diff --git a/tests/unittests/test_schema.py b/tests/unittests/test_schema.py new file mode 100644 index 0000000..eb15da8 --- /dev/null +++ b/tests/unittests/test_schema.py @@ -0,0 +1,155 @@ +import unittest +from parameterized import parameterized +from tap_sftp import schema + +class SchemaTest(unittest.TestCase): + ''' + Test class to verify an appropriate working of functions in schema.py file in the tap. + ''' + + def test_generate_schema(self): + ''' + Test case to verify that for a given sample, the schema is generated correctly. + ''' + + samples = [{'test_key': 'test_value'}, {'TEST': ['test'], 'int_value' : 27}, {'Test': []}] + table_spec = {'table_name': 'TEST', 'search_prefix': '/test', 'search_pattern': 'jsonl'} + + expected_schema = { + 'test_key': {'type': ['null', 'string']}, + 'TEST': { + 'anyOf': [ + {'items': {'type': ['null', 'string']}, 'type': 'array'}, + {'type': ['null', 'string']} + ] + }, + 'int_value': {'type': ['null', 'integer', 'string']}, + 'Test': { + 'anyOf': [ + {'items': {'type': ['null', 'string']}, 'type': 'array'}, + {'type': ['null', 'string']} + ] + } + } + + actual_schema = schema.generate_schema(samples, table_spec) + + self.assertEqual(expected_schema, actual_schema) + + @parameterized.expand([ + [ + 'date_time_datatype', + "date-time", + { + 'anyOf': [ + {'type': ['null', 'string'], 'format': 'date-time'}, + {'type': ['null', 'string']} + ] + } + ], + [ + "dictionary_datatype", + "dict", + { + 'anyOf': [ + {'type': 'object', 'properties': {}}, + {'type': ['null', 'string']} + ] + } + ], + [ + "string_datatype", + "string", + { + 'type': ['null', "string"], + } + ], + [ + "integer_datatype", + "integer", + { + 'type': ['null', "integer", "string"], + } + ], + [ + "boolean_datatype", + "boolean", + { + 'type': ['null', "boolean", "string"], + } + ], + [ + "float_datatype", + "number", + { + 'type': ['null', "number", "string"], + } + ] + ]) + def test_datatype_schema(self, name, test_datatype, expected_schema): + ''' + Test case to verify that schema is created correctly for individual + fields depending on the datatype of the field. + ''' + + actual_schema = schema.datatype_schema(test_datatype) + + self.assertEqual(actual_schema, expected_schema) + + @parameterized.expand([ + ['dictionary_datatype', {'dict':62}, 'dict'], + ['float_datatype', {'number':27}, 'number'], + ['integer_datatype', {'integer':32}, 'integer'], + ['float_and_integer_datatype', {'number':18, 'integer':81}, 'number'], + ['string_datatype', {'string':43}, 'string'], + ['no_datatype', {}, 'string'] + ]) + def test_pick_datatype(self, name, test_value, expected_datatype): + ''' + Test case to verify that when a datatype with it's counts is given, + then the datatype is returned. If no counts are there (which means no datatype), + then it should return the default value "string". + ''' + + actual_datatype = schema.pick_datatype(test_value) + + self.assertEqual(expected_datatype, actual_datatype) + + def test_count_sample(self): + ''' + Test case to verify that for a given sample, counts of datatypes are returned + correctly as per their occurrences. + ''' + + sample = { + 'test_key': {'string_value': '', 'int_value' : 27}, + 'float_value' : '45.99', + 'string_value': '' + } + table_spec = {'table_name': 'TEST', 'search_prefix': '/test', 'search_pattern': 'jsonl'} + expected_counts = {'test_key': {'dict':1}, 'float_value' : {'number':1}, 'string_value': {}} + + actual_counts = schema.count_sample(sample, {}, table_spec) + + self.assertEqual(expected_counts, actual_counts) + + @parameterized.expand([ + ['no_datum', 'test', None, None], + ['empty_list_as_datum', 'test', [], 'list'], + ['non_empty_list_as_datum', 'test', ['test1'], 'list.string'], + ['nested_list_as_datum', 'test', [['test2']], 'list.string'], + ['key_in_date_overrides', 'TEST', 'test1', 'date-time'], + ['dict_as_datum', 'test', {}, 'dict'], + ['string_of_integer_as_datum', 'test', '12', 'integer'], + ['integer_as_datum', 'test', 21, 'integer'], + ['string_of_float_as_datum', 'test', '12.235', 'number'], + ['float_as_datum', 'test', 25.487, 'number'] + ]) + def test_infer(self, name, test_key, test_datum, expected_datatype): + ''' + Test case to verify that proper datatype is returned for corresponding data. + ''' + + actual_datatype = schema.infer(key = test_key, datum = test_datum, date_overrides= ['TEST']) + + self.assertEqual(expected_datatype, actual_datatype) From 664aa6c98d9332af91096ad7aa3835d228504b4b Mon Sep 17 00:00:00 2001 From: harshpatel4crest Date: Tue, 6 Sep 2022 18:15:40 +0530 Subject: [PATCH 16/20] updated jsonl tap-tester test --- tests/base.py | 3 ++- tests/test_sftp_jsonl_data.py | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/base.py b/tests/base.py index 58e4b2d..0c7ec07 100644 --- a/tests/base.py +++ b/tests/base.py @@ -179,7 +179,8 @@ def get_common_properties(self): return {'start_date' : '2017-01-01T00:00:00Z', 'host' : os.getenv('TAP_SFTP_HOST'), 'port' : os.getenv('TAP_SFTP_PORT'), - 'username' : os.getenv('TAP_SFTP_USERNAME')} + 'username' : os.getenv('TAP_SFTP_USERNAME'), + 'private_key_file': None} def run_test(self): conn_id = connections.ensure_connection(self) diff --git a/tests/test_sftp_jsonl_data.py b/tests/test_sftp_jsonl_data.py index 673e24c..2a536ec 100644 --- a/tests/test_sftp_jsonl_data.py +++ b/tests/test_sftp_jsonl_data.py @@ -13,7 +13,7 @@ def name(self): def generate_jsonl_data(self): start_datetime = datetime(2018, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc) - return [{"int": 1, "string": "string_data", "float": 1.22, "date": utils.strftime(start_datetime)}] + return [{"int": 1, "string": "string_data", "float": 1.2, "date": utils.strftime(start_datetime)}] def generate_jsonl_dict_data(self): start_datetime_1 = datetime(2018, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc) @@ -135,14 +135,14 @@ def expected_data(self): "mytable": { "int": 1, "string": "string_data", - "float": Decimal(1.22), + "float": Decimal("1.2"), "date": "2018-01-01T19:29:14.578000Z", "_sdc_source_lineno": 1 }, "mytable_list": { "int": 1, "list_int": [1, 2, 3], - "list_float": [Decimal(1.2), Decimal(2.3), Decimal(3.4)], + "list_float": [Decimal("1.2"), Decimal("2.3"), Decimal("3.4")], "list_string": ["data", "of", "string"], "list_dict": [{"id": 1}, {"id": 2}], "list_list": ["[1, 2, 3]", "[4, 5, 6]"], From 7c1728eb81277f43b982f255c948913b3810523c Mon Sep 17 00:00:00 2001 From: harshpatel4crest Date: Tue, 27 Sep 2022 14:12:45 +0530 Subject: [PATCH 17/20] updated the code for JSONL to handle sampling logic on singer-encoding side --- tap_sftp/client.py | 5 +- tap_sftp/discover.py | 234 +++++++++--------- tap_sftp/gzip_utils.py | 57 ----- tap_sftp/schema.py | 142 ----------- tap_sftp/sync.py | 18 +- tests/base.py | 3 +- tests/unittests/test_JSONL.py | 64 +---- tests/unittests/test_JSONL_sync_sdc_fields.py | 2 +- tests/unittests/test_sorted_files.py | 13 +- tests/unittests/test_timeout.py | 2 +- 10 files changed, 150 insertions(+), 390 deletions(-) delete mode 100644 tap_sftp/gzip_utils.py delete mode 100644 tap_sftp/schema.py diff --git a/tap_sftp/client.py b/tap_sftp/client.py index 0f63315..b194552 100644 --- a/tap_sftp/client.py +++ b/tap_sftp/client.py @@ -145,7 +145,10 @@ def get_files_by_prefix(self, prefix): return files - def get_files(self, prefix, search_pattern, modified_since=None): + def get_files(self, table_spec, modified_since=None): + prefix = table_spec['search_prefix'] + search_pattern = table_spec['search_pattern'] + files = self.get_files_by_prefix(prefix) if files: LOGGER.info('Found %s files in "%s"', len(files), prefix) diff --git a/tap_sftp/discover.py b/tap_sftp/discover.py index c00d87e..8888d2e 100644 --- a/tap_sftp/discover.py +++ b/tap_sftp/discover.py @@ -1,18 +1,16 @@ -import io +import gzip import itertools import json import socket -import sys +import zipfile import backoff import singer -import gzip -import zipfile import csv as python_csv -from singer_encodings import json_schema, csv +from singer_encodings import json_schema, csv, compression, jsonl, schema as se_schema from singer import metadata -from tap_sftp import client, gzip_utils, schema +from tap_sftp import client -LOGGER= singer.get_logger() +LOGGER = singer.get_logger() def compression_infer_local(iterable, file_name): """Uses the incoming file_name and checks the end of the string for supported compression types""" @@ -22,16 +20,7 @@ def compression_infer_local(iterable, file_name): if file_name.endswith('.tar.gz'): raise NotImplementedError("tar.gz not supported") elif file_name.endswith('.gz'): - file_bytes = iterable.read() - gz_file_name = None - try: - gz_file_name = gzip_utils.get_file_name_from_gzfile(fileobj=io.BytesIO(file_bytes)) - except AttributeError: - # If a file is compressed using gzip command with --no-name attribute, - # It will not return the file name and timestamp. Hence we will skip such files. - LOGGER.warning('Skipping "%s" file as we did not get the original file name.', file_name) - # Send file object and file name - yield [gzip.GzipFile(fileobj=io.BytesIO(file_bytes)), gz_file_name] + yield gzip.GzipFile(fileobj=iterable) elif file_name.endswith('.zip'): with zipfile.ZipFile(iterable) as zip: for name in zip.namelist(): @@ -39,20 +28,76 @@ def compression_infer_local(iterable, file_name): else: yield iterable -def maximize_csv_field_width(): - """Set the max filed size as per the system's maxsize""" +def get_row_iterators_local(iterable, options={}, infer_compression=False, headers_in_catalog=None, with_duplicate_headers=False): + """ + Accepts an interable, options and a flag to infer compression and + yields csv.DictReader objects can be used to yield CSV rows. + """ + if infer_compression: + compressed_iterables = compression_infer_local(iterable, options.get('file_name')) + + for item in compressed_iterables: + + # Try to parse as JSONL + try: + # Duplicate 'item' iterator as 'get_JSONL_iterators' will use 1st row of 'item' to load as JSONL + # Thus, the 'item' will have records after the 1st row on encountering the 'JSONDecodeError' error + # As a result 'csv.get_row_iterator' will sync records after the 1st row + item, item_for_JSONL = itertools.tee(item) + yield ('jsonl', jsonl.get_JSONL_iterators(item_for_JSONL, options)) + continue + except json.JSONDecodeError: + pass + + # Maximize the CSV field width + csv.maximize_csv_field_width() + + # Finally parse as CSV + yield ('csv', csv.get_row_iterator(item, options=options)) + + +# pylint: disable=too-many-arguments +def sample_files_local(conn, table_spec, files, sample_rate=1, max_records=1000, max_files=5): + """Function to sample matched files as per the sampling rate and the max records to sample""" + LOGGER.info("Sampling files (max files: %s)", max_files) + to_return = [] + empty_samples = [] + + files_so_far = 0 + + sorted_files = sorted( + files, key=lambda f: f['last_modified'], reverse=True) + + for f in sorted_files: + empty_file, samples = sample_file_local(conn, table_spec, f, sample_rate, max_records) + + if empty_file: + empty_samples += samples + else: + to_return += samples + + files_so_far += 1 - current_field_size_limit = csv.csv.field_size_limit() - field_size_limit = sys.maxsize + if files_so_far >= max_files: + break + + if len(to_return) == 0: + return empty_samples + + return to_return - if current_field_size_limit != field_size_limit: - csv.csv.field_size_limit(field_size_limit) - LOGGER.info("Changed the CSV field size limit from %s to %s", - current_field_size_limit, - field_size_limit) def sample_file_local(conn, table_spec, f, sample_rate, max_records): + """Function to sample a file and return list of records for that file""" + + LOGGER.info('Sampling %s (max records: %s, sample rate: %s)', + f['filepath'], + max_records, + sample_rate) + samples = [] + file_name = f['filepath'] + try: file_handle = conn.get_file_handle(f) except OSError: @@ -60,10 +105,10 @@ def sample_file_local(conn, table_spec, f, sample_rate, max_records): # Add file_name to opts and flag infer_compression to support gzipped files opts = {'key_properties': table_spec['key_properties'], - 'delimiter': table_spec['delimiter'], - 'file_name': f['filepath']} + 'delimiter': table_spec.get('delimiter', ','), + 'file_name': file_name} - readers = get_row_iterators_local(file_handle, options=opts, infer_compression=True) + readers = csv.get_row_iterators(file_handle, options=opts, infer_compression=True) for _, reader in readers: current_row = 0 @@ -78,119 +123,66 @@ def sample_file_local(conn, table_spec, f, sample_rate, max_records): if len(samples) >= max_records: break + LOGGER.info("Sampled %s rows from %s", len(samples), file_name) # Empty sample to show field selection, if needed empty_file = False if len(samples) == 0: empty_file = True # If the 'reader' is an instance of 'csv.Dictreader' and has - # fieldnames, prepare samples with None field names + # fieldnames, prepare samples with 'None' field names # Assumes all reader objects in readers have the same fieldnames if isinstance(reader, python_csv.DictReader) and reader.fieldnames is not None: samples.append({name: None for name in reader.fieldnames}) return (empty_file, samples) -def get_row_iterators_local(iterable, options={}, infer_compression=False): - """Accepts an interable, options and a flag to infer compression and yields - csv.DictReader objects which can be used to yield CSV rows.""" - if infer_compression: - compressed_iterables = compression_infer_local(iterable, options.get('file_name')) +def get_schema_for_table_local(conn, table_spec, sample_rate=1): + """Function to generate schema for the provided data files""" + files = conn.get_files(table_spec) - for item in compressed_iterables: - file_name_splitted = options.get('file_name').split('.') - extension = file_name_splitted[-1].lower() - # Get the extension of the zipped file - if extension == 'zip': - extension = item.name.split('.')[-1].lower() - # Get the extension of the gzipped file ie. file.csv.gz -> csv - elif extension == 'gz': - # Get file name - gzip_file_name = item[1] - # Set iterator 'item' - item = item[0] - # Get file extension - extension = gzip_file_name.split('.')[-1].lower() if gzip_file_name else gzip_file_name - - # For GZ files, if the file is gzipped with --no-name, then - # the 'extension' will be 'None'. Hence, send an empty list - if not extension: - yield (None, []) - # If the extension is JSONL then use 'get_JSONL_iterators' - elif extension == 'jsonl': - yield ('jsonl', get_JSONL_iterators(item, options)) - # Assuming the extension is 'csv' of 'txt', then use singer_encoding's 'get_row_iterator' - else: - # Maximize the CSV field width - maximize_csv_field_width() - yield ('csv', csv.get_row_iterator(item, options=options)) - -def get_JSONL_iterators(iterator, options): - # Get JSOL rows - records = get_JSONL_rows(iterator) - check_jsonl_sample_records, records = itertools.tee(records) - - # Veirfy the 'date_overrides' and 'key_properties' as per the config - check_key_properties_and_date_overrides_for_jsonl_file(options, check_jsonl_sample_records) - return records - -def check_key_properties_and_date_overrides_for_jsonl_file(options, jsonl_sample_records): - - all_keys = set() - for record in jsonl_sample_records: - keys = record.keys() - all_keys.update(keys) - - if options.get('key_properties'): - key_properties = set(options['key_properties']) - if not key_properties.issubset(all_keys): - raise Exception('JSONL file missing required headers: {}' - .format(key_properties - all_keys)) - - if options.get('date_overrides'): - date_overrides = set(options['date_overrides']) - if not date_overrides.issubset(all_keys): - raise Exception('JSONL file missing date_overrides headers: {}' - .format(date_overrides - all_keys)) - -def get_JSONL_rows(iterator): - # Return JSON rows from JSONL file - for row in iterator: - decoded_row = row.decode('utf-8') - if decoded_row.strip(): - row = json.loads(decoded_row) - # Skip if the row is empty - if not row: - continue - else: - continue + if not files: + return {} - yield row + samples = json_schema.sample_files(conn, table_spec, files, sample_rate=sample_rate) -# Override singer_encoding's 'get_row_iterators' as per the Tap's JSONL support -csv.get_row_iterators = get_row_iterators_local + # Return empty if there is no schema generated + if not any(samples): + return { + 'type': 'object', + 'properties': {}, + } -# Override singer_encoding's 'sample_file' as per the Tap's JSONL support -json_schema.sample_file = sample_file_local + schema = se_schema.generate_schema(samples, table_spec) -# Override singer_encoding's 'generate_schema' as the Tap's JSONL support -json_schema.generate_schema = schema.generate_schema - -# Override the '_sdc_extra' column value as per the JSONL-supported format -json_schema.SDC_EXTRA_VALUE = { - 'type': 'array', - 'items': { - 'anyOf': [ - {'type': 'object', 'properties': {}}, - {'type': 'string'} - ] + data_schema = { + **schema, + **json_schema.get_sdc_columns() } -} + + return { + 'type': 'object', + 'properties': data_schema, + } + + +# Override singer_encoding's 'get_row_iterators' as: +# - The Tap is supporting files created without an extension +csv.get_row_iterators = get_row_iterators_local + +# Override singer_encoding's 'sample_file' as: +# - The Tap is looping over the files in the sorted manner of 'last_modified' +# - The Tap is not supporting the skipping of CSV and JSONL files with the wrong extension +json_schema.sample_files = sample_files_local + +# Override singer_encoding's 'sample_file' as: +# - The Tap is not having support for CSV files with duplicate headers +# - The Tap is creating a sample record with 'None' for CSV files with only headers +json_schema.sample_file = sample_file_local def discover_streams(config): streams = [] conn = client.connection(config) - prefix = format(config.get("user_dir", "./")) tables = json.loads(config['tables']) for table_spec in tables: @@ -217,7 +209,7 @@ def discover_streams(config): # generate schema def get_schema(conn, table_spec): LOGGER.info('Sampling records to determine table JSON schema "%s".', table_spec['table_name']) - schema = json_schema.get_schema_for_table(conn, table_spec) + schema = get_schema_for_table_local(conn, table_spec) stream_md = metadata.get_standard_metadata(schema, key_properties=table_spec.get('key_properties'), replication_method='INCREMENTAL') diff --git a/tap_sftp/gzip_utils.py b/tap_sftp/gzip_utils.py deleted file mode 100644 index 71c3f46..0000000 --- a/tap_sftp/gzip_utils.py +++ /dev/null @@ -1,57 +0,0 @@ -import gzip -import struct - -def get_file_name_from_gzfile(fileobj=None): - """Reading headers of GzipFile and returning filename.""" - - _gz = gzip.GzipFile(fileobj=fileobj) - _fp = _gz.fileobj - - # the magic 2 bytes: if 0x1f 0x8b (037 213 in octal) - magic = _fp.read(2) - if magic == b'': - return None - - if magic != b'\037\213': - raise OSError('Not a gzipped file (%r)' % magic) - - (method, flag, _) = struct.unpack(" - # specifies FNAME is encoded in latin1 - while True: - s = _fp.read(1) - if not s or s == b'\000': - break - _fname.append(s) - return ''.join([s.decode('latin1') for s in _fname]) - - return None - -def _read_exact(fp, n): - """This is the gzip.GzipFile._read_exact() method from the Python library. - """ - data = fp.read(n) - while len(data) < n: - b = fp.read(n - len(data)) - if not b: - raise EOFError("Compressed file ended before the " - "end-of-stream marker was reached") - data += b - return data \ No newline at end of file diff --git a/tap_sftp/schema.py b/tap_sftp/schema.py deleted file mode 100644 index 4ddc977..0000000 --- a/tap_sftp/schema.py +++ /dev/null @@ -1,142 +0,0 @@ -import singer -LOGGER = singer.get_logger() - -def generate_schema(samples, table_spec): - """Function to generate the schema as the records""" - counts = {} - for sample in samples: - # {'id': {'integer': 45}, 'name': {'string' : 45}} - counts = count_sample(sample, counts, table_spec) - - for key, value in counts.items(): - datatype = pick_datatype(value) - - if 'list.' in datatype: - child_datatype = datatype.rsplit('.', maxsplit=1)[-1] - counts[key] = { - 'anyOf': [ - {'type': 'array', 'items': datatype_schema(child_datatype)}, - {'type': ['null', 'string']} - ] - } - elif datatype == 'list': - counts[key] = { - 'anyOf': [ - {'type': 'array', 'items': {'type': ['null', 'string']}}, - {'type': ['null', 'string']} - ] - } - else: - counts[key] = datatype_schema(datatype) - - return counts - -def datatype_schema(datatype): - """Function to create schema for the field as per the datatype""" - if datatype == 'date-time': - schema = { - 'anyOf': [ - {'type': ['null', 'string'], 'format': 'date-time'}, - {'type': ['null', 'string']} - ] - } - elif datatype == 'dict': - schema = { - 'anyOf': [ - {'type': 'object', 'properties': {}}, - {'type': ['null', 'string']} - ] - } - else: - types = ['null', datatype] - if datatype != 'string': - types.append('string') - schema = { - 'type': types, - } - return schema - -def pick_datatype(counts): - """Function to get the datatype from the counts""" - # Default return - to_return = 'string' - list_of_datatypes = ['list.date-time', 'list.dict', 'list.number', - 'list.integer', 'list.string', 'list', 'date-time', 'dict'] - - for data_types in list_of_datatypes: - if counts.get(data_types, 0) > 0: - return data_types - - # Return the integer or number datatype - if len(counts) == 1: - if counts.get('integer', 0) > 0: - to_return = 'integer' - elif counts.get('number', 0) > 0: - to_return = 'number' - - # If the data is of integer and number, then return number as the datatype - elif(len(counts) == 2 and - counts.get('integer', 0) > 0 and - counts.get('number', 0) > 0): - to_return = 'number' - - return to_return - -def count_sample(sample, counts, table_spec): - """Function to count the records as per the datatype""" - for key, value in sample.items(): - if key not in counts: - counts[key] = {} - - date_overrides = table_spec.get('date_overrides', []) - datatype = infer(key, value, date_overrides) - - if datatype is not None: - counts[key][datatype] = counts[key].get(datatype, 0) + 1 - - return counts - -def infer(key, datum, date_overrides, second_call=False): - """Function to return the inferred data type""" - if datum is None or datum == '': - return None - - try: - if isinstance(datum, list): - data_type = 'string' - if second_call: # Use string for nested list - LOGGER.warning( - 'Unsupported type for "%s", List inside list is not supported hence will be treated as a string', key) - elif not datum: # Empty list - data_type = 'list' - else: - data_type = 'list.' + infer(key, datum[0], date_overrides, second_call=True) - return data_type - - if key in date_overrides: - return 'date-time' - - if isinstance(datum, dict): - return 'dict' - - try: - # Convert the data into the string before integer conversion - # As for CSV, all the data will be replicated into the string as a result, int("1.1") will result into ValueError - # Whereas for JSONL, all the data will be replicated into original form thus, int(1.1) will not raise any error. - # Hence, wrong datatype will be assigned - int(str(datum)) - return 'integer' - except (ValueError, TypeError): - pass - - try: - # numbers are NOT floats, they are DECIMALS - float(str(datum)) - return 'number' - except (ValueError, TypeError): - pass - - except (ValueError, TypeError): - pass - - return 'string' diff --git a/tap_sftp/sync.py b/tap_sftp/sync.py index 543caab..00d4107 100644 --- a/tap_sftp/sync.py +++ b/tap_sftp/sync.py @@ -27,9 +27,7 @@ def sync_stream(config, state, stream): return 0 table_spec = table_spec[0] - files = conn.get_files(table_spec["search_prefix"], - table_spec["search_pattern"], - modified_since) + files = conn.get_files(table_spec, modified_since) LOGGER.info('Found %s files to be synced.', len(files)) @@ -73,12 +71,14 @@ def sync_file(conn, f, stream, table_spec): for file_extension, reader in readers: with Transformer() as transformer: # Row start for files as per the file type - row_start_line = 2 if file_extension == 'csv' else 1 + row_start_line = 1 if file_extension == 'jsonl' else 2 for row in reader: + # Skipping the empty line + if len(row) == 0: + continue + custom_columns = { '_sdc_source_file': f["filepath"], - - # index zero, +1 for header row '_sdc_source_lineno': records_synced + row_start_line } @@ -87,16 +87,16 @@ def sync_file(conn, f, stream, table_spec): if file_extension == 'jsonl': sdc_extra = [] - # Get the extra fields ie. (json keys - fields from catalog - fields added by the tap) + # Get the extra fields ie. (json keys - fields from the catalog - fields added by the tap) extra_fields = set() # Create '_sdc_extra' fields if the schema is not empty - if schema_dict: + if schema_dict.get('properties'): extra_fields = set(row.keys()) - set(schema_dict.get('properties', {}).keys() - tap_added_fields) # Prepare list of extra fields for extra_field in extra_fields: sdc_extra.append({extra_field: row.get(extra_field)}) - # If the record contains extra fields, then add '_sdc_extra' column + # If the record contains extra fields, then add the '_sdc_extra' column if extra_fields: custom_columns['_sdc_extra'] = sdc_extra diff --git a/tests/base.py b/tests/base.py index 0c7ec07..58e4b2d 100644 --- a/tests/base.py +++ b/tests/base.py @@ -179,8 +179,7 @@ def get_common_properties(self): return {'start_date' : '2017-01-01T00:00:00Z', 'host' : os.getenv('TAP_SFTP_HOST'), 'port' : os.getenv('TAP_SFTP_PORT'), - 'username' : os.getenv('TAP_SFTP_USERNAME'), - 'private_key_file': None} + 'username' : os.getenv('TAP_SFTP_USERNAME')} def run_test(self): conn_id = connections.ensure_connection(self) diff --git a/tests/unittests/test_JSONL.py b/tests/unittests/test_JSONL.py index cec4a53..4aa87cd 100644 --- a/tests/unittests/test_JSONL.py +++ b/tests/unittests/test_JSONL.py @@ -2,6 +2,7 @@ from unittest import mock from parameterized import parameterized from tap_sftp import discover +from singer_encodings.jsonl import get_JSONL_iterators class JSONLIterator: def __init__(self, data): @@ -10,61 +11,17 @@ def __init__(self, data): def decode(self, encoding): return self.data -class Iterable: - def __init__(self, data, name=None): - self.data = data - self.name = name - - def read(self): - return self.data - -class TestCheckJSONLKeyProperties(unittest.TestCase): - """Test cases to verify we raise error if we asr missing Primary Key or Date Overrides value in JSONL data""" - def test_get_JSONL_iterators_positive(self): - options = { - "key_properties": ["id"], - "date_overrides": ["updated_at"] - } - records = [ - JSONLIterator('{"id": 1, "name": "abc", "updated_at": "2022-01-01"}') - ] - discover.get_JSONL_iterators( - options=options, - iterator=records - ) - - @parameterized.expand([ - ["raise_key_properties_error", '{"name": "abc", "updated_at": "2022-01-01"}', "JSONL file missing required headers: {\'id\'}"], - ["raise_date_overrides_error", '{"id": 1, "name": "abc"}', "JSONL file missing date_overrides headers: {\'updated_at\'}"] - ]) - def test_get_JSONL_iterators(self, name, test_data, expected_data): - options = { - "key_properties": ["id"], - "date_overrides": ["updated_at"] - } - records = [ - JSONLIterator(test_data) - ] - with self.assertRaises(Exception) as e: - discover.get_JSONL_iterators( - options=options, - iterator=records - ) - self.assertEqual(str(e.exception), expected_data) - class TestRowIterators(unittest.TestCase): """Test cases to verify we call 'row_iterator' for JSONL or CSV as per the file extension""" @parameterized.expand([ - ["csv", ["test.csv", [Iterable("")]], [1, 0]], - ["jsonl", ["test.jsonl", [Iterable('{"id": 1}')]], [0, 1]], - ["zip_csv", ["test.zip", [Iterable("", "test.csv")]], [1, 0]], - ["zip_jsonl", ["test.zip", [Iterable("", "test.jsonl")]], [0, 1]], - ["gz_csv", ["test.gz", [[Iterable(""), "test.csv"]]], [1, 0]], - ["gz_jsonl", ["test.gz", [[Iterable(""), "test.jsonl"]]], [0, 1]], - ["gz_csv_no_name", ["test.gz", [[Iterable(""), None]]], [0, 0]], - ["gz_jsonl_no_name", ["test.gz", [[Iterable(""), None]]], [0, 0]], + ["csv", ["test.csv", [[b'id,name\n', b'1,test1\n']]], 1], + ["jsonl", ["test.jsonl", [[b'{"id": 1, "name": "test1"}\n']]], 0], + ["zip_csv", ["test.zip", [[b'id,name\n', b'1,test1\n']]], 1], + ["zip_jsonl", ["test.zip", [[b'{"id": 1, "name": "test1"}\n']]], 0], + ["gz_csv", ["test.gz", [[b'id,name\n', b'1,test1\n']]], 1], + ["gz_jsonl", ["test.gz", [[b'{"id": 1, "name": "test1"}\n']]], 0], ]) - @mock.patch("tap_sftp.discover.get_JSONL_iterators") + @mock.patch("singer_encodings.jsonl.get_JSONL_iterators", side_effect=get_JSONL_iterators) @mock.patch("singer_encodings.csv.get_row_iterator") @mock.patch("tap_sftp.discover.compression_infer_local") def test_get_row_iterators_local(self, name, test_data, expected_data, mocked_infer, mocked_get_csv_row_iterator, mocked_get_JSONL_iterators): @@ -76,5 +33,6 @@ def test_get_row_iterators_local(self, name, test_data, expected_data, mocked_in # Function call list(discover.get_row_iterators_local(iterable=[], options=options, infer_compression=True)) # Verify the call count for JSONL or CSV row_iterator - self.assertEqual(mocked_get_csv_row_iterator.call_count, expected_data[0]) - self.assertEqual(mocked_get_JSONL_iterators.call_count, expected_data[1]) + self.assertEqual(mocked_get_csv_row_iterator.call_count, expected_data) + # Verify we try to parse for JSONL first for every file + self.assertEqual(mocked_get_JSONL_iterators.call_count, 1) diff --git a/tests/unittests/test_JSONL_sync_sdc_fields.py b/tests/unittests/test_JSONL_sync_sdc_fields.py index f3a18b9..5c79c14 100644 --- a/tests/unittests/test_JSONL_sync_sdc_fields.py +++ b/tests/unittests/test_JSONL_sync_sdc_fields.py @@ -42,7 +42,7 @@ def test_sync_JSONL_empty_schema_with_records(self, mocked_get_row_iterators, mo "search_prefix": None, "search_pattern": "data.jsonl" } - stream = singer.CatalogEntry(tap_stream_id="test", schema=singer.Schema(), metadata=[]) + stream = singer.CatalogEntry(tap_stream_id="test", schema=singer.Schema(properties={}), metadata=[]) f = {"filepath": "/root_dir/data.jsonl", "last_modified": "2022-01-01"} sync.sync_file(conn=conn, f=f, stream=stream, table_spec=table_spec) args = mocked_transform.call_args.args diff --git a/tests/unittests/test_sorted_files.py b/tests/unittests/test_sorted_files.py index ff2f167..f714b2d 100644 --- a/tests/unittests/test_sorted_files.py +++ b/tests/unittests/test_sorted_files.py @@ -40,8 +40,11 @@ def test_sorted_files(self, mocked_all_files): }] mocked_all_files.return_value = files_list - - files = conn.get_files("/root", "file[0-9].csv") + table_spec = { + "search_prefix": "/root", + "search_pattern": "file[0-9].csv" + } + files = conn.get_files(table_spec) # expected files in increasing order of "last_modified" expected_files_list = ["/root/file1.csv", "/root/file3.csv", "/root/file2.csv", "/root/file4.csv"] @@ -74,7 +77,11 @@ def test_sorted_files_negative(self, mocked_all_files): # setting "modified_since" to now modified_since = singer.utils.strptime_to_utc(datetime.utcnow().replace(tzinfo=pytz.UTC).isoformat()) - files = conn.get_files("/root", "file[0-9].csv", modified_since) + table_spec = { + "search_prefix": "/root", + "search_pattern": "file[0-9].csv" + } + files = conn.get_files(table_spec, modified_since) # as all the modified date is lesser than "modified_since" thus, no files will be returned expected_files_list = [] diff --git a/tests/unittests/test_timeout.py b/tests/unittests/test_timeout.py index a1e5cda..31802ef 100644 --- a/tests/unittests/test_timeout.py +++ b/tests/unittests/test_timeout.py @@ -140,7 +140,7 @@ class TimeoutBackoff(unittest.TestCase): """ @mock.patch("singer.metadata.get_standard_metadata") - @mock.patch("singer_encodings.json_schema.get_schema_for_table") + @mock.patch("tap_sftp.discover.get_schema_for_table_local") def test_timeout_backoff__get_schema(self, mocked_get_schema_for_table, mocked_get_standard_metadata): """ Test case to verify we backoff and retry for 'get_schema' function From 7fe57fbc2a4ee64620e7644b384268abbbcd969a Mon Sep 17 00:00:00 2001 From: harshpatel4crest Date: Tue, 27 Sep 2022 14:15:11 +0530 Subject: [PATCH 18/20] updated comments --- tap_sftp/discover.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tap_sftp/discover.py b/tap_sftp/discover.py index 8888d2e..b38d381 100644 --- a/tap_sftp/discover.py +++ b/tap_sftp/discover.py @@ -30,8 +30,8 @@ def compression_infer_local(iterable, file_name): def get_row_iterators_local(iterable, options={}, infer_compression=False, headers_in_catalog=None, with_duplicate_headers=False): """ - Accepts an interable, options and a flag to infer compression and - yields csv.DictReader objects can be used to yield CSV rows. + Accepts an iterable, options, compression flag, catalog headers and flag for duplicate headers + to infer compression and yields csv.DictReader objects can be used to yield CSV rows. """ if infer_compression: compressed_iterables = compression_infer_local(iterable, options.get('file_name')) From 934931772d1c2c50976cb7f144305bc0b927e22e Mon Sep 17 00:00:00 2001 From: harshpatel4crest Date: Tue, 11 Oct 2022 11:48:32 +0530 Subject: [PATCH 19/20] resolve unittest failure --- tests/unittests/test_gzip_utils.py | 115 --------------------- tests/unittests/test_schema.py | 155 ----------------------------- 2 files changed, 270 deletions(-) delete mode 100644 tests/unittests/test_gzip_utils.py delete mode 100644 tests/unittests/test_schema.py diff --git a/tests/unittests/test_gzip_utils.py b/tests/unittests/test_gzip_utils.py deleted file mode 100644 index 29288de..0000000 --- a/tests/unittests/test_gzip_utils.py +++ /dev/null @@ -1,115 +0,0 @@ -import unittest -from unittest import mock -from parameterized import parameterized -from tap_sftp.gzip_utils import get_file_name_from_gzfile , _read_exact - -class FileHandler: - ''' - Class to handle return values for mocked function calls. - ''' - - def __init__(self, file, fileobj): - self.filename = file - self.fileobj = fileobj - -class TestClass(unittest.TestCase): - ''' - Test class to verify working of functions implemented for gz_file. - ''' - - @mock.patch("gzip.GzipFile") - def test_get_file_name_from_gzfile_returns_none(self, mocked_gzip_file): - ''' - Test case to verify that no file name is returned. - ''' - - mocked_gzip_file.return_value = FileHandler("Test", mocked_gzip_file) - mocked_gzip_file.read.return_value = b'' - returned = get_file_name_from_gzfile(fileobj = None) - - self.assertEqual(returned, None) - - @mock.patch("gzip.GzipFile") - def test_get_file_name_from_gzfile_raises_gzipped_file_error(self, mocked_gzip_file): - ''' - Test case to verify that OSError is raised if incorrect file is passed. - ''' - - with self.assertRaises(OSError) as e: - get_file_name_from_gzfile(fileobj = None) - - self.assertTrue("Not a gzipped file" in str(e.exception)) - - @mock.patch("gzip.GzipFile") - def test_get_file_name_from_gzfile_raises_compression_method_error(self , mocked_gzip_file): - ''' - Test case to verify that OSError is raised if file is compressed by unknown methods. - ''' - - mocked_gzip_file.return_value = FileHandler("Test", mocked_gzip_file) - mocked_gzip_file.read.return_value = b'\037\213' - - with self.assertRaises(OSError) as e: - get_file_name_from_gzfile(fileobj = None) - - self.assertEqual(str(e.exception), "Unknown compression method") - - @parameterized.expand([ - ["with_gzip_extension", "test.txt.gzip", "test.txt"], - ["without_gzip_extension", "test.txt", "test.txt"] - ]) - @mock.patch("gzip.GzipFile") - @mock.patch("tap_sftp.gzip_utils._read_exact") - def test_get_file_name_from_gzfile_no_flag(self,name, - test_value1, test_value2, mocked_read_exact, mocked_gzip_file): - ''' - Test case to verify file name when flag = 0. - ''' - - mocked_gzip_file.return_value = FileHandler("Test", mocked_gzip_file) - mocked_gzip_file.read.return_value = b'\037\213' - mocked_read_exact.return_value = b'\x08\x00\x17\x00\x00\x00\x00\x00' - mocked_gzip_file.name = test_value1 - - gz_file_name = get_file_name_from_gzfile(fileobj = "fileobj") - - self.assertEqual(gz_file_name, test_value2) - - @mock.patch("gzip.GzipFile") - @mock.patch("tap_sftp.gzip_utils._read_exact") - def test_get_file_name_from_gzfile_with_flag(self, mocked_read_exact, mocked_gzip_file): - ''' - Test case to verify file name when flag != 0 . - ''' - - mocked_gzip_file.return_value = FileHandler("Test", mocked_gzip_file) - mocked_gzip_file.read.side_effect = [b'\037\213',b'test_file', b'\000'] - mocked_read_exact.side_effect = [b'\x08\x0c\x17\x00\x00\x00\x00\x00', b'\x01\x00', b'\x01'] - - gz_file_name = get_file_name_from_gzfile(fileobj = "fileobj") - - self.assertEqual(gz_file_name, "test_file") - - @mock.patch("gzip.GzipFile") - def test_read_exact(self, mocked_gzip): - ''' - Test case to verify that data of expected length(in bytes) is received. - ''' - - mocked_gzip.read.side_effect = [b'\x00',b'\x05\x03',b'\x01'] - data = _read_exact(mocked_gzip , 2) - - self.assertEqual(data, b'\x00\x05\x03') - - @mock.patch("gzip.GzipFile") - def test_read_exact_raises_error(self, mocked_gzip): - ''' - Test case to verify EOFError is raised. - ''' - - mocked_gzip.read.side_effect = [b'\x05', b'',b'\x01'] - - with self.assertRaises(EOFError) as e: - _read_exact(mocked_gzip , 2) - - self.assertEqual(str(e.exception), 'Compressed file ended before the end-of-stream marker was reached') diff --git a/tests/unittests/test_schema.py b/tests/unittests/test_schema.py deleted file mode 100644 index eb15da8..0000000 --- a/tests/unittests/test_schema.py +++ /dev/null @@ -1,155 +0,0 @@ -import unittest -from parameterized import parameterized -from tap_sftp import schema - -class SchemaTest(unittest.TestCase): - ''' - Test class to verify an appropriate working of functions in schema.py file in the tap. - ''' - - def test_generate_schema(self): - ''' - Test case to verify that for a given sample, the schema is generated correctly. - ''' - - samples = [{'test_key': 'test_value'}, {'TEST': ['test'], 'int_value' : 27}, {'Test': []}] - table_spec = {'table_name': 'TEST', 'search_prefix': '/test', 'search_pattern': 'jsonl'} - - expected_schema = { - 'test_key': {'type': ['null', 'string']}, - 'TEST': { - 'anyOf': [ - {'items': {'type': ['null', 'string']}, 'type': 'array'}, - {'type': ['null', 'string']} - ] - }, - 'int_value': {'type': ['null', 'integer', 'string']}, - 'Test': { - 'anyOf': [ - {'items': {'type': ['null', 'string']}, 'type': 'array'}, - {'type': ['null', 'string']} - ] - } - } - - actual_schema = schema.generate_schema(samples, table_spec) - - self.assertEqual(expected_schema, actual_schema) - - @parameterized.expand([ - [ - 'date_time_datatype', - "date-time", - { - 'anyOf': [ - {'type': ['null', 'string'], 'format': 'date-time'}, - {'type': ['null', 'string']} - ] - } - ], - [ - "dictionary_datatype", - "dict", - { - 'anyOf': [ - {'type': 'object', 'properties': {}}, - {'type': ['null', 'string']} - ] - } - ], - [ - "string_datatype", - "string", - { - 'type': ['null', "string"], - } - ], - [ - "integer_datatype", - "integer", - { - 'type': ['null', "integer", "string"], - } - ], - [ - "boolean_datatype", - "boolean", - { - 'type': ['null', "boolean", "string"], - } - ], - [ - "float_datatype", - "number", - { - 'type': ['null', "number", "string"], - } - ] - ]) - def test_datatype_schema(self, name, test_datatype, expected_schema): - ''' - Test case to verify that schema is created correctly for individual - fields depending on the datatype of the field. - ''' - - actual_schema = schema.datatype_schema(test_datatype) - - self.assertEqual(actual_schema, expected_schema) - - @parameterized.expand([ - ['dictionary_datatype', {'dict':62}, 'dict'], - ['float_datatype', {'number':27}, 'number'], - ['integer_datatype', {'integer':32}, 'integer'], - ['float_and_integer_datatype', {'number':18, 'integer':81}, 'number'], - ['string_datatype', {'string':43}, 'string'], - ['no_datatype', {}, 'string'] - ]) - def test_pick_datatype(self, name, test_value, expected_datatype): - ''' - Test case to verify that when a datatype with it's counts is given, - then the datatype is returned. If no counts are there (which means no datatype), - then it should return the default value "string". - ''' - - actual_datatype = schema.pick_datatype(test_value) - - self.assertEqual(expected_datatype, actual_datatype) - - def test_count_sample(self): - ''' - Test case to verify that for a given sample, counts of datatypes are returned - correctly as per their occurrences. - ''' - - sample = { - 'test_key': {'string_value': '', 'int_value' : 27}, - 'float_value' : '45.99', - 'string_value': '' - } - table_spec = {'table_name': 'TEST', 'search_prefix': '/test', 'search_pattern': 'jsonl'} - expected_counts = {'test_key': {'dict':1}, 'float_value' : {'number':1}, 'string_value': {}} - - actual_counts = schema.count_sample(sample, {}, table_spec) - - self.assertEqual(expected_counts, actual_counts) - - @parameterized.expand([ - ['no_datum', 'test', None, None], - ['empty_list_as_datum', 'test', [], 'list'], - ['non_empty_list_as_datum', 'test', ['test1'], 'list.string'], - ['nested_list_as_datum', 'test', [['test2']], 'list.string'], - ['key_in_date_overrides', 'TEST', 'test1', 'date-time'], - ['dict_as_datum', 'test', {}, 'dict'], - ['string_of_integer_as_datum', 'test', '12', 'integer'], - ['integer_as_datum', 'test', 21, 'integer'], - ['string_of_float_as_datum', 'test', '12.235', 'number'], - ['float_as_datum', 'test', 25.487, 'number'] - ]) - def test_infer(self, name, test_key, test_datum, expected_datatype): - ''' - Test case to verify that proper datatype is returned for corresponding data. - ''' - - actual_datatype = schema.infer(key = test_key, datum = test_datum, date_overrides= ['TEST']) - - self.assertEqual(expected_datatype, actual_datatype) From 3bc884cd79f7147f32ea5c238683d3612ae71d69 Mon Sep 17 00:00:00 2001 From: harshpatel4crest Date: Tue, 11 Oct 2022 15:19:37 +0530 Subject: [PATCH 20/20] added comments for data generation function in base test file --- tests/base.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/base.py b/tests/base.py index 58e4b2d..bc65faa 100644 --- a/tests/base.py +++ b/tests/base.py @@ -37,12 +37,14 @@ def generate_max_size_csv(self): return [[1, 'a'*131074]] def generate_simple_csv_lines_typeA(self, num_lines): + """Generate CSV data with integer and string dataype""" lines = [] for int_value in range(num_lines): lines.append([int_value, self.random_string_generator(), int_value*5]) return lines def generate_simple_csv_lines_typeB(self, num_lines): + """Generate CSV data with integer, datetime, float and string dataype""" lines = [] start_datetime = datetime(2018, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc) for int_value in range(num_lines): @@ -51,6 +53,7 @@ def generate_simple_csv_lines_typeB(self, num_lines): return lines def generate_simple_csv_lines_typeC(self, num_lines): + """Generate CSV data with integer, datetime, float and string dataype""" lines = [] start_datetime = datetime(2018, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc) for int_value in range(num_lines): @@ -59,12 +62,14 @@ def generate_simple_csv_lines_typeC(self, num_lines): return lines def generate_simple_jsonl_lines_typeA(self, num_lines): + """Generate JSONL data with integer and string dataype""" lines = [] for int_value in range(num_lines): lines.append({"id": int_value, "string_col": self.random_string_generator(), "integer_col": int_value*5}) return lines def generate_simple_jsonl_lines_typeB(self, num_lines): + """Generate JSONL data with integer, datetime, float and string dataype""" lines = [] start_datetime = datetime(2018, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc) for int_value in range(num_lines): @@ -73,6 +78,7 @@ def generate_simple_jsonl_lines_typeB(self, num_lines): return lines def generate_simple_jsonl_lines_typeC(self, num_lines): + """Generate JSONL data with integer, datetime, float and string dataype""" lines = [] start_datetime = datetime(2018, 1, 1, 19, 29, 14, 578000, tzinfo=timezone.utc) for int_value in range(num_lines):