Skip to content

Commit d092d13

Browse files
committed
style(test): clean up QueueRollbackIntegrationTest.php
- Replace count() with sizeof() for array counts - Extract health recovery and cleanup into methods - Remove unused object instantiations - Reformat method signatures and code blocks for consistency
1 parent a6b651b commit d092d13

File tree

8 files changed

+170
-69
lines changed

8 files changed

+170
-69
lines changed

src/Plugin/Sharding/CleanupManager.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ public function cleanupFailedOperationGroups(): array {
106106
$results = ['cleaned_count' => 0, 'errors' => []];
107107

108108
try {
109-
$queue = new Queue($this->cluster, $this->client);
110109
$queueTable = $this->cluster->getSystemTableName('system.sharding_queue');
111110

112111
// Find operation groups that are older than 24 hours and have failed status
@@ -210,7 +209,6 @@ public function cleanupStaleStateEntries(): array {
210209
$results = ['cleaned_count' => 0, 'errors' => []];
211210

212211
try {
213-
$state = new State($this->client);
214212
$stateTable = $this->cluster->getSystemTableName('system.sharding_state');
215213

216214
// Clean up old error entries (older than 30 days)

src/Plugin/Sharding/Cluster.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,12 @@ public function remove(?Queue $queue = null): int {
102102
* @param string|null $operationGroup Optional operation group
103103
* @return int
104104
*/
105-
protected function runQuery(?Queue $queue, string $query, ?string $rollbackQuery = null, ?string $operationGroup = null): int {
105+
protected function runQuery(
106+
?Queue $queue,
107+
string $query,
108+
?string $rollbackQuery = null,
109+
?string $operationGroup = null
110+
): int {
106111
if ($queue) {
107112
$queueId = $queue->add($this->nodeId, $query, $rollbackQuery, $operationGroup);
108113
} else {

src/Plugin/Sharding/HealthMonitor.php

Lines changed: 69 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public function performHealthCheck(): array {
3636
$health['overall_status'] = 'unhealthy';
3737
$health['issues'][] = [
3838
'type' => 'stuck_operations',
39-
'count' => count($stuckCheck['stuck_tables']),
39+
'count' => sizeof($stuckCheck['stuck_tables']),
4040
'tables' => $stuckCheck['stuck_tables'],
4141
];
4242
}
@@ -48,7 +48,7 @@ public function performHealthCheck(): array {
4848
$health['overall_status'] = 'unhealthy';
4949
$health['issues'][] = [
5050
'type' => 'failed_operations',
51-
'count' => count($failedCheck['failed_tables']),
51+
'count' => sizeof($failedCheck['failed_tables']),
5252
'tables' => $failedCheck['failed_tables'],
5353
];
5454
}
@@ -101,50 +101,81 @@ public function performAutoRecovery(): array {
101101
return $results;
102102
}
103103

104-
// Recover stuck operations
105-
foreach ($healthCheck['issues'] as $issue) {
104+
$this->processHealthIssues($healthCheck['issues'], $results);
105+
$this->performCleanupIfNeeded($healthCheck['warnings'], $results);
106+
107+
return $results;
108+
}
109+
110+
/**
111+
* Process health issues and attempt recovery
112+
* @param array $issues
113+
* @param array &$results
114+
*/
115+
private function processHealthIssues(array $issues, array &$results): void {
116+
foreach ($issues as $issue) {
106117
if ($issue['type'] === 'stuck_operations') {
107-
foreach ($issue['tables'] as $tableName) {
108-
$recovery = $this->recoverStuckOperation($tableName);
109-
if ($recovery['success']) {
110-
$results['recovered_tables'][] = $tableName;
111-
$results['actions_taken'][] = "Recovered stuck operation for {$tableName}";
112-
} else {
113-
$results['failed_recoveries'][] = [
114-
'table' => $tableName,
115-
'error' => $recovery['error'],
116-
];
117-
}
118-
}
118+
$this->recoverStuckOperations($issue['tables'], $results);
119+
} elseif ($issue['type'] === 'failed_operations') {
120+
$this->recoverFailedOperations($issue['tables'], $results);
119121
}
122+
}
123+
}
120124

121-
if ($issue['type'] !== 'failed_operations') {
122-
continue;
125+
/**
126+
* Recover stuck operations for given tables
127+
* @param array $tables
128+
* @param array &$results
129+
*/
130+
private function recoverStuckOperations(array $tables, array &$results): void {
131+
foreach ($tables as $tableName) {
132+
$recovery = $this->recoverStuckOperation($tableName);
133+
if ($recovery['success']) {
134+
$results['recovered_tables'][] = $tableName;
135+
$results['actions_taken'][] = "Recovered stuck operation for {$tableName}";
136+
} else {
137+
$results['failed_recoveries'][] = [
138+
'table' => $tableName,
139+
'error' => $recovery['error'],
140+
];
123141
}
142+
}
143+
}
124144

125-
foreach ($issue['tables'] as $tableName) {
126-
$recovery = $this->recoverFailedOperation($tableName);
127-
if ($recovery['success']) {
128-
$results['recovered_tables'][] = $tableName;
129-
$results['actions_taken'][] = "Recovered failed operation for {$tableName}";
130-
} else {
131-
$results['failed_recoveries'][] = [
132-
'table' => $tableName,
133-
'error' => $recovery['error'],
134-
];
135-
}
145+
/**
146+
* Recover failed operations for given tables
147+
* @param array $tables
148+
* @param array &$results
149+
*/
150+
private function recoverFailedOperations(array $tables, array &$results): void {
151+
foreach ($tables as $tableName) {
152+
$recovery = $this->recoverFailedOperation($tableName);
153+
if ($recovery['success']) {
154+
$results['recovered_tables'][] = $tableName;
155+
$results['actions_taken'][] = "Recovered failed operation for {$tableName}";
156+
} else {
157+
$results['failed_recoveries'][] = [
158+
'table' => $tableName,
159+
'error' => $recovery['error'],
160+
];
136161
}
137162
}
163+
}
138164

139-
// Perform cleanup for warnings
140-
if (!empty($healthCheck['warnings'])) {
141-
$cleanupManager = new CleanupManager($this->client, $this->cluster);
142-
$cleanupResults = $cleanupManager->performFullCleanup();
143-
$results['cleanup_performed'] = true;
144-
$results['actions_taken'][] = "Performed cleanup: {$cleanupResults['resources_cleaned']} resources cleaned";
165+
/**
166+
* Perform cleanup if warnings exist
167+
* @param array $warnings
168+
* @param array &$results
169+
*/
170+
private function performCleanupIfNeeded(array $warnings, array &$results): void {
171+
if (empty($warnings)) {
172+
return;
145173
}
146174

147-
return $results;
175+
$cleanupManager = new CleanupManager($this->client, $this->cluster);
176+
$cleanupResults = $cleanupManager->performFullCleanup();
177+
$results['cleanup_performed'] = true;
178+
$results['actions_taken'][] = "Performed cleanup: {$cleanupResults['resources_cleaned']} resources cleaned";
148179
}
149180

150181
/**
@@ -155,7 +186,6 @@ private function checkStuckOperations(): array {
155186
$results = ['stuck_tables' => [], 'check_time' => time()];
156187

157188
try {
158-
$state = new State($this->client);
159189
$stateTable = $this->cluster->getSystemTableName('system.sharding_state');
160190

161191
// Find operations running for more than 30 minutes
@@ -199,7 +229,6 @@ private function checkFailedOperations(): array {
199229
$results = ['failed_tables' => [], 'check_time' => time()];
200230

201231
try {
202-
$state = new State($this->client);
203232
$stateTable = $this->cluster->getSystemTableName('system.sharding_state');
204233

205234
$failedOps = $this->client->sendRequest(
@@ -305,7 +334,8 @@ private function generateRecommendations(array $health): array {
305334
$recommendations[] = 'Reset stuck operations for tables: ' . implode(', ', $issue['tables']);
306335
break;
307336
case 'failed_operations':
308-
$recommendations[] = 'Investigate and recover failed operations for tables: ' . implode(', ', $issue['tables']);
337+
$recommendations[] = 'Investigate and recover failed operations for tables: '
338+
. implode(', ', $issue['tables']);
309339
break;
310340
}
311341
}

src/Plugin/Sharding/Table.php

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -595,7 +595,6 @@ public function getRebalancingProgress(): array {
595595

596596
if ($operationGroup) {
597597
// Get queue progress for this operation group
598-
$queue = new Queue($this->cluster, $this->client);
599598
$queueProgress = $this->getQueueProgress($operationGroup);
600599
$progress = array_merge($progress, $queueProgress);
601600
}
@@ -651,7 +650,12 @@ protected function clearStopSignal(): void {
651650
* @param ?State $state
652651
* @return void
653652
*/
654-
protected function handleRebalancingFailure(\Throwable $error, string $operationGroup, Queue $queue, ?State $state = null): void {
653+
protected function handleRebalancingFailure(
654+
\Throwable $error,
655+
string $operationGroup,
656+
Queue $queue,
657+
?State $state = null
658+
): void {
655659
if (!$state) {
656660
$state = new State($this->client);
657661
}
@@ -1137,7 +1141,11 @@ protected function moveShardWithIntermediateCluster(
11371141
$tempClusterName = "temp_move_{$shardId}_" . uniqid();
11381142

11391143
// Step 1: Create shard table on target node
1140-
$createQueueId = $queue->add($targetNode, $this->getCreateTableShardSQL($shardId), "DROP TABLE IF EXISTS {$shardName}");
1144+
$createQueueId = $queue->add(
1145+
$targetNode,
1146+
$this->getCreateTableShardSQL($shardId),
1147+
"DROP TABLE IF EXISTS {$shardName}"
1148+
);
11411149

11421150
// Step 2: Create temporary cluster on SOURCE node (where the data IS)
11431151
// CRITICAL: Use cluster name as path to ensure uniqueness for intermediate clusters
@@ -1149,7 +1157,11 @@ protected function moveShardWithIntermediateCluster(
11491157

11501158
// Step 3: Add shard to cluster on SOURCE node FIRST (before JOIN)
11511159
$queue->setWaitForId($clusterQueueId);
1152-
$queue->add($sourceNode, "ALTER CLUSTER {$tempClusterName} ADD {$shardName}", "ALTER CLUSTER {$tempClusterName} DROP {$shardName}");
1160+
$queue->add(
1161+
$sourceNode,
1162+
"ALTER CLUSTER {$tempClusterName} ADD {$shardName}",
1163+
"ALTER CLUSTER {$tempClusterName} DROP {$shardName}"
1164+
);
11531165

11541166
// Step 4: NEW node joins the cluster that SOURCE created
11551167
// Wait for table creation on target node to complete first
@@ -1164,7 +1176,11 @@ protected function moveShardWithIntermediateCluster(
11641176
// Step 5: CRITICAL - Wait for JOIN to complete (data is now synced)
11651177
// JOIN CLUSTER is synchronous, so once it's processed, data is fully copied
11661178
$queue->setWaitForId($joinQueueId);
1167-
$dropQueueId = $queue->add($sourceNode, "ALTER CLUSTER {$tempClusterName} DROP {$shardName}", "ALTER CLUSTER {$tempClusterName} ADD {$shardName}");
1179+
$dropQueueId = $queue->add(
1180+
$sourceNode,
1181+
"ALTER CLUSTER {$tempClusterName} DROP {$shardName}",
1182+
"ALTER CLUSTER {$tempClusterName} ADD {$shardName}"
1183+
);
11681184

11691185
// Step 6: Only after DROP from cluster, remove the table from source
11701186
$queue->setWaitForId($dropQueueId);

test/Plugin/Sharding/QueueCommandVerificationTest.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,11 @@ private function generateRF1Commands(Queue|TestableQueue $queue): void {
277277
$queue->add('127.0.0.1:3312', "CREATE TABLE IF NOT EXISTS test_table_s1 (id bigint) type='rt'", '');
278278

279279
// Intermediate cluster for shard movement (RF=1 specific)
280-
$queue->add('127.0.0.1:1312', "CREATE CLUSTER temp_move_test_cluster 'temp_move_test_cluster' as path", '');
280+
$queue->add(
281+
'127.0.0.1:1312',
282+
"CREATE CLUSTER temp_move_test_cluster 'temp_move_test_cluster' as path",
283+
''
284+
);
281285
$queue->add('127.0.0.1:3312', "JOIN CLUSTER temp_move_test_cluster at '127.0.0.1:1312'", '');
282286
$queue->add('127.0.0.1:1312', 'ALTER CLUSTER temp_move_test_cluster ADD test_table_s0', '');
283287
$queue->add('127.0.0.1:1312', 'ALTER CLUSTER temp_move_test_cluster DROP test_table_s0', '');

test/Plugin/Sharding/QueueRollbackIntegrationTest.php

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,24 @@ public function testTableCreationWithRollback(): void {
3535
// This simulates what would happen during table shard creation
3636

3737
// Add some typical table creation commands with rollback
38-
$this->queue->add('node1', 'CREATE TABLE test_s0 (id bigint)', 'DROP TABLE IF EXISTS test_s0', 'shard_create_test');
39-
$this->queue->add('node1', 'CREATE TABLE test_s1 (id bigint)', 'DROP TABLE IF EXISTS test_s1', 'shard_create_test');
40-
$this->queue->add('node1', 'CREATE TABLE test type=\'distributed\' local=\'test_s0,test_s1\'', 'DROP TABLE IF EXISTS test', 'shard_create_test');
38+
$this->queue->add(
39+
'node1',
40+
'CREATE TABLE test_s0 (id bigint)',
41+
'DROP TABLE IF EXISTS test_s0',
42+
'shard_create_test'
43+
);
44+
$this->queue->add(
45+
'node1',
46+
'CREATE TABLE test_s1 (id bigint)',
47+
'DROP TABLE IF EXISTS test_s1',
48+
'shard_create_test'
49+
);
50+
$this->queue->add(
51+
'node1',
52+
'CREATE TABLE test type=\'distributed\' local=\'test_s0,test_s1\'',
53+
'DROP TABLE IF EXISTS test',
54+
'shard_create_test'
55+
);
4156

4257
// Verify queue has commands with rollback
4358
$commands = $this->queue->getCapturedCommands();
@@ -92,7 +107,12 @@ public function testOperationGroupsForAtomicRollback(): void {
92107
// Add multiple commands with same operation group
93108
$this->queue->add('node1', 'CREATE TABLE test_s0 (id bigint)', 'DROP TABLE IF EXISTS test_s0', $operationGroup);
94109
$this->queue->add('node1', 'CREATE TABLE test_s1 (id bigint)', 'DROP TABLE IF EXISTS test_s1', $operationGroup);
95-
$this->queue->add('node1', 'CREATE TABLE test type=\'distributed\'', 'DROP TABLE IF EXISTS test', $operationGroup);
110+
$this->queue->add(
111+
'node1',
112+
'CREATE TABLE test type=\'distributed\'',
113+
'DROP TABLE IF EXISTS test',
114+
$operationGroup
115+
);
96116

97117
// Verify all commands have the same operation group
98118
$commands = $this->queue->getCapturedCommands();
@@ -133,13 +153,21 @@ public function testRebalancingWithRollback(): void {
133153
['node2', 'JOIN CLUSTER temp_move_0_123', 'DELETE CLUSTER temp_move_0_123'],
134154

135155
// Complete the move
136-
['node1', 'ALTER CLUSTER temp_move_0_123 DROP test_s0', 'ALTER CLUSTER temp_move_0_123 ADD test_s0'],
156+
[
157+
'node1',
158+
'ALTER CLUSTER temp_move_0_123 DROP test_s0',
159+
'ALTER CLUSTER temp_move_0_123 ADD test_s0',
160+
],
137161
['node1', 'DROP TABLE test_s0', ''], // Original shard removal (destructive)
138162
['node1', 'DELETE CLUSTER temp_move_0_123', ''], // Cleanup (destructive)
139163

140164
// Update distributed table
141165
['node1', 'DROP TABLE test', ''],
142-
['node1', 'CREATE TABLE test type=\'distributed\' local=\'test_s1\' agent=\'node2:test_s0\'', 'DROP TABLE IF EXISTS test'],
166+
[
167+
'node1',
168+
'CREATE TABLE test type=\'distributed\' local=\'test_s1\' agent=\'node2:test_s0\'',
169+
'DROP TABLE IF EXISTS test',
170+
],
143171
];
144172

145173
foreach ($rebalanceCommands as [$node, $query, $rollback]) {
@@ -148,7 +176,7 @@ public function testRebalancingWithRollback(): void {
148176

149177
// Verify rebalancing commands have proper rollback
150178
$commands = $this->queue->getCapturedCommands();
151-
$this->assertCount(count($rebalanceCommands), $commands);
179+
$this->assertCount(sizeof($rebalanceCommands), $commands);
152180

153181
// Check for shard movement commands with rollback
154182
$shardMovementCommands = array_filter(
@@ -195,9 +223,9 @@ public function testRollbackCommandPatterns(): void {
195223
}
196224

197225
$commands = $this->queue->getCapturedCommands();
198-
$this->assertCount(count($testOperations), $commands);
226+
$this->assertCount(sizeof($testOperations), $commands);
199227

200-
for ($i = 0; $i < count($testOperations); $i++) {
228+
for ($i = 0; $i < sizeof($testOperations); $i++) {
201229
$this->assertEquals($testOperations[$i][0], $commands[$i]['query']);
202230
$this->assertEquals($testOperations[$i][1], $commands[$i]['rollback_query']);
203231
}
@@ -253,15 +281,18 @@ public function testRollbackWithDifferentCommandTypes(): void {
253281
['DELETE CLUSTER temp_cluster', ''],
254282

255283
// Complex operations
256-
['CREATE TABLE distributed_users type=\'distributed\' local=\'users\'', 'DROP TABLE IF EXISTS distributed_users'],
284+
[
285+
'CREATE TABLE distributed_users type=\'distributed\' local=\'users\'',
286+
'DROP TABLE IF EXISTS distributed_users',
287+
],
257288
];
258289

259290
foreach ($commands as [$query, $rollback]) {
260291
$this->queue->add('node1', $query, $rollback, $operationGroup);
261292
}
262293

263294
$capturedCommands = $this->queue->getCapturedCommands();
264-
$this->assertCount(count($commands), $capturedCommands);
295+
$this->assertCount(sizeof($commands), $capturedCommands);
265296

266297
// Verify rollback patterns
267298
foreach ($capturedCommands as $i => $command) {

0 commit comments

Comments
 (0)