Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 7 additions & 50 deletions cmd/maestro/server/grpc_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

ce "github.com/cloudevents/sdk-go/v2"
cetypes "github.com/cloudevents/sdk-go/v2/types"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
Expand All @@ -24,11 +23,14 @@ import (
servergrpc "open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc"

"github.com/openshift-online/maestro/pkg/api"
"github.com/openshift-online/maestro/pkg/client/cloudevents"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/event"
"github.com/openshift-online/maestro/pkg/services"
)

const source = "maestro"

var _ EventServer = &GRPCBroker{}

type GRPCBrokerService struct {
Expand Down Expand Up @@ -290,63 +292,18 @@ func (s *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool,

// decodeResourceStatus translates a CloudEvent into a resource containing the status JSON map.
func decodeResourceStatus(evt *ce.Event) (*api.Resource, error) {
evtExtensions := evt.Context.GetExtensions()

clusterName, err := cetypes.ToString(evtExtensions[types.ExtensionClusterName])
if err != nil {
return nil, fmt.Errorf("failed to get clustername extension: %v", err)
}

resourceID, err := cetypes.ToString(evtExtensions[types.ExtensionResourceID])
if err != nil {
return nil, fmt.Errorf("failed to get resourceid extension: %v", err)
}

resourceVersion, err := cetypes.ToInteger(evtExtensions[types.ExtensionResourceVersion])
if err != nil {
return nil, fmt.Errorf("failed to get resourceversion extension: %v", err)
}

status, err := api.CloudEventToJSONMap(evt)
if err != nil {
return nil, fmt.Errorf("failed to convert cloudevent to resource status: %v", err)
}

resource := &api.Resource{
Source: evt.Source(),
ConsumerName: clusterName,
Version: resourceVersion,
Meta: api.Meta{
ID: resourceID,
},
Status: status,
}

return resource, nil
codec := cloudevents.NewCodec(source)
return codec.Decode(evt)
}

// encodeResourceSpec translates a resource spec JSON map into a CloudEvent.
func encodeResourceSpec(resource *api.Resource) (*ce.Event, error) {
evt, err := api.JSONMAPToCloudEvent(resource.Payload)
if err != nil {
return nil, fmt.Errorf("failed to convert resource payload to cloudevent: %v", err)
}

eventType := types.CloudEventsType{
CloudEventsDataType: workpayload.ManifestBundleEventDataType,
SubResource: types.SubResourceSpec,
Action: types.EventAction("create_request"),
}
evt.SetType(eventType.String())
evt.SetSource("maestro")
// TODO set resource.Source with a new extension attribute if the agent needs
evt.SetExtension(types.ExtensionResourceID, resource.ID)
evt.SetExtension(types.ExtensionResourceVersion, int64(resource.Version))
evt.SetExtension(types.ExtensionClusterName, resource.ConsumerName)

if !resource.GetDeletionTimestamp().IsZero() {
evt.SetExtension(types.ExtensionDeletionTimestamp, resource.GetDeletionTimestamp().Time)
}

return evt, nil
codec := cloudevents.NewCodec(source)
return codec.Encode(source, eventType, resource)
}
2 changes: 2 additions & 0 deletions docs/maestro.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ The `grpcClientTokenFile` stores the token for the corresponding service account

4. After persisting the record, the Maestro server publishes a [CloudEvent](./resources/cloudevents.spec.maestro.json) representing the stored Resource to Maestro agent.

- Maestro server uses the `Resource` record ID as the `ManifestWork` name.

5. The Maestro agent receives this CloudEvent, converts it back into a `ManifestWork`, and applies it to the target Kubernetes cluster. After applied, there is a corresponding [appliedmanifestwork](./resources/appliedmanifestwork.json) created on the agent side.

### Maestro Subscribe Resource Status with MQTT
Expand Down
4 changes: 2 additions & 2 deletions docs/resources/appliedmanifestwork.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
"cloudevents.open-cluster-management.io/originalsource": "maestro",
"maestro.resource.type": "2d18a7dc-c731-5753-8aa9-6b2f22e3b12b"
},
"name": "f1d8a1049b93dffc1929d57a719c3a09a4dcbfe0cd6e42840325be3b2dde73c8-e44ec579-9646-549a-b679-db8d19d6da37",
"name": "f1d8a1049b93dffc1929d57a719c3a09a4dcbfe0cd6e42840325be3b2dde73c8-55c61e54-a3f6-563d-9fec-b1fe297bdfdb",
"resourceVersion": "1552",
"uid": "432b3ff3-702e-4363-89e6-ad399c75608c"
},
"spec": {
"agentID": "f1d8a1049b93dffc1929d57a719c3a09a4dcbfe0cd6e42840325be3b2dde73c8",
"hubHash": "f1d8a1049b93dffc1929d57a719c3a09a4dcbfe0cd6e42840325be3b2dde73c8",
"manifestWorkName": "e44ec579-9646-549a-b679-db8d19d6da37"
"manifestWorkName": "55c61e54-a3f6-563d-9fec-b1fe297bdfdb"
},
"status": {
"appliedResources": [
Expand Down
2 changes: 1 addition & 1 deletion docs/resources/cloudevents.spec.maestro.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
},
"clustername": "7ff1a8f4-8811-49b4-8c19-9271bc51975e",
"logtracing": "{}",
"metadata": "{\"creationTimestamp\":\"2025-12-16T06:00:17Z\",\"labels\":{\"maestro.resource.type\":\"2d18a7dc-c731-5753-8aa9-6b2f22e3b12b\"},\"name\":\"e44ec579-9646-549a-b679-db8d19d6da37\",\"namespace\":\"7ff1a8f4-8811-49b4-8c19-9271bc51975e\",\"resourceVersion\":\"0\",\"uid\":\"55c61e54-a3f6-563d-9fec-b1fe297bdfdb\"}",
"metadata": "{\"creationTimestamp\":\"2025-12-16T06:00:17Z\",\"labels\":{\"maestro.resource.type\":\"2d18a7dc-c731-5753-8aa9-6b2f22e3b12b\"},\"name\":\"55c61e54-a3f6-563d-9fec-b1fe297bdfdb\",\"namespace\":\"7ff1a8f4-8811-49b4-8c19-9271bc51975e\",\"resourceVersion\":\"0\",\"uid\":\"55c61e54-a3f6-563d-9fec-b1fe297bdfdb\"}",
"originalsource": "",
"resourceid": "55c61e54-a3f6-563d-9fec-b1fe297bdfdb",
"resourceversion": 1
Expand Down
2 changes: 1 addition & 1 deletion docs/resources/cloudevents.status.maestro.json
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@
},
"clustername": "7ff1a8f4-8811-49b4-8c19-9271bc51975e",
"logtracing": "{}",
"metadata": "{\"creationTimestamp\":\"2025-12-16T06:00:17Z\",\"labels\":{\"maestro.resource.type\":\"2d18a7dc-c731-5753-8aa9-6b2f22e3b12b\"},\"name\":\"e44ec579-9646-549a-b679-db8d19d6da37\",\"namespace\":\"7ff1a8f4-8811-49b4-8c19-9271bc51975e\",\"resourceVersion\":\"0\",\"uid\":\"55c61e54-a3f6-563d-9fec-b1fe297bdfdb\"}",
"metadata": "{\"creationTimestamp\":\"2025-12-16T06:00:17Z\",\"labels\":{\"maestro.resource.type\":\"2d18a7dc-c731-5753-8aa9-6b2f22e3b12b\"},\"name\":\"55c61e54-a3f6-563d-9fec-b1fe297bdfdb\",\"namespace\":\"7ff1a8f4-8811-49b4-8c19-9271bc51975e\",\"resourceVersion\":\"0\",\"uid\":\"55c61e54-a3f6-563d-9fec-b1fe297bdfdb\"}",
"originalsource": "maestro",
"resourceid": "55c61e54-a3f6-563d-9fec-b1fe297bdfdb",
"resourceversion": 1,
Expand Down
25 changes: 25 additions & 0 deletions pkg/client/cloudevents/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ func (codec *Codec) EventDataType() cetypes.CloudEventsDataType {
}

