@@ -27,9 +27,7 @@ import (
27
27
snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
28
28
"github.com/containerd/containerd/contrib/snapshotservice"
29
29
"github.com/containerd/containerd/log"
30
- "github.com/containerd/containerd/snapshots"
31
30
"github.com/firecracker-microvm/firecracker-go-sdk/vsock"
32
- "github.com/sirupsen/logrus"
33
31
"golang.org/x/sync/errgroup"
34
32
"google.golang.org/grpc"
35
33
@@ -55,52 +53,46 @@ func Run(config config.Config) error {
55
53
56
54
group , ctx := errgroup .WithContext (ctx )
57
55
58
- cache := cache .NewSnapshotterCache ()
59
-
60
56
var (
61
57
monitor * metrics.Monitor
62
58
serviceDiscovery * discovery.ServiceDiscovery
63
59
)
60
+
64
61
if config .Snapshotter .Metrics .Enable {
65
- sdHost := config .Snapshotter .Metrics .Host
66
- sdPort := config .Snapshotter .Metrics .ServiceDiscoveryPort
67
- serviceDiscovery = discovery .NewServiceDiscovery (sdHost , sdPort , cache )
68
62
var err error
69
63
monitor , err = initMetricsProxyMonitor (config .Snapshotter .Metrics .PortRange )
70
64
if err != nil {
71
- log .G (ctx ).WithError (err ).Fatal ("failed creating metrics proxy monitor" )
72
- return err
65
+ return fmt .Errorf ("failed creating metrics proxy monitor: %w" , err )
73
66
}
74
- group .Go (func () error {
75
- return serviceDiscovery .Serve ()
76
- })
77
67
group .Go (func () error {
78
68
return monitor .Start ()
79
69
})
80
70
}
81
71
82
- snapshotter , err := initSnapshotter ( ctx , config , cache , monitor )
72
+ cache , err := initCache ( config , monitor )
83
73
if err != nil {
84
- log .G (ctx ).WithFields (
85
- logrus.Fields {"resolver" : config .Snapshotter .Proxy .Address .Resolver .Type },
86
- ).WithError (err ).Fatal ("failed creating socket resolver" )
87
- return err
74
+ return fmt .Errorf ("failed initializing cache: %w" , err )
75
+ }
76
+
77
+ if config .Snapshotter .Metrics .Enable {
78
+ sdHost := config .Snapshotter .Metrics .Host
79
+ sdPort := config .Snapshotter .Metrics .ServiceDiscoveryPort
80
+ serviceDiscovery = discovery .NewServiceDiscovery (sdHost , sdPort , cache )
81
+ group .Go (func () error {
82
+ return serviceDiscovery .Serve ()
83
+ })
88
84
}
89
85
86
+ snapshotter := demux .NewSnapshotter (cache )
87
+
90
88
grpcServer := grpc .NewServer ()
91
89
service := snapshotservice .FromSnapshotter (snapshotter )
92
90
snapshotsapi .RegisterSnapshotsServer (grpcServer , service )
93
91
94
92
listenerConfig := config .Snapshotter .Listener
95
93
listener , err := net .Listen (listenerConfig .Network , listenerConfig .Address )
96
94
if err != nil {
97
- log .G (ctx ).WithFields (
98
- logrus.Fields {
99
- "network" : listenerConfig .Network ,
100
- "address" : listenerConfig .Address ,
101
- },
102
- ).WithError (err ).Fatal ("failed creating listener" )
103
- return err
95
+ return fmt .Errorf ("failed creating service listener{network: %s, address: %s}: %w" , listenerConfig .Network , listenerConfig .Address , err )
104
96
}
105
97
106
98
group .Go (func () error {
@@ -138,8 +130,7 @@ func Run(config config.Config) error {
138
130
})
139
131
140
132
if err := group .Wait (); err != nil {
141
- log .G (ctx ).WithError (err ).Error ("demux snapshotter error" )
142
- return err
133
+ return fmt .Errorf ("demux snapshotter error: %w" , err )
143
134
}
144
135
145
136
log .G (ctx ).Info ("done" )
@@ -159,15 +150,24 @@ func initResolver(config config.Config) (proxyaddress.Resolver, error) {
159
150
const base10 = 10
160
151
const bits32 = 32
161
152
162
- func initSnapshotter ( ctx context. Context , config config.Config , cache cache. Cache , monitor * metrics.Monitor ) (snapshots. Snapshotter , error ) {
153
+ func initCache ( config config.Config , monitor * metrics.Monitor ) (* cache. RemoteSnapshotterCache , error ) {
163
154
resolver , err := initResolver (config )
164
155
if err != nil {
165
156
return nil , err
166
157
}
167
158
168
- newRemoteSnapshotterFunc := func (ctx context.Context , namespace string ) (* proxy.RemoteSnapshotter , error ) {
159
+ // TODO: https://github.com/firecracker-microvm/firecracker-containerd/issues/689
160
+ snapshotterDialer := func (ctx context.Context , host string , port uint64 ) (net.Conn , error ) {
161
+ return vsock .DialContext (ctx , host , uint32 (port ), vsock .WithLogger (log .G (ctx )),
162
+ vsock .WithAckMsgTimeout (2 * time .Second ),
163
+ vsock .WithRetryInterval (500 * time .Millisecond ),
164
+ )
165
+ }
166
+
167
+ dial := func (ctx context.Context , namespace string ) (net.Conn , error ) {
169
168
r := resolver
170
169
response , err := r .Get (namespace )
170
+
171
171
if err != nil {
172
172
return nil , err
173
173
}
@@ -177,12 +177,23 @@ func initSnapshotter(ctx context.Context, config config.Config, cache cache.Cach
177
177
return nil , err
178
178
}
179
179
180
- // TODO: https://github.com/firecracker-microvm/firecracker-containerd/issues/689
181
- snapshotterDialer := func (ctx context.Context , namespace string ) (net.Conn , error ) {
182
- return vsock .DialContext (ctx , host , uint32 (port ), vsock .WithLogger (log .G (ctx )),
183
- vsock .WithAckMsgTimeout (2 * time .Second ),
184
- vsock .WithRetryInterval (200 * time .Millisecond ),
185
- )
180
+ return snapshotterDialer (ctx , host , port )
181
+ }
182
+
183
+ fetch := func (ctx context.Context , namespace string ) (* proxy.RemoteSnapshotter , error ) {
184
+ r := resolver
185
+ response , err := r .Get (namespace )
186
+ if err != nil {
187
+ return nil , err
188
+ }
189
+ host := response .Address
190
+ port , err := strconv .ParseUint (response .SnapshotterPort , base10 , bits32 )
191
+ if err != nil {
192
+ return nil , err
193
+ }
194
+
195
+ dial := func (ctx context.Context , namespace string ) (net.Conn , error ) {
196
+ return snapshotterDialer (ctx , host , port )
186
197
}
187
198
188
199
var metricsProxy * metrics.Proxy
@@ -193,10 +204,20 @@ func initSnapshotter(ctx context.Context, config config.Config, cache cache.Cach
193
204
}
194
205
}
195
206
196
- return proxy .NewRemoteSnapshotter (ctx , host , snapshotterDialer , metricsProxy )
207
+ return proxy .NewRemoteSnapshotter (ctx , host , dial , metricsProxy )
208
+ }
209
+
210
+ opts := make ([]cache.SnapshotterCacheOption , 0 )
211
+
212
+ if config .Snapshotter .Cache .EvictOnConnectionFailure {
213
+ cachePollFrequency , err := time .ParseDuration (config .Snapshotter .Cache .PollConnectionFrequency )
214
+ if err != nil {
215
+ return nil , fmt .Errorf ("invalid cache evict poll connection frequency: %w" , err )
216
+ }
217
+ opts = append (opts , cache .EvictOnConnectionFailure (dial , cachePollFrequency , nil ))
197
218
}
198
219
199
- return demux . NewSnapshotter ( cache , newRemoteSnapshotterFunc ), nil
220
+ return cache . NewRemoteSnapshotterCache ( fetch , opts ... ), nil
200
221
}
201
222
202
223
func initMetricsProxyMonitor (portRange string ) (* metrics.Monitor , error ) {
0 commit comments