Skip to content

Commit c3f9ffd

Browse files
authored
Merge pull request #1 from opengovern/feat-make-task
fix: use tasks service to run describer
2 parents 2e497e9 + b3a205a commit c3f9ffd

File tree

9 files changed

+757
-115
lines changed

9 files changed

+757
-115
lines changed

discovery/envs/envs.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package envs
2+
3+
import (
4+
"github.com/opengovern/opensecurity/services/tasks/worker/consts"
5+
"os"
6+
)
7+
8+
var (
9+
NatsURL = os.Getenv(consts.NatsURLEnv)
10+
NatsConsumer = os.Getenv(consts.NatsConsumerEnv)
11+
StreamName = os.Getenv(consts.NatsStreamNameEnv)
12+
TopicName = os.Getenv(consts.NatsTopicNameEnv)
13+
ResultTopicName = os.Getenv(consts.NatsResultTopicNameEnv)
14+
15+
ESAddress = os.Getenv(consts.ElasticSearchAddressEnv)
16+
ESUsername = os.Getenv(consts.ElasticSearchUsernameEnv)
17+
ESPassword = os.Getenv(consts.ElasticSearchPasswordEnv)
18+
ESIsOnAks = os.Getenv(consts.ElasticSearchIsOnAksNameEnv)
19+
ESIsOpenSearch = os.Getenv(consts.ElasticSearchIsOpenSearch)
20+
ESAwsRegion = os.Getenv(consts.ElasticSearchAwsRegionEnv)
21+
ESAssumeRoleArn = os.Getenv(consts.ElasticSearchAssumeRoleArnEnv)
22+
23+
InventoryServiceEndpoint = os.Getenv(consts.InventoryBaseURL)
24+
)

discovery/main.go

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@ package main
33
import (
44
"context"
55
"fmt"
6-
"github.com/opengovern/og-describer-kubernetes/discovery/pkg"
7-
"github.com/spf13/cobra"
8-
"go.uber.org/zap"
6+
"github.com/opengovern/og-describer-kubernetes/discovery/worker"
97
"os"
108
"os/signal"
119
"syscall"
@@ -30,33 +28,8 @@ func main() {
3028
}
3129
}()
3230

33-
if err := WorkerCommand().ExecuteContext(ctx); err != nil {
31+
if err := worker.WorkerCommand().ExecuteContext(ctx); err != nil {
3432
fmt.Printf("Error: %v\n", err)
3533
os.Exit(1)
3634
}
3735
}
38-
39-
func WorkerCommand() *cobra.Command {
40-
cmd := &cobra.Command{
41-
RunE: func(cmd *cobra.Command, args []string) error {
42-
ctx := cmd.Context()
43-
cmd.SilenceUsage = true
44-
logger, err := zap.NewProduction()
45-
if err != nil {
46-
return err
47-
}
48-
49-
w, err := pkg.NewWorker(
50-
logger,
51-
cmd.Context(),
52-
)
53-
if err != nil {
54-
return err
55-
}
56-
57-
return w.Run(ctx)
58-
},
59-
}
60-
61-
return cmd
62-
}

discovery/pkg/orchestrator/worker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,10 @@ func Do(ctx context.Context,
7272
}
7373
// logger.Info("decrypted config", zap.Any("config", config))
7474

75-
return doDescribe(ctx, logger, job, params, config, grpcEndpoint, ingestionPipelineEndpoint, describeDeliverToken, useOpenSearch)
75+
return Describe(ctx, logger, job, params, config, grpcEndpoint, ingestionPipelineEndpoint, describeDeliverToken, useOpenSearch)
7676
}
7777

