Skip to content

Commit caadbbc

Browse files
authored
Add Delta Lake Support (#18865)
* Add DeltaLake * Add DeltaLakeUnitTest.java * Create pom.xml
1 parent 9baeb86 commit caadbbc

File tree

3 files changed

+173
-0
lines changed

3 files changed

+173
-0
lines changed

apache-spark-2/pom.xml

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<artifactId>apache-spark-2</artifactId>
7+
<version>1.0-SNAPSHOT</version>
8+
<packaging>jar</packaging>
9+
<name>apache-spark</name>
10+
11+
<parent>
12+
<groupId>com.baeldung</groupId>
13+
<artifactId>parent-modules</artifactId>
14+
<version>1.0.0-SNAPSHOT</version>
15+
</parent>
16+
17+
<dependencies>
18+
<dependency>
19+
<groupId>io.delta</groupId>
20+
<artifactId>delta-core_2.12</artifactId>
21+
<version>${delta-core.version}</version>
22+
</dependency>
23+
<dependency>
24+
<groupId>org.apache.spark</groupId>
25+
<artifactId>spark-core_2.12</artifactId>
26+
<version>${org.apache.spark.spark-core.version}</version>
27+
</dependency>
28+
<dependency>
29+
<groupId>org.apache.spark</groupId>
30+
<artifactId>spark-sql_2.12</artifactId>
31+
<version>${org.apache.spark.spark-sql.version}</version>
32+
</dependency>
33+
</dependencies>
34+
35+
<build>
36+
<plugins>
37+
<plugin>
38+
<artifactId>maven-assembly-plugin</artifactId>
39+
<version>3.3.0</version>
40+
<executions>
41+
<execution>
42+
<phase>package</phase>
43+
<goals>
44+
<goal>single</goal>
45+
</goals>
46+
</execution>
47+
</executions>
48+
<configuration>
49+
<descriptorRefs>
50+
<descriptorRef>jar-with-dependencies</descriptorRef>
51+
</descriptorRefs>
52+
</configuration>
53+
</plugin>
54+
</plugins>
55+
</build>
56+
57+
<repositories>
58+
<repository>
59+
<id>SparkPackagesRepo</id>
60+
<url>https://repos.spark-packages.org</url>
61+
</repository>
62+
</repositories>
63+
64+
<properties>
65+
<delta-core.version>2.4.0</delta-core.version>
66+
<org.apache.spark.spark-core.version>3.4.0</org.apache.spark.spark-core.version>
67+
<org.apache.spark.spark-sql.version>3.4.0</org.apache.spark.spark-sql.version>
68+
<maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
69+
</properties>
70+
</project>
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package com.baeldung.delta;
2+
3+
import org.apache.spark.sql.*;
4+
import java.io.Serializable;
5+
import java.nio.file.Files;
6+
7+
public class DeltaLake {
8+
public static SparkSession createSession() {
9+
return SparkSession.builder()
10+
.appName("DeltaLake")
11+
.master("local[*]")
12+
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
13+
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
14+
.getOrCreate();
15+
}
16+
17+
public static String preparePeopleTable(SparkSession spark) {
18+
try {
19+
String tablePath = Files.createTempDirectory("delta-table-").toAbsolutePath().toString();
20+
21+
Dataset<Row> data = spark.createDataFrame(
22+
java.util.Arrays.asList(
23+
new Person(1, "Alice"),
24+
new Person(2, "Bob")
25+
),
26+
Person.class
27+
);
28+
29+
data.write().format("delta").mode("overwrite").save(tablePath);
30+
spark.sql("DROP TABLE IF EXISTS people");
31+
spark.sql("CREATE TABLE IF NOT EXISTS people USING DELTA LOCATION '" + tablePath + "'");
32+
return tablePath;
33+
} catch (Exception e) {
34+
throw new RuntimeException(e);
35+
}
36+
}
37+
38+
public static void cleanupPeopleTable(SparkSession spark) {
39+
spark.sql("DROP TABLE IF EXISTS people");
40+
}
41+
42+
public static void stopSession(SparkSession spark) {
43+
if (spark != null) {
44+
spark.stop();
45+
}
46+
}
47+
48+
public static class Person implements Serializable {
49+
private int id;
50+
private String name;
51+
52+
public Person() {}
53+
public Person(int id, String name) { this.id = id; this.name = name; }
54+
55+
public int getId() { return id; }
56+
public void setId(int id) { this.id = id; }
57+
public String getName() { return name; }
58+
public void setName(String name) { this.name = name; }
59+
}
60+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.baeldung.delta;
2+
3+
import org.apache.spark.sql.Dataset;
4+
import org.apache.spark.sql.Row;
5+
import org.apache.spark.sql.SparkSession;
6+
import org.junit.jupiter.api.AfterAll;
7+
import org.junit.jupiter.api.BeforeAll;
8+
import org.junit.jupiter.api.Test;
9+
10+
import static org.junit.jupiter.api.Assertions.assertEquals;
11+
import static org.junit.jupiter.api.Assertions.assertTrue;
12+
13+
public class DeltaLakeUnitTest {
14+
15+
private static SparkSession spark;
16+
private static String tablePath;
17+
18+
@BeforeAll
19+
static void setUp() {
20+
spark = DeltaLake.createSession();
21+
tablePath = DeltaLake.preparePeopleTable(spark);
22+
}
23+
24+
@AfterAll
25+
static void tearDown() {
26+
try {
27+
DeltaLake.cleanupPeopleTable(spark);
28+
} finally {
29+
DeltaLake.stopSession(spark);
30+
}
31+
}
32+
33+
@Test
34+
void givenDeltaLake_whenUsingDeltaFormat_thenPrintAndValidate() {
35+
Dataset<Row> df = spark.sql("DESCRIBE DETAIL people");
36+
df.show(false);
37+
38+
Row row = df.first();
39+
assertEquals("file:"+tablePath, row.getAs("location"));
40+
assertEquals("delta", row.getAs("format"));
41+
assertTrue(row.<Long>getAs("numFiles") >= 1);
42+
}
43+
}

0 commit comments

Comments
 (0)