Skip to content

Commit ad04e60

Browse files
authored
Merge pull request #54 from opengovern/feat-make-task
fix: create task for describer
2 parents 6c6f355 + f68c973 commit ad04e60

File tree

13 files changed

+820
-347
lines changed

13 files changed

+820
-347
lines changed

discovery/envs/envs.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package envs
2+
3+
import (
4+
"github.com/opengovern/opensecurity/services/tasks/worker/consts"
5+
"os"
6+
)
7+
8+
var (
9+
NatsURL = os.Getenv(consts.NatsURLEnv)
10+
NatsConsumer = os.Getenv(consts.NatsConsumerEnv)
11+
StreamName = os.Getenv(consts.NatsStreamNameEnv)
12+
TopicName = os.Getenv(consts.NatsTopicNameEnv)
13+
ResultTopicName = os.Getenv(consts.NatsResultTopicNameEnv)
14+
15+
ESAddress = os.Getenv(consts.ElasticSearchAddressEnv)
16+
ESUsername = os.Getenv(consts.ElasticSearchUsernameEnv)
17+
ESPassword = os.Getenv(consts.ElasticSearchPasswordEnv)
18+
ESIsOnAks = os.Getenv(consts.ElasticSearchIsOnAksNameEnv)
19+
ESIsOpenSearch = os.Getenv(consts.ElasticSearchIsOpenSearch)
20+
ESAwsRegion = os.Getenv(consts.ElasticSearchAwsRegionEnv)
21+
ESAssumeRoleArn = os.Getenv(consts.ElasticSearchAssumeRoleArnEnv)
22+
23+
InventoryServiceEndpoint = os.Getenv(consts.InventoryBaseURL)
24+
)

discovery/main.go

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@ package main
33
import (
44
"context"
55
"fmt"
6-
"github.com/opengovern/og-describer-github/discovery/pkg"
7-
"github.com/spf13/cobra"
8-
"go.uber.org/zap"
6+
"github.com/opengovern/og-describer-github/discovery/pkg/worker"
97
"os"
108
"os/signal"
119
"syscall"
@@ -30,33 +28,8 @@ func main() {
3028
}
3129
}()
3230

33-
if err := WorkerCommand().ExecuteContext(ctx); err != nil {
31+
if err := worker.WorkerCommand().ExecuteContext(ctx); err != nil {
3432
fmt.Printf("Error: %v\n", err)
3533
os.Exit(1)
3634
}
3735
}
38-
39-
func WorkerCommand() *cobra.Command {
40-
cmd := &cobra.Command{
41-
RunE: func(cmd *cobra.Command, args []string) error {
42-
ctx := cmd.Context()
43-
cmd.SilenceUsage = true
44-
logger, err := zap.NewProduction()
45-
if err != nil {
46-
return err
47-
}
48-
49-
w, err := pkg.NewWorker(
50-
logger,
51-
cmd.Context(),
52-
)
53-
if err != nil {
54-
return err
55-
}
56-
57-
return w.Run(ctx)
58-
},
59-
}
60-
61-
return cmd
62-
}

discovery/pkg/orchestrator/handler.go

Lines changed: 0 additions & 211 deletions
This file was deleted.

discovery/pkg/orchestrator/worker.go

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,11 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7-
"github.com/go-errors/errors"
87
model "github.com/opengovern/og-describer-github/discovery/pkg/models"
98
"github.com/opengovern/og-describer-github/discovery/provider"
109
"github.com/opengovern/og-describer-github/global"
1110
describe2 "github.com/opengovern/og-util/pkg/describe"
1211
"github.com/opengovern/og-util/pkg/es"
13-
"github.com/opengovern/og-util/pkg/vault"
1412
"go.uber.org/zap"
1513
strconv "strconv"
1614
"strings"
@@ -46,35 +44,7 @@ func trimJsonFromEmptyObjects(input []byte) ([]byte, error) {
4644
return json.Marshal(unknownData)
4745
}
4846

49-
func Do(ctx context.Context,
50-
vlt vault.VaultSourceConfig,
51-
logger *zap.Logger,
52-
job describe2.DescribeJob,
53-
params map[string]string,
54-
grpcEndpoint string,
55-
describeDeliverToken string,
56-
ingestionPipelineEndpoint string,
57-
useOpenSearch bool) (resourceIDs []string, err error) {
58-
defer func() {
59-
if r := recover(); r != nil {
60-
err = fmt.Errorf("paniced with error: %v", r)
61-
logger.Error("paniced with error", zap.Error(err), zap.String("stackTrace", errors.Wrap(r, 2).ErrorStack()))
62-
}
63-
}()
64-
65-
ctx, cancel := context.WithCancel(ctx)
66-
defer cancel()
67-
68-
config, err := vlt.Decrypt(ctx, job.CipherText)
69-
if err != nil {
70-
return nil, fmt.Errorf("decrypt error: %w", err)
71-
}
72-
// logger.Info("decrypted config", zap.Any("config", config))
73-
74-
return doDescribe(ctx, logger, job, params, config, grpcEndpoint, ingestionPipelineEndpoint, describeDeliverToken, useOpenSearch)
75-
}
76-
77-
func doDescribe(
47+
func Describe(
7848
ctx context.Context,
7949
logger *zap.Logger,
8050
job describe2.DescribeJob,
@@ -84,7 +54,7 @@ func doDescribe(
8454
describeToken string,
8555
useOpenSearch bool) ([]string, error) {
8656
logger.Info("Making New Resource Sender")
87-
rs, err := NewResourceSender(grpcEndpoint, ingestionPipelineEndpoint, describeToken, job.JobID, params,useOpenSearch, logger)
57+
rs, err := NewResourceSender(grpcEndpoint, ingestionPipelineEndpoint, describeToken, job.JobID, params, useOpenSearch, logger)
8858
if err != nil {
8959
return nil, fmt.Errorf("failed to connect to resource sender: %w", err)
9060
}

discovery/pkg/task/retry.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package task
2+
3+
import (
4+
"fmt"
5+
"time"
6+
)
7+
8+
const (
9+
maxRetries = 10
10+
retryDelay = 10 * time.Second
11+
connectionRefused = "connect: connection refused"
12+
)
13+
14+
func retryWithBackoff[T any](operationName string, fetchFunc func() ([]T, error)) ([]T, error) {
15+
var result []T
16+
var err error
17+
18+
for attempt := 1; attempt <= maxRetries; attempt++ {
19+
result, err = fetchFunc()
20+
if err == nil {
21+
return result, nil
22+
}
23+
24+
time.Sleep(retryDelay)
25+
}
26+
27+
return nil, fmt.Errorf("failed to complete %s after %d attempts: %w", operationName, maxRetries, err)
28+
}

0 commit comments

Comments
 (0)