Skip to content

Commit c554f01

Browse files
committed
update the scala example
1 parent 01cd9a7 commit c554f01

File tree

1 file changed

+54
-29
lines changed
  • docs/integrations/data-ingestion/aws-glue

1 file changed

+54
-29
lines changed

docs/integrations/data-ingestion/aws-glue/index.md

Lines changed: 54 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -61,42 +61,67 @@ To add the required jars manually, please follow the following:
6161
<TabItem value="Java" label="Java" default>
6262

6363
```java
64-
import com.amazonaws.services.glue.util.Job
65-
import com.amazonaws.services.glue.util.GlueArgParser
6664
import com.amazonaws.services.glue.GlueContext
67-
import org.apache.spark.SparkContext
65+
import com.amazonaws.services.glue.util.GlueArgParser
66+
import com.amazonaws.services.glue.util.Job
67+
import com.clickhouseScala.Native.NativeSparkRead.spark
6868
import org.apache.spark.sql.SparkSession
69-
import org.apache.spark.sql.DataFrame
70-
import scala.collection.JavaConverters._
71-
import com.amazonaws.services.glue.log.GlueLogger
7269

70+
import scala.collection.JavaConverters._
71+
import org.apache.spark.sql.types._
72+
import org.apache.spark.sql.functions._
7373

74-
// Initialize Glue job
75-
object GlueJob {
74+
object ClickHouseGlueExample {
7675
def main(sysArgs: Array[String]) {
77-
val sc: SparkContext = new SparkContext()
78-
val glueContext: GlueContext = new GlueContext(sc)
79-
val spark: SparkSession = glueContext.getSparkSession
80-
val logger = new GlueLogger
81-
import spark.implicits._
82-
// @params: [JOB_NAME]
8376
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
84-
Job.init(args("JOB_NAME"), glueContext, args.asJava)
85-
86-
// JDBC connection details
87-
val jdbcUrl = "jdbc:ch://{host}:{port}/{schema}"
88-
val jdbcProperties = new java.util.Properties()
89-
jdbcProperties.put("user", "default")
90-
jdbcProperties.put("password", "*******")
91-
jdbcProperties.put("driver", "com.clickhouse.jdbc.ClickHouseDriver")
92-
93-
// Load the table from ClickHouse
94-
val df: DataFrame = spark.read.jdbc(jdbcUrl, "my_table", jdbcProperties)
9577

96-
// Show the Spark df, or use it for whatever you like
97-
df.show()
98-
99-
// Commit the job
78+
val sparkSession: SparkSession = SparkSession.builder
79+
.config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
80+
.config("spark.sql.catalog.clickhouse.host", "<your-clickhouse-host>")
81+
.config("spark.sql.catalog.clickhouse.protocol", "https")
82+
.config("spark.sql.catalog.clickhouse.http_port", "<your-clickhouse-port>")
83+
.config("spark.sql.catalog.clickhouse.user", "default")
84+
.config("spark.sql.catalog.clickhouse.password", "<your-password>")
85+
.config("spark.sql.catalog.clickhouse.database", "default")
86+
// for ClickHouse cloud
87+
.config("spark.sql.catalog.clickhouse.option.ssl", "true")
88+
.config("spark.sql.catalog.clickhouse.option.ssl_mode", "NONE")
89+
.getOrCreate
90+
91+
val glueContext = new GlueContext(sparkSession.sparkContext)
92+
Job.init(args("JOB_NAME"), glueContext, args.asJava)
93+
import sparkSession.implicits._
94+
95+
val url = "s3://{path_to_cell_tower_data}/cell_towers.csv.gz"
96+
97+
val schema = StructType(Seq(
98+
StructField("radio", StringType, nullable = false),
99+
StructField("mcc", IntegerType, nullable = false),
100+
StructField("net", IntegerType, nullable = false),
101+
StructField("area", IntegerType, nullable = false),
102+
StructField("cell", LongType, nullable = false),
103+
StructField("unit", IntegerType, nullable = false),
104+
StructField("lon", DoubleType, nullable = false),
105+
StructField("lat", DoubleType, nullable = false),
106+
StructField("range", IntegerType, nullable = false),
107+
StructField("samples", IntegerType, nullable = false),
108+
StructField("changeable", IntegerType, nullable = false),
109+
StructField("created", TimestampType, nullable = false),
110+
StructField("updated", TimestampType, nullable = false),
111+
StructField("averageSignal", IntegerType, nullable = false)
112+
))
113+
114+
val df = sparkSession.read
115+
.option("header", "true")
116+
.schema(schema)
117+
.csv(url)
118+
119+
// Write to ClickHouse
120+
df.writeTo("clickhouse.default.cell_towers").append()
121+
122+
123+
// Read from ClickHouse
124+
val dfRead = spark.sql("select * from clickhouse.default.cell_towers")
100125
Job.commit()
101126
}
102127
}

0 commit comments

Comments
 (0)