Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ assets/cloudwatch-dashboard.rendered.json
samconfig.toml
.aws-sam
.env.local.json
events/my.event.json
events/my.event.json
lambda/tests/.pytest_cache
lambda/tests/test_db
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ This repository provides you with a sample solution that collects metrics of exi
### Solution Tenets
* Solution is designed to provide time-series metrics for Apache Iceberg to monitor Apache Iceberg tables over-time to recognize trends and anomalies.
* Solution is designed to be lightweight and collect metrics exclusively from Apache Iceberg metadata layer without scanning the data layer hense without the need for heavy compute capacity.
* In the future we strive to reduce the dependency on AWS Glue in favor of using AWS Lambda compute when required features are available in [PyIceberg](https://py.iceberg.apache.org) library.

### Technical implementation

Expand Down Expand Up @@ -235,6 +234,15 @@ sam local invoke IcebergMetricsLambda --env-vars .env.local.json
`.env.local.json` - The JSON file that contains values for the Lambda function's environment variables. Lambda code is dependent on env vars that you are passing in the deploy section. You need to create the file it and include relevant [parameters](#parameters) before you calling `sam local invoke`.


### Unit Tests

You can test the metrics generation locally through unit-tests. From lambda folder -

```bash
docker build -f tests/Dockerfile -t iceberg-metrics-tests .
docker run --rm iceberg-metrics-tests
```

## Dependencies

PyIceberg is a Python implementation for accessing Iceberg tables, without the need of a JVM. \
Expand Down
272 changes: 59 additions & 213 deletions lambda/app.py

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion lambda/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ boto3==1.34.51
botocore==1.34.51
pyiceberg[s3fs,glue]
pandas
typing_extensions
typing_extensions
pyarrow
21 changes: 21 additions & 0 deletions lambda/tests/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Use an official Python runtime as a parent image
FROM python:3.10-slim

# Set the working directory in the container
WORKDIR /lambda

# Copy the current directory contents into the container at /app
COPY . /lambda

# Install any needed packages specified in requirements.txt
RUN pip install --no-cache-dir -r tests/requirements.txt

# Install SQLite
RUN apt-get update && \
apt-get install -y sqlite3

ENV CW_NAMESPACE=TestNamespace \
PYTHONPATH=/lambda

# Run the test suite
CMD ["pytest", "/lambda/tests/test_app.py"]
Empty file added lambda/tests/__init__.py
Empty file.
8 changes: 8 additions & 0 deletions lambda/tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
boto3==1.34.51
botocore==1.34.51
pyiceberg[s3fs,glue]==0.7.0
SQLAlchemy==2.0.30
pyarrow==17.0.0
pandas==2.2.2
pytest==7.1.2
unittest2==1.1.0
114 changes: 114 additions & 0 deletions lambda/tests/test_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import os
import shutil
import unittest
from numbers import Number
from unittest.mock import patch
from pyiceberg.schema import Schema
from pyiceberg.types import LongType, StringType, NestedField
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.catalog.sql import SqlCatalog
import numpy as np
import pyarrow as pa
from app import send_files_metrics, send_partition_metrics, send_snapshot_metrics

# Mock AWS credentials
os.environ['AWS_ACCESS_KEY_ID'] = 'testing'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'testing'
os.environ['AWS_SECURITY_TOKEN'] = 'testing'
os.environ['AWS_SESSION_TOKEN'] = 'testing'

class TestIcebergMetrics(unittest.TestCase):

@patch.dict(os.environ, {'CW_NAMESPACE': 'TestNamespace'})
def setUp(self):
self.schema = Schema(
NestedField(1, 'id', LongType(), False),
NestedField(2, 'data', StringType(), False)
)
self.partition_spec = PartitionSpec(
fields=[
PartitionField(source_id=2, field_id=1000, name="data", transform="identity")
]
)

catalog_path = './tests/test_db'
if os.path.exists(catalog_path):
shutil.rmtree(catalog_path)
os.makedirs(catalog_path)
warehouse_path = os.path.abspath(catalog_path)
self.catalog = SqlCatalog(
"default",
**{
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"file://{warehouse_path}",
},
)
self.catalog.create_namespace('default')
self.catalog.create_table(
'default.test_table',
schema=self.schema,
partition_spec=self.partition_spec
)

# Load the table and insert some data
self.table = self.catalog.load_table(('default', 'test_table'))
self.update_table(0, 5)


def create_arrow_table(self, range_start, range_end):
data = {
'id': pa.array(range(range_start, range_end), pa.int64()),
'data': pa.array(['data' + str(i) for i in range(range_start, range_end)], pa.string())
}
return pa.Table.from_pydict(data)


def assert_metrics(self, expected, table, snapshot, method_to_test):
def send_metrics_stub(metrics, namespace, table, snapshot):
metrics = {k: v.item() if not isinstance(v, Number) else v for k, v in metrics.items()}
self.assertDictEqual(metrics, expected)

with patch('app.send_metrics', side_effect=send_metrics_stub):
method_to_test(table, snapshot)


def test_send_files_metrics(self):
expected_file_metrics = {'avg_record_count': np.int64(1), 'max_record_count': np.int64(1), 'min_record_count': np.int64(1), 'avg_file_size': np.int64(1068), 'max_file_size': np.int64(1068), 'min_file_size': np.int64(1068)}
self.assert_metrics(expected_file_metrics, self.table, self.snapshot, send_files_metrics)


@patch('app.send_custom_metric')
def test_send_partition_metrics(self, mock_send_custom_metric):
expected_partition_metrics = {'avg_record_count': np.int64(1), 'max_record_count': np.int64(1), 'min_record_count': np.int64(1), 'deviation_record_count': np.float64(0.0), 'skew_record_count': np.float64(0.0), 'avg_file_count': np.int64(1), 'max_file_count': np.int64(1), 'min_file_count': np.int64(1), 'deviation_file_count': np.float64(0.0), 'skew_file_count': np.float64(0.0)}
self.assert_metrics(expected_partition_metrics, self.table, self.snapshot, send_partition_metrics)


def test_send_snapshot_metrics(self):
expected_snapshot_metrics = {'added_data_files': 5, 'added_records': 5, 'changed_partition_count': 5, 'total_records': 5, 'total_data_files': 5, 'total_delete_files': 0, 'added_files_size': 5340, 'total_files_size': 5340, 'added_position_deletes': 0}
self.assert_metrics(expected_snapshot_metrics, self.table, self.snapshot, send_snapshot_metrics)


def update_table(self, range_start, range_end):
# Perform an update operation on the Iceberg table
arrow_table = self.create_arrow_table(range_start, range_end)
self.table.append(arrow_table)
self.table.refresh()
self.snapshot = self.table.current_snapshot()


@patch('app.send_custom_metric')
def test_metrics_after_update(self, mock_send_custom_metric):
self.update_table(5, 10)

expected_file_metrics = {'avg_record_count': np.int64(1), 'max_record_count': np.int64(1), 'min_record_count': np.int64(1), 'avg_file_size': np.int64(1068), 'max_file_size': np.int64(1068), 'min_file_size': np.int64(1068)}
self.assert_metrics(expected_file_metrics, self.table, self.snapshot, send_files_metrics)

expected_partition_metrics = {'avg_record_count': np.int64(1), 'max_record_count': np.int64(1), 'min_record_count': np.int64(1), 'deviation_record_count': np.float64(0.0), 'skew_record_count': np.float64(0.0), 'avg_file_count': np.int64(1), 'max_file_count': np.int64(1), 'min_file_count': np.int64(1), 'deviation_file_count': np.float64(0.0), 'skew_file_count': np.float64(0.0)}
self.assert_metrics(expected_partition_metrics, self.table, self.snapshot, send_partition_metrics)

expected_snapshot_metrics = {'added_data_files': 5, 'added_records': 5, 'changed_partition_count': 5, 'total_records': 10, 'total_data_files': 10, 'total_delete_files': 0, 'added_files_size': 5340, 'total_files_size': 10680, 'added_position_deletes': 0}
self.assert_metrics(expected_snapshot_metrics, self.table, self.snapshot, send_snapshot_metrics)


if __name__ == '__main__':
unittest.main()