Skip to content

Commit 2c1bd45

Browse files
authored
Merge pull request #3344 from hasethuraman/hari/retry-recover-migrations
fix: reconcile loop to watch migrations if earlier submission had failed
2 parents 9c3f52a + 6d435b9 commit 2c1bd45

File tree

6 files changed

+758
-5
lines changed

6 files changed

+758
-5
lines changed

pkg/azuredisk/azuredisk.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,6 @@ func NewDriver(options *DriverOptions) *Driver {
289289

290290
if kubeClient != nil && driver.NodeID == "" && driver.enableMigrationMonitor {
291291
eventBroadcaster := record.NewBroadcaster()
292-
eventBroadcaster.StartStructuredLogging(0)
293292
eventBroadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{
294293
Interface: kubeClient.CoreV1().Events(""),
295294
})
@@ -302,8 +301,12 @@ func NewDriver(options *DriverOptions) *Driver {
302301
// Recover any ongoing migrations after restart
303302
go func() {
304303
time.Sleep(30 * time.Second) // Wait for controller to fully start
305-
if err := driver.recoverMigrationMonitorsFromLabels(context.Background()); err != nil {
306-
klog.Errorf("Failed to recover migration monitors: %v", err)
304+
// Periodically check every 10 minutes - in worst case kubernetes administrator can add the label to subscribe the events
305+
for {
306+
if err := driver.recoverMigrationMonitorsFromLabels(context.Background()); err != nil {
307+
klog.Errorf("Failed to recover migration monitors: %v", err)
308+
}
309+
time.Sleep(10 * time.Minute)
307310
}
308311
}()
309312
}

pkg/azuredisk/controllerserver_test.go

Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"google.golang.org/grpc/status"
3737
"google.golang.org/protobuf/types/known/timestamppb"
3838
v1 "k8s.io/api/core/v1"
39+
apierrors "k8s.io/apimachinery/pkg/api/errors"
3940
"k8s.io/apimachinery/pkg/api/resource"
4041
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4142
"k8s.io/client-go/tools/record"
@@ -799,6 +800,254 @@ collect:
799800
d.GetMigrationMonitor().Stop()
800801
}
801802

803+
func TestCreateVolume_RecoveryLoopStartsMigrationAfterInitialMonitorUnavailable(t *testing.T) {
804+
origInterval := migrationCheckInterval
805+
defer func() { migrationCheckInterval = origInterval }()
806+
migrationCheckInterval = 25 * time.Millisecond
807+
808+
ctrl := gomock.NewController(t)
809+
defer ctrl.Finish()
810+
811+
d := getFakeDriverWithKubeClient(ctrl)
812+
// Simulate transient unavailability – monitor not ready at provisioning time
813+
d.SetMigrationMonitor(nil)
814+
815+
snapshotID := "/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Compute/snapshots/snap-recov1"
816+
volName := "pv-recover-transient-start"
817+
sizeGi := int64(10)
818+
capBytes := sizeGi * 1024 * 1024 * 1024
819+
diskURI := fmt.Sprintf("/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Compute/disks/%s", volName)
820+
821+
// Mock disk & snapshot clients
822+
diskClient := mock_diskclient.NewMockInterface(ctrl)
823+
d.getClientFactory().(*mock_azclient.MockClientFactory).EXPECT().
824+
GetDiskClientForSub(gomock.Any()).Return(diskClient, nil).AnyTimes()
825+
826+
snapshotClient := mock_snapshotclient.NewMockInterface(ctrl)
827+
d.getClientFactory().(*mock_azclient.MockClientFactory).EXPECT().
828+
GetSnapshotClientForSub(gomock.Any()).
829+
Return(snapshotClient, nil).AnyTimes()
830+
831+
// Snapshot SKU (source = Premium_LRS)
832+
snapshotClient.EXPECT().Get(gomock.Any(), "rg", "snap-recov1").Return(
833+
&armcompute.Snapshot{
834+
Name: to.Ptr("snap-recov1"),
835+
SKU: &armcompute.SnapshotSKU{Name: to.Ptr(armcompute.SnapshotStorageAccountTypesPremiumLRS)},
836+
Properties: &armcompute.SnapshotProperties{
837+
CreationData: &armcompute.CreationData{},
838+
},
839+
}, nil).AnyTimes()
840+
841+
// Track creation & progress
842+
var createDone atomic.Bool
843+
var progressPolls atomic.Int32
844+
845+
// GET behavior:
846+
// - Before CreateOrUpdate: return NotFound
847+
// - After CreateOrUpdate: return disk with ProvisioningState=Succeeded and evolving CompletionPercent
848+
diskClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).
849+
DoAndReturn(func(_ context.Context, _ string, _ string) (*armcompute.Disk, error) {
850+
if !createDone.Load() {
851+
// Simulate disk not existing yet
852+
return nil, apierrors.NewInternalError(fmt.Errorf("transient error"))
853+
}
854+
p := progressPolls.Add(1)
855+
var pct float32
856+
if p >= 5 {
857+
pct = 100
858+
} else {
859+
pct = 0
860+
}
861+
return &armcompute.Disk{
862+
ID: to.Ptr(diskURI),
863+
SKU: &armcompute.DiskSKU{Name: to.Ptr(armcompute.DiskStorageAccountTypesPremiumLRS)},
864+
Properties: &armcompute.DiskProperties{
865+
DiskSizeGB: to.Ptr[int32](int32(sizeGi)),
866+
ProvisioningState: to.Ptr("Succeeded"),
867+
CompletionPercent: to.Ptr(pct),
868+
},
869+
}, nil
870+
}).AnyTimes()
871+
872+
// CreateOrUpdate sets createDone
873+
diskClient.EXPECT().CreateOrUpdate(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
874+
DoAndReturn(func(_ context.Context, _ string, _ string, _ armcompute.Disk) (*armcompute.Disk, error) {
875+
return &armcompute.Disk{
876+
ID: to.Ptr(diskURI),
877+
SKU: &armcompute.DiskSKU{Name: to.Ptr(armcompute.DiskStorageAccountTypesPremiumLRS)},
878+
Properties: &armcompute.DiskProperties{
879+
DiskSizeGB: to.Ptr[int32](int32(sizeGi)),
880+
ProvisioningState: to.Ptr("Succeeded"),
881+
},
882+
}, nil
883+
}).Times(1)
884+
885+
// Patch used during migration monitor after recovery
886+
diskClient.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
887+
Return(&armcompute.Disk{
888+
ID: to.Ptr(diskURI),
889+
SKU: &armcompute.DiskSKU{Name: to.Ptr(armcompute.DiskStorageAccountTypesPremiumV2LRS)},
890+
}, nil).AnyTimes()
891+
892+
req := &csi.CreateVolumeRequest{
893+
Name: volName,
894+
CapacityRange: &csi.CapacityRange{
895+
RequiredBytes: capBytes,
896+
},
897+
Parameters: map[string]string{
898+
consts.SkuNameField: string(armcompute.DiskStorageAccountTypesPremiumV2LRS),
899+
},
900+
VolumeCapabilities: []*csi.VolumeCapability{
901+
{
902+
AccessType: &csi.VolumeCapability_Mount{Mount: &csi.VolumeCapability_MountVolume{}},
903+
AccessMode: &csi.VolumeCapability_AccessMode{
904+
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
905+
},
906+
},
907+
},
908+
VolumeContentSource: &csi.VolumeContentSource{
909+
Type: &csi.VolumeContentSource_Snapshot{
910+
Snapshot: &csi.VolumeContentSource_SnapshotSource{
911+
SnapshotId: snapshotID,
912+
},
913+
},
914+
},
915+
}
916+
917+
_, err := d.CreateVolume(context.Background(), req)
918+
assert.NoError(t, err, "CreateVolume should succeed")
919+
assert.True(t, d.GetMigrationMonitor().IsMigrationActive(diskURI), "Migration for volume %s should be active", diskURI)
920+
921+
// Simulate restart
922+
d.GetMigrationMonitor().Stop()
923+
// Ensure cleanup (task removed)
924+
cleanupDeadline := time.Now().Add(1 * time.Second)
925+
for {
926+
if !d.GetMigrationMonitor().IsMigrationActive(diskURI) {
927+
break
928+
}
929+
if time.Now().After(cleanupDeadline) {
930+
t.Fatalf("Migration task still active after completion")
931+
}
932+
time.Sleep(25 * time.Millisecond)
933+
}
934+
createDone.Store(true)
935+
936+
eventRecorder := record.NewFakeRecorder(100)
937+
d.SetMigrationMonitor(NewMigrationProgressMonitor(d.getCloud().KubeClient, eventRecorder, d.GetDiskController()))
938+
939+
// Prepare labeled PV/PVC for recovery
940+
coreMock := d.getCloud().KubeClient.CoreV1().(*mockcorev1.MockInterface)
941+
pvIf := d.getCloud().KubeClient.CoreV1().PersistentVolumes().(*mockpersistentvolume.MockInterface)
942+
pvcIf := mockpersistentvolumeclaim.NewMockPersistentVolumeClaimInterface(ctrl)
943+
coreMock.EXPECT().PersistentVolumeClaims(gomock.Any()).Return(pvcIf).AnyTimes()
944+
945+
pvcName := "pvc-" + volName
946+
ns := "default"
947+
948+
pv := &v1.PersistentVolume{
949+
ObjectMeta: metav1.ObjectMeta{
950+
Name: volName,
951+
Labels: map[string]string{
952+
LabelMigrationInProgress: "true",
953+
},
954+
},
955+
Spec: v1.PersistentVolumeSpec{
956+
Capacity: v1.ResourceList{
957+
v1.ResourceName("storage"): *resource.NewQuantity(capBytes, resource.BinarySI),
958+
},
959+
ClaimRef: &v1.ObjectReference{Name: pvcName, Namespace: ns},
960+
PersistentVolumeSource: v1.PersistentVolumeSource{
961+
CSI: &v1.CSIPersistentVolumeSource{
962+
Driver: "disk.csi.azure.com",
963+
VolumeHandle: diskURI,
964+
VolumeAttributes: map[string]string{
965+
"storageAccountType": string(armcompute.DiskStorageAccountTypesPremiumV2LRS),
966+
"skuName": string(armcompute.DiskStorageAccountTypesPremiumV2LRS),
967+
},
968+
},
969+
},
970+
},
971+
}
972+
pvc := &v1.PersistentVolumeClaim{
973+
ObjectMeta: metav1.ObjectMeta{Name: pvcName, Namespace: ns},
974+
Spec: v1.PersistentVolumeClaimSpec{VolumeName: pv.Name},
975+
}
976+
977+
pvIf.EXPECT().List(gomock.Any(), gomock.Any()).
978+
Return(&v1.PersistentVolumeList{Items: []v1.PersistentVolume{*pv}}, nil).AnyTimes()
979+
pvIf.EXPECT().Get(gomock.Any(), pv.Name, gomock.Any()).
980+
Return(pv.DeepCopy(), nil).AnyTimes()
981+
pvIf.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Any()).
982+
DoAndReturn(func(_ context.Context, updated *v1.PersistentVolume, _ metav1.UpdateOptions) (*v1.PersistentVolume, error) {
983+
return updated, nil
984+
}).AnyTimes()
985+
pvcIf.EXPECT().Get(gomock.Any(), pvc.Name, gomock.Any()).
986+
Return(pvc.DeepCopy(), nil).AnyTimes()
987+
988+
// Accelerated recovery loop (instead of 30s + 10m)
989+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
990+
defer cancel()
991+
go func() {
992+
for {
993+
select {
994+
case <-ctx.Done():
995+
return
996+
default:
997+
_ = d.RecoverMigrationMonitor(context.Background())
998+
time.Sleep(40 * time.Millisecond)
999+
}
1000+
}
1001+
}()
1002+
1003+
// Wait for migration activation
1004+
activateDeadline := time.Now().Add(2 * time.Second)
1005+
for {
1006+
if d.GetMigrationMonitor().IsMigrationActive(diskURI) {
1007+
break
1008+
}
1009+
if time.Now().After(activateDeadline) {
1010+
t.Fatalf("Migration did not activate via recovery")
1011+
}
1012+
time.Sleep(25 * time.Millisecond)
1013+
}
1014+
1015+
// Collect events until Started & Completed
1016+
foundStart, foundCompletion := false, false
1017+
timeout := time.After(6 * time.Second)
1018+
for !(foundStart && foundCompletion) {
1019+
select {
1020+
case e := <-eventRecorder.Events:
1021+
if strings.Contains(e, ReasonSKUMigrationStarted) {
1022+
foundStart = true
1023+
}
1024+
if strings.Contains(e, ReasonSKUMigrationCompleted) {
1025+
foundCompletion = true
1026+
}
1027+
case <-timeout:
1028+
t.Fatalf("Timed out waiting for migration start/completion (start=%v completion=%v)", foundStart, foundCompletion)
1029+
case <-time.After(30 * time.Millisecond):
1030+
}
1031+
}
1032+
1033+
assert.True(t, foundStart, "Expected migration start event")
1034+
assert.True(t, foundCompletion, "Expected migration completion event")
1035+
1036+
// Ensure cleanup (task removed)
1037+
cleanupDeadline = time.Now().Add(1 * time.Second)
1038+
for {
1039+
if !d.GetMigrationMonitor().IsMigrationActive(diskURI) {
1040+
break
1041+
}
1042+
if time.Now().After(cleanupDeadline) {
1043+
t.Fatalf("Migration task still active after completion")
1044+
}
1045+
time.Sleep(25 * time.Millisecond)
1046+
}
1047+
1048+
d.GetMigrationMonitor().Stop()
1049+
}
1050+
8021051
func TestDeleteVolume(t *testing.T) {
8031052
cntl := gomock.NewController(t)
8041053
defer cntl.Finish()

pkg/azuredisk/migration_monitor.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"k8s.io/client-go/kubernetes"
3636
"k8s.io/client-go/tools/record"
3737
"k8s.io/klog/v2"
38+
"sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
3839
)
3940

