|
49 | 49 | import java.util.concurrent.BlockingQueue; |
50 | 50 | import java.util.concurrent.ExecutionException; |
51 | 51 | import java.util.concurrent.Future; |
| 52 | +import java.util.concurrent.atomic.AtomicBoolean; |
52 | 53 | import java.util.function.Consumer; |
53 | 54 | import java.util.regex.Matcher; |
54 | 55 | import java.util.regex.Pattern; |
@@ -207,8 +208,7 @@ private Stream<RowResult> runManyStatements( |
207 | 208 | runSchemaStatementsInTx( |
208 | 209 | scanner, internalQueue, params, addStatistics, timeout, reportError, fileName); |
209 | 210 | } else { |
210 | | - runDataStatementsInTx( |
211 | | - scanner, internalQueue, params, addStatistics, timeout, reportError, fileName); |
| 211 | + runDataStatementsInTx(scanner, internalQueue, params, addStatistics, reportError, fileName); |
212 | 212 | } |
213 | 213 | }, |
214 | 214 | RowResult.TOMBSTONE); |
@@ -246,40 +246,47 @@ private void runDataStatementsInTx( |
246 | 246 | BlockingQueue<RowResult> queue, |
247 | 247 | Map<String, Object> params, |
248 | 248 | boolean addStatistics, |
249 | | - long timeout, |
250 | 249 | boolean reportError, |
251 | 250 | String fileName) { |
252 | 251 | while (scanner.hasNext()) { |
253 | 252 | String stmt = removeShellControlCommands(scanner.next()); |
254 | 253 | if (stmt.trim().isEmpty()) continue; |
255 | | - boolean schemaOperation; |
256 | | - try { |
257 | | - schemaOperation = isSchemaOperation(stmt); |
258 | | - } catch (Exception e) { |
259 | | - collectError(queue, reportError, e, fileName); |
260 | | - return; |
261 | | - } |
262 | 254 |
|
263 | | - if (!schemaOperation) { |
264 | | - if (isPeriodicOperation(stmt)) { |
265 | | - Util.inThread(pools, () -> { |
266 | | - try { |
267 | | - return db.executeTransactionally( |
268 | | - stmt, params, result -> consumeResult(result, queue, addStatistics, tx, fileName)); |
269 | | - } catch (Exception e) { |
270 | | - collectError(queue, reportError, e, fileName); |
271 | | - return null; |
272 | | - } |
273 | | - }); |
274 | | - } else { |
| 255 | + // Periodic operations cannot be schema operations, so no need to check that here (will fail as invalid |
| 256 | + // query) |
| 257 | + if (isPeriodicOperation(stmt)) { |
| 258 | + Util.inThread(pools, () -> { |
| 259 | + try { |
| 260 | + return db.executeTransactionally( |
| 261 | + stmt, params, result -> consumeResult(result, queue, addStatistics, tx, fileName)); |
| 262 | + } catch (Exception e) { |
| 263 | + collectError(queue, reportError, e, fileName); |
| 264 | + return null; |
| 265 | + } |
| 266 | + }); |
| 267 | + } else { |
| 268 | + AtomicBoolean isSchemaError = new AtomicBoolean(false); |
| 269 | + try { |
275 | 270 | Util.inTx(db, pools, threadTx -> { |
276 | 271 | try (Result result = threadTx.execute(stmt, params)) { |
277 | 272 | return consumeResult(result, queue, addStatistics, tx, fileName); |
278 | 273 | } catch (Exception e) { |
279 | | - collectError(queue, reportError, e, fileName); |
| 274 | + // APOC historically skips schema operations |
| 275 | + if (!(e.getMessage().contains("Schema operations on database") |
| 276 | + && e.getMessage().contains("are not allowed"))) { |
| 277 | + collectError(queue, reportError, e, fileName); |
| 278 | + return null; |
| 279 | + } |
| 280 | + isSchemaError.set(true); |
280 | 281 | return null; |
281 | 282 | } |
282 | 283 | }); |
| 284 | + } catch (Exception e) { |
| 285 | + // An error thrown by a schema operation |
| 286 | + if (isSchemaError.get()) { |
| 287 | + continue; |
| 288 | + } |
| 289 | + throw e; |
283 | 290 | } |
284 | 291 | } |
285 | 292 | } |
|
0 commit comments