Utility package to query multiple S3 objects using S3 Select. More information on AWS S3 Select: https://docs.aws.amazon.com/AmazonS3/latest/userguide/selecting-content-from-objects.html
- Engine modes:
- Parallel - each file gets queries in a separate process
- Sequential - all files get queried sequentially
- User defined query engine (more on this below)
- Cost estimation (before query) and calculation (after query)
- Possibility to add user defined functions at process level (useful for in-flight transformations)
- Support for formats: JSON, CSV and Parquet files
- Support for compressions: GZIP, BZIP
- Support for Input and Output Serialization using SerializerTypes of dictionary config.
- Support for user defined SQL Query
- Scan Range functionality exposed
- SSE functionality exposed
- Ability to select profile for S3 connections
The parallelisation of the file query allows one to get to time to query multiple files much faster. The performance test below has been performed.
For many small files of different types the following results have been achieved on an 8 core CPU (network transfer speeds ignored):
| case | engine | file_type | files | total_size | columns | responses | time_taken_sec | cost |
|--------+------------+-------------+---------+--------------+-----------+-------------+------------------+-------------|
| 1 | sequential | json | 200 | 100 MB | 1 | 200 | 42.53 | 0.000232982 |
| 2 | parallel | json | 200 | 100 MB | 1 | 200 | 5.79 | 0.000232982 |
| 3 | sequential | json | 200 | 100 MB | 5 | 200 | 44.71 | 0.000256532 |
| 4 | parallel | json | 200 | 100 MB | 5 | 200 | 6.81 | 0.000256532 |
| 5 | sequential | csv | 100 | 100 MB | 1 | 100 | 26.54 | 0.000247681 |
| 6 | parallel | csv | 100 | 100 MB | 1 | 100 | 7.14 | 0.000247681 |
| 7 | sequential | csv | 100 | 100 MB | 5 | 100 | 38.26 | 0.000373264 |
| 8 | parallel | csv | 100 | 100 MB | 5 | 100 | 16.2 | 0.000373264 |
| 9 | sequential | parquet | 100 | 100 MB | 1 | 100 | 43.79 | 7.39638e-05 |
| 10 | parallel | parquet | 100 | 100 MB | 1 | 100 | 21.05 | 7.39638e-05 |
| 11 | sequential | parquet | 100 | 100 MB | 5 | 100 | 64.34 | 0.00034574 |
| 12 | parallel | parquet | 100 | 100 MB | 5 | 100 | 28.34 | 0.00034574 |
| 13 | sequential | json | 2000 | 1 GB | 1 | 2000 | 552.23 | 0.00232982 |
| 14 | parallel | json | 2000 | 1 GB | 1 | 2000 | 330.23 | 0.00232982 |
pip3 install s3select_plusBy default, this is a parallel process querying JSON files.
from select_plus import SSP
ssp = SSP(
bucket_name='bucket-name',
prefix='s3-key-prefix'
)
est_cost = ssp.estimate_cost()
print(f'Estimated Cost: ${format(est_cost, "f")}')
# The line below must be added when executing using the ParallelEngine.
# Reason for this is that each file will be queried into a different process.
# The "multiprocessing" python package is restricted to this.
if __name__ == '__main__':
result = ssp.select(
threads=8,
sql_query='SELECT * FROM s3object[*] s'
)
print(result.payload)The "extra function" can be defined to do extra steps for each result from a single SQL query in a process. For example, if one needs to do some processing or transformation of the results before all the results are combined into the final result. The "extra function" also supports "extra function arguments" to be passed to the function.
from select_plus import SSP
ssp = SSP(
bucket_name='bucket-name',
prefix='s3-key-prefix'
)
def transform(response, arg1, arg2):
# Assuming the response from the query looks like: {"column1": 1}
# response = {"column": 1}
response['new_column'] = arg1
response['newer_column'] = arg2
# This function must always return something
return response
if __name__ == '__main__':
result = ssp.select(
threads=8,
sql_query='SELECT * FROM s3object[*] s',
extra_func=transform,
extra_func_args={
"arg1": 1,
"arg2": 2
}
)
print(result.payload)from select_plus import SSP, SequentialEngine
ssp = SSP(
bucket_name='bucket-name',
prefix='s3-key-prefix',
engine=SequentialEngine
)
if __name__ == '__main__':
result = ssp.select(
threads=8,
sql_query='SELECT * FROM s3object[*] s'
)
print(result.payload)from select_plus import SSP, ParallelEngine
ssp = SSP(
bucket_name='bucket-name',
prefix='s3-key-prefix',
engine=ParallelEngine
)
if __name__ == '__main__':
result = ssp.select(
threads=8,
sql_query='SELECT * FROM s3object[*] s'
)
print(result.payload)
print(result.stats.cost) # dollars
print(result.stats.bytes_processed)
print(result.stats.bytes_returned)
print(result.stats.bytes_scanned)
print(result.stats.files_processed)The select_object_content S3 API call will expect some serialization parameters depending on the types of files that are queried.
Serialization works by default with all engine types.
A full list of all the parameters accessible for the serializers is better described here:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.select_object_content
To query JSON files with specific types and parameters one can use the pre-defined Serialization objects
from select_plus import SSP
from select_plus.serializers import CompressionTypes, InputSerialization, OutputSerialization, JSONInputSerialization, JSONOutputSerialization
ssp = SSP(
bucket_name='bucket-name',
prefix='s3-key-prefix'
)
if __name__ == '__main__':
result = ssp.select(
threads=8,
sql_query='SELECT * FROM s3object[*] s',
input_serialization=InputSerialization(
compression_type=CompressionTypes.gzip,
json=JSONInputSerialization(
Type='DOCUMENT'
)
),
output_serialization=OutputSerialization(
json=JSONOutputSerialization(
record_delimiter='\n'
)
)
)
print(result.payload)To query CSV files with specific types and parameters one can use the pre-defined Serialization objects
from select_plus import SSP
from select_plus.serializers import CompressionTypes, InputSerialization, OutputSerialization, CSVInputSerialization, CSVOutputSerialization
ssp = SSP(
bucket_name='bucket-name',
prefix='s3-key-prefix'
)
if __name__ == '__main__':
result = ssp.select(
threads=8,
sql_query='SELECT * FROM s3object[*] s',
input_serialization=InputSerialization(
compression_type=CompressionTypes.gzip,
csv=CSVInputSerialization(
file_header_info='USE'
)
),
output_serialization=OutputSerialization(
csv=CSVOutputSerialization(
record_delimiter='\n'
)
)
)
print(result.payload)To query parquet files with specific types and parameters one can use the pre-defined Serialization objects
from select_plus import SSP
from select_plus.serializers import CompressionTypes, InputSerialization, OutputSerialization, CSVOutputSerialization
ssp = SSP(
bucket_name='bucket-name',
prefix='s3-key-prefix'
)
if __name__ == '__main__':
result = ssp.select(
threads=8,
sql_query='SELECT * FROM s3object[*] s',
input_serialization=InputSerialization(
compression_type=CompressionTypes.gzip,
parquet={}
),
output_serialization=OutputSerialization(
csv=CSVOutputSerialization(
record_delimiter='\n'
)
)
)
print(result.payload)One can use a simple JSON format to input the serialization types if the Serialization objects should be a part of a configuration file for example.
from select_plus import SSP
ssp = SSP(
bucket_name='bucket-name',
prefix='s3-key-prefix'
)
if __name__ == '__main__':
result = ssp.select(
threads=8,
sql_query='SELECT * FROM s3object[*] s',
input_serialization={
"CompressionType": 'GZIP',
"JSON": {
"Type": "LINES"
}
},
output_serialization={
"JSON": {
'RecordDelimiter': '\n'
}
}
)
print(result.payload)One downside to this package is that the S3 client cannot be treated as an input into the main call. The reason is that each individual S3 client must be initialised once per process (restricted by AWS) and cannot be pickled. To circumvent this problem, one can create their own engine where they can implement their own S3 client (or resource).
from select_plus import SSP, BaseEngine
from select_plus.src.aws.s3 import S3
class MyCustomEngine(BaseEngine):
def execute(self, sql_query: str, extra_func: callable = None, extra_func_args: dict = None) -> list:
pass
def _make_func_args(self, sql_query: str, extra_func: callable, extra_func_args: dict):
"""
Gets a list of all keys to be processed
"""
s3 = S3(client='my-custom-client') # This is where you can customize your own S3 client. Even change the entire S3 functionality.
s3_keys = s3.list_objects(bucket_name=self.bucket_name, prefix=self.prefix)
keys = s3_keys['keys']
func_args = [(key, sql_query, extra_func, extra_func_args) for key in keys]
return func_args
def _make_func(self, key: str, sql_query: str, extra_func: callable, extra_func_args: dict):
"""
Performs the SQL query against one single Key.
This process runs as a single process.
As result, all boto initialization must happen inside this function.
It also applies any extra functions added by the user.
"""
s3 = S3() # This is where you can customize your own S3 client. Even change the entire S3 functionality.
response = s3.select(bucket_name=self.bucket_name, key=key, sql_string=sql_query)
if extra_func:
response = self._apply_extra_func(response, extra_func, extra_func_args)
return response
def _wrapper_func(self, args):
"""
This is a wrapper for the function to be executed inside the thread.
The reason why this exists, is because if the tqdm bar is to exist, then tqdm doesn't work with "pool.starmap".
As result, one cannot pass multiple parameters to the function.
"""
result = self._make_func(*args)
return result
@staticmethod
def _apply_extra_func(response: dict, extra_func, extra_func_args):
"""
A user has the possibility of adding an additional function at each thread level to process each chunk of data
before it merges the results from all threads.
Allow the function to access only the payload but not the statistics.
This way, the cost can be computed in the compilation of the results after the proceses have ended.
"""
block_response = {
"stats": response['stats'],
"payload": None
}
func_response = extra_func(response['payload'], **extra_func_args)
block_response['payload'] = func_response
return block_response
ssp = SSP(
bucket_name='bucket-name',
prefix='s3-key-prefix',
engine=MyCustomEngine
)
if __name__ == '__main__':
result = ssp.select(
threads=8,
sql_query='SELECT * FROM s3object[*] s'
)
print(result.payload)Similarly, one can develop new engines. For example using Dask or PySpark.