Skip to content

Commit 3ba3fb4

Browse files
authored
[sdlf-datalakelibrary] simplified datalake-library (#540)
* [sdlf-datalakelibrary] simplified datalake-library * [sdlf-stage-lambda/glue] add DEPRECATED title in README * [sdlf-stage-A/B] simplify and update with datalakeLibrary
1 parent 5f68eea commit 3ba3fb4

File tree

36 files changed

+418
-2087
lines changed

36 files changed

+418
-2087
lines changed

sdlf-datalakeLibrary/README.md

Lines changed: 148 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,151 @@
11
# Datalake Library
2-
The data lake library repository is where a team pushes the transformation code (i.e. business logic) that they wish to apply to their datasets. After each new commit, the repository is automatically packaged into a lambda layer and mounted to the individual Lambda functions of the pipelines belonging to the team. The repository also holds helper functions that automate boiler plate code such as SQS, S3, and DynamoDB operations.
3-
4-
## IMPORTANT
5-
Please ensure that you follow this file structure, with a folder named `python` at the root containing all the Lambda code that should be part of the Layer. The automated build process depends on the file structure being as follows:
6-
7-
./
8-
├── README.md
9-
└── python
10-
└── datalake_library
11-
├── configuration
12-
├── interfaces
13-
├── octagon
14-
├── tests
15-
└── transforms
16-
├── stage_a_transforms
17-
│ ├── light_transform_blueprint.py
18-
│ ├── ...
19-
├── stage_b_transforms
20-
│ ├── heavy_transform_blueprint.py
21-
│ └── ...
22-
└── transform_handler.py
23-
24-
## Adding Transformations
25-
When adding custom transformations to the Lambda Layer, simply add your code to this repository (see example of `light_transform_blueprint.py` in file structure above) in the relevant location (e.g. stage_a_transforms for light transformations in StageA). Any changes to this repository should stay in branches while in development, and once tested/stable, these changes can then be merged into the relevant environment branch (`dev, test or main`). The pipeline will trigger upon commits made to this branch, and release these changes automatically.
2+
3+
A modern, unified Python library for AWS data lake operations. The datalake_library provides clean interfaces for S3, DynamoDB, Step Functions, SQS, and KMS operations with shared boto3 sessions for optimal performance.
4+
5+
## Architecture
6+
7+
The library is built around a unified `DataLakeClient` that provides access to all AWS services through dedicated interfaces:
8+
9+
```
10+
datalake_library/
11+
├── interfaces/ # Service-specific interfaces
12+
│ ├── base_interface.py # BaseInterface
13+
│ ├── s3_interface.py # S3 operations
14+
│ ├── dynamo_interface.py # DynamoDB operations
15+
│ ├── states_interface.py # Step Functions operations
16+
│ ├── sqs_interface.py # SQS operations
17+
│ └── kms_interface.py # KMS operations
18+
├── client.py # Unified DataLakeClient
19+
├── __init__.py # Public API exports
20+
└── commons.py # Shared utilities
21+
```
22+
23+
## Installation
24+
25+
The library is packaged as a Lambda Layer and automatically mounted to Lambda functions. No manual installation required.
26+
27+
## Usage
28+
29+
### Basic Usage
30+
31+
```python
32+
from datalake_library import DataLakeClient
33+
34+
# Initialize client with team/pipeline/stage context
35+
client = DataLakeClient(team="engineering", pipeline="stage", stage="a")
36+
37+
# Access all services through the client
38+
client.s3.download_object(bucket, key)
39+
client.dynamo.update_object_metadata_catalog(metadata)
40+
client.states.run_state_machine(arn, payload)
41+
client.sqs.send_message_to_fifo_queue(message, group_id, queue_url)
42+
client.kms.data_kms_key
43+
```
44+
45+
### Individual Interfaces
46+
47+
```python
48+
from datalake_library import S3Interface, DynamoInterface
49+
50+
# Use individual interfaces with shared session
51+
import boto3
52+
session = boto3.Session()
53+
54+
s3 = S3Interface(team="engineering", session=session)
55+
dynamo = DynamoInterface(team="engineering", session=session)
56+
```
57+
58+
## API Reference
59+
60+
### DataLakeClient
61+
62+
```python
63+
client = DataLakeClient(
64+
team="engineering", # Team name for configuration lookup
65+
dataset="legislators", # Dataset name (optional)
66+
pipeline="stage", # Pipeline name
67+
stage="a", # Stage name
68+
log_level="INFO", # Logging level (optional)
69+
session=None # Boto3 session (optional)
70+
)
71+
```
72+
73+
### S3Interface
74+
75+
```python
76+
# Download object to /tmp
77+
local_path = client.s3.download_object(bucket, key)
78+
79+
# Upload object with KMS encryption
80+
client.s3.upload_object(local_path, bucket, key, kms_key=kms_key)
81+
82+
# Copy object between buckets
83+
client.s3.copy_object(src_bucket, src_key, dest_bucket, dest_key)
84+
85+
# Get object metadata
86+
size, last_modified = client.s3.get_size_and_last_modified(bucket, key)
87+
88+
# Access bucket names
89+
client.s3.raw_bucket
90+
client.s3.stage_bucket
91+
client.s3.analytics_bucket
92+
```
93+
94+
### DynamoInterface
95+
96+
```python
97+
# Update object metadata catalog
98+
metadata = {
99+
"bucket": bucket,
100+
"key": key,
101+
"team": team,
102+
"dataset": dataset
103+
}
104+
client.dynamo.update_object_metadata_catalog(metadata)
105+
106+
# Put item to any table
107+
client.dynamo.put_item(table_name, item)
108+
109+
# Access table names
110+
client.dynamo.object_metadata_table
111+
client.dynamo.manifests_table
112+
```
113+
114+
### StatesInterface
115+
116+
```python
117+
# Run state machine
118+
response = client.states.run_state_machine(arn, payload)
119+
120+
# Access state machine ARN
121+
client.states.state_machine_arn
122+
```
123+
124+
### SQSInterface
125+
126+
```python
127+
# Send message to FIFO queue
128+
client.sqs.send_message_to_fifo_queue(message, group_id, queue_url)
129+
130+
# Receive messages
131+
messages = client.sqs.receive_messages(max_messages, queue_url)
132+
133+
# Access queue URLs
134+
client.sqs.stage_queue_url
135+
client.sqs.stage_dlq_url
136+
```
137+
138+
### KMSInterface
139+
140+
```python
141+
# Access KMS key ARN
142+
kms_key = client.kms.data_kms_key
143+
```
26144

27145
## Pipeline
28-
The CICD pipeline for this repository is defined in the `sdlf-team` repository for each team (`nested-stacks/template-cicd.yaml`). A CodeBuild job is used to package the code in this repository into a `.zip` file, while leaving out any `__pycache__` files, which is then published as a Lambda Layer. Due to limitations on the size of packages, the code in this repository must not exceed 50MB when zipped, and no more than 250Mb unzipped.
29-
30-
Configuration details, e.g. the name of the Lambda Layer built from this repository, will be defined in the template containing the **sdlf-pipeline** infrastructure. Some of the configuration details available for customization:
31-
1. Through the pipeline:
32-
1. Main Git branch to use — currently set to `dev`
33-
2. Through the CodeBuild job:
34-
1. Name of the resulting Layer
35-
2. Compatible runtimes
36-
3. SSM parameter used to store the ARN of the latest version
146+
147+
The library is automatically packaged into a Lambda Layer via CodeBuild when changes are committed to the environment branch (`dev`, `test`, or `main`). The layer is then made available to all Lambda functions in the data lake pipeline.
148+
149+
**Size Limits:**
150+
- Zipped: 50MB maximum
151+
- Unzipped: 250MB maximum
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from .client import DataLakeClient
2+
from .interfaces import DynamoInterface, KMSInterface, S3Interface, SQSInterface, StatesInterface
3+
4+
__all__ = ["DataLakeClient", "S3Interface", "DynamoInterface", "StatesInterface", "SQSInterface", "KMSInterface"]
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import boto3
2+
3+
from .interfaces import DynamoInterface, KMSInterface, S3Interface, SQSInterface, StatesInterface
4+
5+
6+
class DataLakeClient:
7+
def __init__(self, team=None, dataset=None, pipeline=None, stage=None, log_level=None, session=None):
8+
"""
9+
Unified client for all data lake operations with shared boto3 session
10+
"""
11+
# Shared session across all interfaces
12+
self.session = session or boto3.Session()
13+
14+
# Initialize all interfaces with shared session
15+
self.s3 = S3Interface(team, dataset, pipeline, stage, log_level, self.session)
16+
self.dynamo = DynamoInterface(team, dataset, pipeline, stage, log_level, self.session)
17+
self.states = StatesInterface(team, dataset, pipeline, stage, log_level, self.session)
18+
self.sqs = SQSInterface(team, dataset, pipeline, stage, log_level, self.session)
19+
self.kms = KMSInterface(team, dataset, pipeline, stage, log_level, self.session)

sdlf-datalakeLibrary/python/datalake_library/commons.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
from typing import TYPE_CHECKING, Any, Dict, Mapping, Optional
33

4-
from boto3.dynamodb.types import TypeDeserializer, TypeSerializer
4+
from boto3.dynamodb.types import TypeSerializer
55

66
if TYPE_CHECKING:
77
from mypy_boto3_dynamodb.type_defs import (
@@ -23,10 +23,3 @@ def serialize_dynamodb_item(
2323
) -> Dict[str, "AttributeValueTypeDef"]:
2424
serializer = serializer if serializer else TypeSerializer()
2525
return {k: serializer.serialize(v) for k, v in item.items()}
26-
27-
28-
def deserialize_dynamodb_item(
29-
item: Mapping[str, "AttributeValueTypeDef"], deserializer: Optional[TypeDeserializer] = None
30-
) -> Dict[str, Any]:
31-
deserializer = deserializer if deserializer else TypeDeserializer()
32-
return {k: deserializer.deserialize(v) for k, v in item.items()}

sdlf-datalakeLibrary/python/datalake_library/data_quality/schema_validator.py

Lines changed: 0 additions & 164 deletions
This file was deleted.

0 commit comments

Comments
 (0)