Skip to content

Commit 9b97be0

Browse files
committed
fix(sharding): add rollback state tracking and improve rollback flow
- Record operation_group metadata in state for observability and resumability - Enhance rollback sequence to respect dependencies and track executed commands - Log rollback failures and mark rolled_back status to enable safe resume - Escape operation_group in queries to prevent injection issues - Improve error handling to avoid breaking core flow during state writes
1 parent 923451f commit 9b97be0

File tree

2 files changed

+124
-45
lines changed

2 files changed

+124
-45
lines changed

src/Plugin/Sharding/Queue.php

Lines changed: 116 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,16 @@ public function add(string $nodeId, string $query, string $rollbackQuery, ?strin
9191
"
9292
);
9393

94+
// Record operation_group metadata in sharding state for observability/resumability
95+
try {
96+
$state = new State($this->client);
97+
if ($operationGroup !== '') {
98+
$state->set("operation_group:{$operationGroup}", ['added_id' => $id, 'created_at' => $mt]);
99+
}
100+
} catch (\Throwable $_) {
101+
// ignore state write failures to avoid breaking core flow
102+
}
103+
94104
return $id;
95105
}
96106

@@ -346,11 +356,14 @@ public function rollbackOperationGroup(string $operationGroup): bool {
346356
return true; // Nothing to rollback is considered success
347357
}
348358

349-
return $this->executeRollbackSequence($rollbackCommands);
359+
return $this->executeRollbackSequence($rollbackCommands, $operationGroup);
350360
} catch (\Throwable $e) {
351361
Buddy::debugvv("Rollback failed for group {$operationGroup}: " . $e->getMessage());
352362
return false;
353363
}
364+
}
365+
return false;
366+
}
354367
}
355368

356369
/**
@@ -359,58 +372,116 @@ public function rollbackOperationGroup(string $operationGroup): bool {
359372
* @return array<array{id:int,node:string,rollback_query:string,rollback_wait_for_id:int}>
360373
* Rollback commands in reverse execution order
361374
*/
362-
protected function getRollbackCommands(string $operationGroup): array {
363-
$table = $this->cluster->getSystemTableName($this->table);
364-
365-
// Get completed commands with rollback queries in reverse order
366-
$query = "
367-
SELECT id, node, rollback_query, rollback_wait_for_id
368-
FROM {$table}
369-
WHERE operation_group = '{$operationGroup}'
370-
AND status = 'processed'
371-
AND rollback_query != ''
372-
ORDER BY id DESC
373-
";
374-
375-
/** @var array{0?:array{data?:array<array{id:int,node:string,rollback_query:string,rollback_wait_for_id:int}>}} */
376-
$result = $this->client->sendRequest($query)->getResult();
377-
378-
return $result[0]['data'] ?? [];
379-
}
375+
protected function getRollbackCommands(string $operationGroup): array {
376+
$table = $this->cluster->getSystemTableName($this->table);
377+
378+
// Get completed commands with rollback queries in reverse order
379+
$escapedGroup = addcslashes($operationGroup, "'");
380+
$query = "
381+
SELECT id, node, rollback_query, rollback_wait_for_id, operation_group
382+
FROM {$table}
383+
WHERE operation_group = '{$escapedGroup}'
384+
AND status != 'rolled_back'
385+
AND rollback_query != ''
386+
ORDER BY id DESC
387+
";
388+
389+
/** @var array{0?:array{data?:array<array{id:int,node:string,rollback_query:string,rollback_wait_for_id:int,operation_group:string}>}} */
390+
$result = $this->client->sendRequest($query)->getResult();
391+
392+
return $result[0]['data'] ?? [];
393+
}
380394

