|
| 1 | +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. |
| 2 | +# SPDX-License-Identifier: Apache-2.0 |
| 3 | + |
| 4 | +""" |
| 5 | +This script is used by the AWS Glue _getting started with crawlers and jobs_ scenario to |
| 6 | +perform extract, transform, and load (ETL) operations on sample flight data. |
| 7 | +As part of the example, it is uploaded to an Amazon Simple Storage Service (Amazon S3) |
| 8 | +bucket so that AWS Glue can access it. |
| 9 | +""" |
| 10 | + |
| 11 | +# pylint: disable=undefined-variable |
| 12 | + |
| 13 | +import sys |
| 14 | +from awsglue.transforms import * |
| 15 | +from awsglue.utils import getResolvedOptions |
| 16 | +from pyspark.context import SparkContext |
| 17 | +from awsglue.context import GlueContext |
| 18 | +from awsglue.job import Job |
| 19 | + |
| 20 | +""" |
| 21 | +These custom arguments must be passed as Arguments to the StartJobRun request. |
| 22 | + --input_database The name of a metadata database that is contained in your |
| 23 | + AWS Glue Data Catalog and that contains tables that describe |
| 24 | + the data to be processed. |
| 25 | + --input_table The name of a table in the database that describes the data to |
| 26 | + be processed. |
| 27 | + --output_bucket_url An S3 bucket that receives the transformed output data. |
| 28 | +""" |
| 29 | +args = getResolvedOptions( |
| 30 | + sys.argv, ["JOB_NAME", "input_database", "input_table", "output_bucket_url"] |
| 31 | +) |
| 32 | +sc = SparkContext() |
| 33 | +glueContext = GlueContext(sc) |
| 34 | +spark = glueContext.spark_session |
| 35 | +job = Job(glueContext) |
| 36 | +job.init(args["JOB_NAME"], args) |
| 37 | + |
| 38 | +# Script generated for node S3 Flight Data. |
| 39 | +S3FlightData_node1 = glueContext.create_dynamic_frame.from_catalog( |
| 40 | + database=args["input_database"], |
| 41 | + table_name=args["input_table"], |
| 42 | + transformation_ctx="S3FlightData_node1", |
| 43 | +) |
| 44 | + |
| 45 | +# This mapping performs two main functions: |
| 46 | +# 1. It simplifies the output by removing most of the fields from the data. |
| 47 | +# 2. It renames some fields. For example, `fl_date` is renamed to `flight_date`. |
| 48 | +ApplyMapping_node2 = ApplyMapping.apply( |
| 49 | + frame=S3FlightData_node1, |
| 50 | + mappings=[ |
| 51 | + ("year", "long", "year", "long"), |
| 52 | + ("month", "long", "month", "tinyint"), |
| 53 | + ("day_of_month", "long", "day", "tinyint"), |
| 54 | + ("fl_date", "string", "flight_date", "string"), |
| 55 | + ("carrier", "string", "carrier", "string"), |
| 56 | + ("fl_num", "long", "flight_num", "long"), |
| 57 | + ("origin_city_name", "string", "origin_city_name", "string"), |
| 58 | + ("origin_state_abr", "string", "origin_state_abr", "string"), |
| 59 | + ("dest_city_name", "string", "dest_city_name", "string"), |
| 60 | + ("dest_state_abr", "string", "dest_state_abr", "string"), |
| 61 | + ("dep_time", "long", "departure_time", "long"), |
| 62 | + ("wheels_off", "long", "wheels_off", "long"), |
| 63 | + ("wheels_on", "long", "wheels_on", "long"), |
| 64 | + ("arr_time", "long", "arrival_time", "long"), |
| 65 | + ("mon", "string", "mon", "string"), |
| 66 | + ], |
| 67 | + transformation_ctx="ApplyMapping_node2", |
| 68 | +) |
| 69 | + |
| 70 | +# Script generated for node Revised Flight Data. |
| 71 | +RevisedFlightData_node3 = glueContext.write_dynamic_frame.from_options( |
| 72 | + frame=ApplyMapping_node2, |
| 73 | + connection_type="s3", |
| 74 | + format="json", |
| 75 | + connection_options={"path": args["output_bucket_url"], "partitionKeys": []}, |
| 76 | + transformation_ctx="RevisedFlightData_node3", |
| 77 | +) |
| 78 | + |
| 79 | +job.commit() |
0 commit comments