Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .github/workflows/totes_bags.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ jobs:

- name: Install Poetry
run: curl -sSL https://install.python-poetry.org | python3 -
# uses: snok/install-poetry@v1

- name: Install dependencies
run: make requirements
Expand All @@ -43,8 +42,8 @@ jobs:
- name: Coverage tests
run: make check-coverage

# - name: Pylint on src
# run: make run-pylint
- name: Pylint
run: make run-pylint


deploy:
Expand Down
17 changes: 7 additions & 10 deletions src/load/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def connect_to_db(credentials):
)
return connection
except Exception as e:
logger.error(f"Error connecting to the database: {e}")
logger.error("Error connecting to the database: %s", e)
raise

def process_gzip_file(connection, bucket_name, object_key, credentials):
Expand All @@ -39,17 +39,15 @@ def process_gzip_file(connection, bucket_name, object_key, credentials):
s3_object = s3_client.get_object(Bucket=bucket_name, Key=object_key)
with gzip.GzipFile(fileobj=io.BytesIO(s3_object['Body'].read())) as gzipfile:
content = gzipfile.read().decode('utf-8')

table_name = determine_table_name(object_key)

if table_name:
logger.info(f"Inserting data into table {table_name}")
logger.info("Inserting data into table %s", table_name)
insert_data_into_db(connection, content, credentials['schema'], table_name)
else:
logger.warning(f"Unrecognized file {object_key}. Skipping.")
logger.warning("Unrecognized file %s. Skipping.", object_key)

except Exception as e:
logger.error(f"Error processing file {object_key} from bucket {bucket_name}: {e}")
logger.error("Error processing file %s from bucket %s: %s", object_key, bucket_name, e)
raise

def determine_table_name(object_key):
Expand Down Expand Up @@ -80,12 +78,11 @@ def insert_data_into_db(connection, content, schema, table_name):
for row in data:
values = row.split(',')
cursor.execute(insert_query, values)

connection.commit()
logger.info(f"Data successfully inserted into table {table_name}")
logger.info("Data successfully inserted into table %s", table_name)
except Exception as e:
logger.error(f"Error inserting data into table {table_name}: {e}")
logger.error("Error inserting data into table %s: %s", table_name, e)
connection.rollback()
raise
finally:
cursor.close()
cursor.close()
3 changes: 2 additions & 1 deletion src/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# pylint: disable=unused-argument
def lambda_handler(event, context):
"""Lambda function that retrieves secrets from AWS, uses them to connect
to our data warehouse, connects to a processed s3 bucket, and uploads
Expand Down Expand Up @@ -39,5 +40,5 @@ def lambda_handler(event, context):
'body': 'Files processed successfully.'
}
except Exception as e:
logger.error(f"Error processing event: {e}")
logger.error("Error processing event: %s", e)
raise