Skip to content

Commit 4ab0153

Browse files
committed
using resoruce id as the work metadata name
Signed-off-by: Wei Liu <liuweixa@redhat.com>
1 parent 7fea9a3 commit 4ab0153

File tree

10 files changed

+833
-58
lines changed

10 files changed

+833
-58
lines changed

cmd/maestro/server/grpc_broker.go

Lines changed: 7 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"time"
1111

1212
ce "github.com/cloudevents/sdk-go/v2"
13-
cetypes "github.com/cloudevents/sdk-go/v2/types"
1413
"google.golang.org/grpc"
1514
"google.golang.org/grpc/credentials"
1615
"google.golang.org/grpc/keepalive"
@@ -24,11 +23,14 @@ import (
2423
servergrpc "open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc"
2524

2625
"github.com/openshift-online/maestro/pkg/api"
26+
"github.com/openshift-online/maestro/pkg/client/cloudevents"
2727
"github.com/openshift-online/maestro/pkg/dao"
2828
"github.com/openshift-online/maestro/pkg/event"
2929
"github.com/openshift-online/maestro/pkg/services"
3030
)
3131

32+
const source = "maestro"
33+
3234
var _ EventServer = &GRPCBroker{}
3335

3436
type GRPCBrokerService struct {
@@ -290,63 +292,18 @@ func (s *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool,
290292

291293
// decodeResourceStatus translates a CloudEvent into a resource containing the status JSON map.
292294
func decodeResourceStatus(evt *ce.Event) (*api.Resource, error) {
293-
evtExtensions := evt.Context.GetExtensions()
294-
295-
clusterName, err := cetypes.ToString(evtExtensions[types.ExtensionClusterName])
296-
if err != nil {
297-
return nil, fmt.Errorf("failed to get clustername extension: %v", err)
298-
}
299-
300-
resourceID, err := cetypes.ToString(evtExtensions[types.ExtensionResourceID])
301-
if err != nil {
302-
return nil, fmt.Errorf("failed to get resourceid extension: %v", err)
303-
}
304-
305-
resourceVersion, err := cetypes.ToInteger(evtExtensions[types.ExtensionResourceVersion])
306-
if err != nil {
307-
return nil, fmt.Errorf("failed to get resourceversion extension: %v", err)
308-
}
309-
310-
status, err := api.CloudEventToJSONMap(evt)
311-
if err != nil {
312-
return nil, fmt.Errorf("failed to convert cloudevent to resource status: %v", err)
313-
}
314-
315-
resource := &api.Resource{
316-
Source: evt.Source(),
317-
ConsumerName: clusterName,
318-
Version: resourceVersion,
319-
Meta: api.Meta{
320-
ID: resourceID,
321-
},
322-
Status: status,
323-
}
324-
325-
return resource, nil
295+
codec := cloudevents.NewCodec(source)
296+
return codec.Decode(evt)
326297
}
327298

328299
// encodeResourceSpec translates a resource spec JSON map into a CloudEvent.
329300
func encodeResourceSpec(resource *api.Resource) (*ce.Event, error) {
330-
evt, err := api.JSONMAPToCloudEvent(resource.Payload)
331-
if err != nil {
332-
return nil, fmt.Errorf("failed to convert resource payload to cloudevent: %v", err)
333-
}
334-
335301
eventType := types.CloudEventsType{
336302
CloudEventsDataType: workpayload.ManifestBundleEventDataType,
337303
SubResource: types.SubResourceSpec,
338304
Action: types.EventAction("create_request"),
339305
}
340-
evt.SetType(eventType.String())
341-
evt.SetSource("maestro")
342-
// TODO set resource.Source with a new extension attribute if the agent needs
343-
evt.SetExtension(types.ExtensionResourceID, resource.ID)
344-
evt.SetExtension(types.ExtensionResourceVersion, int64(resource.Version))
345-
evt.SetExtension(types.ExtensionClusterName, resource.ConsumerName)
346-
347-
if !resource.GetDeletionTimestamp().IsZero() {
348-
evt.SetExtension(types.ExtensionDeletionTimestamp, resource.GetDeletionTimestamp().Time)
349-
}
350306

351-
return evt, nil
307+
codec := cloudevents.NewCodec(source)
308+
return codec.Encode(source, eventType, resource)
352309
}

docs/maestro.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,8 @@ The `grpcClientTokenFile` stores the token for the corresponding service account
186186

187187
4. After persisting the record, the Maestro server publishes a [CloudEvent](./resources/cloudevents.spec.maestro.json) representing the stored Resource to Maestro agent.
188188

189+
- Maestro server uses the `Resource` record ID as the `ManifestWork` name.
190+
189191
5. The Maestro agent receives this CloudEvent, converts it back into a `ManifestWork`, and applies it to the target Kubernetes cluster. After applied, there is a corresponding [appliedmanifestwork](./resources/appliedmanifestwork.json) created on the agent side.
190192

191193
### Maestro Subscribe Resource Status with MQTT

docs/resources/appliedmanifestwork.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@
1111
"cloudevents.open-cluster-management.io/originalsource": "maestro",
1212
"maestro.resource.type": "2d18a7dc-c731-5753-8aa9-6b2f22e3b12b"
1313
},
14-
"name": "f1d8a1049b93dffc1929d57a719c3a09a4dcbfe0cd6e42840325be3b2dde73c8-e44ec579-9646-549a-b679-db8d19d6da37",
14+
"name": "f1d8a1049b93dffc1929d57a719c3a09a4dcbfe0cd6e42840325be3b2dde73c8-55c61e54-a3f6-563d-9fec-b1fe297bdfdb",
1515
"resourceVersion": "1552",
1616
"uid": "432b3ff3-702e-4363-89e6-ad399c75608c"
1717
},
1818
"spec": {
1919
"agentID": "f1d8a1049b93dffc1929d57a719c3a09a4dcbfe0cd6e42840325be3b2dde73c8",
2020
"hubHash": "f1d8a1049b93dffc1929d57a719c3a09a4dcbfe0cd6e42840325be3b2dde73c8",
21-
"manifestWorkName": "e44ec579-9646-549a-b679-db8d19d6da37"
21+
"manifestWorkName": "55c61e54-a3f6-563d-9fec-b1fe297bdfdb"
2222
},
2323
"status": {
2424
"appliedResources": [

docs/resources/cloudevents.spec.maestro.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474
},
7575
"clustername": "7ff1a8f4-8811-49b4-8c19-9271bc51975e",
7676
"logtracing": "{}",
77-
"metadata": "{\"creationTimestamp\":\"2025-12-16T06:00:17Z\",\"labels\":{\"maestro.resource.type\":\"2d18a7dc-c731-5753-8aa9-6b2f22e3b12b\"},\"name\":\"e44ec579-9646-549a-b679-db8d19d6da37\",\"namespace\":\"7ff1a8f4-8811-49b4-8c19-9271bc51975e\",\"resourceVersion\":\"0\",\"uid\":\"55c61e54-a3f6-563d-9fec-b1fe297bdfdb\"}",
77+
"metadata": "{\"creationTimestamp\":\"2025-12-16T06:00:17Z\",\"labels\":{\"maestro.resource.type\":\"2d18a7dc-c731-5753-8aa9-6b2f22e3b12b\"},\"name\":\"55c61e54-a3f6-563d-9fec-b1fe297bdfdb\",\"namespace\":\"7ff1a8f4-8811-49b4-8c19-9271bc51975e\",\"resourceVersion\":\"0\",\"uid\":\"55c61e54-a3f6-563d-9fec-b1fe297bdfdb\"}",
7878
"originalsource": "",
7979
"resourceid": "55c61e54-a3f6-563d-9fec-b1fe297bdfdb",
8080
"resourceversion": 1

docs/resources/cloudevents.status.maestro.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@
141141
},
142142
"clustername": "7ff1a8f4-8811-49b4-8c19-9271bc51975e",
143143
"logtracing": "{}",
144-
"metadata": "{\"creationTimestamp\":\"2025-12-16T06:00:17Z\",\"labels\":{\"maestro.resource.type\":\"2d18a7dc-c731-5753-8aa9-6b2f22e3b12b\"},\"name\":\"e44ec579-9646-549a-b679-db8d19d6da37\",\"namespace\":\"7ff1a8f4-8811-49b4-8c19-9271bc51975e\",\"resourceVersion\":\"0\",\"uid\":\"55c61e54-a3f6-563d-9fec-b1fe297bdfdb\"}",
144+
"metadata": "{\"creationTimestamp\":\"2025-12-16T06:00:17Z\",\"labels\":{\"maestro.resource.type\":\"2d18a7dc-c731-5753-8aa9-6b2f22e3b12b\"},\"name\":\"55c61e54-a3f6-563d-9fec-b1fe297bdfdb\",\"namespace\":\"7ff1a8f4-8811-49b4-8c19-9271bc51975e\",\"resourceVersion\":\"0\",\"uid\":\"55c61e54-a3f6-563d-9fec-b1fe297bdfdb\"}",
145145
"originalsource": "maestro",
146146
"resourceid": "55c61e54-a3f6-563d-9fec-b1fe297bdfdb",
147147
"resourceversion": 1,

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -833,8 +833,6 @@ open-cluster-management.io/api v1.1.1-0.20251215032811-ee922fbb996c h1:LWZ+5dwVS
833833
open-cluster-management.io/api v1.1.1-0.20251215032811-ee922fbb996c/go.mod h1:Hk/3c114t6Ba5qhpqw+RoA93yEbE2CosG+JzzBZ6aCo=
834834
open-cluster-management.io/ocm v1.1.1-0.20251211014758-deb61b0a60d5 h1:02XVQtUt8v/fw6n3Rbf7Tu4FHJVVW2MiQLSpThWLwro=
835835
open-cluster-management.io/ocm v1.1.1-0.20251211014758-deb61b0a60d5/go.mod h1:kaFjiXsxtUUwWChYIy/ojANeMMrDW1KUrliuKA55z/Q=
836-
open-cluster-management.io/sdk-go v1.1.1-0.20251209031938-62521c9935ac h1:Wt7rzenZqrtyYI58+lpe9tmf9e5Ft8Wwd0MyDwuJ4ck=
837-
open-cluster-management.io/sdk-go v1.1.1-0.20251209031938-62521c9935ac/go.mod h1:0EZ9M7AtD0b+x9lUo5pYlyFF2aKOk1y88looeOVybwU=
838836
open-cluster-management.io/sdk-go v1.1.1-0.20251218031856-08bb1caedf74 h1:Sf+w+8ZzJgUQez/ADk2FLhoVDl1hD4rJqLQfvCFuM1o=
839837
open-cluster-management.io/sdk-go v1.1.1-0.20251218031856-08bb1caedf74/go.mod h1:3xQf3gISaZ3377vFnwjH3QH8EF2UNaf8D9igLPUBChk=
840838
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 h1:jpcvIRr3GLoUoEKRkHKSmGjxb6lWwrBlJsXc+eUYQHM=

