Skip to content

Commit c4910cb

Browse files
committed
Added canaries for podgroup
1 parent 0dc379f commit c4910cb

File tree

8 files changed

+275
-16
lines changed

8 files changed

+275
-16
lines changed

apiserver/podgroup.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"net/http"
66

77
"github.com/laincloud/deployd/engine"
8+
"github.com/laincloud/deployd/utils/util"
89
"github.com/mijia/sweb/form"
910
"github.com/mijia/sweb/log"
1011
"github.com/mijia/sweb/server"
@@ -26,12 +27,25 @@ func (rpg RestfulPodGroups) Post(ctx context.Context, r *http.Request) (int, int
2627
}
2728

2829
orcEngine := getEngine(ctx)
29-
if err := orcEngine.NewPodGroup(pgSpec); err != nil {
30-
switch err {
30+
var deployErr error
31+
if util.PodGroupType(pgSpec.Name) == engine.PGCanaryType {
32+
var canary engine.Canary
33+
if err := form.ParamBodyJson(r, &canary); err != nil {
34+
log.Warnf("Failed to decode canary, %s", err)
35+
return http.StatusBadRequest, fmt.Sprintf("Invalid Canary params format: %s", err)
36+
}
37+
canaryPgSpec := engine.CanaryPodsWithSpec{pgSpec, &canary}
38+
deployErr = orcEngine.NewCanary(canaryPgSpec)
39+
40+
} else {
41+
deployErr = orcEngine.NewPodGroup(pgSpec)
42+
}
43+
if deployErr != nil {
44+
switch deployErr {
3145
case engine.ErrNotEnoughResources, engine.ErrPodGroupExists, engine.ErrDependencyPodNotExists:
32-
return http.StatusMethodNotAllowed, err.Error()
46+
return http.StatusMethodNotAllowed, deployErr.Error()
3347
default:
34-
return http.StatusInternalServerError, err.Error()
48+
return http.StatusInternalServerError, deployErr.Error()
3549
}
3650
}
3751

engine/canaries.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package engine
2+
3+
import (
4+
"strings"
5+
6+
"github.com/laincloud/deployd/storage"
7+
"github.com/mijia/sweb/log"
8+
)
9+
10+
type IStrategy interface {
11+
Type() string
12+
Rules() []interface{}
13+
}
14+
15+
type DefaultStrategy struct {
16+
DivType string `json:"Type"`
17+
DivDatas []interface{} `json:"Rule"`
18+
}
19+
20+
func (ds *DefaultStrategy) Type() string {
21+
return ds.DivType
22+
}
23+
24+
func (ds *DefaultStrategy) Rules() []interface{} {
25+
return ds.DivDatas
26+
}
27+
28+
type Canary struct {
29+
Strategies []IStrategy
30+
}
31+
32+
func (canary *Canary) Equal(nCanary *Canary) bool {
33+
if len(canary.Strategies) != len(nCanary.Strategies) {
34+
return false
35+
}
36+
for i, strategy := range canary.Strategies {
37+
if strategy.Type() != nCanary.Strategies[i].Type() {
38+
return false
39+
}
40+
}
41+
return true
42+
}
43+
44+
type CanaryPodsWithSpec struct {
45+
PodGroupSpec
46+
Canary *Canary
47+
}
48+
49+
func (spec *CanaryPodsWithSpec) SaveCanary(store storage.Store) error {
50+
key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/")
51+
if err := store.Set(key, spec.Canary, true); err != nil {
52+
log.Warnf("[Store] Failed to save pod group canary info %s, %s", key, err)
53+
return err
54+
}
55+
return nil
56+
}
57+
58+
func (spec *PodGroupSpec) RemoveCanary(store storage.Store) error {
59+
key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/")
60+
if err := store.Remove(key); err != nil {
61+
log.Warnf("[Store] Failed to remove pod group canary info %s, %s", key, err)
62+
return err
63+
}
64+
return nil
65+
}
66+
67+
func (spec *PodGroupSpec) FetchCanary(store storage.Store) (*Canary, error) {
68+
key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/")
69+
var canary Canary
70+
if err := store.Get(key, &canary); err != nil {
71+
log.Warnf("[Store] Failed to fetch pod group canary info %s, %s", key, err)
72+
return nil, err
73+
}
74+
return &canary, nil
75+
}
76+
77+
func (spec *PodGroupSpec) UpdateCanary(store storage.Store, canary *Canary) error {
78+
key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/")
79+
if err := store.Set(key, canary, true); err != nil {
80+
log.Warnf("[Store] Failed to update pod group canary info %s, %s", key, err)
81+
return err
82+
}
83+
return nil
84+
}

engine/canaries_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package engine
2+
3+
import (
4+
"encoding/json"
5+
"testing"
6+
"time"
7+
)
8+
9+
func TestEngineCanaries(t *testing.T) {
10+
etcdAddr := "http://127.0.0.1:2379"
11+
ConfigPortsManager(etcdAddr)
12+
c, store, err := initClusterAndStore()
13+
if err != nil {
14+
t.Fatalf("Cannot create the cluster and storage, %s", err)
15+
}
16+
17+
engine, err := New(c, store)
18+
if err != nil {
19+
t.Fatalf("Cannot create the orc engine, %s", err)
20+
}
21+
22+
namespace := "hello"
23+
name := "hello.canary.web"
24+
pgSpec := createPodGroupSpec(namespace, name, 1)
25+
data := map[string]interface{}{"suffix": 1, "upstream": "beta1"}
26+
strategies := []IStrategy{&DefaultStrategy{"uidsuffix", []interface{}{data}}}
27+
canary := &Canary{strategies}
28+
canarySpec := CanaryPodsWithSpec{pgSpec, canary}
29+
if err := engine.NewCanary(canarySpec); err != nil {
30+
t.Fatalf("Should not return error, %s", err)
31+
}
32+
if err := engine.NewCanary(canarySpec); err == nil {
33+
t.Errorf("Should return exists error, but we got no problem")
34+
}
35+
36+
time.Sleep(20 * time.Second)
37+
if pg, ok := engine.InspectPodGroup(name); !ok {
38+
t.Errorf("We should have the pod group, but we don't get it")
39+
} else if pg.State != RunStateSuccess {
40+
t.Errorf("We should have the pod deployed and running")
41+
}
42+
43+
engine.RescheduleInstance(name, 3)
44+
time.Sleep(20 * time.Second)
45+
if pg, ok := engine.InspectPodGroup(name); !ok {
46+
t.Errorf("We should have the pod group, but we don't get it")
47+
} else if len(pg.Pods) != 3 {
48+
t.Errorf("We should have 3 instance of the pods")
49+
}
50+
51+
engine.RescheduleInstance(name, 1)
52+
time.Sleep(30 * time.Second)
53+
if pg, ok := engine.InspectPodGroup(name); !ok {
54+
t.Errorf("We should have the pod group, but we don't get it")
55+
} else if len(pg.Pods) != 1 {
56+
bytes, err := json.Marshal(pg.Pods)
57+
pods := ""
58+
if err == nil {
59+
pods = string(bytes)
60+
}
61+
t.Errorf("We should have 1 instance of the pods : %v", pods)
62+
}
63+
64+
podSpec := createPodSpec(namespace, name)
65+
podSpec.Containers[0].MemoryLimit = 24 * 1024 * 1024
66+
engine.RescheduleSpec(name, podSpec)
67+
time.Sleep(40 * time.Second)
68+
if pg, ok := engine.InspectPodGroup(name); !ok {
69+
t.Errorf("We should have the pod group, but we don't get it")
70+
} else if pg.Spec.Version != 1 {
71+
t.Errorf("We should have version 1 of the pods")
72+
}
73+
74+
if nCanary := engine.FetchCanaryInfos(name); nCanary == nil {
75+
t.Fatalf("Should return canary!")
76+
} else {
77+
if !nCanary.Equal(canary) {
78+
t.Fatalf("Canary info should not changed")
79+
}
80+
}
81+
82+
if err := engine.RemovePodGroup(name); err != nil {
83+
t.Errorf("We should be able to remove the pod group, %s", err)
84+
} else if err := engine.NewPodGroup(pgSpec); err == nil {
85+
t.Errorf("We should not be able to deploy pod group again in short time we remove it")
86+
}
87+
time.Sleep(20 * time.Second)
88+
89+
if nCanary := engine.FetchCanaryInfos(name); nCanary != nil {
90+
t.Fatalf("Should not return nCanary")
91+
}
92+
}

engine/engine.go

Lines changed: 52 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/laincloud/deployd/cluster"
1212
"github.com/laincloud/deployd/storage"
13+
"github.com/laincloud/deployd/utils/util"
1314
"github.com/mijia/adoc"
1415
"github.com/mijia/sweb/log"
1516
)
@@ -43,18 +44,18 @@ type EngineConfig struct {
4344
type OrcEngine struct {
4445
sync.RWMutex
4546

46-
config EngineConfig
47-
cluster cluster.Cluster
48-
store storage.Store
49-
eagleView *RuntimeEagleView
50-
pgCtrls map[string]*podGroupController
51-
rmPgCtrls map[string]*podGroupController
52-
dependsCtrls map[string]*dependsController
53-
rmDepCtrls map[string]*dependsController
54-
opsChan chan orcOperation
47+
config EngineConfig
48+
cluster cluster.Cluster
49+
store storage.Store
50+
eagleView *RuntimeEagleView
51+
pgCtrls map[string]*podGroupController
52+
rmPgCtrls map[string]*podGroupController
53+
dependsCtrls map[string]*dependsController
54+
rmDepCtrls map[string]*dependsController
55+
opsChan chan orcOperation
5556
refreshAllChan chan bool
56-
stop chan struct{}
57-
clstrFailCnt int
57+
stop chan struct{}
58+
clstrFailCnt int
5859
}
5960

6061
const (
@@ -236,10 +237,16 @@ func (engine *OrcEngine) RemovePodGroup(name string) error {
236237
return err
237238
}
238239
log.Infof("start delete %v\n", name)
240+
// if is canary podgroup remove canary strategy
241+
if util.PodGroupType(name) == PGCanaryType {
242+
// remove key
243+
pgCtrl.spec.RemoveCanary(engine.store)
244+
}
239245
engine.opsChan <- orcOperRemove{pgCtrl}
240246
delete(engine.pgCtrls, name)
241247
engine.rmPgCtrls[name] = pgCtrl
242248
go engine.checkPodGroupRemoveResult(name, pgCtrl)
249+
243250
return nil
244251
}
245252
}
@@ -284,7 +291,6 @@ func (engine *OrcEngine) RescheduleSpec(name string, podSpec PodSpec) error {
284291
pgCtrl.opsChan <- pgOperSaveStore{true}
285292
pgCtrl.opsChan <- pgOperOver{}
286293
}
287-
288294
return nil
289295
}
290296
}
@@ -329,6 +335,40 @@ func (engine *OrcEngine) ChangeState(pgName, op string, instance int) error {
329335
return nil
330336
}
331337