381395
/**
382396
* Execute rollback commands in sequence
383-
* @param array<array{id:int,node:string,rollback_query:string,rollback_wait_for_id:int}> $rollbackCommands
384-
* @return bool Success status
397+
* @param array<array{id:int,node:string,rollback_query:string,rollback_wait_for_id:int,operation_group:string}> $rollbackCommands
385398
*/
386-
protected function executeRollbackSequence(array $rollbackCommands): bool {
387-
$allSuccess = true;
399+
protected function executeRollbackSequence(array $rollbackCommands, string $operationGroup = ''): bool {
400+
401+
// Keep track of executed ids to satisfy rollback_wait_for_id without DB lookup
402+
$executed = [];
403+
$allSuccess = true;
404+
405+
foreach ($rollbackCommands as $command) {
406+
$id = (int)$command['id'];
407+
$waitFor = (int)$command['rollback_wait_for_id'];
408+
409+
// If there is dependency, wait until that id is rolled back (or we executed it)
410+
if ($waitFor > 0 && !in_array($waitFor, $executed, true)) {
411+
while (true) {
412+
$dep = $this->getById($waitFor);
413+
// If dependency row does not exist, proceed — nothing to wait for
414+
if (empty($dep)) {
415+
Buddy::debugvv("Rollback: dependency {$waitFor} not found for command {$id}, proceeding");
416+
break;
417+
}
388418

389-
foreach ($rollbackCommands as $command) {
390-
try {
391-
// Execute rollback command on the specific node
392-
$nodeId = $command['node'];
393-
$rollbackQuery = $command['rollback_query'];
394-
395-
Buddy::debugvv("Executing rollback on {$nodeId}: {$rollbackQuery}");
396-
397-
// Execute the rollback query
398-
$res = $this->client->sendRequest($rollbackQuery);
399-
if ($res->hasError()) {
400-
$error = $res->getError();
401-
Buddy::debugvv("Rollback command failed: {$rollbackQuery} - Error: {$error}");
402-
$allSuccess = false;
403-
// Continue with other rollback commands even if one fails
404-
} else {
405-
Buddy::debugvv("Rollback successful: {$rollbackQuery}");
419+
if ($dep['status'] === 'rolled_back' || in_array($waitFor, $executed, true)) {
420+
break;
406421
}
407-
} catch (\Throwable $e) {
408-
Buddy::debugvv("Rollback command exception: {$command['rollback_query']} - " . $e->getMessage());
409-
$allSuccess = false;
410-
// Continue with other rollback commands
422+
423+
// Sleep briefly to avoid busy loop
424+
usleep(100_000); // 100ms
411425
}
412426
}
413427

414-
return $allSuccess;
428+
try {
429+
// Execute rollback command on the specific node
430+
$nodeId = $command['node'];
431+
$rollbackQuery = $command['rollback_query'];
432+
433+
Buddy::debugvv("Executing rollback on {$nodeId}: {$rollbackQuery}");
434+
435+
// Execute the rollback query
436+
$res = $this->client->sendRequest($rollbackQuery);
437+
if ($res->hasError()) {
438+
$error = $res->getError();
439+
Buddy::debugvv("Rollback command failed: {$rollbackQuery} - Error: {$error}");
440+
441+
// Record rollback failure in state and stop sequence so it can be resumed later
442+
try {
443+
$state = new State($this->client);
444+
$state->set("rollback_log:{$operationGroup}:{$id}", $error);
445+
} catch (\Throwable $_) {
446+
// ignore state write failures
447+
}
448+
449+
return false;
450+
}
451+
452+
Buddy::debugvv("Rollback successful: {$rollbackQuery}");
453+
454+
// Mark this queue row as rolled_back
455+
try {
456+
$this->updateStatus($id, 'rolled_back', 0, 0);
457+
} catch (\Throwable $u) {
458+
Buddy::debugvv("Failed to mark rolled_back for {$id}: " . $u->getMessage());
459+
try {
460+
$state = new State($this->client);
461+
$state->set("rollback_log:{$operationGroup}:{$id}", 'mark_failed:' . $u->getMessage());
462+
} catch (\Throwable $_) {
463+
// ignore
464+
}
465+
466+
return false;
467+
}
468+
469+
$executed[] = $id;
470+
} catch (\Throwable $e) {
471+
Buddy::debugvv("Rollback command exception: {$command['rollback_query']} - " . $e->getMessage());
472+
473+
try {
474+
$state = new State($this->client);
475+
$state->set("rollback_log:{$operationGroup}:{$id}", $e->getMessage());
476+
} catch (\Throwable $_) {
477+
// ignore
478+
}
479+
480+
// Stop sequence on exception to allow manual/automated resume
481+
return false;
482+
}
415483
}
484+
485+
return $allSuccess;
486+
}
416487
}

src/Plugin/Sharding/Table.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,14 @@ public function shard(
241241
$operationGroup = "shard_create_{$this->name}_" . uniqid();
242242
$result['operation_group'] = $operationGroup;
243243

244+
// Record operation_group metadata in state for observability and resumability
245+
try {
246+
$state = new State($this->client);
247+
$state->set("operation_group:{$operationGroup}", json_encode(['table' => $this->name, 'created_at' => time()]));
248+
} catch (\Throwable $_) {
249+
// ignore state write errors
250+
}
251+
// Begin main operation scope
244252
try {
245253
/** @var Map<string,Set<int>> */
246254
$nodeShardsMap = new Map;

0 commit comments

Comments
 (0)