Skip to content

Commit 38bb129

Browse files
committed
refactor(sharding): unify replication and rollback handling
- Merge rollback logic into unified add and replication methods - Auto-generate rollback queries when supported - Use rollback columns with schema fallback - Simplify queue insertion and processing with operation groups - Remove redundant rollback-specific methods and cleanup code
1 parent e0d006c commit 38bb129

File tree

7 files changed

+254
-373
lines changed

7 files changed

+254
-373
lines changed

src/Plugin/Sharding/CleanupManager.php

Lines changed: 52 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ final class CleanupManager {
1313
public function __construct(
1414
private Client $client,
1515
private Cluster $cluster
16-
) {}
16+
) {
17+
}
1718

1819
/**
1920
* Comprehensive cleanup of all orphaned resources
@@ -47,10 +48,9 @@ public function performFullCleanup(): array {
4748
$stateCleanup = $this->cleanupStaleStateEntries();
4849
$results['actions_taken'][] = "Cleaned {$stateCleanup['cleaned_count']} stale state entries";
4950
$results['resources_cleaned'] += $stateCleanup['cleaned_count'];
50-
5151
} catch (\Throwable $e) {
5252
$results['errors'][] = $e->getMessage();
53-
Buddy::debugvv("Cleanup error: " . $e->getMessage());
53+
Buddy::debugvv('Cleanup error: ' . $e->getMessage());
5454
}
5555

5656
return $results;
@@ -65,7 +65,7 @@ public function cleanupOrphanedTemporaryClusters(): array {
6565

6666
try {
6767
// Get all clusters
68-
$clusterResult = $this->client->sendRequest("SHOW CLUSTERS");
68+
$clusterResult = $this->client->sendRequest('SHOW CLUSTERS');
6969
/** @var array{0?:array{data?:array<array{cluster:string}>}} */
7070
$clusterData = $clusterResult->getResult();
7171
$clusters = $clusterData[0]['data'] ?? [];
@@ -74,22 +74,25 @@ public function cleanupOrphanedTemporaryClusters(): array {
7474
$clusterName = $cluster['cluster'] ?? '';
7575

7676
// Check if it's a temporary cluster (starts with temp_move_)
77-
if (strpos($clusterName, 'temp_move_') === 0) {
78-
// Check if it's orphaned (older than 1 hour)
79-
if ($this->isClusterOrphaned($clusterName)) {
80-
try {
81-
$this->client->sendRequest("DELETE CLUSTER {$clusterName}");
82-
$results['cleaned_count']++;
83-
Buddy::debugvv("Cleaned orphaned cluster: {$clusterName}");
84-
} catch (\Throwable $e) {
85-
$results['errors'][] = "Failed to clean cluster {$clusterName}: " . $e->getMessage();
86-
}
87-
}
77+
if (strpos($clusterName, 'temp_move_') !== 0) {
78+
continue;
8879
}
89-
}
9080

81+
// Check if it's orphaned (older than 1 hour)
82+
if (!$this->isClusterOrphaned($clusterName)) {
83+
continue;
84+
}
85+
86+
try {
87+
$this->client->sendRequest("DELETE CLUSTER {$clusterName}");
88+
$results['cleaned_count']++;
89+
Buddy::debugvv("Cleaned orphaned cluster: {$clusterName}");
90+
} catch (\Throwable $e) {
91+
$results['errors'][] = "Failed to clean cluster {$clusterName}: " . $e->getMessage();
92+
}
93+
}
9194
} catch (\Throwable $e) {
92-
$results['errors'][] = "Failed to list clusters: " . $e->getMessage();
95+
$results['errors'][] = 'Failed to list clusters: ' . $e->getMessage();
9396
}
9497

9598
return $results;
@@ -115,13 +118,15 @@ public function cleanupFailedOperationGroups(): array {
115118
return $results;
116119
}
117120