4041
const (
@@ -625,12 +626,25 @@ func (d *Driver) recoverMigrationMonitorsFromLabels(ctx context.Context) error {
625626
diskURI := pv.Spec.CSI.VolumeHandle
626627

627628
// For recovery, we need to determine fromSKU and toSKU from the disk properties
628-
klog.V(2).Infof("Recovering migration monitor for PV: %s", pv.Name)
629+
klog.V(3).Infof("Recovering migration monitor for PV: %s", pv.Name)
629630

630631
// For now, we'll use placeholders - when more SKUs involve for migration, need to get this from disk properties
631632
fromSKU := string(armcompute.DiskStorageAccountTypesPremiumLRS) // we may incorrectly say PremiumLRS instead of StandardZRS, lets keep it simple for now
632633
toSKU := armcompute.DiskStorageAccountTypesPremiumV2LRS
633634

635+
if pv.Spec.CSI.VolumeAttributes != nil {
636+
if sku, exists := azureutils.ParseDiskParametersForKey(pv.Spec.CSI.VolumeAttributes, "storageAccountType"); exists {
637+
if !strings.EqualFold(sku, string(toSKU)) {
638+
continue
639+
}
640+
}
641+
if sku, exists := azureutils.ParseDiskParametersForKey(pv.Spec.CSI.VolumeAttributes, "skuName"); exists {
642+
if !strings.EqualFold(sku, string(toSKU)) {
643+
continue
644+
}
645+
}
646+
}
647+
634648
storageQtyVal := pv.Spec.Capacity[corev1.ResourceStorage]
635649
storageQty := &storageQtyVal
636650
volumeSizeInBytes := storageQty.Value()
@@ -643,6 +657,6 @@ func (d *Driver) recoverMigrationMonitorsFromLabels(ctx context.Context) error {
643657
}
644658
}
645659

646-
klog.V(2).Infof("Recovered %d migration monitors from labels", recoveredCount)
660+
klog.V(4).Infof("Recovered %d migration monitors from labels", recoveredCount)
647661
return nil
648662
}

0 commit comments

Comments
 (0)