|
6 | 6 | [xt-play.util :as util] |
7 | 7 | [xt-play.xtdb :as xtdb])) |
8 | 8 |
|
9 | | -(defn- encode-txs [tx-type txs] |
| 9 | +(defn- encode-txs [tx-type query? txs] |
10 | 10 | (case (keyword tx-type) |
11 | | - :sql (if (string? txs) |
| 11 | + :sql (if (and (not query?) |
| 12 | + (string? txs)) |
12 | 13 | (->> (str/split txs #";") |
13 | 14 | (remove str/blank?) |
14 | 15 | (map #(do [:sql %])) |
15 | 16 | (vec)) |
16 | 17 | txs) |
17 | | - :xtql (util/read-edn (str "[" txs "]")) |
| 18 | + :xtql (util/read-edn (if query? txs (str "[" txs "]"))) |
18 | 19 | ;;else |
19 | 20 | txs)) |
20 | 21 |
|
| 22 | +(defn- dml? [statement] |
| 23 | + (when statement |
| 24 | + (let [upper-st (str/upper-case statement)] |
| 25 | + (or (str/includes? upper-st "INSERT") |
| 26 | + (str/includes? upper-st "UPDATE") |
| 27 | + (str/includes? upper-st "DELETE") |
| 28 | + (str/includes? upper-st "ERASE") |
| 29 | + (str/includes? upper-st "MERGE") |
| 30 | + (str/includes? upper-st "PATCH"))))) |
| 31 | + |
| 32 | +(defn split-sql [sql] |
| 33 | + (loop [chars (seq sql) current [] statements [] in-string? false escape? false] |
| 34 | + (if (empty? chars) |
| 35 | + (conj statements (apply str current)) |
| 36 | + (let [c (first chars) |
| 37 | + rest-chars (rest chars)] |
| 38 | + (cond |
| 39 | + (and (= c \') (not escape?)) |
| 40 | + (recur rest-chars (conj current c) statements (not in-string?) false) |
| 41 | + |
| 42 | + (and in-string? (= c \\)) |
| 43 | + (recur rest-chars (conj current c) statements in-string? (not escape?)) |
| 44 | + |
| 45 | + (and (= c \;) (not in-string?)) |
| 46 | + (recur rest-chars [] (conj statements (apply str current)) in-string? false) |
| 47 | + |
| 48 | + :else |
| 49 | + (recur rest-chars (conj current c) statements in-string? false)))))) |
| 50 | + |
21 | 51 | (defn- prepare-statements |
22 | 52 | "Takes a batch of transactions and prepares the jdbc execution args to |
23 | | - be run sequentially" |
| 53 | + be run sequentially. It groups statements by type and wraps DMLs in transactions if system time specified." |
24 | 54 | [tx-batches] |
25 | 55 | (for [{:keys [txs system-time]} tx-batches] |
26 | 56 | (remove nil? |
27 | | - (concat |
28 | | - (when system-time |
29 | | - [[(format "BEGIN AT SYSTEM_TIME TIMESTAMP '%s'" system-time)]]) |
30 | | - (mapv (fn [q] (vector (str q ";"))) |
31 | | - (str/split txs #"\s*;\s*")) |
32 | | - (when system-time |
33 | | - [["COMMIT"]]))))) |
| 57 | + (when txs |
| 58 | + (let [statements (split-sql txs) |
| 59 | + by-type (partition-by dml? statements)] |
| 60 | + (log/warn "by-type" by-type) |
| 61 | + (mapcat |
| 62 | + (fn [grp] |
| 63 | + (let [dmls? (dml? (first grp))] |
| 64 | + (concat |
| 65 | + (when (and dmls? system-time) |
| 66 | + [[(format "BEGIN AT SYSTEM_TIME TIMESTAMP '%s'" system-time)]]) |
| 67 | + (vec |
| 68 | + (keep (fn [q] (when-not (empty? q) |
| 69 | + [(str/trim q)])) |
| 70 | + grp)) |
| 71 | + (when (and dmls? system-time) |
| 72 | + [["COMMIT"]])))) |
| 73 | + by-type) |
| 74 | + ))))) |
34 | 75 |
|
35 | 76 | (defn format-system-time [s] |
36 | 77 | (when s (read-instant-date s))) |
|
54 | 95 | (mapv PG->clj row)) |
55 | 96 | result)) |
56 | 97 |
|
57 | | -(defn- run!-tx [node tx-type tx-batches query] |
| 98 | +(defn- detect-xtql-queries [batch] |
| 99 | + (if (:query batch) |
| 100 | + batch |
| 101 | + (if (and (string? (:txs batch)) |
| 102 | + (re-matches #"(?i)^\s*(\(->)?\s*\((from|unify|rel).+" (:txs batch))) |
| 103 | + (assoc batch :query true) |
| 104 | + batch))) |
| 105 | + |
| 106 | +(defn- run!-tx [node tx-type tx-batches] |
58 | 107 | (let [tx-batches (->> tx-batches |
| 108 | + (map detect-xtql-queries) |
59 | 109 | (map #(update % :system-time format-system-time)) |
60 | | - (map #(update % :txs (partial encode-txs tx-type))))] |
61 | | - (doseq [{:keys [system-time txs] :as batch} tx-batches] |
62 | | - (log/info tx-type "running batch: " batch) |
63 | | - (xtdb/submit! node txs {:system-time system-time}))) |
64 | | - (log/info tx-type "running query:" query) |
65 | | - (let [res (xtdb/query node query)] |
66 | | - (log/info tx-type "XTDB query response:" res) |
67 | | - res)) |
68 | | - |
69 | | -(defn- run!-with-jdbc-conn [tx-batches query] |
| 110 | + (map #(update % :txs (partial encode-txs tx-type (:query %))))) |
| 111 | + tx-results |
| 112 | + (doall (mapv |
| 113 | + (fn [{:keys [system-time query txs] :as batch}] |
| 114 | + (log/info tx-type "running batch: " batch) |
| 115 | + (try |
| 116 | + (if query |
| 117 | + (xtdb/query node txs) |
| 118 | + (xtdb/submit! node txs {:system-time system-time})) |
| 119 | + (catch Throwable ex |
| 120 | + (log/error "Exception while running transaction" (ex-message ex)) |
| 121 | + (parse-result |
| 122 | + [{:message (ex-message ex) |
| 123 | + :exception (.getClass ex) |
| 124 | + :data (ex-data ex)}])))) |
| 125 | + tx-batches))] |
| 126 | + (log/warn "run!-tx-res" tx-results) |
| 127 | + tx-results)) |
| 128 | + |
| 129 | + |
| 130 | +(defn- run!-with-jdbc-conn [tx-batches] |
70 | 131 | (xtdb/with-jdbc |
71 | 132 | (fn [conn] |
72 | | - (doseq [txs (prepare-statements tx-batches) |
73 | | - statement txs] |
74 | | - (log/info "beta executing statement:" statement) |
75 | | - (xtdb/jdbc-execute! conn statement)) |
76 | | - (log/info "beta running query:" query) |
77 | | - (let [res (xtdb/jdbc-execute! conn [query])] |
78 | | - (log/info "beta query response" res) |
79 | | - (parse-result res))))) |
| 133 | + (let [tx-in-progress? (atom false) |
| 134 | + res (mapv (fn [txs] |
| 135 | + (vec |
| 136 | + (mapcat |
| 137 | + (fn [statement] |
| 138 | + (log/info "beta executing statement:" statement) |
| 139 | + (when (str/includes? (str/upper-case (first statement)) "BEGIN") |
| 140 | + (reset! tx-in-progress? true)) |
| 141 | + (try |
| 142 | + (let [res (parse-result (xtdb/jdbc-execute! conn statement))] |
| 143 | + (when (str/includes? (str/upper-case (first statement)) "COMMIT") |
| 144 | + (reset! tx-in-progress? false)) |
| 145 | + (if-not (vector? (ffirst res)) |
| 146 | + [res] |
| 147 | + res)) |
| 148 | + (catch Exception ex |
| 149 | + (log/error "Exception while running statement" (ex-message ex)) |
| 150 | + (when @tx-in-progress? |
| 151 | + (log/warn "Rolling back transaction") |
| 152 | + (xtdb/jdbc-execute! conn ["ROLLBACK"])) |
| 153 | + (parse-result |
| 154 | + [{:message (ex-message ex) |
| 155 | + :exception (.getClass ex) |
| 156 | + :data (ex-data ex)}])))) |
| 157 | + txs))) |
| 158 | + (prepare-statements tx-batches))] |
| 159 | + (log/info "run!-with-jdbc-conn-res" res) |
| 160 | + res)))) |
80 | 161 |
|
81 | 162 | (defn run!! |
82 | 163 | "Given transaction batches, a query and the type of transaction to |
83 | 164 | use, will run transaction batches and queries sequentially, |
84 | 165 | returning the last query response in column format." |
85 | | - [{:keys [tx-batches query tx-type]}] |
86 | | - (let [query (if (= "xtql" tx-type) (util/read-edn query) query)] |
87 | | - (xtdb/with-xtdb |
88 | | - (fn [node] |
89 | | - (if (= "sql-v2" tx-type) |
90 | | - (run!-with-jdbc-conn tx-batches query) |
91 | | - (let [res (run!-tx node tx-type tx-batches query)] |
92 | | - (util/map-results->rows res))))))) |
| 166 | + [{:keys [tx-batches tx-type]}] |
| 167 | + (xtdb/with-xtdb |
| 168 | + (fn [node] |
| 169 | + (if (#{"sql-v2" "sql"} tx-type) |
| 170 | + (run!-with-jdbc-conn tx-batches) |
| 171 | + (let [res (run!-tx node tx-type tx-batches)] |
| 172 | + (log/warn "run!!" res) |
| 173 | + (mapv (comp vector util/map-results->rows) res)))))) |
93 | 174 |
|
94 | 175 | (defn docs-run!! |
95 | 176 | "Given transaction batches and a query from the docs, will return the query |
|
98 | 179 | (xtdb/with-xtdb |
99 | 180 | (fn [node] |
100 | 181 | (run!-tx node "sql" |
101 | | - (mapv #(update % :txs util/read-edn) tx-batches) |
102 | | - (util/read-edn query))))) |
| 182 | + (vec |
| 183 | + (conj (mapv #(update % :txs util/read-edn) tx-batches) |
| 184 | + {:txs (util/read-edn query) :query true})))))) |
0 commit comments