78-
func doDescribe(
78+
func Describe(
7979
ctx context.Context,
8080
logger *zap.Logger,
8181
job describe2.DescribeJob,

discovery/task/retry.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package task
2+
3+
import (
4+
"fmt"
5+
"time"
6+
)
7+
8+
const (
9+
maxRetries = 10
10+
retryDelay = 10 * time.Second
11+
connectionRefused = "connect: connection refused"
12+
)
13+
14+
func retryWithBackoff[T any](operationName string, fetchFunc func() ([]T, error)) ([]T, error) {
15+
var result []T
16+
var err error
17+
18+
for attempt := 1; attempt <= maxRetries; attempt++ {
19+
result, err = fetchFunc()
20+
if err == nil {
21+
return result, nil
22+
}
23+
24+
time.Sleep(retryDelay)
25+
}
26+
27+
return nil, fmt.Errorf("failed to complete %s after %d attempts: %w", operationName, maxRetries, err)
28+
}

discovery/task/run-task.go

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
package task
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"github.com/opengovern/og-describer-kubernetes/discovery/pkg/orchestrator"
8+
authApi "github.com/opengovern/og-util/pkg/api"
9+
"github.com/opengovern/og-util/pkg/describe"
10+
"github.com/opengovern/og-util/pkg/httpclient"
11+
"github.com/opengovern/og-util/pkg/integration"
12+
"github.com/opengovern/og-util/pkg/jq"
13+
"github.com/opengovern/og-util/pkg/opengovernance-es-sdk"
14+
"github.com/opengovern/og-util/pkg/tasks"
15+
"github.com/opengovern/og-util/pkg/vault"
16+
coreApi "github.com/opengovern/opensecurity/services/core/api"
17+
coreClient "github.com/opengovern/opensecurity/services/core/client"
18+
"github.com/opengovern/opensecurity/services/tasks/scheduler"
19+
"go.uber.org/zap"
20+
"time"
21+
)
22+
23+
type TaskRunner struct {
24+
vaultSrc vault.VaultSourceConfig
25+
jq *jq.JobQueue
26+
coreServiceEndpoint string
27+
describeToken string
28+
esClient opengovernance.Client
29+
logger *zap.Logger
30+
request tasks.TaskRequest
31+
response *scheduler.TaskResponse
32+
}
33+
34+
func NewTaskRunner(ctx context.Context, jq *jq.JobQueue, coreServiceEndpoint string, describeToken string, esClient opengovernance.Client,
35+
logger *zap.Logger, request tasks.TaskRequest, response *scheduler.TaskResponse) (*TaskRunner, error) {
36+
37+
vaultSc, err := vault.NewHashiCorpVaultClient(ctx, logger, request.VaultConfig.HashiCorp, request.VaultConfig.KeyId)
38+
if err != nil {
39+
return nil, fmt.Errorf("failed to initialize HashiCorp vault: %w", err)
40+
}
41+
42+
logger.Info("Vault setup complete")
43+
44+
return &TaskRunner{
45+
vaultSrc: vaultSc,
46+
jq: jq,
47+
coreServiceEndpoint: coreServiceEndpoint,
48+
describeToken: describeToken,
49+
esClient: esClient,
50+
logger: logger,
51+
request: request,
52+
response: response,
53+
}, nil
54+
}
55+
56+
type TaskResult struct {
57+
}
58+
59+
type ResourceType struct {
60+
Name string
61+
}
62+
63+
type Integration struct {
64+
IntegrationID string
65+
ProviderID string
66+
IntegrationType string
67+
Secret string
68+
Labels map[string]string
69+
Annotations map[string]string
70+
}
71+
72+
func (tr *TaskRunner) RunTask(ctx context.Context) error {
73+
74+
taskResult := &TaskResult{}
75+
var err error
76+
var integrations []Integration
77+
78+
inventoryClient := coreClient.NewCoreServiceClient(tr.coreServiceEndpoint)
79+
if _, ok := tr.request.TaskDefinition.Params["integrations_query"]; ok {
80+
integrations, err = retryWithBackoff("GetIntegrations", func() ([]Integration, error) {
81+
return GetIntegrationsFromQuery(inventoryClient, tr.request.TaskDefinition.Params)
82+
})
83+
if err != nil {
84+
tr.logger.Error("Error fetching integrations", zap.Error(err))
85+
return err
86+
}
87+
}
88+
89+
for _, i := range integrations {
90+
err = tr.describeIntegrationResourceTypes(ctx, i)
91+
if err != nil {
92+
tr.logger.Error("Error describing integrations", zap.Error(err))
93+
return err
94+
}
95+
}
96+
97+
jsonBytes, err := json.Marshal(taskResult)
98+
if err != nil {
99+
err = fmt.Errorf("failed Marshaling task result: %s", err.Error())
100+
return err
101+
}
102+
tr.response.Result = jsonBytes
103+
104+
return nil
105+
}
106+
107+
func (tr *TaskRunner) describeIntegrationResourceTypes(ctx context.Context, i Integration) error {
108+
var resourceTypes []ResourceType
109+
var err error
110+
111+
config, err := tr.vaultSrc.Decrypt(ctx, i.Secret)
112+
if err != nil {
113+
return fmt.Errorf("decrypt error: %w", err)
114+
}
115+
116+
inventoryClient := coreClient.NewCoreServiceClient(tr.coreServiceEndpoint)
117+
if _, ok := tr.request.TaskDefinition.Params["resource_types_query"]; ok {
118+
resourceTypes, err = retryWithBackoff("GetResourceTypes", func() ([]ResourceType, error) {
119+
return GetResourceTypesFromQuery(inventoryClient, tr.request.TaskDefinition.Params)
120+
})
121+
if err != nil {
122+
tr.logger.Error("Error fetching integrations", zap.Error(err))
123+
return err
124+
}
125+
}
126+
127+
for _, rt := range resourceTypes {
128+
params := make(map[string]string)
129+
for key, value := range tr.request.TaskDefinition.Params {
130+
params[key] = fmt.Sprintf("%v", value)
131+
}
132+
for k, v := range params {
133+
ctx = context.WithValue(ctx, k, v)
134+
}
135+
136+
job := describe.DescribeJob{
137+
JobID: 0,
138+
ResourceType: rt.Name,
139+
IntegrationID: i.IntegrationID,
140+
ProviderID: i.ProviderID,
141+
DescribedAt: time.Now().Unix(),
142+
IntegrationType: integration.Type(i.IntegrationType),
143+
CipherText: i.Secret,
144+
IntegrationLabels: i.Labels,
145+
IntegrationAnnotations: i.Annotations,
146+
}
147+
_, err = orchestrator.Describe(ctx, tr.logger, job, params, config, tr.request.EsDeliverEndpoint,
148+
tr.request.IngestionPipelineEndpoint, tr.describeToken, tr.request.UseOpenSearch)
149+
if err != nil {
150+
tr.logger.Error("Error describing job", zap.Error(err))
151+
return err
152+
}
153+
}
154+
155+
return nil
156+
}
157+
158+
func GetIntegrationsFromQuery(coreServiceClient coreClient.CoreServiceClient, params map[string]any) ([]Integration, error) {
159+
if v, ok := params["integrations_query"]; ok {
160+
if vv, ok := v.(string); !ok {
161+
return nil, fmt.Errorf("query id should be a string")
162+
} else {
163+
queryResponse, err := coreServiceClient.RunQuery(&httpclient.Context{UserRole: authApi.AdminRole}, coreApi.RunQueryRequest{
164+
Query: &vv,
165+
Page: coreApi.Page{
166+
No: 1,
167+
Size: 1000,
168+
},
169+
})
170+
if err != nil {
171+
return nil, err
172+
}
173+
var integrations []Integration
174+
for _, r := range queryResponse.Result {
175+
integ := Integration{}
176+
for i, rc := range r {
177+
switch queryResponse.Headers[i] {
178+
case "integration_id":
179+
integ.IntegrationID = rc.(string)
180+
case "provider_id":
181+
integ.ProviderID = rc.(string)
182+
case "integration_type":
183+
integ.IntegrationType = rc.(string)
184+
case "secret":
185+
integ.Secret = rc.(string)
186+
case "annotations":
187+
if rc != nil {
188+
if jsonStr, ok := rc.(string); ok {
189+
var ann map[string]string
190+
if err := json.Unmarshal([]byte(jsonStr), &ann); err == nil {
191+
integ.Annotations = ann
192+
}
193+
}
194+
}
195+
case "labels":
196+
if rc != nil {
197+
if jsonStr, ok := rc.(string); ok {
198+
var lbl map[string]string
199+
if err := json.Unmarshal([]byte(jsonStr), &lbl); err == nil {
200+
integ.Labels = lbl
201+
}
202+
}
203+
}
204+
}
205+
}
206+
integrations = append(integrations, integ)
207+
}
208+
return integrations, nil
209+
}
210+
} else {
211+
return nil, fmt.Errorf("query id should be a string")
212+
}
213+
}
214+
215+
func GetResourceTypesFromQuery(coreServiceClient coreClient.CoreServiceClient, params map[string]any) ([]ResourceType, error) {
216+
if v, ok := params["integrations_query"]; ok {
217+
if vv, ok := v.(string); !ok {
218+
return nil, fmt.Errorf("query id should be a string")
219+
} else {
220+
queryResponse, err := coreServiceClient.RunQuery(&httpclient.Context{UserRole: authApi.AdminRole}, coreApi.RunQueryRequest{
221+
Query: &vv,
222+
Page: coreApi.Page{
223+
No: 1,
224+
Size: 1000,
225+
},
226+
})
227+
if err != nil {
228+
return nil, err
229+
}
230+
var resourceTypes []ResourceType
231+
for _, r := range queryResponse.Result {
232+
resourceType := ResourceType{}
233+
for i, rc := range r {
234+
if queryResponse.Headers[i] == "resource_type" {
235+
resourceType.Name = rc.(string)
236+
}
237+
}
238+
resourceTypes = append(resourceTypes, resourceType)
239+
}
240+
return resourceTypes, nil
241+
}
242+
} else {
243+
return nil, fmt.Errorf("query id should be a string")
244+
}
245+
}

discovery/worker/service.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package worker
2+
3+
import (
4+
"github.com/spf13/cobra"
5+
"go.uber.org/zap"
6+
)
7+
8+
func WorkerCommand() *cobra.Command {
9+
cmd := &cobra.Command{
10+
RunE: func(cmd *cobra.Command, args []string) error {
11+
ctx := cmd.Context()
12+
cmd.SilenceUsage = true
13+
logger, err := zap.NewProduction()
14+
if err != nil {
15+
return err
16+
}
17+
18+
w, err := NewWorker(
19+
logger,
20+
cmd.Context(),
21+
)
22+
if err != nil {
23+
return err
24+
}
25+
26+
return w.Run(ctx)
27+
},
28+
}
29+
30+
return cmd
31+
}

0 commit comments

Comments
 (0)