Skip to content

Commit 0aec16a

Browse files
authored
Merge pull request #425 from raghavyuva/feat/deploy_to_cluster
feat: cluster based deployment, rollback, restart across services
2 parents c23e725 + 4b090cf commit 0aec16a

File tree

6 files changed

+553
-305
lines changed

6 files changed

+553
-305
lines changed
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
package docker
2+
3+
import (
4+
"github.com/docker/docker/api/types"
5+
"github.com/docker/docker/api/types/events"
6+
"github.com/docker/docker/api/types/filters"
7+
"github.com/docker/docker/api/types/network"
8+
"github.com/docker/docker/api/types/swarm"
9+
"github.com/docker/docker/api/types/volume"
10+
"github.com/raghavyuva/nixopus-api/internal/config"
11+
)
12+
13+
func (s *DockerService) InitCluster() error {
14+
config := config.AppConfig
15+
16+
// Use localhost as default advertise address if SSH host is not configured
17+
// Useful during development
18+
advertiseAddr := "127.0.0.1:2377"
19+
if config.SSH.Host != "" {
20+
advertiseAddr = config.SSH.Host + ":2377"
21+
}
22+
23+
_, err := s.Cli.SwarmInit(s.Ctx, swarm.InitRequest{
24+
ListenAddr: "0.0.0.0:2377",
25+
// Address that Hosts can use to reach the master node
26+
AdvertiseAddr: advertiseAddr,
27+
})
28+
if err != nil {
29+
return err
30+
}
31+
32+
return nil
33+
}
34+
35+
func (s *DockerService) JoinCluster() error {
36+
config := config.AppConfig
37+
38+
// Use localhost as default advertise address if SSH host is not configured
39+
advertiseAddr := "127.0.0.1:2377"
40+
if config.SSH.Host != "" {
41+
advertiseAddr = config.SSH.Host + ":2377"
42+
}
43+
44+
err := s.Cli.SwarmJoin(s.Ctx, swarm.JoinRequest{
45+
ListenAddr: "0.0.0.0:2377",
46+
AdvertiseAddr: advertiseAddr,
47+
})
48+
return err
49+
}
50+
51+
func (s *DockerService) LeaveCluster(force bool) error {
52+
err := s.Cli.SwarmLeave(s.Ctx, force)
53+
return err
54+
}
55+
56+
func (s *DockerService) GetClusterInfo() (swarm.ClusterInfo, error) {
57+
clusterInfo, err := s.Cli.SwarmInspect(s.Ctx)
58+
return clusterInfo.ClusterInfo, err
59+
}
60+
61+
func (s *DockerService) GetClusterNodes() ([]swarm.Node, error) {
62+
nodes, err := s.Cli.NodeList(s.Ctx, types.NodeListOptions{})
63+
return nodes, err
64+
}
65+
66+
func (s *DockerService) GetClusterServices() ([]swarm.Service, error) {
67+
services, err := s.Cli.ServiceList(s.Ctx, types.ServiceListOptions{})
68+
return services, err
69+
}
70+
71+
func (s *DockerService) GetClusterTasks() ([]swarm.Task, error) {
72+
tasks, err := s.Cli.TaskList(s.Ctx, types.TaskListOptions{})
73+
return tasks, err
74+
}
75+
76+
func (s *DockerService) GetClusterSecrets() ([]swarm.Secret, error) {
77+
secrets, err := s.Cli.SecretList(s.Ctx, types.SecretListOptions{})
78+
return secrets, err
79+
}
80+
81+
func (s *DockerService) GetClusterConfigs() ([]swarm.Config, error) {
82+
configs, err := s.Cli.ConfigList(s.Ctx, types.ConfigListOptions{})
83+
return configs, err
84+
}
85+
86+
func (s *DockerService) GetClusterVolumes() ([]*volume.Volume, error) {
87+
volumes, err := s.Cli.VolumeList(s.Ctx, volume.ListOptions{})
88+
return volumes.Volumes, err
89+
}
90+
91+
func (s *DockerService) GetClusterNetworks() ([]network.Summary, error) {
92+
networks, err := s.Cli.NetworkList(s.Ctx, network.ListOptions{})
93+
return networks, err
94+
}
95+
96+
func (s *DockerService) UpdateNodeAvailability(nodeID string, availability swarm.NodeAvailability) error {
97+
node, _, err := s.Cli.NodeInspectWithRaw(s.Ctx, nodeID)
98+
if err != nil {
99+
return err
100+
}
101+
spec := node.Spec
102+
spec.Availability = availability
103+
return s.Cli.NodeUpdate(s.Ctx, nodeID, node.Version, spec)
104+
}
105+
106+
func (s *DockerService) ListenEvents(opts events.ListOptions) (<-chan events.Message, <-chan error) {
107+
return s.Cli.Events(s.Ctx, opts)
108+
}
109+
110+
func (s *DockerService) ScaleService(serviceID string, replicas uint64, rollback string) error {
111+
svc, _, err := s.Cli.ServiceInspectWithRaw(s.Ctx, serviceID, types.ServiceInspectOptions{})
112+
if err != nil {
113+
return err
114+
}
115+
spec := svc.Spec
116+
spec.Mode.Replicated.Replicas = &replicas
117+
_, err = s.Cli.ServiceUpdate(s.Ctx, serviceID, svc.Version, spec, types.ServiceUpdateOptions{
118+
Rollback: rollback,
119+
})
120+
return err
121+
}
122+
123+
func (s *DockerService) GetServiceHealth(service swarm.Service) (int, int, error) {
124+
tasks, err := s.Cli.TaskList(s.Ctx, types.TaskListOptions{
125+
Filters: filters.NewArgs(
126+
filters.Arg("service", service.ID),
127+
),
128+
})
129+
if err != nil {
130+
return 0, 0, err
131+
}
132+
133+
running := 0
134+
for _, t := range tasks {
135+
if t.Status.State == swarm.TaskStateRunning {
136+
running++
137+
}
138+
}
139+
desired := 0
140+
if service.Spec.Mode.Replicated != nil && service.Spec.Mode.Replicated.Replicas != nil {
141+
desired = int(*service.Spec.Mode.Replicated.Replicas)
142+
}
143+
return running, desired, nil
144+
}
145+
146+
func (s *DockerService) GetTaskHealth(task swarm.Task) swarm.TaskState {
147+
if task.Status.State != "" {
148+
return task.Status.State
149+
}
150+
return swarm.TaskState("")
151+
}
152+
153+
func (s *DockerService) CreateService(service swarm.Service) error {
154+
_, err := s.Cli.ServiceCreate(s.Ctx, service.Spec, types.ServiceCreateOptions{})
155+
if err != nil {
156+
return err
157+
}
158+
return nil
159+
}
160+
161+
func (s *DockerService) UpdateService(serviceID string, serviceSpec swarm.ServiceSpec, rollback string) error {
162+
svc, _, err := s.Cli.ServiceInspectWithRaw(s.Ctx, serviceID, types.ServiceInspectOptions{})
163+
if err != nil {
164+
return err
165+
}
166+
167+
_, err = s.Cli.ServiceUpdate(s.Ctx, serviceID, svc.Version, serviceSpec, types.ServiceUpdateOptions{
168+
Rollback: rollback,
169+
})
170+
return err
171+
}
172+
173+
func (s *DockerService) DeleteService(serviceID string) error {
174+
err := s.Cli.ServiceRemove(s.Ctx, serviceID)
175+
return err
176+
}
177+
178+
func (s *DockerService) RollbackService(serviceID string) error {
179+
_, err := s.Cli.ServiceUpdate(s.Ctx, serviceID, swarm.Version{}, swarm.ServiceSpec{}, types.ServiceUpdateOptions{
180+
Rollback: "previous",
181+
})
182+
return err
183+
}
184+
185+
func (s *DockerService) GetServiceByID(serviceID string) (swarm.Service, error) {
186+
service, _, err := s.Cli.ServiceInspectWithRaw(s.Ctx, serviceID, types.ServiceInspectOptions{})
187+
return service, err
188+
}

