Skip to content

Commit 49fe8e1

Browse files
authored
feat: TaskQ tuning for complete deployment lifecycle (#393)
1 parent 3cb776d commit 49fe8e1

File tree

16 files changed

+195
-108
lines changed

16 files changed

+195
-108
lines changed

api/internal/features/deploy/controller/handle_deploy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func (c *DeployController) HandleDeploy(f fuego.ContextWithBody[types.CreateDepl
5151
}
5252
}
5353

54-
c.logger.Log(logger.Info, "attempting to create deployment", "name: "+data.Name+", user_id: "+user.ID.String())
54+
c.logger.Log(logger.Info, "attempting to create deployment", "name: "+data.Name+", user_id: "+user.ID.String())
5555

5656
application, err := c.taskService.CreateDeploymentTask(&data, user.ID, organizationID)
5757
if err != nil {

api/internal/features/deploy/tasks/create.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ func (t *TaskService) CreateDeploymentTask(deployment *types.CreateDeploymentReq
2424
return shared_types.Application{}, err
2525
}
2626

27+
TaskPayload.CorrelationID = uuid.NewString()
28+
2729
err = CreateDeploymentQueue.Add(TaskCreateDeployment.WithArgs(context.Background(), TaskPayload))
2830
if err != nil {
2931
fmt.Printf("error enqueuing create deployment: %v\n", err)
@@ -119,6 +121,8 @@ func (t *TaskService) ReDeployApplication(request *types.ReDeployApplicationRequ
119121
return shared_types.Application{}, err
120122
}
121123

124+
TaskPayload.CorrelationID = uuid.NewString()
125+
122126
err = ReDeployQueue.Add(TaskReDeploy.WithArgs(context.Background(), TaskPayload))
123127
if err != nil {
124128
fmt.Printf("error enqueuing redeploy: %v\n", err)

api/internal/features/deploy/tasks/init.go

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -56,20 +56,24 @@ func (t *TaskService) SetupCreateDeploymentQueue() {
5656
Name: QUEUE_CREATE_DEPLOYMENT,
5757
ConsumerIdleTimeout: 10 * time.Minute,
5858
MinNumWorker: 1,
59-
MaxNumWorker: 10,
60-
ReservationSize: 10,
61-
ReservationTimeout: 10 * time.Second,
59+
MaxNumWorker: 4,
60+
ReservationSize: 1,
61+
ReservationTimeout: 15 * time.Minute,
6262
WaitTimeout: 5 * time.Second,
6363
BufferSize: 100,
6464
})
6565

6666
TaskCreateDeployment = taskq.RegisterTask(&taskq.TaskOptions{
67-
Name: TASK_CREATE_DEPLOYMENT,
67+
Name: TASK_CREATE_DEPLOYMENT,
68+
RetryLimit: 5,
6869
Handler: func(ctx context.Context, data shared_types.TaskPayload) error {
70+
fmt.Printf("[%s] start: correlation_id=%s\n", TASK_CREATE_DEPLOYMENT, data.CorrelationID)
6971
err := t.BuildPack(ctx, data)
7072
if err != nil {
73+
fmt.Print("error handling create deployment: ", err)
7174
return err
7275
}
76+
fmt.Printf("[%s] done: correlation_id=%s\n", TASK_CREATE_DEPLOYMENT, data.CorrelationID)
7377
return nil
7478
},
7579
})
@@ -78,15 +82,16 @@ func (t *TaskService) SetupCreateDeploymentQueue() {
7882
Name: QUEUE_UPDATE_DEPLOYMENT,
7983
ConsumerIdleTimeout: 10 * time.Minute,
8084
MinNumWorker: 1,
81-
MaxNumWorker: 10,
82-
ReservationSize: 10,
83-
ReservationTimeout: 10 * time.Second,
85+
MaxNumWorker: 4,
86+
ReservationSize: 1,
87+
ReservationTimeout: 15 * time.Minute,
8488
WaitTimeout: 5 * time.Second,
8589
BufferSize: 100,
8690
})
8791

8892
TaskUpdateDeployment = taskq.RegisterTask(&taskq.TaskOptions{
89-
Name: TASK_UPDATE_DEPLOYMENT,
93+
Name: TASK_UPDATE_DEPLOYMENT,
94+
RetryLimit: 5,
9095
Handler: func(ctx context.Context, data shared_types.TaskPayload) error {
9196
fmt.Println("Updating deployment")
9297
err := t.HandleUpdateDeployment(ctx, data)
@@ -102,15 +107,16 @@ func (t *TaskService) SetupCreateDeploymentQueue() {
102107
Name: QUEUE_REDEPLOYMENT,
103108
ConsumerIdleTimeout: 10 * time.Minute,
104109
MinNumWorker: 1,
105-
MaxNumWorker: 10,
106-
ReservationSize: 10,
107-
ReservationTimeout: 10 * time.Second,
110+
MaxNumWorker: 4,
111+
ReservationSize: 1,
112+
ReservationTimeout: 15 * time.Minute,
108113
WaitTimeout: 5 * time.Second,
109114
BufferSize: 100,
110115
})
111116

112117
TaskReDeploy = taskq.RegisterTask(&taskq.TaskOptions{
113-
Name: TASK_REDEPLOYMENT,
118+
Name: TASK_REDEPLOYMENT,
119+
RetryLimit: 5,
114120
Handler: func(ctx context.Context, data shared_types.TaskPayload) error {
115121
fmt.Println("Redeploying application")
116122
err := t.HandleReDeploy(ctx, data)
@@ -126,15 +132,16 @@ func (t *TaskService) SetupCreateDeploymentQueue() {
126132
Name: QUEUE_ROLLBACK,
127133
ConsumerIdleTimeout: 10 * time.Minute,
128134
MinNumWorker: 1,
129-
MaxNumWorker: 10,
130-
ReservationSize: 10,
131-
ReservationTimeout: 10 * time.Second,
135+
MaxNumWorker: 4,
136+
ReservationSize: 1,
137+
ReservationTimeout: 15 * time.Minute,
132138
WaitTimeout: 5 * time.Second,
133139
BufferSize: 100,
134140
})
135141

136142
TaskRollback = taskq.RegisterTask(&taskq.TaskOptions{
137-
Name: TASK_ROLLBACK,
143+
Name: TASK_ROLLBACK,
144+
RetryLimit: 10,
138145
Handler: func(ctx context.Context, data shared_types.TaskPayload) error {
139146
fmt.Println("Rolling back deployment")
140147
err := t.HandleRollback(ctx, data)
@@ -150,15 +157,16 @@ func (t *TaskService) SetupCreateDeploymentQueue() {
150157
Name: QUEUE_RESTART,
151158
ConsumerIdleTimeout: 10 * time.Minute,
152159
MinNumWorker: 1,
153-
MaxNumWorker: 10,
154-
ReservationSize: 10,
155-
ReservationTimeout: 10 * time.Second,
160+
MaxNumWorker: 4,
161+
ReservationSize: 1,
162+
ReservationTimeout: 15 * time.Minute,
156163
WaitTimeout: 5 * time.Second,
157164
BufferSize: 100,
158165
})
159166

160167
TaskRestart = taskq.RegisterTask(&taskq.TaskOptions{
161-
Name: TASK_RESTART,
168+
Name: TASK_RESTART,
169+
RetryLimit: 5,
162170
Handler: func(ctx context.Context, data shared_types.TaskPayload) error {
163171
fmt.Println("Restarting deployment")
164172
err := t.HandleRestart(ctx, data)

api/internal/features/deploy/tasks/restart.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ func (t *TaskService) RestartDeployment(request *types.RestartDeploymentRequest,
3535
return err
3636
}
3737

38+
payload.CorrelationID = uuid.NewString()
39+
3840
return RestartQueue.Add(TaskRestart.WithArgs(context.Background(), payload))
3941
}
4042

@@ -66,4 +68,3 @@ func (s *TaskService) HandleRestart(ctx context.Context, TaskPayload shared_type
6668
taskCtx.LogAndUpdateStatus("Application containers restarted", shared_types.Running)
6769
return nil
6870
}
69-

api/internal/features/deploy/tasks/rollback.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ func (t *TaskService) RollbackDeployment(request *types.RollbackDeploymentReques
3434
return err
3535
}
3636

37+
payload.CorrelationID = uuid.NewString()
38+
3739
return RollbackQueue.Add(TaskRollback.WithArgs(context.Background(), payload))
3840
}
3941

@@ -82,4 +84,3 @@ func (s *TaskService) HandleRollback(ctx context.Context, TaskPayload shared_typ
8284

8385
return nil
8486
}
85-

api/internal/features/deploy/tasks/update.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package tasks
22

33
import (
4-
"context"
5-
"fmt"
4+
"context"
5+
"fmt"
66

7-
"github.com/google/uuid"
7+
"github.com/google/uuid"
88
"github.com/raghavyuva/nixopus-api/internal/features/deploy/types"
99
shared_types "github.com/raghavyuva/nixopus-api/internal/types"
1010
)
@@ -30,6 +30,8 @@ func (s *TaskService) UpdateDeployment(deployment *types.UpdateDeploymentRequest
3030
return shared_types.Application{}, err
3131
}
3232

33+
TaskPayload.CorrelationID = uuid.NewString()
34+
3335
err = UpdateDeploymentQueue.Add(TaskUpdateDeployment.WithArgs(context.Background(), TaskPayload))
3436
if err != nil {
3537
fmt.Printf("error enqueuing update deployment: %v\n", err)
@@ -80,4 +82,4 @@ func (s *TaskService) HandleUpdateDeployment(ctx context.Context, TaskPayload sh
8082
taskCtx.LogAndUpdateStatus("Deployment completed successfully", shared_types.Deployed)
8183

8284
return nil
83-
}
85+
}

api/internal/features/github-connector/controller/get_github_repositories.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package controller
22

33
import (
44
"net/http"
5+
"strconv"
56

67
"github.com/go-fuego/fuego"
78
"github.com/raghavyuva/nixopus-api/internal/features/logger"
@@ -21,7 +22,22 @@ func (c *GithubConnectorController) GetGithubRepositories(f fuego.ContextNoBody)
2122
}
2223
}
2324

24-
repositories, err := c.service.GetGithubRepositories(user.ID.String())
25+
q := r.URL.Query()
26+
page := 1
27+
pageSize := 10
28+
29+
if v := q.Get("page"); v != "" {
30+
if p, err := strconv.Atoi(v); err == nil && p > 0 {
31+
page = p
32+
}
33+
}
34+
if v := q.Get("page_size"); v != "" {
35+
if ps, err := strconv.Atoi(v); err == nil && ps > 0 {
36+
pageSize = ps
37+
}
38+
}
39+
40+
repositories, totalCount, err := c.service.GetGithubRepositoriesPaginated(user.ID.String(), page, pageSize)
2541
if err != nil {
2642
c.logger.Log(logger.Error, err.Error(), "")
2743
return nil, fuego.HTTPError{
@@ -33,6 +49,11 @@ func (c *GithubConnectorController) GetGithubRepositories(f fuego.ContextNoBody)
3349
return &shared_types.Response{
3450
Status: "success",
3551
Message: "Repositories fetched successfully",
36-
Data: repositories,
52+
Data: map[string]interface{}{
53+
"total_count": totalCount,
54+
"repositories": repositories,
55+
"page": page,
56+
"page_size": pageSize,
57+
},
3758
}, nil
3859
}

api/internal/features/github-connector/service/clone_repository.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,13 @@ type CloneRepositoryConfig struct {
3131
// If any errors occur during the process, the method logs the error and
3232
// returns the error.
3333
func (s *GithubConnectorService) CloneRepository(c CloneRepositoryConfig, commitHash *string) (string, error) {
34-
// we have to optimize the code here
35-
_, repo_url, err := s.GetRepositoryDetailsFromId(c.RepoID, c.UserID)
34+
// Fetch repository directly by ID based on installation
35+
repo, err := s.GetGithubRepositoryByID(c.UserID, c.RepoID)
3636
if err != nil {
3737
s.logger.Log(logger.Error, fmt.Sprintf("Failed to get repository details: %s", err.Error()), "")
3838
return "", err
3939
}
40+
repo_url := repo.CloneURL
4041

4142
s.logger.Log(logger.Info, fmt.Sprintf("Cloning repository %s", repo_url), c.UserID)
4243

api/internal/features/github-connector/service/get_github_repositories.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,33 +10,34 @@ import (
1010
shared_types "github.com/raghavyuva/nixopus-api/internal/types"
1111
)
1212

13-
func (c *GithubConnectorService) GetGithubRepositories(user_id string) ([]shared_types.GithubRepository, error) {
14-
connectors, err := c.storage.GetAllConnectors(user_id)
13+
// GetGithubRepositoriesPaginated fetches repositories for the user's GitHub installation with pagination.
14+
func (c *GithubConnectorService) GetGithubRepositoriesPaginated(userID string, page int, pageSize int) ([]shared_types.GithubRepository, int, error) {
15+
connectors, err := c.storage.GetAllConnectors(userID)
1516
if err != nil {
1617
c.logger.Log(logger.Error, err.Error(), "")
17-
return nil, err
18+
return nil, 0, err
1819
}
1920

2021
if len(connectors) == 0 {
21-
c.logger.Log(logger.Error, "No connectors found for user", user_id)
22-
return nil, nil
22+
c.logger.Log(logger.Error, "No connectors found for user", userID)
23+
return []shared_types.GithubRepository{}, 0, nil
2324
}
2425

2526
installation_id := connectors[0].InstallationID
26-
2727
jwt := GenerateJwt(&connectors[0])
2828

2929
accessToken, err := c.getInstallationToken(jwt, installation_id)
3030
if err != nil {
3131
c.logger.Log(logger.Error, fmt.Sprintf("Failed to get installation token: %s", err.Error()), "")
32-
return nil, err
32+
return nil, 0, err
3333
}
3434

3535
client := &http.Client{}
36-
req, err := http.NewRequest("GET", "https://api.github.com/installation/repositories?per_page=500", nil)
36+
url := fmt.Sprintf("https://api.github.com/installation/repositories?per_page=%d&page=%d", pageSize, page)
37+
req, err := http.NewRequest("GET", url, nil)
3738
if err != nil {
3839
c.logger.Log(logger.Error, err.Error(), "")
39-
return nil, err
40+
return nil, 0, err
4041
}
4142

4243
req.Header.Set("Authorization", fmt.Sprintf("token %s", accessToken))
@@ -46,25 +47,25 @@ func (c *GithubConnectorService) GetGithubRepositories(user_id string) ([]shared
4647
resp, err := client.Do(req)
4748
if err != nil {
4849
c.logger.Log(logger.Error, err.Error(), "")
49-
return nil, err
50+
return nil, 0, err
5051
}
5152
defer resp.Body.Close()
5253

5354
if resp.StatusCode != http.StatusOK {
5455
bodyBytes, _ := io.ReadAll(resp.Body)
5556
c.logger.Log(logger.Error, fmt.Sprintf("GitHub API error: %s - %s", resp.Status, string(bodyBytes)), "")
56-
return nil, fmt.Errorf("GitHub API error: %s", resp.Status)
57+
return nil, 0, fmt.Errorf("GitHub API error: %s", resp.Status)
5758
}
5859

5960
var response struct {
61+
TotalCount int `json:"total_count"`
6062
Repositories []shared_types.GithubRepository `json:"repositories"`
6163
}
6264

63-
err = json.NewDecoder(resp.Body).Decode(&response)
64-
if err != nil {
65+
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
6566
c.logger.Log(logger.Error, err.Error(), "")
66-
return nil, err
67+
return nil, 0, err
6768
}
6869

69-
return response.Repositories, nil
70+
return response.Repositories, response.TotalCount, nil
7071
}

0 commit comments

Comments
 (0)