|
| 1 | +--- |
| 2 | +title: Spark |
| 3 | +pcx_content_type: example |
| 4 | +--- |
| 5 | + |
| 6 | +Below is an example of how you can build an [Apache Spark](https://spark.apache.org/) application (with Scala) which connects to the R2 Data Catalog. This application is built to run locally, but it can be adapted to run on a cluster. |
| 7 | + |
| 8 | +## Prerequisites |
| 9 | + |
| 10 | +- Sign up for a [Cloudflare account](https://dash.cloudflare.com/sign-up/workers-and-pages). |
| 11 | +- Create an [R2 bucket](/r2/buckets/) and enable the data catalog. |
| 12 | +- Create an [R2 API token](/r2/api/tokens/) with both [R2 and data catalog permissions](/r2/api/tokens/#permissions). |
| 13 | +- Install Java 17, Spark 3.5.3, and SBT 1.10.11 |
| 14 | + - Note: The specific versions of tools are critical for getting things to work in this example. |
| 15 | + - Tip: [“SDKMAN”](https://sdkman.io/) is a convenient package manager for installing SDKs. |
| 16 | + |
| 17 | +## Example usage |
| 18 | + |
| 19 | +To start, create a new empty project directory somewhere on your machine. Inside that directory, create the following file at `src/main/scala/com/example/R2DataCatalogDemo.scala`. This will serve as the main entry point for your Spark application. |
| 20 | + |
| 21 | +```java |
| 22 | +package com.example |
| 23 | + |
| 24 | +import org.apache.spark.sql.SparkSession |
| 25 | + |
| 26 | +object R2DataCatalogDemo { |
| 27 | + def main(args: Array[String]): Unit = { |
| 28 | + |
| 29 | + val uri = sys.env("CATALOG_URI") |
| 30 | + val warehouse = sys.env("WAREHOUSE") |
| 31 | + val token = sys.env("TOKEN") |
| 32 | + |
| 33 | + val spark = SparkSession.builder() |
| 34 | + .appName("My R2 Data Catalog Demo") |
| 35 | + .master("local[*]") |
| 36 | + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") |
| 37 | + .config("spark.sql.catalog.mydemo", "org.apache.iceberg.spark.SparkCatalog") |
| 38 | + .config("spark.sql.catalog.mydemo.type", "rest") |
| 39 | + .config("spark.sql.catalog.mydemo.uri", uri) |
| 40 | + .config("spark.sql.catalog.mydemo.warehouse", warehouse) |
| 41 | + .config("spark.sql.catalog.mydemo.token", token) |
| 42 | + .getOrCreate() |
| 43 | + |
| 44 | + import spark.implicits._ |
| 45 | + |
| 46 | + val data = Seq( |
| 47 | + (1, "Alice", 25), |
| 48 | + (2, "Bob", 30), |
| 49 | + (3, "Charlie", 35), |
| 50 | + (4, "Diana", 40) |
| 51 | + ).toDF("id", "name", "age") |
| 52 | + |
| 53 | + spark.sql("USE mydemo") |
| 54 | + |
| 55 | + spark.sql("CREATE NAMESPACE IF NOT EXISTS demoNamespace") |
| 56 | + |
| 57 | + data.writeTo("demoNamespace.demotable").createOrReplace() |
| 58 | + |
| 59 | + val readResult = spark.sql("SELECT * FROM demoNamespace.demotable WHERE age > 30") |
| 60 | + println("Records with age > 30:") |
| 61 | + readResult.show() |
| 62 | + } |
| 63 | +} |
| 64 | +``` |
| 65 | + |
| 66 | +For building this application and managing dependencies, we'll use [sbt (“simple build tool”)](https://www.scala-sbt.org/). The following is an example `build.sbt` file to place at the root of your project. It is configured to produce a "fat JAR", bundling all required dependencies. |
| 67 | + |
| 68 | +```java |
| 69 | +name := "R2DataCatalogDemo" |
| 70 | + |
| 71 | +version := "1.0" |
| 72 | + |
| 73 | +val sparkVersion = "3.5.3" |
| 74 | +val icebergVersion = "1.8.1" |
| 75 | + |
| 76 | +// You need to use binaries of Spark compiled with either 2.12 or 2.13; and 2.12 is more common. |
| 77 | +// If you download Spark 3.5.3 with sdkman, then it comes with 2.12.18 |
| 78 | +scalaVersion := "2.12.18" |
| 79 | + |
| 80 | +libraryDependencies ++= Seq( |
| 81 | + "org.apache.spark" %% "spark-core" % sparkVersion, |
| 82 | + "org.apache.spark" %% "spark-sql" % sparkVersion, |
| 83 | + "org.apache.iceberg" % "iceberg-core" % icebergVersion, |
| 84 | + "org.apache.iceberg" % "iceberg-spark-runtime-3.5_2.12" % icebergVersion, |
| 85 | + "org.apache.iceberg" % "iceberg-aws-bundle" % icebergVersion, |
| 86 | +) |
| 87 | + |
| 88 | +// build a fat JAR with all dependencies |
| 89 | +assembly / assemblyMergeStrategy := { |
| 90 | + case PathList("META-INF", "services", xs @ _*) => MergeStrategy.concat |
| 91 | + case PathList("META-INF", xs @ _*) => MergeStrategy.discard |
| 92 | + case "reference.conf" => MergeStrategy.concat |
| 93 | + case "application.conf" => MergeStrategy.concat |
| 94 | + case x if x.endsWith(".properties") => MergeStrategy.first |
| 95 | + case x => MergeStrategy.first |
| 96 | +} |
| 97 | + |
| 98 | +// For Java 17 Compatability |
| 99 | +Compile / javacOptions ++= Seq("--release", "17") |
| 100 | +``` |
| 101 | + |
| 102 | +To enable the [sbt-assembly plugin](https://github.com/sbt/sbt-assembly?tab=readme-ov-file) (used to build fat JARs), add the following to a new file at `project/assembly.sbt`: |
| 103 | + |
| 104 | +``` |
| 105 | +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.2.0") |
| 106 | +``` |
| 107 | + |
| 108 | +Make sure Java, Spark, and sbt are installed and available in your shell. If you're using SDKMAN, you can install them as shown below: |
| 109 | + |
| 110 | +```bash |
| 111 | +sdk install java 17.0.14-amzn |
| 112 | +sdk install spark 3.5.3 |
| 113 | +sdk install sbt 1.10.11 |
| 114 | +``` |
| 115 | + |
| 116 | +With everything installed, you can now build the project using sbt. This will generate a single bundled JAR file. |
| 117 | + |
| 118 | +```bash |
| 119 | +sbt clean assembly |
| 120 | +``` |
| 121 | + |
| 122 | +After building, the output JAR should be located at `target/scala-2.12/R2DataCatalogDemo-assembly-1.0.jar`. |
| 123 | + |
| 124 | +To run the application, you'll use `spark-submit`. Below is an example shell script (`submit.sh`) that includes the necessary Java compatability flags for Spark on Java 17: |
| 125 | + |
| 126 | +``` |
| 127 | +# We need to set these "--add-opens" so that Spark can run on Java 17 (it needs access to |
| 128 | +# parts of the JVM which have been modularized and made internal). |
| 129 | +JAVA_17_COMPATABILITY="--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED" |
| 130 | +
|
| 131 | +spark-submit \ |
| 132 | +--conf "spark.driver.extraJavaOptions=$JAVA_17_COMPATABILITY" \ |
| 133 | +--conf "spark.executor.extraJavaOptions=$JAVA_17_COMPATABILITY" \ |
| 134 | +--class com.example.R2DataCatalogDemo target/scala-2.12/R2DataCatalogDemo-assembly-1.0.jar |
| 135 | +``` |
| 136 | + |
| 137 | +Before running it, make sure the script is executable: |
| 138 | + |
| 139 | +```bash |
| 140 | +chmod +x submit.sh |
| 141 | +``` |
| 142 | + |
| 143 | +At this point, your project directory should be structured like this: |
| 144 | + |
| 145 | +``` |
| 146 | +. |
| 147 | +├── Makefile |
| 148 | +├── README.md |
| 149 | +├── build.sbt |
| 150 | +├── project |
| 151 | +│ ├── assembly.sbt |
| 152 | +│ ├── build.properties |
| 153 | +│ └── project |
| 154 | +├── spark-submit.sh |
| 155 | +└── src |
| 156 | + └── main |
| 157 | + └── scala |
| 158 | + └── com |
| 159 | + └── example |
| 160 | + └── R2DataCatalogDemo.scala |
| 161 | +``` |
| 162 | + |
| 163 | +Before submitting the job, make sure you have the required environment variable set for your catalog URI, warehouse, and [Cloudflare API token](/r2/api/tokens/). |
| 164 | + |
| 165 | +```bash |
| 166 | +export CATALOG_URI= |
| 167 | +export WAREHOUSE= |
| 168 | +export TOKEN= |
| 169 | +``` |
| 170 | + |
| 171 | +You're now ready to run the job: |
| 172 | + |
| 173 | +```bash |
| 174 | +./submit.sh |
| 175 | +``` |
0 commit comments