Skip to content

Commit 18f3f48

Browse files
committed
add S3 Stack for Apache Iceberg table
1 parent 4da3eeb commit 18f3f48

File tree

5 files changed

+54
-14
lines changed

5 files changed

+54
-14
lines changed

README.md

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ For example:
5959
"--table_name": "iceberg_demo_table",
6060
"--primary_key": "name",
6161
"--kinesis_table_name": "iceberg_demo_kinesis_stream_table",
62-
"--kinesis_stream_arn": "arn:aws:kinesis:us-east-1:123456789012:stream/iceberg-demo-stream",
6362
"--starting_position_of_kinesis_iterator": "LATEST",
6463
"--iceberg_s3_path": "s3://glue-iceberg-demo-atq4q5u/iceberg_demo_db",
6564
"--lock_table_name": "iceberg_lock",
@@ -93,6 +92,7 @@ For example:
9392
:information_source: `--primary_key` option should be set by Iceberg table's primary column name.
9493

9594
:warning: **You should create a S3 bucket for a glue job script and upload the glue job script file into the s3 bucket.**
95+
9696
At this point you can now synthesize the CloudFormation template for this code.
9797

9898
<pre>
@@ -108,11 +108,15 @@ command.
108108
## Run Test
109109

110110
1. Set up **Apache Iceberg connector for AWS Glue** to use Apache Iceberg with AWS Glue jobs.
111-
2. Create a Kinesis data stream
111+
2. Create a S3 bucket for Apache Iceber table
112+
<pre>
113+
(.venv) $ cdk deploy IcebergS3Path
114+
</pre>
115+
3. Create a Kinesis data stream
112116
<pre>
113117
(.venv) $ cdk deploy KinesisStreamAsGlueStreamingJobDataSource
114118
</pre>
115-
3. Define a schema for the streaming data
119+
4. Define a schema for the streaming data
116120
<pre>
117121
(.venv) $ cdk deploy GlueSchemaOnKinesisStream
118122
</pre>
@@ -137,7 +141,7 @@ command.
137141

138142
(11) Choose **Finish**
139143

140-
4. Upload **AWS SDK for Java 2.x** jar file into S3
144+
5. Upload **AWS SDK for Java 2.x** jar file into S3
141145
<pre>
142146
(.venv) $ wget https://repo1.maven.org/maven2/software/amazon/awssdk/aws-sdk-java/2.17.224/aws-sdk-java-2.17.224.jar
143147
(.venv) $ aws s3 cp aws-sdk-java-2.17.224.jar s3://aws-glue-assets-123456789012-atq4q5u/extra-jars/aws-sdk-java-2.17.224.jar
@@ -153,7 +157,7 @@ command.
153157
--user-jars-first true
154158
</pre>
155159
In order to do this, we might need to upload **AWS SDK for Java 2.x** jar file into S3.
156-
5. Create Glue Streaming Job
160+
6. Create Glue Streaming Job
157161

158162
* (step 1) Select one of Glue Job Scripts and upload into S3
159163

@@ -180,7 +184,7 @@ command.
180184
GrantLFPermissionsOnGlueJobRole \
181185
GlueStreamingSinkToIceberg
182186
</pre>
183-
6. Make sure the glue job to access the Kinesis Data Streams table in the Glue Catalog database, otherwise grant the glue job to permissions
187+
7. Make sure the glue job to access the Kinesis Data Streams table in the Glue Catalog database, otherwise grant the glue job to permissions
184188

185189
Wec can get permissions by running the following command:
186190
<pre>
@@ -193,7 +197,7 @@ command.
193197
--permissions SELECT DESCRIBE ALTER INSERT DELETE \
194198
--resource '{ "Table": {"DatabaseName": "<i>iceberg_demo_db</i>", "TableWildcard": {}} }'
195199
</pre>
196-
7. Create a table with partitioned data in Amazon Athena
200+
8. Create a table with partitioned data in Amazon Athena
197201

