@@ -5,17 +5,22 @@ package recovery
55
66import (
77 "context"
8+ "encoding/json"
89 "fmt"
10+ "net/url"
911 "os"
1012 "path"
1113 "strings"
1214
1315 "github.com/kubernetes-incubator/bootkube/pkg/asset"
1416
17+ "github.com/coreos/etcd-operator/pkg/spec"
1518 "github.com/coreos/etcd/clientv3"
19+ "github.com/pborman/uuid"
1620 "k8s.io/apimachinery/pkg/api/meta"
1721 "k8s.io/apimachinery/pkg/runtime"
1822 "k8s.io/client-go/pkg/api"
23+ "k8s.io/client-go/pkg/apis/extensions/v1beta1"
1924)
2025
2126// etcdBackend is a backend that extracts a controlPlane from an etcd instance.
@@ -57,6 +62,7 @@ func (s *etcdBackend) read(ctx context.Context) (*controlPlane, error) {
5762 return nil , err
5863 }
5964 }
65+
6066 return cp , nil
6167}
6268
@@ -78,6 +84,19 @@ func (s *etcdBackend) get(ctx context.Context, key string, out runtime.Object, i
7884 return decode (s .decoder , kv .Value , out )
7985}
8086
87+ func (s * etcdBackend ) getBytes (ctx context.Context , key string ) ([]byte , error ) {
88+ key = path .Join (s .pathPrefix , key )
89+ getResp , err := s .client .KV .Get (ctx , key )
90+ if err != nil {
91+ return nil , err
92+ }
93+
94+ if len (getResp .Kvs ) == 0 {
95+ return nil , fmt .Errorf ("key not found: %s" , key )
96+ }
97+ return getResp .Kvs [0 ].Value , nil
98+ }
99+
81100// list fetches a list runtime.Object from etcd located at key prefix `key`.
82101func (s * etcdBackend ) list (ctx context.Context , key string , listObj runtime.Object ) error {
83102 listPtr , err := meta .GetItemsPtr (listObj )
@@ -100,27 +119,173 @@ func (s *etcdBackend) list(ctx context.Context, key string, listObj runtime.Obje
100119 return decodeList (elems , listPtr , s .decoder )
101120}
102121
103- const assetPathRecoveryEtcd = "recovery-etcd.yaml"
122+ const (
123+ assetPathRecoveryEtcd = "recovery-etcd.yaml"
124+ etcdTPRKey = "ThirdPartyResourceData/etcd.coreos.com/clusters/kube-system/kube-etcd"
125+ etcdMemberPodPrefix = "pods/kube-system/kube-etcd-"
126+ RecoveryEtcdClientAddr = "http://localhost:52379"
127+ )
128+
129+ type etcdSelfhostedBackend struct {
130+ * etcdBackend
131+
132+ backupPath string
133+ }
134+
135+ // NewSelfHostedEtcdBackend constructs a new etcdBackend for the given client and pathPrefix, and backup file.
136+ func NewSelfHostedEtcdBackend (client * clientv3.Client , pathPrefix , backupPath string ) Backend {
137+ eb := & etcdBackend {
138+ client : client ,
139+ decoder : api .Codecs .UniversalDecoder (),
140+ pathPrefix : pathPrefix ,
141+ }
142+
143+ return & etcdSelfhostedBackend {
144+ etcdBackend : eb ,
145+ backupPath : backupPath ,
146+ }
147+ }
148+
149+ // read implements Backend.read().
150+ func (s * etcdSelfhostedBackend ) read (ctx context.Context ) (* controlPlane , error ) {
151+ cp , err := s .etcdBackend .read (ctx )
152+ if err != nil {
153+ return nil , err
154+ }
155+
156+ d , err := s .getBytes (ctx , etcdTPRKey )
157+ if err != nil {
158+ return nil , err
159+ }
160+
161+ var tpr v1beta1.ThirdPartyResourceData
162+ err = decode (s .decoder , d , & tpr )
163+ if err != nil {
164+ return nil , err
165+ }
104166
167+ var kubeetcd spec.Cluster
168+ err = json .Unmarshal (tpr .Data , & kubeetcd )
169+ if err != nil {
170+ return nil , err
171+ }
172+
173+ etpr , err := createEtcdTPRAsset (kubeetcd )
174+ if err != nil {
175+ return nil , err
176+ }
177+ cp .tpr = etpr
178+
179+ serviceIP , err := getServiceIPFromClusterSpec (kubeetcd .Spec )
180+ if err != nil {
181+ return nil , err
182+ }
183+ eas := createBootEtcdAsset (s .pathPrefix , s .backupPath , serviceIP )
184+ esas := createBootEtcdServiceAsset (serviceIP )
185+ cp .bootEtcd = & eas
186+ cp .bootEtcdService = & esas
187+
188+ return cp , nil
189+ }
190+
191+ // StartRecoveryEtcdForBackup starts a recovery etcd container using given backup.
192+ // The started etcd server listens on RecoveryEtcdClientAddr.
105193func StartRecoveryEtcdForBackup (p , backupPath string ) error {
106194 d , f := path .Split (backupPath )
107195
108196 config := struct {
109197 Image string
110198 BackupFile string
111199 BackupDir string
200+ ClientAddr string
112201 }{
113- // TODO: this already exists in bootkube/cmd.
114- // do not duplicate this!
115- Image : "quay.io/coreos/etcd:v3.1.6" ,
202+ Image : asset .DefaultImages .Etcd ,
116203 BackupFile : f ,
117204 BackupDir : d ,
205+ ClientAddr : RecoveryEtcdClientAddr ,
118206 }
119207
120- as := asset .MustCreateAssetFromTemplate (assetPathRecoveryEtcd , RecoveryEtcdTemplate , config )
208+ as := asset .MustCreateAssetFromTemplate (assetPathRecoveryEtcd , recoveryEtcdTemplate , config )
121209 return as .WriteFile (p )
122210}
123211
212+ // CleanRecoveryEtcd removes the recovery etcd static pod manifest and stops the recovery
213+ // etcd container.
124214func CleanRecoveryEtcd (p string ) error {
125215 return os .Remove (path .Join (p , assetPathRecoveryEtcd ))
126216}
217+
218+ func createBootEtcdAsset (pathPrefix , backupPath , serviceIP string ) asset.Asset {
219+ d , f := path .Split (backupPath )
220+
221+ config := struct {
222+ Image string
223+ BackupFile string
224+ BackupDir string
225+ BootEtcdServiceIP string
226+ TPRKey string
227+ MemberPodPrefix string
228+ ClusterToken string
229+ }{
230+ Image : asset .DefaultImages .Etcd ,
231+ BackupFile : f ,
232+ BackupDir : d ,
233+ BootEtcdServiceIP : serviceIP ,
234+ TPRKey : path .Join (pathPrefix , etcdTPRKey ),
235+ MemberPodPrefix : path .Join (pathPrefix , etcdMemberPodPrefix ),
236+ ClusterToken : "bootkube-recovery-" + uuid .New (),
237+ }
238+
239+ return asset .MustCreateAssetFromTemplate (asset .AssetPathBootstrapEtcd , bootFromBackupEtcdTemplate , config )
240+ }
241+
242+ func createBootEtcdServiceAsset (serviceIP string ) asset.Asset {
243+ config := struct { BootEtcdServiceIP string }{BootEtcdServiceIP : serviceIP }
244+
245+ return asset .MustCreateAssetFromTemplate (asset .AssetPathBootstrapEtcdService , recoveryEtcdSvcTemplate , config )
246+ }
247+
248+ func createEtcdTPRAsset (s spec.Cluster ) (* asset.Asset , error ) {
249+ clone := cloneEtcdClusterTPR (s )
250+
251+ data , err := json .Marshal (clone )
252+ if err != nil {
253+ return nil , err
254+ }
255+
256+ return & asset.Asset {
257+ Name : asset .AssetPathMigrateEtcdCluster ,
258+ Data : data ,
259+ }, nil
260+ }
261+
262+ func getServiceIPFromClusterSpec (s spec.ClusterSpec ) (string , error ) {
263+ ep := s .SelfHosted .BootMemberClientEndpoint
264+ u , err := url .Parse (ep )
265+ if err != nil {
266+ return "" , err
267+ }
268+ return stripPort (u .Host ), nil
269+ }
270+
271+ func cloneEtcdClusterTPR (s spec.Cluster ) spec.Cluster {
272+ var clone spec.Cluster
273+ clone .Spec = s .Spec
274+ clone .Metadata .SetName (s .Metadata .GetName ())
275+ clone .Metadata .SetNamespace (s .Metadata .GetNamespace ())
276+ clone .APIVersion = s .APIVersion
277+ clone .Kind = s .Kind
278+
279+ return clone
280+ }
281+
282+ func stripPort (hostport string ) string {
283+ colon := strings .IndexByte (hostport , ':' )
284+ if colon == - 1 {
285+ return hostport
286+ }
287+ if i := strings .IndexByte (hostport , ']' ); i != - 1 {
288+ return strings .TrimPrefix (hostport [:i ], "[" )
289+ }
290+ return hostport [:colon ]
291+ }
0 commit comments