func (codec *Codec) Encode(source string, eventType cetypes.CloudEventsType, res *api.Resource) (*cloudevents.Event, error) {
// use resource id as the resource payload metadata name to ensure the payload metadata name is
// unique on the agent side
if err := resetPayloadMetadataNameWithResID(res); err != nil {
return nil, err
}

// converts a resource payload to a CloudEvent
// If the resource payload has metadata the event will have the metadata extension
evt, err := api.JSONMAPToCloudEvent(res.Payload)
if err != nil {
return nil, fmt.Errorf("failed to convert resource payload to cloudevent: %v", err)
Expand Down Expand Up @@ -107,3 +115,20 @@ func (codec *Codec) Decode(evt *cloudevents.Event) (*api.Resource, error) {

return resource, nil
}

func resetPayloadMetadataNameWithResID(res *api.Resource) error {
metadata, ok := res.Payload[cetypes.ExtensionWorkMeta]
if !ok {
// the resource payload does not have metadata, do nothing
// the agent will use resource id as the payload metadata name
return nil
}

metaObj, ok := metadata.(map[string]interface{})
if !ok {
return fmt.Errorf("unsupported object type %T", metadata)
}
metaObj["name"] = res.ID
res.Payload[cetypes.ExtensionWorkMeta] = metaObj
return nil
}
Loading
Loading