|
171 | 171 | (defn- run!-with-jdbc-conn [node tx-batches] |
172 | 172 | (with-open [conn (xtdb/get-node-connection node)] |
173 | 173 | (let [tx-in-progress? (atom false) |
174 | | - res (mapv (fn [txs] |
175 | | - (mapv |
176 | | - (fn [statement] |
177 | | - (log/debug "beta executing statement:" statement) |
178 | | - ;; Check if this is a PRAGMA statement |
179 | | - (if-let [pragma (pragma-statement? (first statement))] |
180 | | - ;; Handle PRAGMA statements |
181 | | - (case pragma |
182 | | - :finish-block |
183 | | - (do |
184 | | - (log/info "Executing PRAGMA finish_block") |
185 | | - (.finishBlock (.getLogProcessor (db-catalog/primary-db node))) |
186 | | - {:result [[:status] ["Block finished"]] |
187 | | - :warnings []}) |
188 | | - ;; Unknown pragma |
189 | | - {:error {:message (str "Unknown PRAGMA: " pragma)}}) |
190 | | - ;; Handle regular SQL statements |
191 | | - (do |
192 | | - (when (str/includes? (str/upper-case (first statement)) "BEGIN") |
193 | | - (reset! tx-in-progress? true)) |
194 | | - (try |
195 | | - (let [[rs warnings] (xtdb/jdbc-execute! conn statement) |
196 | | - res (xform-result rs)] |
197 | | - (when (str/includes? (str/upper-case (first statement)) "COMMIT") |
198 | | - (reset! tx-in-progress? false)) |
199 | | - (log/info :run-with-jdbc-conn-warnings warnings) |
200 | | - {:result res |
201 | | - :warnings warnings}) |
202 | | - (catch Exception ex |
203 | | - (log/error "Exception while running statement" (ex-message ex)) |
204 | | - (when @tx-in-progress? |
205 | | - (log/warn "Rolling back transaction") |
206 | | - (xtdb/jdbc-execute! conn ["ROLLBACK"])) |
207 | | - {:error {:message (ex-message ex) |
208 | | - :exception (.getClass ex) |
209 | | - :data (ex-data ex)}}))))) |
210 | | - txs)) |
211 | | - (transform-statements tx-batches))] |
| 174 | + ;; Keep track of which batches have system-time |
| 175 | + batches-with-system-time (set (keep-indexed |
| 176 | + (fn [idx batch] |
| 177 | + (when (:system-time batch) idx)) |
| 178 | + tx-batches)) |
| 179 | + transformed (transform-statements tx-batches) |
| 180 | + res (map-indexed |
| 181 | + (fn [batch-idx txs] |
| 182 | + (let [has-system-time? (contains? batches-with-system-time batch-idx)] |
| 183 | + (->> (mapv |
| 184 | + (fn [statement] |
| 185 | + (log/debug "beta executing statement:" statement) |
| 186 | + (let [upper-stmt (str/upper-case (first statement)) |
| 187 | + is-begin? (str/includes? upper-stmt "BEGIN") |
| 188 | + is-commit? (str/includes? upper-stmt "COMMIT") |
| 189 | + ;; Skip rendering BEGIN/COMMIT if they were auto-added for system-time |
| 190 | + skip-result? (and has-system-time? (or is-begin? is-commit?))] |
| 191 | + ;; Check if this is a PRAGMA statement |
| 192 | + (if-let [pragma (pragma-statement? (first statement))] |
| 193 | + ;; Handle PRAGMA statements |
| 194 | + (case pragma |
| 195 | + :finish-block |
| 196 | + (do |
| 197 | + (log/info "Executing PRAGMA finish_block") |
| 198 | + (.finishBlock (.getLogProcessor (db-catalog/primary-db node))) |
| 199 | + {:result [[:status] ["Block finished"]] |
| 200 | + :warnings []}) |
| 201 | + ;; Unknown pragma |
| 202 | + {:error {:message (str "Unknown PRAGMA: " pragma)}}) |
| 203 | + ;; Handle regular SQL statements |
| 204 | + (do |
| 205 | + (when is-begin? |
| 206 | + (reset! tx-in-progress? true)) |
| 207 | + (try |
| 208 | + (let [[rs warnings] (xtdb/jdbc-execute! conn statement) |
| 209 | + res (xform-result rs)] |
| 210 | + (when is-commit? |
| 211 | + (reset! tx-in-progress? false)) |
| 212 | + (log/info :run-with-jdbc-conn-warnings warnings) |
| 213 | + (if skip-result? |
| 214 | + nil ;; Return nil for auto-added BEGIN/COMMIT |
| 215 | + {:result res |
| 216 | + :warnings warnings})) |
| 217 | + (catch Exception ex |
| 218 | + (log/error "Exception while running statement" (ex-message ex)) |
| 219 | + (when @tx-in-progress? |
| 220 | + (log/warn "Rolling back transaction") |
| 221 | + (xtdb/jdbc-execute! conn ["ROLLBACK"])) |
| 222 | + {:error {:message (ex-message ex) |
| 223 | + :exception (.getClass ex) |
| 224 | + :data (ex-data ex)}})))))) |
| 225 | + txs) |
| 226 | + ;; Remove nils (filtered BEGIN/COMMIT results) |
| 227 | + (remove nil?) |
| 228 | + vec))) |
| 229 | + transformed) |
| 230 | + res (vec res)] |
212 | 231 | (log/debug "run!-with-jdbc-conn-res" res) |
213 | 232 | res))) |
214 | 233 |
|
|
0 commit comments