|
155 | 155 | (doall (mapv |
156 | 156 | (fn [{:keys [system-time query txs] :as batch}] |
157 | 157 | (log/debug tx-type "running batch: " batch) |
158 | | - (try |
159 | | - (if query |
160 | | - (xtdb/query node txs) |
161 | | - (xtdb/submit! node txs {:system-time system-time})) |
162 | | - (catch Throwable ex |
163 | | - (log/error "Exception while running transaction" (ex-message ex)) |
164 | | - {:error {:message (ex-message ex) |
165 | | - :exception (.getClass ex) |
166 | | - :data (ex-data ex)}}))) |
| 158 | + (let [start-time (System/nanoTime)] |
| 159 | + (try |
| 160 | + (let [result (if query |
| 161 | + (xtdb/query node txs) |
| 162 | + (xtdb/submit! node txs {:system-time system-time})) |
| 163 | + elapsed-ms (Math/round (/ (- (System/nanoTime) start-time) 1000000.0))] |
| 164 | + (if query |
| 165 | + (assoc {} :result result :timing-ms elapsed-ms) |
| 166 | + (assoc result :timing-ms elapsed-ms))) |
| 167 | + (catch Throwable ex |
| 168 | + (let [elapsed-ms (Math/round (/ (- (System/nanoTime) start-time) 1000000.0))] |
| 169 | + (log/error "Exception while running transaction" (ex-message ex)) |
| 170 | + {:error {:message (ex-message ex) |
| 171 | + :exception (.getClass ex) |
| 172 | + :data (ex-data ex)} |
| 173 | + :timing-ms elapsed-ms}))))) |
167 | 174 | tx-batches))] |
168 | 175 | (log/info "run!-tx-res" tx-results) |
169 | 176 | tx-results)) |
|
173 | 180 | (let [tx-in-progress? (atom false) |
174 | 181 | ;; Keep track of which batches have system-time |
175 | 182 | batches-with-system-time (set (keep-indexed |
176 | | - (fn [idx batch] |
177 | | - (when (:system-time batch) idx)) |
178 | | - tx-batches)) |
| 183 | + (fn [idx batch] |
| 184 | + (when (:system-time batch) idx)) |
| 185 | + tx-batches)) |
179 | 186 | transformed (transform-statements tx-batches) |
180 | 187 | res (map-indexed |
181 | 188 | (fn [batch-idx txs] |
182 | 189 | (let [has-system-time? (contains? batches-with-system-time batch-idx)] |
183 | 190 | (->> (mapv |
184 | 191 | (fn [statement] |
185 | 192 | (log/debug "beta executing statement:" statement) |
186 | | - (let [upper-stmt (str/upper-case (first statement)) |
| 193 | + (let [start-time (System/nanoTime) |
| 194 | + upper-stmt (str/upper-case (first statement)) |
187 | 195 | is-begin? (str/includes? upper-stmt "BEGIN") |
188 | 196 | is-commit? (str/includes? upper-stmt "COMMIT") |
189 | 197 | ;; Skip rendering BEGIN/COMMIT if they were auto-added for system-time |
190 | | - skip-result? (and has-system-time? (or is-begin? is-commit?))] |
191 | | - ;; Check if this is a PRAGMA statement |
192 | | - (if-let [pragma (pragma-statement? (first statement))] |
193 | | - ;; Handle PRAGMA statements |
194 | | - (case pragma |
195 | | - :finish-block |
196 | | - (do |
197 | | - (log/info "Executing PRAGMA finish_block") |
198 | | - (.finishBlock (.getLogProcessor (db-catalog/primary-db node))) |
199 | | - {:result [[:status] ["Block finished"]] |
200 | | - :warnings []}) |
201 | | - ;; Unknown pragma |
202 | | - {:error {:message (str "Unknown PRAGMA: " pragma)}}) |
203 | | - ;; Handle regular SQL statements |
204 | | - (do |
205 | | - (when is-begin? |
206 | | - (reset! tx-in-progress? true)) |
207 | | - (try |
208 | | - (let [[rs warnings] (xtdb/jdbc-execute! conn statement) |
209 | | - res (xform-result rs)] |
210 | | - (when is-commit? |
211 | | - (reset! tx-in-progress? false)) |
212 | | - (log/info :run-with-jdbc-conn-warnings warnings) |
213 | | - (if skip-result? |
214 | | - nil ;; Return nil for auto-added BEGIN/COMMIT |
215 | | - {:result res |
216 | | - :warnings warnings})) |
217 | | - (catch Exception ex |
218 | | - (log/error "Exception while running statement" (ex-message ex)) |
219 | | - (when @tx-in-progress? |
220 | | - (log/warn "Rolling back transaction") |
221 | | - (xtdb/jdbc-execute! conn ["ROLLBACK"])) |
222 | | - {:error {:message (ex-message ex) |
223 | | - :exception (.getClass ex) |
224 | | - :data (ex-data ex)}})))))) |
| 198 | + skip-result? (and has-system-time? (or is-begin? is-commit?)) |
| 199 | + result |
| 200 | + ;; Check if this is a PRAGMA statement |
| 201 | + (if-let [pragma (pragma-statement? (first statement))] |
| 202 | + ;; Handle PRAGMA statements |
| 203 | + (case pragma |
| 204 | + :finish-block |
| 205 | + (do |
| 206 | + (log/info "Executing PRAGMA finish_block") |
| 207 | + (.finishBlock (.getLogProcessor (db-catalog/primary-db node))) |
| 208 | + {:result [[:status] ["Block finished"]] |
| 209 | + :warnings []}) |
| 210 | + ;; Unknown pragma |
| 211 | + {:error {:message (str "Unknown PRAGMA: " pragma)}}) |
| 212 | + ;; Handle regular SQL statements |
| 213 | + (do |
| 214 | + (when is-begin? |
| 215 | + (reset! tx-in-progress? true)) |
| 216 | + (try |
| 217 | + (let [[rs warnings] (xtdb/jdbc-execute! conn statement) |
| 218 | + res (xform-result rs)] |
| 219 | + (when is-commit? |
| 220 | + (reset! tx-in-progress? false)) |
| 221 | + (log/info :run-with-jdbc-conn-warnings warnings) |
| 222 | + (if skip-result? |
| 223 | + nil ;; Return nil for auto-added BEGIN/COMMIT |
| 224 | + {:result res |
| 225 | + :warnings warnings})) |
| 226 | + (catch Exception ex |
| 227 | + (log/error "Exception while running statement" (ex-message ex)) |
| 228 | + (when @tx-in-progress? |
| 229 | + (log/warn "Rolling back transaction") |
| 230 | + (xtdb/jdbc-execute! conn ["ROLLBACK"])) |
| 231 | + {:error {:message (ex-message ex) |
| 232 | + :exception (.getClass ex) |
| 233 | + :data (ex-data ex)}})))) |
| 234 | + elapsed-ms (Math/round (/ (- (System/nanoTime) start-time) 1000000.0))] |
| 235 | + (when result |
| 236 | + (assoc result :timing-ms elapsed-ms)))) |
225 | 237 | txs) |
226 | 238 | ;; Remove nils (filtered BEGIN/COMMIT results) |
227 | 239 | (remove nil?) |
|
254 | 266 | (mapv #(update % :txs util/read-edn) tx-batches)) |
255 | 267 | (let [res (run!-tx node "sql" |
256 | 268 | [{:txs (util/read-edn query) :query true}])] |
257 | | - (first res))))) |
| 269 | + (:result (first res)))))) |
0 commit comments