|
6 | 6 | [cider.nrepl :refer [cider-nrepl-handler]] |
7 | 7 | [nrepl.server :as nrepl-server] |
8 | 8 | [org.httpkit.server :as server] |
| 9 | + [ring.middleware.reload :refer [wrap-reload]] |
| 10 | + [ring.util.request :as ring-req] |
9 | 11 | [compojure.core :refer [defroutes GET POST]]) |
10 | 12 | (:import |
11 | 13 | [java.util Properties] |
|
20 | 22 | Record] |
21 | 23 | [org.apache.iceberg.rest RESTCatalog] |
22 | 24 | [org.apache.iceberg.aws.s3 S3FileIOProperties] |
23 | | - [org.apache.hadoop.conf Configuration]) |
| 25 | + [org.apache.hadoop.conf Configuration] |
| 26 | + [org.apache.spark.sql SparkSession]) |
24 | 27 | (:gen-class)) |
25 | 28 |
|
26 | 29 | (def PORT 8090) |
| 30 | +(def NREPL-PORT 7890) |
27 | 31 |
|
28 | 32 | (defn- record->vec |
29 | 33 | [record] |
|
49 | 53 | (.initialize "demo" catalog-props))] |
50 | 54 | catalog)) |
51 | 55 |
|
| 56 | +(defn spark-session |
| 57 | + [] |
| 58 | + (-> (SparkSession/builder) |
| 59 | + (.config "spark.sql.defaultCatalog" "demo") |
| 60 | + (.config "spark.sql.catalog.demo" "org.apache.iceberg.spark.SparkCatalog") |
| 61 | + (.config "spark.sql.catalog.demo.type" "rest") |
| 62 | + (.config "spark.sql.catalog.demo.uri" "http://iceberg-rest:8181") |
| 63 | + (.config "spark.sql.catalog.demo.io-impl" "org.apache.iceberg.aws.s3.S3FileIO") |
| 64 | + (.config "spark.sql.catalog.demo.warehouse" "s3://warehouse/wh/") |
| 65 | + (.config "spark.sql.catalog.demo.s3.endpoint" "http://minio.net:9000") |
| 66 | + (.master "local") |
| 67 | + .getOrCreate)) |
| 68 | + |
52 | 69 | (def CATALOG (atom nil)) |
53 | 70 |
|
54 | 71 | (defn get-catalog |
|
108 | 125 | .partition |
109 | 126 | partition-data->vec))))) |
110 | 127 |
|
111 | | -(defn handle-scan-table [ns-in table-in] |
| 128 | +(defn handle-scan-table |
| 129 | + [ns-in table-in] |
112 | 130 | (let [table (load-table (get-catalog) ns-in table-in) |
113 | 131 | rows (scan-table table) |
114 | 132 | response-body (->> rows |
115 | 133 | (mapv record->vec) |
116 | 134 | json/write-str)] |
117 | 135 | {:body response-body})) |
118 | 136 |
|
119 | | -(defn handle-table-partitions [ns-in table-in] |
| 137 | +(defn handle-table-partitions |
| 138 | + [ns-in table-in] |
120 | 139 | (let [table (load-table (get-catalog) ns-in table-in) |
121 | 140 | partitions-from-meta (table-partitions-from-meta table) |
122 | 141 | partitions-from-data (table-partitions-from-data table) |
|
125 | 144 | response-body (json/write-str response)] |
126 | 145 | {:body response-body})) |
127 | 146 |
|
| 147 | +(defn handle-spark-sql |
| 148 | + [request] |
| 149 | + (let [sql (ring-req/body-string request) |
| 150 | + session (spark-session) |
| 151 | + dataset (.sql session sql) |
| 152 | + _ (.show dataset) |
| 153 | + response-body (-> dataset |
| 154 | + .toJSON |
| 155 | + .toLocalIterator |
| 156 | + iterator-seq |
| 157 | + (->> (map json/read-str)) |
| 158 | + json/write-str)] |
| 159 | + {:body response-body})) |
| 160 | + |
128 | 161 | (defroutes app-routes |
129 | 162 | (GET "/scan/:ns/:table" [ns table] (handle-scan-table ns table)) |
130 | | - (GET "/partitions/:ns/:table" [ns table] (handle-table-partitions ns table))) |
| 163 | + (GET "/partitions/:ns/:table" [ns table] (handle-table-partitions ns table)) |
| 164 | + (POST "/sql" request (handle-spark-sql request))) |
131 | 165 |
|
132 | 166 | (defn- block-forever |
133 | 167 | [] |
134 | 168 | (while true |
135 | 169 | (Thread/sleep 60000))) |
136 | 170 |
|
137 | 171 | (defn -main |
138 | | - [& args] |
| 172 | + [& _args] |
139 | 173 | (try |
| 174 | + (println "starting nREPL server on port" NREPL-PORT) |
| 175 | + (nrepl-server/start-server :port NREPL-PORT :bind "0.0.0.0" :handler cider-nrepl-handler) |
| 176 | + (println "started nREPL server on port" NREPL-PORT) |
140 | 177 | (println "starting server on port" PORT) |
141 | | - (nrepl-server/start-server :port 7890 :bind "0.0.0.0" :handler cider-nrepl-handler) |
142 | | - (server/run-server app-routes {:port PORT}) |
| 178 | + (server/run-server (wrap-reload #'app-routes) {:port PORT}) |
143 | 179 | (println "started server on port" PORT) |
144 | 180 | (block-forever) |
145 | 181 | (catch Exception e |
|
0 commit comments