77package main
88
99import (
10+ "bytes"
1011 "context"
12+ "encoding/json"
1113 "fmt"
1214 "io"
1315 "log/syslog"
16+ "net/http"
1417 "os"
1518 "path/filepath"
1619 "strings"
@@ -38,11 +41,12 @@ const (
3841type PluginConfig struct {
3942 Events []string `toml:"events"`
4043
41- ServerPath string `toml:"server_path"`
42- PersistDir string `toml:"persist_dir"`
43- Readable bool `toml:"readable"`
44- Timeout int `toml:"timeout"`
45- Overwrite bool `toml:"overwrite"`
44+ ServerPath string `toml:"server_path"`
45+ PersistDir string `toml:"persist_dir"`
46+ Readable bool `toml:"readable"`
47+ Timeout int `toml:"timeout"`
48+ Overwrite bool `toml:"overwrite"`
49+ PrefetchDistributionEndpoint string `toml:"prefetch_distribution_endpoint"`
4650}
4751
4852type PluginArgs struct {
@@ -104,6 +108,11 @@ func buildFlags(args *PluginArgs) []cli.Flag {
104108 Usage : "whether to overwrite the existed persistent files" ,
105109 Destination : & args .Config .Overwrite ,
106110 },
111+ & cli.StringFlag {
112+ Name : "prefetch-distribution-endpoint" ,
113+ Usage : "The service endpoint of prefetch distribution, for example: http://localhost:1323/api/v1/prefetch/upload" ,
114+ Destination : & args .Config .PrefetchDistributionEndpoint ,
115+ },
107116 }
108117}
109118
@@ -129,7 +138,8 @@ var (
129138)
130139
131140const (
132- imageNameLabel = "io.kubernetes.cri.image-name"
141+ imageNameLabel = "io.kubernetes.cri.image-name"
142+ containerNameLabel = "io.kubernetes.cri.container-name"
133143)
134144
135145func (p * plugin ) Configure (config , runtime , version string ) (stub.EventMask , error ) {
@@ -156,11 +166,26 @@ func (p *plugin) Configure(config, runtime, version string) (stub.EventMask, err
156166 return p .mask , nil
157167}
158168
169+ type PrefetchFile struct {
170+ Path string
171+ }
172+
173+ type CacheItem struct {
174+ ImageName string
175+ ContainerName string
176+ PrefetchFiles []PrefetchFile
177+ }
178+
179+ type Cache struct {
180+ Items map [string ]* CacheItem
181+ }
182+
159183func (p * plugin ) StartContainer (_ * api.PodSandbox , container * api.Container ) error {
160- dir , imageName , err := GetImageName (container .Annotations )
184+ dir , imageName , imageRepo , err := GetImageName (container .Annotations )
161185 if err != nil {
162186 return err
163187 }
188+ containerName := container .Annotations [containerNameLabel ]
164189
165190 persistDir := filepath .Join (cfg .PersistDir , dir )
166191 if err := os .MkdirAll (persistDir , os .ModePerm ); err != nil {
@@ -172,37 +197,127 @@ func (p *plugin) StartContainer(_ *api.PodSandbox, container *api.Container) err
172197 persistFile = fmt .Sprintf ("%s.timeout%ds" , persistFile , cfg .Timeout )
173198 }
174199
175- fanotifyServer := fanotify .NewServer (cfg .ServerPath , container .Pid , imageName , persistFile , cfg .Readable , cfg .Overwrite , time .Duration (cfg .Timeout )* time .Second , logWriter )
200+ var hasSentPrefetchList = false
201+
202+ fanotifyServer := fanotify .NewServer (cfg .ServerPath , container .Pid , imageName , persistFile , cfg .Readable , cfg .Overwrite , time .Duration (cfg .Timeout )* time .Second , logWriter , containerName , hasSentPrefetchList )
176203
177204 if err := fanotifyServer .RunServer (); err != nil {
178205 return err
179206 }
180207
208+ go func () {
209+ time .Sleep (10 * time .Minute )
210+ fanotifyServer .Mu .Lock ()
211+ if ! fanotifyServer .IsSent {
212+ data , err := getPrefetchList (persistFile )
213+ if err != nil {
214+ log .WithError (err ).Error ("error reading file" )
215+ }
216+ if err = sendToServer (imageRepo , containerName , cfg .PrefetchDistributionEndpoint , data ); err != nil {
217+ log .WithError (err ).Error ("failed to send prefetch to http server" )
218+ }
219+ fanotifyServer .IsSent = true
220+ }
221+ fanotifyServer .Mu .Unlock ()
222+ }()
223+
181224 globalFanotifyServer [imageName ] = fanotifyServer
182225
183226 return nil
184227}
185228
229+ func sendToServer (imageName , containerName , serverURL string , data []byte ) error {
230+ filePaths := strings .Split (string (data ), "\n " )
231+
232+ var prefetchFiles []PrefetchFile
233+ for _ , path := range filePaths {
234+ if path != "" {
235+ prefetchFiles = append (prefetchFiles , PrefetchFile {Path : path })
236+ }
237+ }
238+
239+ item := CacheItem {
240+ ImageName : imageName ,
241+ ContainerName : containerName ,
242+ PrefetchFiles : prefetchFiles ,
243+ }
244+
245+ err := postRequest (item , serverURL )
246+ if err != nil {
247+ return errors .Wrap (err , "error uploading to server" )
248+ }
249+
250+ return nil
251+ }
252+
253+ func postRequest (item CacheItem , endpoint string ) error {
254+ data , err := json .Marshal (item )
255+ if err != nil {
256+ return err
257+ }
258+
259+ resp , err := http .Post (endpoint , "application/json" , bytes .NewBuffer (data ))
260+ if err != nil {
261+ return err
262+ }
263+ defer resp .Body .Close ()
264+
265+ if resp .StatusCode != http .StatusOK {
266+ return errors .Wrap (fmt .Errorf ("server returned a non-OK status code: %d" , resp .StatusCode ), "HTTP Status Error" )
267+ }
268+
269+ body , err := io .ReadAll (resp .Body )
270+ if err != nil {
271+ return errors .Wrap (err , "failed to read response body" )
272+ }
273+
274+ log .Info ("Server Response:" , string (body ))
275+
276+ return nil
277+ }
278+
279+ func getPrefetchList (prefetchListPath string ) ([]byte , error ) {
280+ data , err := os .ReadFile (prefetchListPath )
281+ if err != nil {
282+ return nil , err
283+ }
284+ return data , nil
285+ }
286+
186287func (p * plugin ) StopContainer (_ * api.PodSandbox , container * api.Container ) ([]* api.ContainerUpdate , error ) {
187288 var update = []* api.ContainerUpdate {}
188- _ , imageName , err := GetImageName (container .Annotations )
289+ _ , imageName , imageRepo , err := GetImageName (container .Annotations )
189290 if err != nil {
190291 return update , err
191292 }
293+
192294 if fanotifyServer , ok := globalFanotifyServer [imageName ]; ok {
193- fanotifyServer .StopServer ()
295+ fanotifyServer .Mu .Lock ()
296+ if ! fanotifyServer .IsSent {
297+ data , err := getPrefetchList (fanotifyServer .PersistFile )
298+ if err != nil {
299+ return update , err
300+ }
301+ if err = sendToServer (imageRepo , fanotifyServer .ContainerName , cfg .PrefetchDistributionEndpoint , data ); err != nil {
302+ log .WithError (err ).Error ("failed to send prefetch to http server" )
303+ }
304+ fanotifyServer .IsSent = true
305+
306+ fanotifyServer .StopServer ()
307+ }
308+ fanotifyServer .Mu .Unlock ()
194309 } else {
195310 return nil , errors .New ("can not find fanotify server for container image " + imageName )
196311 }
197-
198312 return update , nil
199313}
200314
201- func GetImageName (annotations map [string ]string ) (string , string , error ) {
315+ func GetImageName (annotations map [string ]string ) (string , string , string , error ) {
202316 named , err := docker .ParseDockerRef (annotations [imageNameLabel ])
203317 if err != nil {
204- return "" , "" , err
318+ return "" , "" , "" , err
205319 }
320+ imageRepo := docker .Named .String (named )
206321 nameTagged := named .(docker.NamedTagged )
207322 repo := docker .Path (nameTagged )
208323
@@ -211,7 +326,7 @@ func GetImageName(annotations map[string]string) (string, string, error) {
211326
212327 imageName := image + ":" + nameTagged .Tag ()
213328
214- return dir , imageName , nil
329+ return dir , imageName , imageRepo , nil
215330}
216331
217332func (p * plugin ) onClose () {
0 commit comments