pkg/client/cloudevents/codec.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
cloudevents "github.com/cloudevents/sdk-go/v2"
88
cloudeventstypes "github.com/cloudevents/sdk-go/v2/types"
99
"github.com/google/uuid"
10+
1011
workpayload "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/payload"
1112
cegeneric "open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
1213
cetypes "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
@@ -31,6 +32,14 @@ func (codec *Codec) EventDataType() cetypes.CloudEventsDataType {
3132
}
3233

3334
func (codec *Codec) Encode(source string, eventType cetypes.CloudEventsType, res *api.Resource) (*cloudevents.Event, error) {
35+
// use resource id as the resource payload metadata name to ensure the payload metadata name is
36+
// unique on the agent side
37+
if err := resetPayloadMetadataNameWithResID(res); err != nil {
38+
return nil, err
39+
}
40+
41+
// converts a resource payload to a CloudEvent
42+
// If the resource payload has metadata the event will have the metadata extension
3443
evt, err := api.JSONMAPToCloudEvent(res.Payload)
3544
if err != nil {
3645
return nil, fmt.Errorf("failed to convert resource payload to cloudevent: %v", err)
@@ -107,3 +116,20 @@ func (codec *Codec) Decode(evt *cloudevents.Event) (*api.Resource, error) {
107116

108117
return resource, nil
109118
}
119+
120+
func resetPayloadMetadataNameWithResID(res *api.Resource) error {
121+
metadata, ok := res.Payload[cetypes.ExtensionWorkMeta]
122+
if !ok {
123+
// the resource payload does not have metadata, do nothing
124+
// the agent will using resource id as the payload metadata name
125+
return nil
126+
}
127+
128+
metaObj, ok := metadata.(map[string]interface{})
129+
if !ok {
130+
return fmt.Errorf("unsupported object type %T", metadata)
131+
}
132+
metaObj["name"] = res.ID
133+
res.Payload[cetypes.ExtensionWorkMeta] = metaObj
134+
return nil
135+
}

0 commit comments

Comments
 (0)