Skip to content

Commit 6d7e538

Browse files
authored
Merge pull request #6589 from devtron-labs/go-routines-wrapped
misc: go routines wrapped into panic safe function
2 parents 0f53272 + dce8b0f commit 6d7e538

File tree

18 files changed

+172
-85
lines changed

18 files changed

+172
-85
lines changed

api/restHandler/BatchOperationRestHandler.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"errors"
2323
"net/http"
2424

25+
"github.com/devtron-labs/common-lib/async"
2526
"github.com/devtron-labs/devtron/api/restHandler/common"
2627
"github.com/devtron-labs/devtron/pkg/apis/devtron/v1"
2728
"github.com/devtron-labs/devtron/pkg/apis/devtron/v1/validation"
@@ -44,17 +45,19 @@ type BatchOperationRestHandlerImpl struct {
4445
teamService team.TeamService
4546
logger *zap.SugaredLogger
4647
enforcerUtil rbac.EnforcerUtil
48+
asyncRunnable *async.Runnable
4749
}
4850

4951
func NewBatchOperationRestHandlerImpl(userAuthService user.UserService, enforcer casbin.Enforcer, workflowAction batch.WorkflowAction,
50-
teamService team.TeamService, logger *zap.SugaredLogger, enforcerUtil rbac.EnforcerUtil) *BatchOperationRestHandlerImpl {
52+
teamService team.TeamService, logger *zap.SugaredLogger, enforcerUtil rbac.EnforcerUtil, asyncRunnable *async.Runnable) *BatchOperationRestHandlerImpl {
5153
return &BatchOperationRestHandlerImpl{
5254
userAuthService: userAuthService,
5355
enforcer: enforcer,
5456
workflowAction: workflowAction,
5557
teamService: teamService,
5658
logger: logger,
5759
enforcerUtil: enforcerUtil,
60+
asyncRunnable: asyncRunnable,
5861
}
5962
}
6063

