|
| 1 | +(ns spark-query.core |
| 2 | + (:require |
| 3 | + [clojure.data.json :as json] |
| 4 | + [clojure.java.io :as io] |
| 5 | + [clojure.pprint :refer [pprint]] |
| 6 | + [org.httpkit.server :as server] |
| 7 | + [compojure.core :refer [defroutes GET POST]]) |
| 8 | + (:import |
| 9 | + [java.util Properties] |
| 10 | + [org.apache.iceberg |
| 11 | + CatalogProperties] |
| 12 | + [org.apache.iceberg.catalog |
| 13 | + Catalog |
| 14 | + Namespace |
| 15 | + TableIdentifier] |
| 16 | + [org.apache.iceberg.data |
| 17 | + IcebergGenerics |
| 18 | + Record] |
| 19 | + [org.apache.iceberg.rest RESTCatalog] |
| 20 | + [org.apache.iceberg.aws.s3 S3FileIOProperties] |
| 21 | + [org.apache.hadoop.conf Configuration]) |
| 22 | + (:gen-class)) |
| 23 | + |
| 24 | +(def PORT 8090) |
| 25 | + |
| 26 | +(defn- record->vec |
| 27 | + [record] |
| 28 | + (let [size (.size record) |
| 29 | + columns (->> record |
| 30 | + .struct |
| 31 | + .fields |
| 32 | + (mapv #(.name %))) |
| 33 | + values (mapv #(.get record %) (range size))] |
| 34 | + (zipmap columns values))) |
| 35 | + |
| 36 | +(defn open-catalog |
| 37 | + [] |
| 38 | + (let [catalog-props {CatalogProperties/CATALOG_IMPL "org.apache.iceberg.rest.RESTCatalog" |
| 39 | + CatalogProperties/URI "http://iceberg-rest:8181" |
| 40 | + CatalogProperties/WAREHOUSE_LOCATION, "s3a://warehouse/wh" |
| 41 | + CatalogProperties/FILE_IO_IMPL "org.apache.iceberg.aws.s3.S3FileIO" |
| 42 | + S3FileIOProperties/ENDPOINT "http://minio.net:9000"} |
| 43 | + catalog (RESTCatalog.) |
| 44 | + catalog-config (Configuration.) |
| 45 | + _ (doto catalog |
| 46 | + (.setConf catalog-config) |
| 47 | + (.initialize "demo" catalog-props))] |
| 48 | + catalog)) |
| 49 | + |
| 50 | +(def CATALOG (atom nil)) |
| 51 | + |
| 52 | +(defn get-catalog |
| 53 | + [] |
| 54 | + (if @CATALOG |
| 55 | + @CATALOG |
| 56 | + (let [catalog (open-catalog)] |
| 57 | + (reset! CATALOG catalog) |
| 58 | + catalog))) |
| 59 | + |
| 60 | +(defn load-table |
| 61 | + [catalog ns table] |
| 62 | + (let [ns-id (Namespace/of (into-array String [ns])) |
| 63 | + table-id (TableIdentifier/of ns-id table) |
| 64 | + table (.loadTable catalog table-id)] |
| 65 | + table)) |
| 66 | + |
| 67 | +(defn scan-table [ns-in table-in] |
| 68 | + (let [table (load-table (get-catalog) ns-in table-in) |
| 69 | + rows (-> table |
| 70 | + IcebergGenerics/read |
| 71 | + .build |
| 72 | + .iterator |
| 73 | + iterator-seq |
| 74 | + (into [])) |
| 75 | + response-body (->> rows |
| 76 | + (mapv record->vec) |
| 77 | + json/write-str)] |
| 78 | + {:body response-body})) |
| 79 | + |
| 80 | +(defroutes app-routes |
| 81 | + (GET "/scan/:ns/:table" [ns table] (scan-table ns table))) |
| 82 | + |
| 83 | +(defn- block-forever |
| 84 | + [] |
| 85 | + (while true |
| 86 | + (Thread/sleep 60000))) |
| 87 | + |
| 88 | +(defn -main |
| 89 | + [& args] |
| 90 | + (try |
| 91 | + (println "starting server on port" PORT) |
| 92 | + (server/run-server app-routes {:port PORT}) |
| 93 | + (println "started server on port" PORT) |
| 94 | + (block-forever) |
| 95 | + (catch Exception e |
| 96 | + (println (.getMessage e)) |
| 97 | + (.printStackTrace e) |
| 98 | + (System/exit 1)))) |
0 commit comments