118-
$result = $this->client->sendRequest("
121+
$result = $this->client->sendRequest(
122+
"
119123
SELECT DISTINCT operation_group
120124
FROM {$queueTable}
121125
WHERE operation_group != ''
122126
AND created_at < {$cutoffTime}
123127
AND status IN ('failed', 'error')
124-
");
128+
"
129+
);
125130

126131
/** @var array{0?:array{data?:array<array{operation_group:string}>}} */
127132
$data = $result->getResult();
@@ -132,21 +137,21 @@ public function cleanupFailedOperationGroups(): array {
132137

133138
try {
134139
// Delete all queue items for this operation group
135-
$this->client->sendRequest("
140+
$this->client->sendRequest(
141+
"
136142
DELETE FROM {$queueTable}
137143
WHERE operation_group = '{$operationGroup}'
138-
");
144+
"
145+
);
139146

140147
$results['cleaned_count']++;
141148
Buddy::debugvv("Cleaned failed operation group: {$operationGroup}");
142-
143149
} catch (\Throwable $e) {
144150
$results['errors'][] = "Failed to clean operation group {$operationGroup}: " . $e->getMessage();
145151
}
146152
}
147-
148153
} catch (\Throwable $e) {
149-
$results['errors'][] = "Failed to clean operation groups: " . $e->getMessage();
154+
$results['errors'][] = 'Failed to clean operation groups: ' . $e->getMessage();
150155
}
151156

152157
return $results;
@@ -164,31 +169,34 @@ public function cleanupExpiredQueueItems(): array {
164169
$cutoffTime = (time() - 604800) * 1000; // 7 days ago in milliseconds
165170

166171
// Count items to be deleted
167-
$countResult = $this->client->sendRequest("
172+
$countResult = $this->client->sendRequest(
173+
"
168174
SELECT COUNT(*) as count
169175
FROM {$queueTable}
170176
WHERE created_at < {$cutoffTime}
171177
AND status IN ('processed', 'failed', 'error')
172-
");
178+
"
179+
);
173180

174181
/** @var array{0?:array{data?:array{0?:array{count:int}}}} */
175182
$countData = $countResult->getResult();
176183
$count = $countData[0]['data'][0]['count'] ?? 0;
177184

178185
if ($count > 0) {
179186
// Delete expired items
180-
$this->client->sendRequest("
187+
$this->client->sendRequest(
188+
"
181189
DELETE FROM {$queueTable}
182190
WHERE created_at < {$cutoffTime}
183191
AND status IN ('processed', 'failed', 'error')
184-
");
192+
"
193+
);
185194

186195
$results['cleaned_count'] = $count;
187196
Buddy::debugvv("Cleaned {$count} expired queue items");
188197
}
189-
190198
} catch (\Throwable $e) {
191-
$results['errors'][] = "Failed to clean expired queue items: " . $e->getMessage();
199+
$results['errors'][] = 'Failed to clean expired queue items: ' . $e->getMessage();
192200
}
193201

194202
return $results;
@@ -208,11 +216,13 @@ public function cleanupStaleStateEntries(): array {
208216
// Clean up old error entries (older than 30 days)
209217
$cutoffTime = time() - 2592000; // 30 days
210218

211-
$errorEntries = $this->client->sendRequest("
219+
$errorEntries = $this->client->sendRequest(
220+
"
212221
SELECT key FROM {$stateTable}
213222
WHERE key LIKE 'rebalance_error:%'
214223
AND updated_at < {$cutoffTime}
215-
");
224+
"
225+
);
216226

217227
/** @var array{0?:array{data?:array<array{key:string}>}} */
218228
$data = $errorEntries->getResult();
@@ -221,18 +231,19 @@ public function cleanupStaleStateEntries(): array {
221231
foreach ($entries as $entry) {
222232
try {
223233
// Delete the stale entry
224-
$this->client->sendRequest("
234+
$this->client->sendRequest(
235+
"
225236
DELETE FROM {$stateTable}
226237
WHERE key = '{$entry['key']}'
227-
");
238+
"
239+
);
228240
$results['cleaned_count']++;
229241
} catch (\Throwable $e) {
230242
$results['errors'][] = "Failed to clean state entry {$entry['key']}: " . $e->getMessage();
231243
}
232244
}
233-
234245
} catch (\Throwable $e) {
235-
$results['errors'][] = "Failed to clean state entries: " . $e->getMessage();
246+
$results['errors'][] = 'Failed to clean state entries: ' . $e->getMessage();
236247
}
237248

238249
return $results;
@@ -265,12 +276,14 @@ private function isClusterOrphaned(string $clusterName): bool {
265276
*/
266277
private function hasOperationGroupColumn(string $queueTable): bool {
267278
try {
268-
$result = $this->client->sendRequest("
279+
$result = $this->client->sendRequest(
280+
"
269281
SELECT operation_group FROM {$queueTable} LIMIT 1
270-
");
282+
"
283+
);
271284
return !$result->hasError();
272285
} catch (\Throwable $e) {
273286
return false;
274287
}
275288
}
276-
}
289+
}

0 commit comments

Comments
 (0)