Skip to content

Commit e0d006c

Browse files
committed
feat(sharding): add automatic rollback, health monitoring, and cleanup
- Introduce atomic operation groups with rollback commands - Add rollback execution with error handling and schema migration - Implement rebalancing controls: stop, pause, resume, and progress tracking - Add HealthMonitor for issue detection and automatic recovery - Provide CleanupManager for orphaned resources and stale states - Extend queue schema and APIs to support rollback metadata - Update documentation with usage examples and migration steps
1 parent ff540bf commit e0d006c

File tree

14 files changed

+2589
-62
lines changed

14 files changed

+2589
-62
lines changed

doc/sharding/01-components.md

Lines changed: 166 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,32 +59,46 @@ public static function rebalanceWithNewNodes(Vector $schema, Set $newNodes, int
5959

6060
### 3. Queue Class (`src/Plugin/Sharding/Queue.php`)
6161

62-
Manages asynchronous command execution across cluster nodes.
62+
Manages asynchronous command execution across cluster nodes with rollback support.
6363

6464
**Key Features:**
6565
- Command queuing with dependencies
6666
- `wait_for_id` synchronization mechanism
6767
- Node-specific command targeting
6868
- Parallel execution support
69+
- **Rollback command storage** (NEW)
70+
- **Operation group management** (NEW)
71+
- **Automatic rollback execution** (NEW)
6972

7073
**Core Responsibilities:**
7174
- Command ordering and dependencies
7275
- Asynchronous execution management
7376
- Inter-node communication
7477
- Operation synchronization
78+
- **Rollback orchestration** (NEW)
79+
- **Group-based atomic operations** (NEW)
80+
81+
**Enhanced Methods (NEW):**
82+
- `addWithRollback()`: Add command with rollback support
83+
- `rollbackOperationGroup()`: Execute rollback for entire group
84+
- `getRollbackCommands()`: Retrieve rollback commands
85+
- `executeRollbackSequence()`: Execute rollback in reverse order
86+
- `hasRollbackSupport()`: Check for rollback column support
87+
- `migrateForRollbackSupport()`: Upgrade existing tables
7588

7689
**Synchronization Pattern:**
7790
```php
78-
// Command A
79-
$idA = $queue->add($node, "COMMAND A");
91+
// Command A with rollback
92+
$idA = $queue->addWithRollback($node, "CREATE TABLE t1", "DROP TABLE t1", $group);
8093

81-
// Command B waits for A to complete
94+
// Command B waits for A and has rollback
8295
$queue->setWaitForId($idA);
83-
$idB = $queue->add($node, "COMMAND B");
96+
$idB = $queue->addWithRollback($node, "ALTER CLUSTER c1 ADD t1", "ALTER CLUSTER c1 DROP t1", $group);
8497

85-
// Command C waits for B to complete
86-
$queue->setWaitForId($idB);
87-
$idC = $queue->add($node, "COMMAND C");
98+
// On failure, rollback entire group
99+
if ($error) {
100+
$queue->rollbackOperationGroup($group);
101+
}
88102
```
89103

90104
### 4. State Class (`src/Plugin/Sharding/State.php`)
@@ -251,3 +265,147 @@ class TestableQueue {
251265
- **Integration Testing**: Test with real dependencies where possible
252266

253267
This component architecture provides a robust, scalable foundation for distributed table sharding with comprehensive testing coverage and production-ready reliability.
268+
269+
## New Components (Rollback & Recovery System)
270+
271+
### 7. RollbackCommandGenerator Class (`src/Plugin/Sharding/RollbackCommandGenerator.php`)
272+
273+
Generates reverse SQL commands for rollback operations.
274+
275+
**Key Features:**
276+
- Automatic rollback command generation
277+
- Pattern matching for SQL commands
278+
- Safety checks for rollback operations
279+
- Batch generation support
280+
281+
**Supported Conversions:**
282+
- `CREATE TABLE``DROP TABLE IF EXISTS`
283+
- `CREATE CLUSTER``DELETE CLUSTER`
284+
- `ALTER CLUSTER ADD``ALTER CLUSTER DROP`
285+
- `ALTER CLUSTER DROP``ALTER CLUSTER ADD`
286+
- `JOIN CLUSTER``DELETE CLUSTER`
287+
288+
**Usage Pattern:**
289+
```php
290+
// Automatic generation
291+
$rollback = RollbackCommandGenerator::generate("CREATE TABLE users");
292+
// Returns: "DROP TABLE IF EXISTS users"
293+
294+
// Specific generators
295+
$rollback = RollbackCommandGenerator::forCreateTable("users");
296+
$rollback = RollbackCommandGenerator::forAlterClusterAdd("c1", "users");
297+
298+
// Safety check
299+
if (RollbackCommandGenerator::isSafeToRollback($command)) {
300+
$rollback = RollbackCommandGenerator::generate($command);
301+
}
302+
```
303+
304+
### 8. CleanupManager Class (`src/Plugin/Sharding/CleanupManager.php`)
305+
306+
Manages cleanup of orphaned resources and failed operations.
307+
308+
**Key Features:**
309+
- Orphaned cluster detection and removal
310+
- Failed operation group cleanup
311+
- Expired queue item removal
312+
- Stale state entry cleanup
313+
314+
**Core Methods:**
315+
- `performFullCleanup()`: Comprehensive cleanup of all resources
316+
- `cleanupOrphanedTemporaryClusters()`: Remove temp clusters >1 hour old
317+
- `cleanupFailedOperationGroups()`: Clean failed groups >24 hours old
318+
- `cleanupExpiredQueueItems()`: Remove queue items >7 days old
319+
- `cleanupStaleStateEntries()`: Clean state entries >30 days old
320+
321+
**Usage Pattern:**
322+
```php
323+
$cleanup = new CleanupManager($client, $cluster);
324+
$results = $cleanup->performFullCleanup();
325+
// Returns: ['resources_cleaned' => 42, 'actions_taken' => [...]]
326+
```
327+
328+
### 9. HealthMonitor Class (`src/Plugin/Sharding/HealthMonitor.php`)
329+
330+
Monitors system health and performs auto-recovery.
331+
332+
**Key Features:**
333+
- Comprehensive health checking
334+
- Automatic issue detection
335+
- Auto-recovery mechanisms
336+
- Recommendation generation
337+
338+
**Health Checks:**
339+
- Stuck rebalancing operations (>30 minutes)
340+
- Failed operations detection
341+
- Orphaned resource identification
342+
- Queue depth monitoring
343+
344+
**Core Methods:**
345+
- `performHealthCheck()`: Complete system health assessment
346+
- `performAutoRecovery()`: Automatic recovery from issues
347+
- `checkStuckOperations()`: Detect stuck rebalancing
348+
- `checkFailedOperations()`: Find failed operations
349+
- `checkOrphanedResources()`: Identify orphaned resources
350+
- `checkQueueHealth()`: Monitor queue depth
351+
352+
**Usage Pattern:**
353+
```php
354+
$monitor = new HealthMonitor($client, $cluster);
355+
$health = $monitor->performHealthCheck();
356+
357+
if ($health['overall_status'] !== 'healthy') {
358+
// Automatic recovery
359+
$recovery = $monitor->performAutoRecovery();
360+
361+
// Or manual intervention based on recommendations
362+
foreach ($health['recommendations'] as $recommendation) {
363+
echo $recommendation;
364+
}
365+
}
366+
```
367+
368+
## Component Interaction Flows
369+
370+
### Rollback Flow
371+
372+
```
373+
Table.shard()
374+
├── Creates operation_group
375+
├── Queue.addWithRollback() [multiple times]
376+
├── On failure:
377+
│ └── Queue.rollbackOperationGroup()
378+
│ ├── getRollbackCommands()
379+
│ └── executeRollbackSequence()
380+
└── On success: Complete
381+
```
382+
383+
### Health Monitoring Flow
384+
385+
```
386+
HealthMonitor.performHealthCheck()
387+
├── checkStuckOperations()
388+
├── checkFailedOperations()
389+
├── checkOrphanedResources()
390+
├── checkQueueHealth()
391+
└── generateRecommendations()
392+
└── performAutoRecovery() [if needed]
393+
├── recoverStuckOperation()
394+
├── recoverFailedOperation()
395+
└── CleanupManager.performFullCleanup()
396+
```
397+
398+
### Rebalancing Control Flow
399+
400+
```
401+
Table.rebalance()
402+
├── Create operation_group
403+
├── Check stop signal
404+
├── Execute operations
405+
│ └── Check stop signal between steps
406+
├── On stop signal:
407+
│ └── rollbackOperationGroup()
408+
└── On completion: Clear operation_group
409+
```
410+
411+
The enhanced component architecture now provides automatic rollback, health monitoring, and resource cleanup capabilities, making the sharding system more robust and production-ready.

doc/sharding/04-queue-system.md

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,28 @@ The Queue class manages distributed command execution with the following key fea
1313
- **Node Targeting**: Routes commands to specific cluster nodes
1414
- **Parallel Execution**: Supports concurrent operations where safe
1515
- **Synchronization Points**: Ensures critical operations complete before proceeding
16+
- **Rollback Support**: Stores rollback commands alongside forward commands
17+
- **Operation Groups**: Groups related commands for atomic execution
18+
- **Automatic Rollback**: Executes rollback sequence on failure
19+
20+
### Enhanced Queue Table Structure
21+
22+
```sql
23+
CREATE TABLE system.sharding_queue (
24+
`id` bigint, -- Primary key
25+
`node` string, -- Target node
26+
`query` string, -- Forward command
27+
`rollback_query` string, -- Rollback command (NEW)
28+
`wait_for_id` bigint, -- Forward dependency
29+
`rollback_wait_for_id` bigint, -- Rollback dependency (NEW)
30+
`operation_group` string, -- Operation group ID (NEW)
31+
`tries` int, -- Retry count
32+
`status` string, -- Command status
33+
`created_at` bigint, -- Creation timestamp
34+
`updated_at` bigint, -- Last update timestamp
35+
`duration` int -- Execution duration
36+
)
37+
```
1638

1739
### Queue Command Structure
1840

@@ -24,6 +46,17 @@ $command = [
2446
'query' => $sqlCommand,
2547
'wait_for_id' => $dependencyId, // Optional dependency
2648
];
49+
50+
// Enhanced command with rollback support
51+
$command = [
52+
'id' => $queueId,
53+
'node' => $nodeId,
54+
'query' => $sqlCommand,
55+
'rollback_query' => $rollbackCommand, // Reverse operation
56+
'wait_for_id' => $dependencyId,
57+
'rollback_wait_for_id' => 0, // Rollback dependency
58+
'operation_group' => $groupId, // Group identifier
59+
];
2760
```
2861

2962
## Command Dependencies and Synchronization
@@ -48,6 +81,77 @@ $idC = $queue->add($node, "COMMAND C");
4881
$queue->resetWaitForId();
4982
```
5083

84+
## Rollback Operations
85+
86+
### Adding Commands with Rollback
87+
88+
The queue system now supports adding commands with automatic rollback:
89+
90+
```php
91+
// Add command with automatic rollback generation
92+
$queue->addWithRollback(
93+
$nodeId,
94+
"CREATE TABLE users_s0 (id bigint)",
95+
null, // Auto-generate rollback
96+
$operationGroup
97+
);
98+
99+
// Add command with explicit rollback
100+
$queue->addWithRollback(
101+
$nodeId,
102+
"ALTER CLUSTER c1 ADD users_s0",
103+
"ALTER CLUSTER c1 DROP users_s0", // Explicit rollback
104+
$operationGroup
105+
);
106+
```
107+
108+
### Operation Groups
109+
110+
Related commands are grouped for atomic execution:
111+
112+
```php
113+
$operationGroup = "shard_create_users_" . uniqid();
114+
115+
// All commands in the same group
116+
$queue->addWithRollback($node1, $cmd1, $rollback1, $operationGroup);
117+
$queue->addWithRollback($node2, $cmd2, $rollback2, $operationGroup);
118+
$queue->addWithRollback($node3, $cmd3, $rollback3, $operationGroup);
119+
120+
// On failure, rollback entire group
121+
if ($error) {
122+
$queue->rollbackOperationGroup($operationGroup);
123+
}
124+
```
125+
126+
### Rollback Execution Flow
127+
128+
When a rollback is triggered:
129+
130+
1. **Get Rollback Commands**: Retrieve all completed commands in the group
131+
2. **Reverse Order**: Sort commands by ID descending (reverse order)
132+
3. **Execute Rollbacks**: Run each rollback command
133+
4. **Continue on Error**: If a rollback fails, continue with others
134+
5. **Report Status**: Return overall rollback success/failure
135+
136+
```php
137+
protected function executeRollbackSequence(array $rollbackCommands): bool {
138+
$allSuccess = true;
139+
140+
foreach ($rollbackCommands as $command) {
141+
try {
142+
$this->client->sendRequest($command['rollback_query']);
143+
Buddy::debugvv("Rollback successful: {$command['rollback_query']}");
144+
} catch (\Throwable $e) {
145+
Buddy::debugvv("Rollback failed: " . $e->getMessage());
146+
$allSuccess = false;
147+
// Continue with other rollback commands
148+
}
149+
}
150+
151+
return $allSuccess;
152+
}
153+
```
154+
51155
### Critical Synchronization Points
52156

53157
1. **Table Creation**: Must complete before cluster operations

0 commit comments

Comments
 (0)