api/internal/features/deploy/docker/init.go

Lines changed: 67 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@ import (
77

88
"github.com/docker/docker/api/types"
99
"github.com/docker/docker/api/types/container"
10+
"github.com/docker/docker/api/types/events"
1011
"github.com/docker/docker/api/types/filters"
1112
"github.com/docker/docker/api/types/image"
1213
"github.com/docker/docker/api/types/network"
14+
"github.com/docker/docker/api/types/swarm"
15+
"github.com/docker/docker/api/types/volume"
1316
"github.com/docker/docker/client"
1417
v1 "github.com/opencontainers/image-spec/specs-go/v1"
1518
"github.com/raghavyuva/nixopus-api/internal/features/logger"
@@ -23,9 +26,9 @@ type DockerService struct {
2326
}
2427

2528
type DockerRepository interface {
26-
ListAllContainers() ([]container.Summary, error)
27-
ListContainers(opts container.ListOptions) ([]container.Summary, error)
28-
ListAllImages(opts image.ListOptions) []image.Summary
29+
ListAllContainers() ([]container.Summary, error)
30+
ListContainers(opts container.ListOptions) ([]container.Summary, error)
31+
ListAllImages(opts image.ListOptions) []image.Summary
2932

3033
StopContainer(containerID string, opts container.StopOptions) error
3134
RemoveContainer(containerID string, opts container.RemoveOptions) error
@@ -46,6 +49,28 @@ type DockerRepository interface {
4649
RemoveImage(imageName string, opts image.RemoveOptions) error
4750
PruneBuildCache(opts types.BuildCachePruneOptions) error
4851
PruneImages(opts filters.Args) (image.PruneReport, error)
52+
53+
InitCluster() error
54+
JoinCluster() error
55+
LeaveCluster(force bool) error
56+
GetClusterInfo() (swarm.ClusterInfo, error)
57+
GetClusterNodes() ([]swarm.Node, error)
58+
GetClusterServices() ([]swarm.Service, error)
59+
GetClusterTasks() ([]swarm.Task, error)
60+
GetClusterSecrets() ([]swarm.Secret, error)
61+
GetClusterConfigs() ([]swarm.Config, error)
62+
GetClusterVolumes() ([]*volume.Volume, error)
63+
GetClusterNetworks() ([]network.Summary, error)
64+
UpdateNodeAvailability(nodeID string, availability swarm.NodeAvailability) error
65+
ScaleService(serviceID string, replicas uint64, rollback string) error
66+
ListenEvents(opts events.ListOptions) (<-chan events.Message, <-chan error)
67+
GetServiceHealth(service swarm.Service) (int, int, error)
68+
GetTaskHealth(task swarm.Task) swarm.TaskState
69+
CreateService(service swarm.Service) error
70+
UpdateService(serviceID string, serviceSpec swarm.ServiceSpec, rollback string) error
71+
DeleteService(serviceID string) error
72+
RollbackService(serviceID string) error
73+
GetServiceByID(serviceID string) (swarm.Service, error)
4974
}
5075

5176
type DockerClient struct {
@@ -54,11 +79,35 @@ type DockerClient struct {
5479

5580
// NewDockerService creates a new instance of DockerService using the default docker client.
5681
func NewDockerService() *DockerService {
57-
return &DockerService{
58-
Cli: NewDockerClient(),
82+
client := NewDockerClient()
83+
service := &DockerService{
84+
Cli: client,
5985
Ctx: context.Background(),
6086
logger: logger.NewLogger(),
6187
}
88+
89+
// Initialize cluster if not already initialized, this should be run on master node only
90+
// TODO: Add a check to see if the node is the master node
91+
// WARNING: This should be thought again during multi-server architecture feature
92+
if !isClusterInitialized(client) {
93+
if err := service.InitCluster(); err != nil {
94+
service.logger.Log(logger.Warning, "Failed to initialize cluster", err.Error())
95+
} else {
96+
service.logger.Log(logger.Info, "Cluster initialized successfully", "")
97+
}
98+
} else {
99+
service.logger.Log(logger.Info, "Cluster already initialized", "")
100+
}
101+
102+
return service
103+
}
104+
105+
func isClusterInitialized(cli *client.Client) bool {
106+
info, err := cli.Info(context.Background())
107+
if err != nil {
108+
return false
109+
}
110+
return info.Swarm.LocalNodeState == swarm.LocalNodeStateActive
62111
}
63112

64113
func NewDockerServiceWithClient(cli *client.Client, ctx context.Context, logger logger.Logger) *DockerService {
@@ -101,24 +150,24 @@ func NewDockerClient() *client.Client {
101150
//
102151
// If an error occurs while listing the containers, it returns the error (no panic).
103152
func (s *DockerService) ListAllContainers() ([]container.Summary, error) {
104-
containers, err := s.Cli.ContainerList(s.Ctx, container.ListOptions{
105-
All: true,
106-
})
107-
if err != nil {
108-
return nil, err
109-
}
110-
111-
return containers, nil
153+
containers, err := s.Cli.ContainerList(s.Ctx, container.ListOptions{
154+
All: true,
155+
})
156+
if err != nil {
157+
return nil, err
158+
}
159+
160+
return containers, nil
112161
}
113162

114163
// ListContainers returns containers using the provided docker list options
115164
// (including native filters like name/status/ancestor and optional limits).
116165
func (s *DockerService) ListContainers(opts container.ListOptions) ([]container.Summary, error) {
117-
containers, err := s.Cli.ContainerList(s.Ctx, opts)
118-
if err != nil {
119-
return nil, err
120-
}
121-
return containers, nil
166+
containers, err := s.Cli.ContainerList(s.Ctx, opts)
167+
if err != nil {
168+
return nil, err
169+
}
170+
return containers, nil
122171
}
123172

124173
// StopContainer stops the container with the given ID. If the container does not exist,

api/internal/features/deploy/tasks/delete.go

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,14 @@ import (
55
"os"
66
"path/filepath"
77

8-
"github.com/docker/docker/api/types/container"
98
"github.com/docker/docker/api/types/image"
109
"github.com/google/uuid"
1110
"github.com/raghavyuva/nixopus-api/internal/features/deploy/types"
1211
"github.com/raghavyuva/nixopus-api/internal/features/logger"
1312
)
1413

1514
// DeleteDeployment deletes a deployment and its associated resources.
16-
// It stops and removes the container, image, and repository.
15+
// It stops and removes the service, image, and repository.
1716
// It returns an error if any operation fails.
1817
func (s *TaskService) DeleteDeployment(deployment *types.DeleteDeploymentRequest, userID uuid.UUID, organizationID uuid.UUID) error {
1918
application, err := s.Storage.GetApplicationById(deployment.ID.String(), organizationID)
@@ -23,23 +22,28 @@ func (s *TaskService) DeleteDeployment(deployment *types.DeleteDeploymentRequest
2322

2423
domain := application.Domain
2524

26-
deployments, err := s.Storage.GetApplicationDeployments(application.ID)
25+
services, err := s.DockerRepo.GetClusterServices()
2726
if err != nil {
28-
s.Logger.Log(logger.Error, "Failed to get application deployments", err.Error())
27+
s.Logger.Log(logger.Error, "Failed to get services", err.Error())
2928
} else {
30-
for _, dep := range deployments {
31-
if dep.ContainerID != "" {
32-
s.Logger.Log(logger.Info, "Stopping container", dep.ContainerID)
33-
if err := s.DockerRepo.StopContainer(dep.ContainerID, container.StopOptions{}); err != nil {
34-
s.Logger.Log(logger.Error, "Failed to stop container", err.Error())
35-
}
36-
37-
s.Logger.Log(logger.Info, "Removing container", dep.ContainerID)
38-
if err := s.DockerRepo.RemoveContainer(dep.ContainerID, container.RemoveOptions{Force: true}); err != nil {
39-
s.Logger.Log(logger.Error, "Failed to remove container", err.Error())
29+
for _, service := range services {
30+
if service.Spec.Annotations.Name == application.Name {
31+
s.Logger.Log(logger.Info, "Deleting service", service.ID)
32+
if err := s.DockerRepo.DeleteService(service.ID); err != nil {
33+
s.Logger.Log(logger.Error, "Failed to delete service", err.Error())
34+
} else {
35+
s.Logger.Log(logger.Info, "Service deleted successfully", service.ID)
4036
}
37+
break
4138
}
39+
}
40+
}
4241

42+
deployments, err := s.Storage.GetApplicationDeployments(application.ID)
43+
if err != nil {
44+
s.Logger.Log(logger.Error, "Failed to get application deployments", err.Error())
45+
} else {
46+
for _, dep := range deployments {
4347
if dep.ContainerImage != "" {
4448
s.Logger.Log(logger.Info, "Removing image", dep.ContainerImage)
4549
if err := s.DockerRepo.RemoveImage(dep.ContainerImage, image.RemoveOptions{Force: true}); err != nil {

0 commit comments

Comments
 (0)