198202
Go to [Athena](https://console.aws.amazon.com/athena/home) on the AWS Management console.<br/>
199203
* (step 1) Create a database
@@ -236,11 +240,11 @@ command.
236240
--resource '{ "Table": {"DatabaseName": "<i>iceberg_demo_db</i>", "TableWildcard": {}} }'
237241
</pre>
238242

239-
8. Run glue job to load data from Kinesis Data Streams into S3
243+
9. Run glue job to load data from Kinesis Data Streams into S3
240244
<pre>
241245
(.venv) $ aws glue start-job-run --job-name <i>streaming_data_from_kds_into_iceberg_table</i>
242246
</pre>
243-
9. Generate streaming data
247+
10. Generate streaming data
244248

245249
We can synthetically generate data in JSON format using a simple Python application.
246250
<pre>
@@ -287,7 +291,7 @@ command.
287291
{"name": "Micheal", "age": 44, "m_time": "2023-12-14 09:02:57"}
288292
{"name": "Takisha", "age": 24, "m_time": "2023-12-30 12:38:23"}
289293
</pre>
290-
10. Check streaming data in S3
294+
11. Check streaming data in S3
291295

292296
After 3~5 minutes, you can see that the streaming data have been delivered from **Kinesis Data Streams** to **S3** and stored in a folder structure by year, month, day, and hour.
293297

@@ -296,7 +300,7 @@ command.
296300
![iceberg-table](./assets/iceberg-data-level-02.png)
297301
![iceberg-table](./assets/iceberg-data-level-03.png)
298302

299-
11. Run test query
303+
12. Run test query
300304

301305
Enter the following SQL statement and execute the query.
302306
<pre>
@@ -354,6 +358,7 @@ command.
354358
</pre>
355359
* (10) [Apache Iceberg - Maintenance for streaming tables (v0.14.0)](https://iceberg.apache.org/docs/0.14.0/spark-structured-streaming/#maintenance-for-streaming-tables)
356360
* (11) [awsglue python package](https://github.com/awslabs/aws-glue-libs): The awsglue Python package contains the Python portion of the AWS Glue library. This library extends PySpark to support serverless ETL on AWS.
361+
* (12) [AWS Glue Notebook Samples](https://github.com/aws-samples/aws-glue-samples/tree/master/examples/notebooks) - sample iPython notebook files which show you how to use open data dake formats; Apache Hudi, Delta Lake, and Apache Iceberg on AWS Glue Interactive Sessions and AWS Glue Studio Notebook.
357362

358363
## Troubleshooting
359364

app.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,19 @@
88
GlueJobRoleStack,
99
GlueStreamDataSchemaStack,
1010
GlueStreamingJobStack,
11-
DataLakePermissionsStack
11+
DataLakePermissionsStack,
12+
S3BucketStack
1213
)
1314

1415
APP_ENV = cdk.Environment(account=os.getenv('CDK_DEFAULT_ACCOUNT'),
1516
region=os.getenv('CDK_DEFAULT_REGION'))
1617

1718
app = cdk.App()
1819

20+
s3_bucket = S3BucketStack(app, 'IcebergS3Path')
21+
1922
kds_stack = KdsStack(app, 'KinesisStreamAsGlueStreamingJobDataSource')
23+
kds_stack.add_dependency(s3_bucket)
2024

2125
glue_job_role = GlueJobRoleStack(app, 'GlueStreamingSinkToIcebergJobRole')
2226
glue_job_role.add_dependency(kds_stack)
@@ -33,7 +37,8 @@
3337
grant_lake_formation_permissions.add_dependency(glue_stream_schema)
3438

3539
glue_streaming_job = GlueStreamingJobStack(app, 'GlueStreamingSinkToIceberg',
36-
glue_job_role.glue_job_role
40+
glue_job_role.glue_job_role,
41+
kds_stack.kinesis_stream
3742
)
3843
glue_streaming_job.add_dependency(grant_lake_formation_permissions)
3944

cdk_stacks/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@
33
from .glue_stream_data_schema import GlueStreamDataSchemaStack
44
from .glue_streaming_job import GlueStreamingJobStack
55
from .lakeformation_permissions import DataLakePermissionsStack
6+
from .s3 import S3BucketStack

cdk_stacks/glue_streaming_job.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@
99

1010
class GlueStreamingJobStack(Stack):
1111

12-
def __init__(self, scope: Construct, construct_id: str, glue_job_role, **kwargs) -> None:
12+
def __init__(self, scope: Construct, construct_id: str, glue_job_role, kinesis_stream, **kwargs) -> None:
1313
super().__init__(scope, construct_id, **kwargs)
1414

1515
glue_assets_s3_bucket_name = self.node.try_get_context('glue_assets_s3_bucket_name')
1616
glue_job_script_file_name = self.node.try_get_context('glue_job_script_file_name')
1717
glue_job_input_arguments = self.node.try_get_context('glue_job_input_arguments')
1818

1919
glue_job_default_arguments = {
20+
"--kinesis_stream_arn": kinesis_stream.stream_arn,
2021
"--enable-metrics": "true",
2122
"--enable-spark-ui": "true",
2223
"--spark-event-logs-path": f"s3://{glue_assets_s3_bucket_name}/sparkHistoryLogs/",

cdk_stacks/s3.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from urllib.parse import urlparse
2+
3+
import aws_cdk as cdk
4+
5+
from aws_cdk import (
6+
Stack,
7+
aws_s3 as s3
8+
)
9+
10+
from constructs import Construct
11+
12+
13+
class S3BucketStack(Stack):
14+
15+
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
16+
super().__init__(scope, construct_id, **kwargs)
17+
18+
glue_job_input_arguments = self.node.try_get_context('glue_job_input_arguments')
19+
s3_path = glue_job_input_arguments["--iceberg_s3_path"]
20+
s3_bucket_name = urlparse(s3_path).netloc
21+
22+
s3_bucket = s3.Bucket(self, "s3bucket",
23+
removal_policy=cdk.RemovalPolicy.DESTROY, #XXX: Default: cdk.RemovalPolicy.RETAIN - The bucket will be orphaned
24+
bucket_name=s3_bucket_name)
25+
26+
self.s3_bucket_name = s3_bucket.bucket_name
27+
28+
cdk.CfnOutput(self, f'{self.stack_name}_S3Bucket', value=self.s3_bucket_name)

0 commit comments

Comments
 (0)