@@ -66,16 +66,16 @@ $command = [
6666The queue system uses ` wait_for_id ` to ensure proper command ordering:
6767
6868``` php
69- // Command A
70- $idA = $queue->add($node, "COMMAND A");
69+ // Command A with mandatory rollback
70+ $idA = $queue->add($node, "COMMAND A", "ROLLBACK A", $operationGroup );
7171
7272// Command B waits for A to complete
7373$queue->setWaitForId($idA);
74- $idB = $queue->add($node, "COMMAND B");
74+ $idB = $queue->add($node, "COMMAND B", "ROLLBACK B", $operationGroup );
7575
7676// Command C waits for B to complete
7777$queue->setWaitForId($idB);
78- $idC = $queue->add($node, "COMMAND C");
78+ $idC = $queue->add($node, "COMMAND C", "ROLLBACK C", $operationGroup );
7979
8080// Reset dependencies for independent commands
8181$queue->resetWaitForId();
@@ -85,19 +85,19 @@ $queue->resetWaitForId();
8585
8686### Adding Commands with Rollback
8787
88- The queue system now supports adding commands with automatic rollback:
88+ All queue commands now require explicit rollback commands :
8989
9090``` php
91- // Add command with automatic rollback generation
92- $queue->addWithRollback (
91+ // Add command with explicit rollback (rollback is mandatory)
92+ $queue->add (
9393 $nodeId,
9494 "CREATE TABLE users_s0 (id bigint)",
95- null , // Auto-generate rollback
95+ "DROP TABLE IF EXISTS users_s0" , // Explicit rollback required
9696 $operationGroup
9797);
9898
99- // Add command with explicit rollback
100- $queue->addWithRollback (
99+ // Add cluster command with rollback
100+ $queue->add (
101101 $nodeId,
102102 "ALTER CLUSTER c1 ADD users_s0",
103103 "ALTER CLUSTER c1 DROP users_s0", // Explicit rollback
@@ -112,10 +112,10 @@ Related commands are grouped for atomic execution:
112112``` php
113113$operationGroup = "shard_create_users_" . uniqid();
114114
115- // All commands in the same group
116- $queue->addWithRollback ($node1, $cmd1, $rollback1, $operationGroup);
117- $queue->addWithRollback ($node2, $cmd2, $rollback2, $operationGroup);
118- $queue->addWithRollback ($node3, $cmd3, $rollback3, $operationGroup);
115+ // All commands in the same group (rollback required for each)
116+ $queue->add ($node1, $cmd1, $rollback1, $operationGroup);
117+ $queue->add ($node2, $cmd2, $rollback2, $operationGroup);
118+ $queue->add ($node3, $cmd3, $rollback3, $operationGroup);
119119
120120// On failure, rollback entire group
121121if ($error) {
@@ -177,39 +177,41 @@ protected function moveShardWithIntermediateCluster(
177177 $tempClusterName = "temp_move_{$shardId}_" . uniqid();
178178
179179 // Step 1: Create shard table on target node
180- $createQueueId = $queue->add($targetNode, $this->getCreateTableShardSQL($shardId));
180+ $createQueueId = $queue->add($targetNode, $this->getCreateTableShardSQL($shardId), "DROP TABLE IF EXISTS {$shardName}" );
181181
182182 // Step 2: Create temporary cluster on SOURCE node (where the data IS)
183183 // CRITICAL: Use cluster name as path to ensure uniqueness
184184 $clusterQueueId = $queue->add(
185185 $sourceNode,
186- "CREATE CLUSTER {$tempClusterName} '{$tempClusterName}' as path"
186+ "CREATE CLUSTER {$tempClusterName} '{$tempClusterName}' as path",
187+ "DELETE CLUSTER {$tempClusterName}"
187188 );
188189
189190 // Step 3: Add shard to cluster on SOURCE node FIRST (before JOIN)
190191 $queue->setWaitForId($clusterQueueId);
191- $queue->add($sourceNode, "ALTER CLUSTER {$tempClusterName} ADD {$shardName}");
192+ $queue->add($sourceNode, "ALTER CLUSTER {$tempClusterName} ADD {$shardName}", "ALTER CLUSTER {$tempClusterName} DROP {$shardName}" );
192193
193194 // Step 4: TARGET node joins the cluster that SOURCE created
194195 // Wait for table creation on target node to complete first
195196 $queue->setWaitForId($createQueueId);
196197 $joinQueueId = $queue->add(
197198 $targetNode,
198- "JOIN CLUSTER {$tempClusterName} AT '{$sourceNode}' '{$tempClusterName}' as path"
199+ "JOIN CLUSTER {$tempClusterName} AT '{$sourceNode}' '{$tempClusterName}' as path",
200+ "DELETE CLUSTER {$tempClusterName}"
199201 );
200202
201203 // Step 5: CRITICAL - Wait for JOIN to complete (data is now synced)
202204 // JOIN CLUSTER is synchronous, so once it's processed, data is fully copied
203205 $queue->setWaitForId($joinQueueId);
204- $dropQueueId = $queue->add($sourceNode, "ALTER CLUSTER {$tempClusterName} DROP {$shardName}");
206+ $dropQueueId = $queue->add($sourceNode, "ALTER CLUSTER {$tempClusterName} DROP {$shardName}", "ALTER CLUSTER {$tempClusterName} ADD {$shardName}" );
205207
206208 // Step 6: Only after DROP from cluster, remove the table from source
207209 $queue->setWaitForId($dropQueueId);
208- $deleteQueueId = $queue->add($sourceNode, "DROP TABLE {$shardName}");
210+ $deleteQueueId = $queue->add($sourceNode, "DROP TABLE {$shardName}", "" );
209211
210212 // Step 7: Clean up temporary cluster ONLY on SOURCE node after all operations complete
211213 $queue->setWaitForId($deleteQueueId);
212- return $queue->add($sourceNode, "DELETE CLUSTER {$tempClusterName}");
214+ return $queue->add($sourceNode, "DELETE CLUSTER {$tempClusterName}", "" );
213215}
214216```
215217
@@ -253,15 +255,15 @@ protected function handleRFNNewNodes(Queue $queue, Vector $schema, Vector $newSc
253255
254256 foreach ($shardsForNewNode as $shard) {
255257 // Create shard table on new node
256- $queue->add($newNode, $this->getCreateTableShardSQL($shard));
258+ $queue->add($newNode, $this->getCreateTableShardSQL($shard), "DROP TABLE IF EXISTS {$shardName}" );
257259
258260 // Set up replication (no wait needed - parallel creation)
259261 $existingNodes = $this->getExistingNodesForShard($schema, $shard);
260262 $connectedNodes = $existingNodes->merge(new Set([$newNode]));
261263 $primaryNode = $existingNodes->first();
262264
263265 // Use existing cluster replication mechanism
264- $this->handleReplication($primaryNode, $queue, $connectedNodes, $clusterMap, $shard);
266+ $this->handleReplication($primaryNode, $queue, $connectedNodes, $clusterMap, $shard, $operationGroup );
265267 }
266268 }
267269}
@@ -279,7 +281,7 @@ protected function createDistributedTablesFromSchema(Queue $queue, Vector $newSc
279281 // Phase 1: Drop all distributed tables with proper force option
280282 foreach ($newSchema as $row) {
281283 $sql = "DROP TABLE IF EXISTS {$this->name} OPTION force=1";
282- $queueId = $queue->add($row['node'], $sql);
284+ $queueId = $queue->add($row['node'], $sql, "" );
283285 $dropQueueIds->add($queueId);
284286 }
285287
@@ -297,7 +299,7 @@ protected function createDistributedTablesFromSchema(Queue $queue, Vector $newSc
297299 }
298300
299301 $sql = $this->getCreateShardedTableSQLWithSchema($row['shards'], $newSchema);
300- $queue->add($row['node'], $sql);
302+ $queue->add($row['node'], $sql, "DROP TABLE IF EXISTS {$this->name}" );
301303 }
302304
303305 // Reset wait dependencies
@@ -310,14 +312,16 @@ protected function createDistributedTablesFromSchema(Queue $queue, Vector $newSc
310312### Queue Command Addition
311313
312314``` php
313- public function add(string $nodeId, string $query): int {
315+ public function add(string $nodeId, string $query, string $rollbackQuery, ?string $operationGroup = null ): int {
314316 $queueId = $this->generateNextId();
315317
316318 $command = [
317319 'id' => $queueId,
318320 'node' => $nodeId,
319321 'query' => $query,
322+ 'rollback_query' => $rollbackQuery,
320323 'wait_for_id' => $this->currentWaitForId,
324+ 'operation_group' => $operationGroup ?? '',
321325 'created_at' => time(),
322326 'status' => 'pending',
323327 ];
@@ -373,14 +377,14 @@ public function process(Node $node): void {
373377When operations must be strictly ordered:
374378
375379``` php
376- // Pattern: Each operation waits for the previous to complete
377- $step1Id = $queue->add($node, "STEP 1 COMMAND");
380+ // Pattern: Each operation waits for the previous to complete (rollback required)
381+ $step1Id = $queue->add($node, "STEP 1 COMMAND", "STEP 1 ROLLBACK" );
378382
379383$queue->setWaitForId($step1Id);
380- $step2Id = $queue->add($node, "STEP 2 COMMAND");
384+ $step2Id = $queue->add($node, "STEP 2 COMMAND", "STEP 2 ROLLBACK" );
381385
382386$queue->setWaitForId($step2Id);
383- $step3Id = $queue->add($node, "STEP 3 COMMAND");
387+ $step3Id = $queue->add($node, "STEP 3 COMMAND", "STEP 3 ROLLBACK" );
384388
385389$queue->resetWaitForId();
386390```
@@ -390,35 +394,35 @@ $queue->resetWaitForId();
390394When some operations can run in parallel but must converge:
391395
392396``` php
393- // Phase 1: Parallel operations
397+ // Phase 1: Parallel operations (rollback required for each)
394398$queue->resetWaitForId(); // Ensure no dependencies
395- $parallel1Id = $queue->add($node1, "PARALLEL COMMAND 1");
396- $parallel2Id = $queue->add($node2, "PARALLEL COMMAND 2");
397- $parallel3Id = $queue->add($node3, "PARALLEL COMMAND 3");
399+ $parallel1Id = $queue->add($node1, "PARALLEL COMMAND 1", "PARALLEL ROLLBACK 1" );
400+ $parallel2Id = $queue->add($node2, "PARALLEL COMMAND 2", "PARALLEL ROLLBACK 2" );
401+ $parallel3Id = $queue->add($node3, "PARALLEL COMMAND 3", "PARALLEL ROLLBACK 3" );
398402
399403// Phase 2: Wait for all parallel operations to complete
400404$maxParallelId = max($parallel1Id, $parallel2Id, $parallel3Id);
401405$queue->setWaitForId($maxParallelId);
402406
403407// Phase 3: Sequential operations that depend on all parallel operations
404- $finalStepId = $queue->add($node1, "FINAL COMMAND");
408+ $finalStepId = $queue->add($node1, "FINAL COMMAND", "FINAL ROLLBACK" );
405409```
406410
407411### Cross-Node Synchronization
408412
409413When operations on different nodes must be coordinated:
410414
411415``` php
412- // Node A prepares
413- $prepareId = $queue->add($nodeA, "PREPARE OPERATION");
416+ // Node A prepares (rollback required)
417+ $prepareId = $queue->add($nodeA, "PREPARE OPERATION", "PREPARE ROLLBACK" );
414418
415419// Node B waits for Node A to prepare, then joins
416420$queue->setWaitForId($prepareId);
417- $joinId = $queue->add($nodeB, "JOIN OPERATION");
421+ $joinId = $queue->add($nodeB, "JOIN OPERATION", "JOIN ROLLBACK" );
418422
419423// Node A waits for Node B to join, then finalizes
420424$queue->setWaitForId($joinId);
421- $finalizeId = $queue->add($nodeA, "FINALIZE OPERATION");
425+ $finalizeId = $queue->add($nodeA, "FINALIZE OPERATION", "FINALIZE ROLLBACK" );
422426```
423427
424428## Error Handling in Queue Operations
0 commit comments