diff --git a/.gitignore b/.gitignore index eafe382..1b1b67d 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,8 @@ assets/cloudwatch-dashboard.rendered.json samconfig.toml .aws-sam .env.local.json -events/my.event.json \ No newline at end of file +events/my.event.json +lambda/tests/.pytest_cache +lambda/tests/test_db +lambda/tests/__pycache__ +lambda/__pycache__ \ No newline at end of file diff --git a/README.md b/README.md index 91215bf..154e2e1 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,6 @@ This repository provides you with a sample solution that collects metrics of exi ### Solution Tenets * Solution is designed to provide time-series metrics for Apache Iceberg to monitor Apache Iceberg tables over-time to recognize trends and anomalies. * Solution is designed to be lightweight and collect metrics exclusively from Apache Iceberg metadata layer without scanning the data layer hense without the need for heavy compute capacity. -* In the future we strive to reduce the dependency on AWS Glue in favor of using AWS Lambda compute when required features are available in [PyIceberg](https://py.iceberg.apache.org) library. ### Technical implementation @@ -90,35 +89,43 @@ https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/i ### Build and Deploy -> ! Important - The guidance below uses AWS Serverless Application Model (SAM) for easier packaging and deployment of AWS Lambda. However if you use your own packaging tool or if you want to deploy AWS Lambda manually you can explore following files: +> ! Important - The guidance below uses AWS Serverless Application Model (SAM) and Amazon ECR for easier packaging and deployment of AWS Lambda. However if you use your own packaging tool or if you want to deploy AWS Lambda manually you can explore following files: > - template.yaml > - lambda/requirements.txt > - lambda/app.py +> Once you've installed [Docker](#install-docker) and [SAM CLI](#install-sam-cli) you are ready to build the AWS Lambda. Open your terminal and run command below. -#### 1. Build AWS Lambda using AWS SAM CLI - -Once you've installed [Docker](#install-docker) and [SAM CLI](#install-sam-cli) you are ready to build the AWS Lambda. Open your terminal and run command below. +#### 1. Build and Deploy Script ```bash +export CLOUDWATCH_NAMESPACE={{ cw_namespace }} +export AWS_REGION={{ aws_region }} +export aws_account_id={{ aws_account_id }} +export ecr_repository_name={{ repository_name }} +export STACK_NAME={{ your_stack_name }} +export S3_ARTIFACTS_BUCKET_NAME={{ s3_bucket_name }} +export S3_ARTIFACTS_PATH={{ s3_bucket_path }} +export ecr_repository_uri=${aws_account_id}.dkr.ecr.$AWS_REGION.amazonaws.com/${ecr_repository_name} + +docker build -f Dockerfile --platform linux/amd64 -t ${ecr_repository_name}:main --build-arg CLOUDWATCH_NAMESPACE=$CLOUDWATCH_NAMESPACE . sam build --use-container -``` - -#### 2. Deploy AWS Lambda using AWS SAM CLI - -Once build is finished you can deploy your AWS Lambda. SAM will upload packaged code and deploy AWS Lambda resource using AWS CloudFormation. Run below command using your terminal. - -```bash -sam deploy --guided +aws ecr get-login-password --region $AWS_REGION | docker login --username AWS --password-stdin ${aws_account_id}.dkr.ecr.us-east-1.amazonaws.com +aws ecr create-repository --repository-name $ecr_repository_name --region $AWS_REGION --image-scanning-configuration scanOnPush=true --image-tag-mutability MUTABLE +docker tag ${ecr_repository_name}:main ${ecr_repository_uri}:latest +docker push ${ecr_repository_uri}:latest + +sam deploy --guided --debug --region $AWS_REGION \ + --parameter-overrides ImageURL=${ecr_repository_uri}:latest \ + --image-repository $ecr_repository_uri \ + --stack-name $STACK_NAME --capabilities CAPABILITY_IAM CAPABILITY_AUTO_EXPAND \ + --s3-bucket $S3_ARTIFACTS_BUCKET_NAME --s3-prefix $S3_ARTIFACTS_PATH ``` ##### Parameters -- `CWNamespace` - A namespace is a container for CloudWatch metrics. -- `GlueServiceRole` - AWS Glue Role arn you created [earlier](#configuring-iam-permissions-for-aws-glue). -- `Warehouse` - Required catalog property to determine the root path of the data warehouse on S3. This can be any path on your S3 bucket. Not critical for the solution. +- `CLOUDWATCH_NAMESPACE` - A namespace is a container for CloudWatch metrics. - -#### 3. Configure EventBridge Trigger +#### 2. Configure EventBridge Trigger In this section you will configure EventBridge Rule that will trigger Lambda function on every transaction commit to Apache Iceberg table. Default rule listens to `Glue Data Catalog Table State Change` event from all the tables in Glue Data Catalog catalog. Lambda code knows to skip non-iceberg tables. @@ -168,7 +175,7 @@ events_client.put_targets( print(f"Pattern updated = {event_pattern_dump}") ``` -#### 4. (Optional) Create CloudWatch Dashboard +#### 3. (Optional) Create CloudWatch Dashboard Once your Iceberg Table metrics are submitted to CloudWatch you can start using them to monitor and create alarms. CloudWatch also let you visualize metrics using CloudWatch Dashboards. `assets/cloudwatch-dashboard.template.json` is a sample CloudWatch dashboard configuration that uses fraction of the submitted metrics and combines it with AWS Glue native metrics for Apache Iceberg. @@ -235,6 +242,16 @@ sam local invoke IcebergMetricsLambda --env-vars .env.local.json `.env.local.json` - The JSON file that contains values for the Lambda function's environment variables. Lambda code is dependent on env vars that you are passing in the deploy section. You need to create the file it and include relevant [parameters](#parameters) before you calling `sam local invoke`. +### Unit Tests + +You can test the metrics generation locally through unit-tests. From lambda folder - + +```bash +cd lambda +docker build -f tests/Dockerfile -t iceberg-metrics-tests . +docker run --rm iceberg-metrics-tests +``` + ## Dependencies PyIceberg is a Python implementation for accessing Iceberg tables, without the need of a JVM. \ diff --git a/assets/arch.png b/assets/arch.png index 9bfe91f..816d57e 100644 Binary files a/assets/arch.png and b/assets/arch.png differ diff --git a/lambda/Dockerfile b/lambda/Dockerfile new file mode 100644 index 0000000..1432af8 --- /dev/null +++ b/lambda/Dockerfile @@ -0,0 +1,12 @@ +FROM public.ecr.aws/lambda/python:3.10 + +COPY . ${LAMBDA_TASK_ROOT} + +# Install the function's dependencies +RUN pip install --upgrade pip && \ + pip install -r requirements.txt + +ARG CLOUDWATCH_NAMESPACE +ENV CW_NAMESPACE=$CLOUDWATCH_NAMESPACE + +CMD [ "app.lambda_handler" ] \ No newline at end of file diff --git a/lambda/app.py b/lambda/app.py index f0de489..64add29 100644 --- a/lambda/app.py +++ b/lambda/app.py @@ -1,20 +1,17 @@ -import json import boto3 +import numpy as np from datetime import datetime, timezone from pyiceberg.catalog.glue import GlueCatalog +from pyiceberg.table import Table +from pyiceberg.table.snapshots import Snapshot import os -import time -import uuid -import pandas as pd -import numpy as np +import pyarrow.compute as pc import logging logger = logging.getLogger() logger.setLevel(logging.INFO) -glue_client = boto3.client('glue') - -required_vars = ['CW_NAMESPACE', 'GLUE_SERVICE_ROLE', 'SPARK_CATALOG_S3_WAREHOUSE'] +required_vars = ['CW_NAMESPACE'] for var in required_vars: # Retrieve the environment variable value if os.getenv(var) is None: @@ -22,12 +19,6 @@ raise EnvironmentError(f"Required environment variable '{var}' is not set.") cw_namespace = os.environ.get('CW_NAMESPACE') -glue_service_role = os.environ.get('GLUE_SERVICE_ROLE') -warehouse_path = os.environ.get('SPARK_CATALOG_S3_WAREHOUSE') - -glue_session_tags = { - "app": "monitor-iceberg" -} def send_custom_metric( metric_name, dimensions, value, unit, namespace, timestamp=None): """ @@ -58,104 +49,51 @@ def send_custom_metric( metric_name, dimensions, value, unit, namespace, timesta Namespace=namespace, MetricData=[metric_data] ) - -def wait_for_session(session_id,interval=1): - while True: - response = glue_client.get_session( - Id=session_id - ) - status = response["Session"]["Status"] - if status in ['READY','FAILED','TIMEOUT','STOPPED']: - logger.info(f"Session {session_id} status={status}") - break - time.sleep(interval) - -def wait_for_statement(session_id,statement_id,interval=1): - while True: - response = glue_client.get_statement( - SessionId=session_id, - Id=statement_id, - ) - status = response["Statement"]["State"] - if status in ['AVAILABLE','CANCELLED','ERROR']: - logger.info(f"Statement status={status}") - return response - time.sleep(interval) - - - -def parse_spark_show_output(output): - lines = output.strip().split('\n') - header = lines[1] # Column names are typically in the second line - columns = [col.strip() for col in header.split('|') if col.strip()] # Clean and split by '|' - - data = [] - # Start reading data from the third line and skip the last line which is a border - for row in lines[3:-1]: - # Remove border and split - row_data = [cell.strip() for cell in row.split('|') if cell.strip()] - if row_data: - data.append(row_data) - # Create DataFrame - return pd.DataFrame(data, columns=columns) +def normalize_metrics(metrics: dict): + return {k: v.item() if isinstance(v, np.int64) or isinstance(v, np.float64) + else v + for k, v in metrics.items()} -def send_files_metrics(glue_db_name, glue_table_name, snapshot,session_id): - sql_stmt = f"SELECT CAST(AVG(record_count) as INT) as avg_record_count, MAX(record_count) as max_record_count, MIN(record_count) as min_record_count, CAST(AVG(file_size_in_bytes) as INT) as avg_file_size, MAX(file_size_in_bytes) as max_file_size, MIN(file_size_in_bytes) as min_file_size FROM glue_catalog.{glue_db_name}.{glue_table_name}.files" - run_stmt_response = glue_client.run_statement( - SessionId=session_id, - Code=f"df = spark.sql(\"{sql_stmt}\");df.show(df.count(),truncate=False)" - ) - stmt_id = run_stmt_response["Id"] - logger.info(f"select files statement_id={stmt_id}") - stmt_response = wait_for_statement(session_id, run_stmt_response["Id"]) - data_str = stmt_response["Statement"]["Output"]["Data"]["TextPlain"] - logger.info(stmt_response) - df = parse_spark_show_output(data_str) - df = df.applymap(int) - file_metrics = { - "avg_record_count": df.iloc[0]["avg_record_count"], - "max_record_count": df.iloc[0]["max_record_count"], - "min_record_count": df.iloc[0]["min_record_count"], - "avg_file_size": df.iloc[0]['avg_file_size'], - "max_file_size": df.iloc[0]['max_file_size'], - "min_file_size": df.iloc[0]['min_file_size'], - } - logger.info("file_metrics=") - logger.info(file_metrics) - # loop over file_metrics, use key as metric name and value as metric value - # loop over partition_metrics, use key as metric name and value as metric value - for metric_name, metric_value in file_metrics.items(): - logger.info(f"metric_name=files.{metric_name}, metric_value={metric_value.item()}") +def send_metrics(metrics: dict, namespace: str, table: Table, snapshot: Snapshot): + normalized_metrics = normalize_metrics(metrics) + for metric_name, metric_value in normalized_metrics.items(): + logger.info(f"metric_name={namespace}.{metric_name}, metric_value={metric_value}") send_custom_metric( - metric_name=f"files.{metric_name}", + metric_name=f"{namespace}.{metric_name}", dimensions=[ - {'Name': 'table_name', 'Value': f"{glue_db_name}.{glue_table_name}"} + {'Name': 'table_name', 'Value': f"{table.name()[1]}.{table.name()[2]}"} ], - value=metric_value.item(), + value=metric_value, unit='Bytes' if "size" in metric_name else "Count", - namespace=os.getenv('CW_NAMESPACE'), + namespace=cw_namespace, timestamp = snapshot.timestamp_ms, ) - -def send_partition_metrics(glue_db_name, glue_table_name, snapshot,session_id): - sql_stmt = f"select partition,record_count,file_count from glue_catalog.{glue_db_name}.{glue_table_name}.partitions" - run_stmt_response = glue_client.run_statement( - SessionId=session_id, - Code=f"df = spark.sql(\"{sql_stmt}\");df.show(df.count(),truncate=False)" - ) +def send_files_metrics(table: Table, snapshot: Snapshot): + logger.info(f"send_files_metrics() -> snapshot_id={snapshot.snapshot_id}") + df = table.inspect.files().to_pandas() + file_metrics = { + "avg_record_count": df["record_count"].astype(int).mean().astype(int), + "max_record_count": df["record_count"].astype(int).max(), + "min_record_count": df["record_count"].astype(int).min(), + "avg_file_size": df['file_size_in_bytes'].astype(int).mean().astype(int), + "max_file_size": df['file_size_in_bytes'].astype(int).max(), + "min_file_size": df['file_size_in_bytes'].astype(int).min() + } + + logger.info("file_metrics=") + logger.info(file_metrics) + send_metrics(file_metrics, "files", table, snapshot) - stmt_id = run_stmt_response["Id"] - logger.info(f"send_partition_metrics() -> statement_id={stmt_id}") - stmt_response = wait_for_statement(session_id, stmt_id) - data_str = stmt_response["Statement"]["Output"]["Data"]["TextPlain"] - if data_str == "": +def send_partition_metrics(table: Table, snapshot: Snapshot): + logger.info(f"send_partition_metrics() -> snapshot_id={snapshot.snapshot_id}") + if not table.metadata.partition_specs: logger.info("No partitions found") return - df = parse_spark_show_output(data_str) + df = table.inspect.partitions().to_pandas() partition_metrics = { "avg_record_count": df["record_count"].astype(int).mean().astype(int), "max_record_count": df["record_count"].astype(int).max(), @@ -168,25 +106,14 @@ def send_partition_metrics(glue_db_name, glue_table_name, snapshot,session_id): "deviation_file_count": df['file_count'].astype(int).std().round(2), "skew_file_count": df['file_count'].astype(int).skew().round(2), } + logger.info("partition_metrics=") logger.info(partition_metrics) - # loop over aggregated partition_metrics, use key as metric name and value as metric value - for metric_name, metric_value in partition_metrics.items(): - logger.info(f"metric_name=partitions.{metric_name}, metric_value={metric_value.item()}") - send_custom_metric( - metric_name=f"partitions.{metric_name}", - dimensions=[ - {'Name': 'table_name', 'Value': f"{glue_db_name}.{glue_table_name}"} - ], - value=metric_value.item(), - unit='Count', - namespace=os.getenv('CW_NAMESPACE'), - timestamp = snapshot.timestamp_ms, - ) + send_metrics(partition_metrics, "partitions", table, snapshot) for index, row in df.iterrows(): - partition_name = row['partition'] + partition_name = "_".join(f"{val}" for val in row['partition'].values()) record_count = row['record_count'] file_count = row['file_count'] logger.info(f"partition_name={partition_name}, record_count={record_count}, file_count={file_count}") @@ -194,129 +121,53 @@ def send_partition_metrics(glue_db_name, glue_table_name, snapshot,session_id): send_custom_metric( metric_name=f"partitions.record_count", dimensions=[ - {'Name': 'table_name', 'Value': f"{glue_db_name}.{glue_table_name}"}, + {'Name': 'table_name', 'Value': f"{table.name()[1]}.{table.name()[2]}"}, {'Name': 'partition_name', 'Value': partition_name} ], value=int(record_count), unit='Count', - namespace=os.getenv('CW_NAMESPACE'), + namespace=cw_namespace, timestamp = snapshot.timestamp_ms, ) send_custom_metric( metric_name=f"partitions.file_count", dimensions=[ - {'Name': 'table_name', 'Value': f"{glue_db_name}.{glue_table_name}"}, + {'Name': 'table_name', 'Value': f"{table.name()[1]}.{table.name()[2]}"}, {'Name': 'partition_name', 'Value': partition_name} ], value=int(file_count), unit='Count', - namespace=os.getenv('CW_NAMESPACE'), + namespace=cw_namespace, timestamp = snapshot.timestamp_ms, ) - -def get_all_sessions(): - sessions = [] - next_token = None - - while True: - if next_token: - response = glue_client.list_sessions(Tags=glue_session_tags, NextToken=next_token) - else: - response = glue_client.list_sessions(Tags=glue_session_tags) - - sessions.extend(response['Sessions']) - next_token = response.get('NextToken') - - if not next_token: - break - - return sessions -def create_or_reuse_glue_session(): - session_id = None + return partition_metrics - glue_sessions = get_all_sessions() - - for session in glue_sessions: - if(session["Status"] == "READY"): - session_id = session["Id"] - logger.info(f"Found existing session_id={session_id}") - break - - if(session_id is None): - generated_uuid_string = str(uuid.uuid4()) - session_id = f"iceberg-metadata-lambda-{generated_uuid_string}" - logger.info(f"No active Glue session found, creating new glue session with ID = {session_id}") - glue_client.create_session( - Id=session_id, - Role=glue_service_role, - Command={'Name': 'glueetl', "PythonVersion": "3"}, - Timeout=120, - DefaultArguments={ - "--enable-glue-datacatalog": "true", - "--enable-observability-metrics": "true", - "--conf": f"spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.warehouse={warehouse_path} --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", - "--datalake-formats": "iceberg" - }, - GlueVersion="4.0", - NumberOfWorkers=2, - WorkerType="G.1X", - IdleTimeout=30, - Tags=glue_session_tags, - ) - wait_for_session(session_id) - return session_id - - -def dt_to_ts(dt_str): - dt_obj = datetime.fromisoformat(dt_str.replace('Z', '+00:00')) - timestamp_seconds = dt_obj.timestamp() - return int(timestamp_seconds * 1000) - -def send_snapshot_metrics(glue_db_name, glue_table_name, snapshot_id, session_id): +def send_snapshot_metrics(table: Table, snapshot: Snapshot): logger.info("send_snapshot_metrics") - sql_stmt = f"select committed_at,snapshot_id,operation,summary from glue_catalog.{glue_db_name}.{glue_table_name}.snapshots where snapshot_id={snapshot_id}" - logger.debug(sql_stmt) - run_stmt_response = glue_client.run_statement( - SessionId=session_id, - Code=f"df=spark.sql(\"{sql_stmt}\");json_rdd=df.toJSON();json_strings=json_rdd.collect();print(json_strings)" - ) - stmt_id = run_stmt_response["Id"] - logger.info(f"send_snapshot_metrics() -> statement_id={stmt_id}") - stmt_response = wait_for_statement(session_id, stmt_id) - json_list_str = stmt_response["Statement"]["Output"]["Data"]["TextPlain"].replace("\'", "") - snapshots = json.loads(json_list_str) - logger.info("send_snapshot_metrics()->response") - logger.info(json.dumps(snapshots, indent=4)) - snapshot = snapshots[0] - + snapshot_id = snapshot.snapshot_id + logger.info(f"send_snapshot_metrics() -> snapshot_id={snapshot_id}") + expr = pc.field("snapshot_id") == snapshot_id + snapshots = table.inspect.snapshots().filter(expr).to_pylist() + snapshot_metrics_obj = snapshots[0] + snapshot_metrics = dict(snapshot_metrics_obj["summary"]) metrics = [ "added-data-files", "added-records", "changed-partition-count", "total-records","total-data-files", "total-delete-files", "added-files-size", "total-files-size", "added-position-deletes" ] - for metric in metrics: - normalized_metric_name = metric.replace("-", "_") - if snapshot["summary"].get(metric) is None: - snapshot["summary"][metric] = 0 - metric_value = snapshot["summary"][metric] - timestamp_ms = dt_to_ts(snapshot["committed_at"]) - logger.info(f"metric_name=snapshot.{normalized_metric_name}, value={metric_value}") - send_custom_metric( - metric_name=f"snapshot.{normalized_metric_name}", - dimensions=[ - {'Name': 'table_name', 'Value': f"{glue_db_name}.{glue_table_name}"} - ], - value=int(metric_value), - unit='Bytes' if "size" in normalized_metric_name else "Count", - namespace=os.getenv('CW_NAMESPACE'), - timestamp = timestamp_ms, - ) + snapshot_metrics = { k.replace("-", "_"): int(snapshot_metrics.get(k, 0)) for k in metrics } + + logger.info("snapshot_metrics=") + logger.info(snapshot_metrics) + send_metrics(snapshot_metrics, "snapshots", table, snapshot) + # check if glue table is of iceberg format, return boolean def check_table_is_of_iceberg_format(event): + glue_client = boto3.client('glue') response = glue_client.get_table( DatabaseName=event["detail"]["databaseName"], Name=event["detail"]["tableName"], @@ -342,11 +193,8 @@ def lambda_handler(event, context): catalog = GlueCatalog(glue_db_name) table = catalog.load_table((glue_db_name, glue_table_name)) - logger.info(f"current snapshot id={table.metadata.current_snapshot_id}") - snapshot = table.metadata.snapshot_by_id(table.metadata.current_snapshot_id) - logger.info("Using glue IS to produce metrics") - session_id = create_or_reuse_glue_session() + snapshot = table.current_snapshot() - send_snapshot_metrics(glue_db_name, glue_table_name, table.metadata.current_snapshot_id, session_id) - send_partition_metrics(glue_db_name, glue_table_name, snapshot,session_id) - send_files_metrics(glue_db_name, glue_table_name, snapshot,session_id) \ No newline at end of file + send_snapshot_metrics(table, snapshot) + send_partition_metrics(table, snapshot) + send_files_metrics(table, snapshot) \ No newline at end of file diff --git a/lambda/requirements.txt b/lambda/requirements.txt index 83afb4f..6ff4cc3 100644 --- a/lambda/requirements.txt +++ b/lambda/requirements.txt @@ -2,4 +2,5 @@ boto3==1.34.51 botocore==1.34.51 pyiceberg[s3fs,glue] pandas -typing_extensions \ No newline at end of file +typing_extensions +pyarrow \ No newline at end of file diff --git a/template.yaml b/lambda/template.yaml similarity index 55% rename from template.yaml rename to lambda/template.yaml index 12a0981..fde2733 100644 --- a/template.yaml +++ b/lambda/template.yaml @@ -1,14 +1,7 @@ AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Parameters: - CWNamespace: - Description: Amazon CloudWatch custom metric namespace - Type: String - GlueServiceRole: - Description: AWS Glue Service Role with permissions to create session. - Type: String - Warehouse: - Description: warehouse is a required catalog property to determine the root path of the data warehouse in storage. + ImageURL: Type: String Globals: Function: @@ -18,29 +11,14 @@ Resources: IcebergMetricsLambda: Type: AWS::Serverless::Function Properties: - CodeUri: lambda/ - Handler: app.lambda_handler - Runtime: python3.9 + PackageType: Image + ImageUri: !Ref ImageURL Policies: - CloudWatchPutMetricPolicy: {} - AWSLambdaBasicExecutionRole - AmazonS3ReadOnlyAccess + - AmazonEC2ContainerRegistryReadOnly - Statement: - - Sid: GlueInteractiveSessionPolicy - Effect: Allow - Action: - - glue:GetSession - - glue:CreateSession - - glue:GetStatement - - glue:RunStatement - - glue:GetSession - - glue:ListStatements - Resource: !Sub 'arn:aws:glue:*:${AWS::AccountId}:session/*' - - Sid: IAMPassRoleForGlueIS - Effect: Allow - Action: - - iam:PassRole - Resource: !Ref GlueServiceRole - Sid: GlueDataCatalogPolicy Effect: Allow Action: @@ -54,13 +32,16 @@ Resources: - glue:GetPartitions - glue:ListSessions Resource: '*' + - Statement: + - Sid: ECRReadOnlyAdditionsPolicy + Effect: Allow + Action: + - ecr:DescribeRegistry + - ecr:InitiateLayerUpload + - ecr:SetRepositoryPolicy + Resource: '*' Architectures: - x86_64 - Environment: - Variables: - CW_NAMESPACE: !Ref CWNamespace - GLUE_SERVICE_ROLE: !Ref GlueServiceRole - SPARK_CATALOG_S3_WAREHOUSE: !Ref Warehouse PermissionForEventBridgeToInvokeLambda: Type: AWS::Lambda::Permission Properties: diff --git a/lambda/tests/Dockerfile b/lambda/tests/Dockerfile new file mode 100644 index 0000000..b542dd1 --- /dev/null +++ b/lambda/tests/Dockerfile @@ -0,0 +1,21 @@ +# Use an official Python runtime as a parent image +FROM python:3.10-slim + +# Set the working directory in the container +WORKDIR /lambda + +# Copy the current directory contents into the container at /app +COPY . /lambda + +# Install any needed packages specified in requirements.txt +RUN pip install --no-cache-dir -r tests/requirements.txt + +# Install SQLite +RUN apt-get update && \ + apt-get install -y sqlite3 + +ENV CW_NAMESPACE=TestNamespace \ + PYTHONPATH=/lambda + +# Run the test suite +CMD ["pytest", "/lambda/tests/test_app.py"] diff --git a/lambda/tests/__init__.py b/lambda/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lambda/tests/requirements.txt b/lambda/tests/requirements.txt new file mode 100644 index 0000000..27a54e5 --- /dev/null +++ b/lambda/tests/requirements.txt @@ -0,0 +1,8 @@ +boto3==1.34.51 +botocore==1.34.51 +pyiceberg[s3fs,glue]==0.7.0 +SQLAlchemy==2.0.30 +pyarrow==17.0.0 +pandas==2.2.2 +pytest==7.1.2 +unittest2==1.1.0 \ No newline at end of file diff --git a/lambda/tests/test_app.py b/lambda/tests/test_app.py new file mode 100644 index 0000000..750b9db --- /dev/null +++ b/lambda/tests/test_app.py @@ -0,0 +1,114 @@ +import os +import shutil +import unittest +from numbers import Number +from unittest.mock import patch +from pyiceberg.schema import Schema +from pyiceberg.types import LongType, StringType, NestedField +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.catalog.sql import SqlCatalog +import numpy as np +import pyarrow as pa +from app import send_files_metrics, send_partition_metrics, send_snapshot_metrics, normalize_metrics + +# Mock AWS credentials +os.environ['AWS_ACCESS_KEY_ID'] = 'testing' +os.environ['AWS_SECRET_ACCESS_KEY'] = 'testing' +os.environ['AWS_SECURITY_TOKEN'] = 'testing' +os.environ['AWS_SESSION_TOKEN'] = 'testing' + +class TestIcebergMetrics(unittest.TestCase): + + @patch.dict(os.environ, {'CW_NAMESPACE': 'TestNamespace'}) + def setUp(self): + self.schema = Schema( + NestedField(1, 'id', LongType(), False), + NestedField(2, 'data', StringType(), False) + ) + self.partition_spec = PartitionSpec( + fields=[ + PartitionField(source_id=2, field_id=1000, name="data", transform="identity") + ] + ) + + catalog_path = './tests/test_db' + if os.path.exists(catalog_path): + shutil.rmtree(catalog_path) + os.makedirs(catalog_path) + warehouse_path = os.path.abspath(catalog_path) + self.catalog = SqlCatalog( + "default", + **{ + "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db", + "warehouse": f"file://{warehouse_path}", + }, + ) + self.catalog.create_namespace('default') + self.catalog.create_table( + 'default.test_table', + schema=self.schema, + partition_spec=self.partition_spec + ) + + # Load the table and insert some data + self.table = self.catalog.load_table(('default', 'test_table')) + self.update_table(0, 5) + + + def create_arrow_table(self, range_start, range_end): + data = { + 'id': pa.array(range(range_start, range_end), pa.int64()), + 'data': pa.array(['data' + str(i) for i in range(range_start, range_end)], pa.string()) + } + return pa.Table.from_pydict(data) + + + def assert_metrics(self, expected, table, snapshot, method_to_test): + def send_metrics_stub(metrics, namespace, table, snapshot): + normalized_metrics = normalize_metrics(metrics) + self.assertDictEqual(normalized_metrics, expected) + + with patch('app.send_metrics', side_effect=send_metrics_stub): + method_to_test(table, snapshot) + + + def test_send_files_metrics(self): + expected_file_metrics = {'avg_record_count': 1, 'max_record_count': 1, 'min_record_count': 1, 'avg_file_size': 1068, 'max_file_size': 1068, 'min_file_size': 1068} + self.assert_metrics(expected_file_metrics, self.table, self.snapshot, send_files_metrics) + + + @patch('app.send_custom_metric') + def test_send_partition_metrics(self, mock_send_custom_metric): + expected_partition_metrics = {'avg_record_count': 1, 'max_record_count': 1, 'min_record_count': 1, 'deviation_record_count': 0.0, 'skew_record_count': 0.0, 'avg_file_count': 1, 'max_file_count': 1, 'min_file_count': 1, 'deviation_file_count': 0.0, 'skew_file_count': 0.0} + self.assert_metrics(expected_partition_metrics, self.table, self.snapshot, send_partition_metrics) + + + def test_send_snapshot_metrics(self): + expected_snapshot_metrics = {'added_data_files': 5, 'added_records': 5, 'changed_partition_count': 5, 'total_records': 5, 'total_data_files': 5, 'total_delete_files': 0, 'added_files_size': 5340, 'total_files_size': 5340, 'added_position_deletes': 0} + self.assert_metrics(expected_snapshot_metrics, self.table, self.snapshot, send_snapshot_metrics) + + + def update_table(self, range_start, range_end): + # Perform an update operation on the Iceberg table + arrow_table = self.create_arrow_table(range_start, range_end) + self.table.append(arrow_table) + self.table.refresh() + self.snapshot = self.table.current_snapshot() + + + @patch('app.send_custom_metric') + def test_metrics_after_update(self, mock_send_custom_metric): + self.update_table(5, 10) + + expected_file_metrics = {'avg_record_count': 1, 'max_record_count': 1, 'min_record_count': 1, 'avg_file_size': 1068, 'max_file_size': 1068, 'min_file_size': 1068} + self.assert_metrics(expected_file_metrics, self.table, self.snapshot, send_files_metrics) + + expected_partition_metrics = {'avg_record_count': 1, 'max_record_count': 1, 'min_record_count': 1, 'deviation_record_count': 0.0, 'skew_record_count': 0.0, 'avg_file_count': 1, 'max_file_count': 1, 'min_file_count': 1, 'deviation_file_count': 0.0, 'skew_file_count': 0.0} + self.assert_metrics(expected_partition_metrics, self.table, self.snapshot, send_partition_metrics) + + expected_snapshot_metrics = {'added_data_files': 5, 'added_records': 5, 'changed_partition_count': 5, 'total_records': 10, 'total_data_files': 10, 'total_delete_files': 0, 'added_files_size': 5340, 'total_files_size': 10680, 'added_position_deletes': 0} + self.assert_metrics(expected_snapshot_metrics, self.table, self.snapshot, send_snapshot_metrics) + + +if __name__ == '__main__': + unittest.main()