@@ -20,23 +20,125 @@ limitations under the License.
20
20
package volume
21
21
22
22
import (
23
+ "context"
24
+ "fmt"
23
25
"path/filepath"
26
+ "sync/atomic"
24
27
"syscall"
25
28
26
29
"os"
27
30
"time"
28
31
29
32
v1 "k8s.io/api/core/v1"
33
+ "k8s.io/client-go/tools/record"
30
34
"k8s.io/klog/v2"
35
+ "k8s.io/kubernetes/pkg/kubelet/events"
31
36
"k8s.io/kubernetes/pkg/volume/util/types"
32
37
)
33
38
34
39
const (
35
40
rwMask = os .FileMode (0660 )
36
41
roMask = os .FileMode (0440 )
37
42
execMask = os .FileMode (0110 )
43
+
44
+ progressReportDuration = 60 * time .Second
38
45
)
39
46
47
+ type VolumeOwnership struct {
48
+ mounter Mounter
49
+ dir string
50
+ fsGroup * int64
51
+ fsGroupChangePolicy * v1.PodFSGroupChangePolicy
52
+ completionCallback func (types.CompleteFuncParam )
53
+
54
+ // for monitoring progress of permission change operation
55
+ pod * v1.Pod
56
+ fileCounter atomic.Int64
57
+ recorder record.EventRecorder
58
+ }
59
+
60
+ func NewVolumeOwnership (mounter Mounter , dir string , fsGroup * int64 , fsGroupChangePolicy * v1.PodFSGroupChangePolicy , completeFunc func (types.CompleteFuncParam )) * VolumeOwnership {
61
+ vo := & VolumeOwnership {
62
+ mounter : mounter ,
63
+ dir : dir ,
64
+ fsGroup : fsGroup ,
65
+ fsGroupChangePolicy : fsGroupChangePolicy ,
66
+ completionCallback : completeFunc ,
67
+ }
68
+ vo .fileCounter .Store (0 )
69
+ return vo
70
+ }
71
+
72
+ func (vo * VolumeOwnership ) AddProgressNotifier (pod * v1.Pod , recorder record.EventRecorder ) * VolumeOwnership {
73
+ vo .pod = pod
74
+ vo .recorder = recorder
75
+ return vo
76
+ }
77
+
78
+ func (vo * VolumeOwnership ) ChangePermissions () error {
79
+ if vo .fsGroup == nil {
80
+ return nil
81
+ }
82
+
83
+ if skipPermissionChange (vo .mounter , vo .dir , vo .fsGroup , vo .fsGroupChangePolicy ) {
84
+ klog .V (3 ).InfoS ("Skipping permission and ownership change for volume" , "path" , vo .dir )
85
+ return nil
86
+ }
87
+
88
+ ctx , cancel := context .WithCancel (context .Background ())
89
+ defer cancel ()
90
+
91
+ timer := time .AfterFunc (30 * time .Second , func () {
92
+ vo .initiateProgressMonitor (ctx )
93
+ })
94
+ defer timer .Stop ()
95
+
96
+ return vo .changePermissionsRecursively ()
97
+ }
98
+
99
+ func (vo * VolumeOwnership ) initiateProgressMonitor (ctx context.Context ) {
100
+ klog .Warningf ("Setting volume ownership for %s and fsGroup set. If the volume has a lot of files then setting volume ownership could be slow, see https://github.com/kubernetes/kubernetes/issues/69699" , vo .dir )
101
+ if vo .pod != nil {
102
+ go vo .monitorProgress (ctx )
103
+ }
104
+ }
105
+
106
+ func (vo * VolumeOwnership ) changePermissionsRecursively () error {
107
+ err := walkDeep (vo .dir , func (path string , info os.FileInfo , err error ) error {
108
+ if err != nil {
109
+ return err
110
+ }
111
+ vo .fileCounter .Add (1 )
112
+ return changeFilePermission (path , vo .fsGroup , vo .mounter .GetAttributes ().ReadOnly , info )
113
+ })
114
+
115
+ if vo .completionCallback != nil {
116
+ vo .completionCallback (types.CompleteFuncParam {
117
+ Err : & err ,
118
+ })
119
+ }
120
+ return err
121
+ }
122
+
123
+ func (vo * VolumeOwnership ) monitorProgress (ctx context.Context ) {
124
+ ticker := time .NewTicker (progressReportDuration )
125
+ defer ticker .Stop ()
126
+ for {
127
+ select {
128
+ case <- ctx .Done ():
129
+ return
130
+ case <- ticker .C :
131
+ vo .logWarning ()
132
+ }
133
+ }
134
+ }
135
+
136
+ func (vo * VolumeOwnership ) logWarning () {
137
+ msg := fmt .Sprintf ("Setting volume ownership for %s, processed %d files" , vo .dir , vo .fileCounter .Load ())
138
+ klog .Warning (msg )
139
+ vo .recorder .Event (vo .pod , v1 .EventTypeWarning , events .VolumePermissionChangeInProgress , msg )
140
+ }
141
+
40
142
// SetVolumeOwnership modifies the given volume to be owned by
41
143
// fsGroup, and sets SetGid so that newly created files are owned by
42
144
// fsGroup. If fsGroup is nil nothing is done.
0 commit comments