44 "context"
55 "encoding/json"
66 "fmt"
7+ "github.com/opengovern/og-describer-kubernetes/discovery/envs"
78 "github.com/opengovern/og-describer-kubernetes/discovery/pkg/orchestrator"
89 authApi "github.com/opengovern/og-util/pkg/api"
910 "github.com/opengovern/og-util/pkg/describe"
@@ -54,6 +55,24 @@ func NewTaskRunner(ctx context.Context, jq *jq.JobQueue, coreServiceEndpoint str
5455}
5556
5657type TaskResult struct {
58+ AllIntegrations []string `json:"all_integrations"`
59+ AllIntegrationsCount int `json:"all_integrations_count"`
60+ ProgressedIntegrations map [string ]* IntegrationResult `json:"proposed_integrations"`
61+ ProgressedIntegrationsCount int `json:"proposed_integrations_count"`
62+ }
63+
64+ type IntegrationResult struct {
65+ IntegrationID string `json:"integration_id"`
66+ AllResourceTypes []string `json:"all_resource_types"`
67+ AllResourceTypesCount int `json:"all_resource_types_count"`
68+ ResourceTypeResults []ResourceTypeResult `json:"resource_type_results"`
69+ FinishedResourceTypesCount int `json:"finished_resource_types_count"`
70+ }
71+
72+ type ResourceTypeResult struct {
73+ ResourceType string `json:"resource_type"`
74+ Error string `json:"error"`
75+ ResourceCount int `json:"resource_count"`
5776}
5877
5978type ResourceType struct {
@@ -86,8 +105,15 @@ func (tr *TaskRunner) RunTask(ctx context.Context) error {
86105 }
87106 }
88107
108+ taskResult .AllIntegrations = make ([]string , len (integrations ))
109+ for _ , i := range integrations {
110+ taskResult .AllIntegrations = append (taskResult .AllIntegrations , i .IntegrationID )
111+ }
112+ taskResult .AllIntegrationsCount = len (integrations )
113+ taskResult .ProgressedIntegrations = make (map [string ]* IntegrationResult )
114+
89115 for _ , i := range integrations {
90- err = tr .describeIntegrationResourceTypes (ctx , i )
116+ err = tr .describeIntegrationResourceTypes (ctx , i , taskResult )
91117 if err != nil {
92118 tr .logger .Error ("Error describing integrations" , zap .Error (err ))
93119 return err
@@ -104,7 +130,12 @@ func (tr *TaskRunner) RunTask(ctx context.Context) error {
104130 return nil
105131}
106132
107- func (tr * TaskRunner ) describeIntegrationResourceTypes (ctx context.Context , i Integration ) error {
133+ func (tr * TaskRunner ) describeIntegrationResourceTypes (ctx context.Context , i Integration , taskResult * TaskResult ) error {
134+ taskResult .ProgressedIntegrations [i .IntegrationID ] = & IntegrationResult {
135+ IntegrationID : i .IntegrationID ,
136+ }
137+ taskResult .ProgressedIntegrationsCount ++
138+
108139 var resourceTypes []ResourceType
109140 var err error
110141
@@ -124,6 +155,13 @@ func (tr *TaskRunner) describeIntegrationResourceTypes(ctx context.Context, i In
124155 }
125156 }
126157
158+ taskResult .ProgressedIntegrations [i .IntegrationID ].AllResourceTypes = make ([]string , len (resourceTypes ))
159+ for _ , rt := range resourceTypes {
160+ taskResult .ProgressedIntegrations [i .IntegrationID ].AllResourceTypes = append (taskResult .ProgressedIntegrations [i .IntegrationID ].AllResourceTypes , rt .Name )
161+ }
162+ taskResult .ProgressedIntegrations [i .IntegrationID ].AllResourceTypesCount = len (resourceTypes )
163+ taskResult .ProgressedIntegrations [i .IntegrationID ].ResourceTypeResults = make ([]ResourceTypeResult , len (resourceTypes ))
164+
127165 for _ , rt := range resourceTypes {
128166 params := make (map [string ]string )
129167 for key , value := range tr .request .TaskDefinition .Params {
@@ -144,10 +182,36 @@ func (tr *TaskRunner) describeIntegrationResourceTypes(ctx context.Context, i In
144182 IntegrationLabels : i .Labels ,
145183 IntegrationAnnotations : i .Annotations ,
146184 }
147- _ , err = orchestrator .Describe (ctx , tr .logger , job , params , config , tr .request .EsDeliverEndpoint ,
185+ resources , err : = orchestrator .Describe (ctx , tr .logger , job , params , config , tr .request .EsDeliverEndpoint ,
148186 tr .request .IngestionPipelineEndpoint , tr .describeToken , tr .request .UseOpenSearch )
187+ errMsg := ""
149188 if err != nil {
150189 tr .logger .Error ("Error describing job" , zap .Error (err ))
190+ errMsg = err .Error ()
191+ }
192+ taskResult .ProgressedIntegrations [i .IntegrationID ].ResourceTypeResults = append (
193+ taskResult .ProgressedIntegrations [i .IntegrationID ].ResourceTypeResults ,
194+ ResourceTypeResult {
195+ ResourceType : rt .Name ,
196+ Error : errMsg ,
197+ ResourceCount : len (resources ),
198+ })
199+
200+ taskResult .ProgressedIntegrations [i .IntegrationID ].FinishedResourceTypesCount = len (taskResult .ProgressedIntegrations [i .IntegrationID ].ResourceTypeResults )
201+ jsonBytes , err := json .Marshal (taskResult )
202+ if err != nil {
203+ err = fmt .Errorf ("failed Marshaling task result: %s" , err .Error ())
204+ return err
205+ }
206+ tr .response .Result = jsonBytes
207+ responseJson , marshalErr := json .Marshal (tr .response )
208+ if marshalErr != nil {
209+ tr .logger .Error ("failed to create final job result json" , zap .Error (marshalErr ))
210+ return marshalErr
211+ }
212+ msgId := fmt .Sprintf ("task-run-result-%d" , tr .request .TaskDefinition .RunID )
213+ if _ , err = tr .jq .Produce (ctx , envs .ResultTopicName , responseJson , msgId ); err != nil { // Use original ctx
214+ tr .logger .Error ("failed to publish initial InProgress job status" , zap .String ("response" , string (responseJson )), zap .Error (err ))
151215 return err
152216 }
153217 }
0 commit comments