@@ -104,13 +107,14 @@ func (handler BatchOperationRestHandlerImpl) Operate(w http.ResponseWriter, r *h
104107

105108
ctx, cancel := context.WithCancel(r.Context())
106109
if cn, ok := w.(http.CloseNotifier); ok {
107-
go func(done <-chan struct{}, closed <-chan bool) {
110+
runnableFunc := func(done <-chan struct{}, closed <-chan bool) {
108111
select {
109112
case <-done:
110113
case <-closed:
111114
cancel()
112115
}
113-
}(ctx.Done(), cn.CloseNotify())
116+
}
117+
handler.asyncRunnable.Execute(func() { runnableFunc(ctx.Done(), cn.CloseNotify()) })
114118
}
115119
err = handler.workflowAction.Execute(&workflow, emptyProps, r.Context())
116120
if err != nil {

client/argocdServer/connection/Connection.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"github.com/argoproj/argo-cd/v2/pkg/apiclient/account"
2323
"github.com/argoproj/argo-cd/v2/util/settings"
24+
"github.com/devtron-labs/common-lib/async"
2425
"github.com/devtron-labs/common-lib/utils/k8s"
2526
"github.com/devtron-labs/devtron/client/argocdServer/bean"
2627
config2 "github.com/devtron-labs/devtron/client/argocdServer/config"
@@ -81,6 +82,7 @@ type ArgoCDConnectionManagerImpl struct {
8182
gitOpsConfigReadService config.GitOpsConfigReadService
8283
runTimeConfig *k8s.RuntimeConfig
8384
argoCDConfigGetter config2.ArgoCDConfigGetter
85+
asyncRunnable *async.Runnable
8486
}
8587

8688
func NewArgoCDConnectionManagerImpl(Logger *zap.SugaredLogger,
@@ -92,7 +94,8 @@ func NewArgoCDConnectionManagerImpl(Logger *zap.SugaredLogger,
9294
versionService version.VersionService,
9395
gitOpsConfigReadService config.GitOpsConfigReadService,
9496
runTimeConfig *k8s.RuntimeConfig,
95-
argoCDConfigGetter config2.ArgoCDConfigGetter) (*ArgoCDConnectionManagerImpl, error) {
97+
argoCDConfigGetter config2.ArgoCDConfigGetter,
98+
asyncRunnable *async.Runnable) (*ArgoCDConnectionManagerImpl, error) {
9699
argoUserServiceImpl := &ArgoCDConnectionManagerImpl{
97100
logger: Logger,
98101
settingsManager: settingsManager,
@@ -105,13 +108,17 @@ func NewArgoCDConnectionManagerImpl(Logger *zap.SugaredLogger,
105108
gitOpsConfigReadService: gitOpsConfigReadService,
106109
runTimeConfig: runTimeConfig,
107110
argoCDConfigGetter: argoCDConfigGetter,
111+
asyncRunnable: asyncRunnable,
108112
}
109113
if !runTimeConfig.LocalDevMode {
110114
grpcConfig, err := argoCDConfigGetter.GetGRPCConfig()
111115
if err != nil {
112116
Logger.Errorw("error in GetAllGRPCConfigs", "error", err)
113117
}
114-
go argoUserServiceImpl.ValidateGitOpsAndGetOrUpdateArgoCdUserDetail(grpcConfig)
118+
runnableFunc := func() {
119+
argoUserServiceImpl.ValidateGitOpsAndGetOrUpdateArgoCdUserDetail(grpcConfig)
120+
}
121+
asyncRunnable.Execute(runnableFunc)
115122
}
116123
return argoUserServiceImpl, nil
117124
}

cmd/external-app/wire_gen.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/app/AppService.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23+
"net/url"
24+
"strconv"
25+
"time"
26+
2327
health2 "github.com/argoproj/gitops-engine/pkg/health"
2428
argoApplication "github.com/devtron-labs/devtron/client/argocdServer/bean"
2529
"github.com/devtron-labs/devtron/internal/sql/models"
@@ -40,12 +44,10 @@ import (
4044
"github.com/devtron-labs/devtron/pkg/deployment/manifest/deploymentTemplate/read"
4145
bean4 "github.com/devtron-labs/devtron/pkg/deployment/trigger/devtronApps/bean"
4246
"github.com/devtron-labs/devtron/pkg/workflow/cd"
43-
"net/url"
44-
"strconv"
45-
"time"
4647

4748
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
4849
"github.com/caarlos0/env"
50+
"github.com/devtron-labs/common-lib/async"
4951
k8sCommonBean "github.com/devtron-labs/common-lib/utils/k8s/commonBean"
5052
"github.com/devtron-labs/common-lib/utils/k8s/health"
5153
"github.com/devtron-labs/devtron/api/bean"
@@ -124,6 +126,7 @@ type AppServiceImpl struct {
124126
deploymentConfigService common2.DeploymentConfigService
125127
envConfigOverrideReadService read.EnvConfigOverrideService
126128
cdWorkflowRunnerService cd.CdWorkflowRunnerService
129+
asyncRunnable *async.Runnable
127130
}
128131

129132
type AppService interface {
@@ -164,7 +167,8 @@ func NewAppService(
164167
appListingService AppListingService,
165168
deploymentConfigService common2.DeploymentConfigService,
166169
envConfigOverrideReadService read.EnvConfigOverrideService,
167-
cdWorkflowRunnerService cd.CdWorkflowRunnerService) *AppServiceImpl {
170+
cdWorkflowRunnerService cd.CdWorkflowRunnerService,
171+
asyncRunnable *async.Runnable) *AppServiceImpl {
168172
appServiceImpl := &AppServiceImpl{
169173
mergeUtil: mergeUtil,
170174
pipelineOverrideRepository: pipelineOverrideRepository,
@@ -195,6 +199,7 @@ func NewAppService(
195199
deploymentConfigService: deploymentConfigService,
196200
envConfigOverrideReadService: envConfigOverrideReadService,
197201
cdWorkflowRunnerService: cdWorkflowRunnerService,
202+
asyncRunnable: asyncRunnable,
198203
}
199204
return appServiceImpl
200205
}
@@ -320,7 +325,7 @@ func (impl *AppServiceImpl) UpdateDeploymentStatusForGitOpsPipelines(app *v1alph
320325
}
321326
if isSucceeded {
322327
impl.logger.Infow("writing cd success event", "gitHash", gitHash, "pipelineOverride", pipelineOverride)
323-
go impl.WriteCDSuccessEvent(cdPipeline.AppId, cdPipeline.EnvironmentId, pipelineOverride)
328+
impl.asyncRunnable.Execute(func() { impl.WriteCDSuccessEvent(cdPipeline.AppId, cdPipeline.EnvironmentId, pipelineOverride) })
324329
}
325330
} else {
326331
impl.logger.Debugw("event received for older triggered revision", "gitHash", gitHash)

pkg/argoApplication/ArgoApplicationServiceExtended.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"fmt"
2323
application2 "github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
2424
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
25+
"github.com/devtron-labs/common-lib/async"
2526
"github.com/devtron-labs/common-lib/utils/k8s"
2627
openapi "github.com/devtron-labs/devtron/api/helm-app/openapiClient"
2728
"github.com/devtron-labs/devtron/client/argocdServer"
@@ -46,6 +47,7 @@ type ArgoApplicationServiceExtendedImpl struct {
4647
argoApplicationReadService read.ArgoApplicationReadService
4748
clusterService cluster.ClusterService
4849
acdClientWrapper argocdServer.ArgoClientWrapperService
50+
asyncRunnable *async.Runnable
4951
}
5052

5153
func NewArgoApplicationServiceExtendedServiceImpl(
@@ -54,13 +56,15 @@ func NewArgoApplicationServiceExtendedServiceImpl(
5456
acdClientWrapper argocdServer.ArgoClientWrapperService,
5557
argoApplicationReadService read.ArgoApplicationReadService,
5658
clusterService cluster.ClusterService,
59+
asyncRunnable *async.Runnable,
5760
) *ArgoApplicationServiceExtendedImpl {
5861
return &ArgoApplicationServiceExtendedImpl{
5962
aCDAuthConfig: aCDAuthConfig,
6063
clusterService: clusterService,
6164
argoApplicationReadService: argoApplicationReadService,
6265
acdClientWrapper: acdClientWrapper,
6366
ArgoApplicationServiceImpl: argoApplicationServiceImpl,
67+
asyncRunnable: asyncRunnable,
6468
}
6569
}
6670

@@ -328,7 +332,7 @@ func (c *ArgoApplicationServiceExtendedImpl) parseResult(resp *v1alpha1.Applicat
328332
for _, node := range queryNodes {
329333
rQuery := transform(node, query.ApplicationName)
330334
qCount++
331-
go func(request application2.ApplicationResourceRequest) {
335+
runnableFunc := func(request application2.ApplicationResourceRequest) {
332336
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
333337
defer cancel()
334338
startTime := time.Now()
@@ -343,7 +347,8 @@ func (c *ArgoApplicationServiceExtendedImpl) parseResult(resp *v1alpha1.Applicat
343347
} else {
344348
response <- argoApplication.Result{Response: nil, Error: fmt.Errorf("connection closed by client"), Request: &request}
345349
}
346-
}(*rQuery)
350+
}
351+
c.asyncRunnable.Execute(func() { runnableFunc(*rQuery) })
347352
}
348353

349354
if qCount == 0 {

pkg/build/trigger/HandlerService.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
99
"github.com/caarlos0/env"
10+
"github.com/devtron-labs/common-lib/async"
1011
blob_storage "github.com/devtron-labs/common-lib/blob-storage"
1112
"github.com/devtron-labs/common-lib/utils"
1213
bean4 "github.com/devtron-labs/common-lib/utils/bean"
@@ -116,6 +117,7 @@ type HandlerServiceImpl struct {
116117
clusterService cluster.ClusterService
117118
envService environment.EnvironmentService
118119
K8sUtil *k8s.K8sServiceImpl
120+
asyncRunnable *async.Runnable
119121
}
120122

121123
func NewHandlerServiceImpl(Logger *zap.SugaredLogger, workflowService executor.WorkflowService,
@@ -141,6 +143,7 @@ func NewHandlerServiceImpl(Logger *zap.SugaredLogger, workflowService executor.W
141143
clusterService cluster.ClusterService,
142144
envService environment.EnvironmentService,
143145
K8sUtil *k8s.K8sServiceImpl,
146+
asyncRunnable *async.Runnable,
144147
) *HandlerServiceImpl {
145148
buildxCacheFlags := &BuildxCacheFlags{}
146149
err := env.Parse(buildxCacheFlags)
@@ -174,6 +177,7 @@ func NewHandlerServiceImpl(Logger *zap.SugaredLogger, workflowService executor.W
174177
clusterService: clusterService,
175178
envService: envService,
176179
K8sUtil: K8sUtil,
180+
asyncRunnable: asyncRunnable,
177181
}
178182
config, err := types.GetCiConfig()
179183
if err != nil {
@@ -628,7 +632,12 @@ func (impl *HandlerServiceImpl) triggerCiPipeline(trigger types.Trigger) (int, e
628632
}
629633

630634
middleware.CiTriggerCounter.WithLabelValues(pipeline.App.AppName, pipeline.Name).Inc()
631-
go impl.ciService.WriteCITriggerEvent(trigger, pipeline, workflowRequest)
635+
636+
runnableFunc := func() {
637+
impl.ciService.WriteCITriggerEvent(trigger, pipeline, workflowRequest)
638+
}
639+
impl.asyncRunnable.Execute(runnableFunc)
640+
632641
return savedCiWf.Id, err
633642
}
634643

pkg/cluster/ClusterService.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"encoding/json"
2222
"fmt"
23+
"github.com/devtron-labs/common-lib/async"
2324
informerBean "github.com/devtron-labs/common-lib/informer"
2425
"github.com/devtron-labs/common-lib/utils/k8s/commonBean"
2526
"github.com/devtron-labs/devtron/pkg/cluster/adapter"
@@ -97,6 +98,7 @@ type ClusterServiceImpl struct {
9798
userRepository repository3.UserRepository
9899
roleGroupRepository repository3.RoleGroupRepository
99100
clusterReadService read.ClusterReadService
101+
asyncRunnable *async.Runnable
100102
}
101103

102104
func NewClusterServiceImpl(repository repository.ClusterRepository, logger *zap.SugaredLogger,
@@ -105,7 +107,8 @@ func NewClusterServiceImpl(repository repository.ClusterRepository, logger *zap.
105107
roleGroupRepository repository3.RoleGroupRepository,
106108
envVariables *globalUtil.EnvironmentVariables,
107109
cronLogger *cronUtil.CronLoggerImpl,
108-
clusterReadService read.ClusterReadService) (*ClusterServiceImpl, error) {
110+
clusterReadService read.ClusterReadService,
111+
asyncRunnable *async.Runnable) (*ClusterServiceImpl, error) {
109112
clusterService := &ClusterServiceImpl{
110113
clusterRepository: repository,
111114
logger: logger,
@@ -115,6 +118,7 @@ func NewClusterServiceImpl(repository repository.ClusterRepository, logger *zap.
115118
userRepository: userRepository,
116119
roleGroupRepository: roleGroupRepository,
117120
clusterReadService: clusterReadService,
121+
asyncRunnable: asyncRunnable,
118122
}
119123
// initialise cron
120124
newCron := cron.New(cron.WithChain(cron.Recover(cronLogger)))
@@ -782,7 +786,7 @@ func (impl *ClusterServiceImpl) ConnectClustersInBatch(clusters []*bean.ClusterB
782786
continue
783787
}
784788
wg.Add(1)
785-
go func(idx int, cluster *bean.ClusterBean) {
789+
runnableFunc := func(idx int, cluster *bean.ClusterBean) {
786790
defer wg.Done()
787791
clusterConfig := cluster.GetClusterConfig()
788792
_, _, k8sClientSet, err := impl.K8sUtil.GetK8sConfigAndClients(clusterConfig)
@@ -796,7 +800,8 @@ func (impl *ClusterServiceImpl) ConnectClustersInBatch(clusters []*bean.ClusterB
796800
id = idx
797801
}
798802
impl.GetAndUpdateConnectionStatusForOneCluster(k8sClientSet, id, respMap)
799-
}(idx, cluster)
803+
}
804+
impl.asyncRunnable.Execute(func() { runnableFunc(idx, cluster) })
800805
}
801806

802807
wg.Wait()

pkg/clusterTerminalAccess/UserTerminalAccessService.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,13 @@ import (
2121
"encoding/json"
2222
"errors"
2323
"fmt"
24+
"strconv"
25+
"strings"
26+
"sync"
27+
"time"
28+
2429
"github.com/caarlos0/env/v6"
30+
"github.com/devtron-labs/common-lib/async"
2531
k8s2 "github.com/devtron-labs/common-lib/utils/k8s"
2632
"github.com/devtron-labs/devtron/api/helm-app/service/bean"
2733
"github.com/devtron-labs/devtron/internal/sql/models"
@@ -43,10 +49,6 @@ import (
4349
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4450
"k8s.io/apimachinery/pkg/runtime"
4551
"k8s.io/apimachinery/pkg/runtime/schema"
46-
"strconv"
47-
"strings"
48-
"sync"
49-
"time"
5052
)
5153

5254
type UserTerminalAccessService interface {
@@ -75,6 +77,7 @@ type UserTerminalAccessServiceImpl struct {
7577
terminalSessionHandler terminal.TerminalSessionHandler
7678
K8sCapacityService capacity.K8sCapacityService
7779
k8sUtil *k8s2.K8sServiceImpl
80+
asyncRunnable *async.Runnable
7881
}
7982

8083
type UserTerminalAccessSessionData struct {
@@ -98,7 +101,12 @@ func GetTerminalAccessConfig() (*models.UserTerminalSessionConfig, error) {
98101
return config, err
99102
}
100103

101-
func NewUserTerminalAccessServiceImpl(logger *zap.SugaredLogger, terminalAccessRepository repository.TerminalAccessRepository, config *models.UserTerminalSessionConfig, k8sCommonService k8s.K8sCommonService, terminalSessionHandler terminal.TerminalSessionHandler, K8sCapacityService capacity.K8sCapacityService, k8sUtil *k8s2.K8sServiceImpl, cronLogger *cron3.CronLoggerImpl) (*UserTerminalAccessServiceImpl, error) {
104+
func NewUserTerminalAccessServiceImpl(logger *zap.SugaredLogger,
105+
terminalAccessRepository repository.TerminalAccessRepository,
106+
config *models.UserTerminalSessionConfig, k8sCommonService k8s.K8sCommonService,
107+
terminalSessionHandler terminal.TerminalSessionHandler,
108+
K8sCapacityService capacity.K8sCapacityService, k8sUtil *k8s2.K8sServiceImpl,
109+
cronLogger *cron3.CronLoggerImpl, asyncRunnable *async.Runnable) (*UserTerminalAccessServiceImpl, error) {
102110
//fetches all running and starting entities from db and start SyncStatus
103111
podStatusSyncCron := cron.New(cron.WithChain(cron.Recover(cronLogger)))
104112
terminalAccessDataArrayMutex := &sync.RWMutex{}
@@ -114,14 +122,15 @@ func NewUserTerminalAccessServiceImpl(logger *zap.SugaredLogger, terminalAccessR
114122
terminalSessionHandler: terminalSessionHandler,
115123
K8sCapacityService: K8sCapacityService,
116124
k8sUtil: k8sUtil,
125+
asyncRunnable: asyncRunnable,
117126
}
118127
podStatusSyncCron.Start()
119128
_, err := podStatusSyncCron.AddFunc(fmt.Sprintf("@every %ds", config.TerminalPodStatusSyncTimeInSecs), accessServiceImpl.SyncPodStatus)
120129
if err != nil {
121130
logger.Errorw("error occurred while starting cron job", "time in secs", config.TerminalPodStatusSyncTimeInSecs)
122131
return nil, err
123132
}
124-
go accessServiceImpl.SyncRunningInstances()
133+
accessServiceImpl.asyncRunnable.Execute(func() { accessServiceImpl.SyncRunningInstances() })
125134
return accessServiceImpl, err
126135
}
127136
func (impl *UserTerminalAccessServiceImpl) ValidateShell(podName, namespace, shellName, containerName string, clusterId int) (bool, string, error) {

0 commit comments

Comments
 (0)