338+
func (engine *OrcEngine) NewCanary(spec CanaryPodsWithSpec) error {
339+
if err := engine.NewPodGroup(spec.PodGroupSpec); err != nil {
340+
return err
341+
}
342+
// gen canary strategy
343+
spec.SaveCanary(engine.store)
344+
return nil
345+
}
346+
347+
func (engine *OrcEngine) FetchCanaryInfos(name string) *Canary {
348+
engine.RLock()
349+
defer engine.RUnlock()
350+
if pgCtrl, ok := engine.pgCtrls[name]; !ok {
351+
return nil
352+
} else {
353+
if canary, err := pgCtrl.spec.FetchCanary(engine.store); err != nil {
354+
return nil
355+
} else {
356+
return canary
357+
}
358+
}
359+
return nil
360+
}
361+
362+
func (engine *OrcEngine) UpdateCanaryInfos(name string, canary *Canary) error {
363+
engine.RLock()
364+
defer engine.RUnlock()
365+
if pgCtrl, ok := engine.pgCtrls[name]; !ok {
366+
return ErrPodGroupNotExists
367+
} else {
368+
return pgCtrl.spec.UpdateCanary(engine.store, canary)
369+
}
370+
}
371+
332372
func (engine *OrcEngine) hasEnoughResource(pgCtrl *podGroupController, podSpec PodSpec) bool {
333373
if resources, err := engine.cluster.GetResources(); err != nil {
334374
return false

engine/podgroup.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ type PodGroupWithSpec struct {
2424
PodGroup
2525
}
2626

27+
// type PodGroupController interface {
28+
// Deploy()
29+
// Remove()
30+
// Inspect() PodGroupWithSpec
31+
// RescheduleSpec(podSpec PodSpec)
32+
// RescheduleInstance(numInstances int, restartPolicy ...RestartPolicy)
33+
// RescheduleDrift(fromNode, toNode string, instanceNo int, force bool)
34+
// ChangeState(op string, instance int)
35+
// }
36+
2737
type podGroupController struct {
2838
Publisher
2939

engine/specs.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ const (
2222
kLainNodesKey = "nodes"
2323
kLainLastPodSpecKey = "last_spec"
2424
kLainPgOpingKey = "operating"
25+
kLainCanaryKey = "canaries"
2526

2627
kLainLabelPrefix = "cc.bdp.lain.deployd"
2728
kLainLogVolumePath = "/lain/logs"
@@ -31,6 +32,8 @@ const (
3132

3233
MinPodKillTimeout = 10
3334
MaxPodKillTimeout = 120
35+
36+
PGCanaryType = "canary"
3437
)
3538

3639
var (

utils/util/util.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,15 @@ func ParseContainerName(containerName string) (string, int, int, int, error) {
4848
return g.Group(1), version, instance, driftCount, nil
4949
}
5050

51+
func PodGroupType(procName string) string {
52+
p := regex.MustCompile(`.*\.(.*)\..*`)
53+
g := p.Match(procName)
54+
if g == nil {
55+
return ""
56+
}
57+
return g.Group(1)
58+
}
59+
5160
func IpConflictErrorMatch(err string) string {
5261
p := regex.MustCompile(`IP assignment error, data: {IP:([0-9.]+) HandleID:(.*)}: Address already assigned in block`)
5362
g := p.Match(err)

utils/util/util_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,10 @@ func Test_timeMarshal(t *testing.T) {
5454
data, _ := json.Marshal(tt)
5555
fmt.Println("t:", string(data))
5656
}
57+
58+
func Test_PGType(t *testing.T) {
59+
pgName := "hello.canary.web"
60+
if pgType := PodGroupType(pgName); pgType != "canary" {
61+
t.Fatalf("pgtype %s should be canary", pgType)
62+
}
63+
}

0 commit comments

Comments
 (0)