-
Notifications
You must be signed in to change notification settings - Fork 50
Expand file tree
/
Copy paths3.py
More file actions
532 lines (432 loc) · 19.7 KB
/
s3.py
File metadata and controls
532 lines (432 loc) · 19.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
import itertools
import functools
import re
import io
import json
import gzip
import sys
import backoff
import boto3
import singer
from botocore.credentials import (
AssumeRoleCredentialFetcher,
CredentialResolver,
DeferredRefreshableCredentials,
JSONFileCache
)
from botocore.exceptions import ClientError, ConnectTimeoutError, ReadTimeoutError
from botocore.session import Session
from botocore.config import Config
from botocore.paginate import PageIterator
from singer_encodings import (
compression,
csv
)
from tap_s3_csv import (
utils,
conversion
)
LOGGER = singer.get_logger()
SDC_SOURCE_BUCKET_COLUMN = "_sdc_source_bucket"
SDC_SOURCE_FILE_COLUMN = "_sdc_source_file"
SDC_SOURCE_LINENO_COLUMN = "_sdc_source_lineno"
SDC_EXTRA_COLUMN = "_sdc_extra"
skipped_files_count = 0
# timeout request after 300 seconds
REQUEST_TIMEOUT = 300
def is_access_denied_error(error):
"""
This function checks whether the URLError contains 'Access Denied' substring
and return boolean values accordingly, to decide whether to backoff or not.
"""
# retry if the error string contains 'Access Denied'
if 'Access Denied' in str(error) or 'AccessDenied' in str(error):
return True
return False
def retry_pattern(fnc):
@backoff.on_exception(backoff.expo,
(ConnectTimeoutError, ReadTimeoutError),
max_tries=5,
on_backoff=log_backoff_attempt,
factor=2)
@backoff.on_exception(backoff.expo,
ClientError,
max_tries=5,
on_backoff=log_backoff_attempt,
giveup=is_access_denied_error, # Giveup if we do not have the access
factor=10)
@functools.wraps(fnc)
def wrapper(*args, **kwargs):
return fnc(*args, **kwargs)
return wrapper
def log_backoff_attempt(details):
LOGGER.info("Error detected communicating with Amazon, triggering backoff: %d try", details.get("tries"))
# Added decorator over functions of botocore SDK as functions from SDK returns generator and
# tap is yielding data from that function so backoff is not working over tap function(list_files_in_bucket()).
PageIterator._make_request = retry_pattern(PageIterator._make_request)
class AssumeRoleProvider():
METHOD = 'assume-role'
def __init__(self, fetcher):
self._fetcher = fetcher
def load(self):
return DeferredRefreshableCredentials(
self._fetcher.fetch_credentials,
self.METHOD
)
@retry_pattern
def setup_aws_client(config):
role_arn = "arn:aws:iam::{}:role/{}".format(config['account_id'].replace('-', ''),
config['role_name'])
session = Session()
fetcher = AssumeRoleCredentialFetcher(
session.create_client,
session.get_credentials(),
role_arn,
extra_args={
'DurationSeconds': 3600,
'RoleSessionName': 'TapS3CSV',
'ExternalId': config['external_id']
},
cache=JSONFileCache()
)
refreshable_session = Session()
refreshable_session.register_component(
'credential_provider',
CredentialResolver([AssumeRoleProvider(fetcher)])
)
LOGGER.info("Attempting to assume_role on RoleArn: %s", role_arn)
boto3.setup_default_session(botocore_session=refreshable_session)
def get_sampled_schema_for_table(config, table_spec):
LOGGER.info('Sampling records to determine table schema.')
s3_files_gen = get_input_files_for_table(config, table_spec)
samples = [sample for sample in sample_files(config, table_spec, s3_files_gen)]
if skipped_files_count:
LOGGER.warning("%s files got skipped during the last sampling.",skipped_files_count)
if not samples:
#Return empty properties for accept everything from data if no samples found
return {
'type': 'object',
'properties': {}
}
metadata_schema = {
SDC_SOURCE_BUCKET_COLUMN: {'type': 'string'},
SDC_SOURCE_FILE_COLUMN: {'type': 'string'},
SDC_SOURCE_LINENO_COLUMN: {'type': 'integer'},
SDC_EXTRA_COLUMN: {'type': 'array', 'items': {
'anyOf': [{'type': 'object', 'properties': {}}, {'type': 'string'}]}}
}
data_schema = conversion.generate_schema(samples, table_spec)
return {
'type': 'object',
'properties': merge_dicts(data_schema, metadata_schema)
}
def merge_dicts(first, second):
to_return = first.copy()
for key in second:
if key in first:
if isinstance(first[key], dict) and isinstance(second[key], dict):
to_return[key] = merge_dicts(first[key], second[key])
else:
to_return[key] = second[key]
else:
to_return[key] = second[key]
return to_return
def maximize_csv_field_width():
current_field_size_limit = csv.csv.field_size_limit()
field_size_limit = sys.maxsize
if current_field_size_limit != field_size_limit:
csv.csv.field_size_limit(field_size_limit)
LOGGER.info("Changed the CSV field size limit from %s to %s",
current_field_size_limit,
field_size_limit)
def get_records_for_csv(s3_path, sample_rate, iterator):
current_row = 0
sampled_row_count = 0
maximize_csv_field_width()
for row in iterator:
# Skipping the empty line of CSV.
if len(row) == 0:
current_row += 1
continue
if (current_row % sample_rate) == 0:
if row.get(csv.SDC_EXTRA_COLUMN):
row.pop(csv.SDC_EXTRA_COLUMN)
sampled_row_count += 1
if (sampled_row_count % 200) == 0:
LOGGER.info("Sampled %s rows from %s",
sampled_row_count, s3_path)
yield row
current_row += 1
LOGGER.info("Sampled %s rows from %s", sampled_row_count, s3_path)
def get_records_for_jsonl(s3_path, sample_rate, iterator):
current_row = 0
sampled_row_count = 0
for row in iterator:
if (current_row % sample_rate) == 0:
decoded_row = row.decode('utf-8')
if decoded_row.strip():
row = json.loads(decoded_row)
# Skipping the empty json.
if len(row) == 0:
current_row += 1
continue
else:
current_row += 1
continue
sampled_row_count += 1
if (sampled_row_count % 200) == 0:
LOGGER.info("Sampled %s rows from %s",
sampled_row_count, s3_path)
yield row
current_row += 1
LOGGER.info("Sampled %s rows from %s", sampled_row_count, s3_path)
def check_key_properties_and_date_overrides_for_jsonl_file(table_spec, jsonl_sample_records, s3_path):
all_keys = set()
for record in jsonl_sample_records:
keys = record.keys()
all_keys.update(keys)
if table_spec.get('key_properties'):
key_properties = set(table_spec['key_properties'])
if not key_properties.issubset(all_keys):
raise Exception('JSONL file "{}" is missing required key_properties key: {}'
.format(s3_path, key_properties - all_keys))
if table_spec.get('date_overrides'):
date_overrides = set(table_spec['date_overrides'])
if not date_overrides.issubset(all_keys):
raise Exception('JSONL file "{}" is missing date_overrides key: {}'
.format(s3_path, date_overrides - all_keys))
#pylint: disable=global-statement
def sampling_gz_file(table_spec, s3_path, file_handle, sample_rate):
global skipped_files_count
if s3_path.endswith(".tar.gz"):
LOGGER.warning('Skipping "%s" file as .tar.gz extension is not supported',s3_path)
skipped_files_count = skipped_files_count + 1
return []
file_bytes = file_handle.read()
gz_file_obj = gzip.GzipFile(fileobj=io.BytesIO(file_bytes))
try:
gz_file_name = utils.get_file_name_from_gzfile(fileobj=io.BytesIO(file_bytes))
except AttributeError as err:
# If a file is compressed using gzip command with --no-name attribute,
# It will not return the file name and timestamp. Hence we will skip such files.
# We also seen this issue occur when tar is used to compress the file
LOGGER.warning('Skipping "%s" file as we did not get the original file name',s3_path)
skipped_files_count = skipped_files_count + 1
return []
if gz_file_name:
if gz_file_name.endswith(".gz"):
LOGGER.warning('Skipping "%s" file as it contains nested compression.',s3_path)
skipped_files_count = skipped_files_count + 1
return []
gz_file_extension = gz_file_name.split(".")[-1].lower()
return sample_file(table_spec, s3_path + "/" + gz_file_name, io.BytesIO(gz_file_obj.read()), sample_rate, gz_file_extension)
raise Exception('"{}" file has some error(s)'.format(s3_path))
#pylint: disable=global-statement
def sample_file(table_spec, s3_path, file_handle, sample_rate, extension):
global skipped_files_count
# Check whether file is without extension or not
if not extension or s3_path.lower() == extension:
LOGGER.warning('"%s" without extension will not be sampled.',s3_path)
skipped_files_count = skipped_files_count + 1
return []
if extension in ["csv", "txt"]:
# If file object read from s3 bucket file else use extracted file object from zip or gz
file_handle = file_handle._raw_stream if hasattr(file_handle, "_raw_stream") else file_handle #pylint:disable=protected-access
iterator = csv.get_row_iterator(file_handle, table_spec, None, True)
csv_records = []
if iterator:
csv_records = get_records_for_csv(s3_path, sample_rate, iterator)
else:
LOGGER.warning('Skipping "%s" file as it is empty',s3_path)
skipped_files_count = skipped_files_count + 1
return csv_records
if extension == "gz":
return sampling_gz_file(table_spec, s3_path, file_handle, sample_rate)
if extension == "jsonl":
# If file object read from s3 bucket file else use extracted file object from zip or gz
file_handle = file_handle._raw_stream if hasattr(file_handle, "_raw_stream") else file_handle
records = get_records_for_jsonl(
s3_path, sample_rate, file_handle)
check_jsonl_sample_records, records = itertools.tee(
records)
jsonl_sample_records = list(check_jsonl_sample_records)
if len(jsonl_sample_records) == 0:
LOGGER.warning('Skipping "%s" file as it is empty', s3_path)
skipped_files_count = skipped_files_count + 1
check_key_properties_and_date_overrides_for_jsonl_file(
table_spec, jsonl_sample_records, s3_path)
return records
if extension == "zip":
LOGGER.warning('Skipping "%s" file as it contains nested compression.',s3_path)
skipped_files_count = skipped_files_count + 1
return []
LOGGER.warning('"%s" having the ".%s" extension will not be sampled.',s3_path,extension)
skipped_files_count = skipped_files_count + 1
return []
#pylint: disable=global-statement
def get_files_to_sample(config, s3_files, max_files):
"""
Returns the list of files for sampling, it checks the s3_files whether any zip or gz file exists or not
if exists then extract if and append in the list of files
Args:
config dict(): Configuration
s3_files list(): List of S3 Bucket files
Returns:
list(dict()) : List of Files for sampling
|_ s3_path str(): S3 Bucket File path
|_ file_handle StreamingBody(): file object
|_ type str(): Type of file which is used for extracted file
|_ extension str(): extension of file (for normal files only)
"""
global skipped_files_count
sampled_files = []
OTHER_FILES = ["csv","gz","jsonl","txt"]
for s3_file in s3_files:
file_key = s3_file.get('key')
if len(sampled_files) >= max_files:
break
if file_key:
file_name = file_key.split("/").pop()
extension = file_name.split(".").pop().lower()
file_handle = get_file_handle(config, file_key)
# Check whether file is without extension or not
if not extension or file_name.lower() == extension:
LOGGER.warning('"%s" without extension will not be sampled.',file_key)
skipped_files_count = skipped_files_count + 1
elif file_key.endswith(".tar.gz"):
LOGGER.warning('Skipping "%s" file as .tar.gz extension is not supported', file_key)
skipped_files_count = skipped_files_count + 1
elif extension == "zip":
files = compression.infer(io.BytesIO(file_handle.read()), file_name)
# Add only those extracted files which are supported by tap
# Prepare dictionary contains the zip file name, type i.e. unzipped and file object of extracted file
sampled_files.extend([{ "type" : "unzipped", "s3_path" : file_key, "file_handle" : de_file } for de_file in files if de_file.name.split(".")[-1].lower() in OTHER_FILES and not de_file.name.endswith(".tar.gz") ])
elif extension in OTHER_FILES:
# Prepare dictionary contains the s3 file path, extension of file and file object
sampled_files.append({ "s3_path" : file_key , "file_handle" : file_handle, "extension" : extension })
else:
LOGGER.warning('"%s" having the ".%s" extension will not be sampled.',file_key,extension)
skipped_files_count = skipped_files_count + 1
return sampled_files
# pylint: disable=too-many-arguments,global-statement
def sample_files(config, table_spec, s3_files,
sample_rate=5, max_records=1000, max_files=5):
global skipped_files_count
LOGGER.info("Sampling files (max files: %s)", max_files)
for s3_file in itertools.islice(get_files_to_sample(config, s3_files, max_files), max_files):
s3_path = s3_file.get("s3_path","")
file_handle = s3_file.get("file_handle")
file_type = s3_file.get("type")
extension = s3_file.get("extension")
# Check whether the file is extracted from zip file.
if file_type and file_type == "unzipped":
# Append the extracted file name with zip file.
s3_path += "/" + file_handle.name
extension = file_handle.name.split(".")[-1].lower()
LOGGER.info('Sampling %s (max records: %s, sample rate: %s)',
s3_path,
max_records,
sample_rate)
try:
yield from itertools.islice(sample_file(table_spec, s3_path, file_handle, sample_rate, extension), max_records)
except (UnicodeDecodeError,json.decoder.JSONDecodeError):
# UnicodeDecodeError will be raised if non csv file parsed to csv parser
# JSONDecodeError will be reaised if non JSONL file parsed to JSON parser
# Handled both error and skipping file with wrong extension.
LOGGER.warn("Skipping %s file as parsing failed. Verify an extension of the file.",s3_path)
skipped_files_count = skipped_files_count + 1
#pylint: disable=global-statement
def get_input_files_for_table(config, table_spec, modified_since=None):
global skipped_files_count
bucket = config['bucket']
to_return = []
pattern = table_spec['search_pattern']
try:
matcher = re.compile(pattern)
except re.error as e:
raise ValueError(
("search_pattern for table `{}` is not a valid regular "
"expression. See "
"https://docs.python.org/3.9/library/re.html#regular-expression-syntax").format(table_spec['table_name']),
pattern) from e
LOGGER.info(
'Checking bucket "%s" for keys matching "%s"', bucket, pattern)
matched_files_count = 0
unmatched_files_count = 0
max_files_before_log = 30000
for s3_object in list_files_in_bucket(config, table_spec.get('search_prefix')):
key = s3_object['Key']
last_modified = s3_object['LastModified']
if s3_object['Size'] == 0:
LOGGER.warning('Skipping matched file "%s" as it is empty', key)
skipped_files_count = skipped_files_count + 1
unmatched_files_count += 1
continue
if matcher.search(key):
matched_files_count += 1
if modified_since is None or modified_since < last_modified:
LOGGER.info('Will download key "%s" as it was last modified %s',
key,
last_modified)
yield {'key': key, 'last_modified': last_modified}
else:
unmatched_files_count += 1
if (unmatched_files_count + matched_files_count) % max_files_before_log == 0:
# Are we skipping greater than 50% of the files?
if (unmatched_files_count / (matched_files_count + unmatched_files_count)) > 0.5:
LOGGER.warn(("Found %s matching files and %s non-matching files. "
"You should consider adding a `search_prefix` to the config "
"or removing non-matching files from the bucket."),
matched_files_count, unmatched_files_count)
else:
LOGGER.info("Found %s matching files and %s non-matching files",
matched_files_count, unmatched_files_count)
if matched_files_count == 0:
raise Exception("No files found matching pattern {}".format(pattern))
def get_request_timeout(config):
# Get `request_timeout` value from config.
config_request_timeout = config.get('request_timeout')
# if config request_timeout is other than 0,"0" or "" then use request_timeout
if config_request_timeout and float(config_request_timeout):
request_timeout = float(config_request_timeout)
else:
# If value is 0,"0","" or not passed then it set default to 300 seconds.
request_timeout = REQUEST_TIMEOUT
return request_timeout
@retry_pattern
def list_files_in_bucket(config, search_prefix=None):
# Set connect and read timeout for resource
timeout = get_request_timeout(config)
bucket_region_name = config.get('region_name')
client_config = Config(connect_timeout=timeout, read_timeout=timeout, region_name=bucket_region_name)
s3_client = boto3.client('s3', config=client_config)
s3_object_count = 0
max_results = 1000
bucket = config['bucket']
args = {
'Bucket': bucket,
'MaxKeys': max_results,
}
if search_prefix is not None:
args['Prefix'] = search_prefix
paginator = s3_client.get_paginator('list_objects_v2')
pages = 0
for page in paginator.paginate(**args):
pages += 1
LOGGER.debug("On page %s", pages)
s3_object_count += len(page['Contents'])
yield from page['Contents']
if s3_object_count > 0:
LOGGER.info("Found %s files.", s3_object_count)
else:
LOGGER.warning('Found no files for bucket "%s" that match prefix "%s"', bucket, search_prefix)
@retry_pattern
def get_file_handle(config, s3_path):
bucket = config['bucket']
# Set connect and read timeout for resource
timeout = get_request_timeout(config)
client_config = Config(connect_timeout=timeout, read_timeout=timeout)
s3_client = boto3.resource('s3', config=client_config)
s3_bucket = s3_client.Bucket(bucket)
s3_object = s3_bucket.Object(s3_path)
return s3_object.get()['Body']