Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions re/api/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,41 @@ func disableRuleEndpoint(s re.Service) endpoint.Endpoint {
return updateRuleStatusRes{Rule: rule}, err
}
}

func abortRuleExecutionEndpoint(s re.Service) endpoint.Endpoint {
return func(ctx context.Context, request any) (any, error) {
session, ok := ctx.Value(authn.SessionKey).(authn.Session)
if !ok {
return nil, svcerr.ErrAuthorization
}

req := request.(abortRuleExecutionReq)
if err := req.validate(); err != nil {
return abortRuleExecutionRes{}, err
}
err := s.AbortRuleExecution(ctx, session, req.id)
if err != nil {
return abortRuleExecutionRes{}, err
}
return abortRuleExecutionRes{}, nil
}
}

func getRuleExecutionStatusEndpoint(s re.Service) endpoint.Endpoint {
return func(ctx context.Context, request any) (any, error) {
session, ok := ctx.Value(authn.SessionKey).(authn.Session)
if !ok {
return nil, svcerr.ErrAuthorization
}

req := request.(getRuleExecutionStatusReq)
if err := req.validate(); err != nil {
return getRuleExecutionStatusRes{}, err
}
status, err := s.GetRuleExecutionStatus(ctx, session, req.id)
if err != nil {
return getRuleExecutionStatusRes{}, err
}
return getRuleExecutionStatusRes{RuleExecutionStatus: status}, nil
}
}
24 changes: 24 additions & 0 deletions re/api/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,27 @@ func (req deleteRuleReq) validate() error {

return nil
}

type abortRuleExecutionReq struct {
id string
}

func (req abortRuleExecutionReq) validate() error {
if req.id == "" {
return apiutil.ErrMissingID
}

return nil
}

type getRuleExecutionStatusReq struct {
id string
}

func (req getRuleExecutionStatusReq) validate() error {
if req.id == "" {
return apiutil.ErrMissingID
}

return nil
}
32 changes: 32 additions & 0 deletions re/api/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ var (
_ supermq.Response = (*rulesPageRes)(nil)
_ supermq.Response = (*updateRuleRes)(nil)
_ supermq.Response = (*deleteRuleRes)(nil)
_ supermq.Response = (*abortRuleExecutionRes)(nil)
_ supermq.Response = (*getRuleExecutionStatusRes)(nil)
)

type pageRes struct {
Expand Down Expand Up @@ -136,3 +138,33 @@ func (res deleteRuleRes) Headers() map[string]string {
func (res deleteRuleRes) Empty() bool {
return true
}

type abortRuleExecutionRes struct{}

func (res abortRuleExecutionRes) Code() int {
return http.StatusAccepted
}

func (res abortRuleExecutionRes) Headers() map[string]string {
return map[string]string{}
}

func (res abortRuleExecutionRes) Empty() bool {
return true
}

type getRuleExecutionStatusRes struct {
re.RuleExecutionStatus `json:",inline"`
}

func (res getRuleExecutionStatusRes) Code() int {
return http.StatusOK
}

func (res getRuleExecutionStatusRes) Headers() map[string]string {
return map[string]string{}
}

func (res getRuleExecutionStatusRes) Empty() bool {
return false
}
56 changes: 46 additions & 10 deletions re/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
)

const (
ruleIdKey = "ruleID"
inputChannelKey = "input_channel"
ruleIdKey = "ruleID"
inputChannelKey = "input_channel"
lastRunStatusKey = "last_run_status"
)

// MakeHandler creates an HTTP handler for the service endpoints.
Expand Down Expand Up @@ -99,6 +100,20 @@ func MakeHandler(svc re.Service, authn smqauthn.AuthNMiddleware, mux *chi.Mux, l
api.EncodeResponse,
opts...,
), "disable_rule").ServeHTTP)

r.Post("/abort", otelhttp.NewHandler(kithttp.NewServer(
abortRuleExecutionEndpoint(svc),
decodeAbortRuleExecutionRequest,
api.EncodeResponse,
opts...,
), "abort_rule_execution").ServeHTTP)

r.Get("/execution-status", otelhttp.NewHandler(kithttp.NewServer(
getRuleExecutionStatusEndpoint(svc),
decodeGetRuleExecutionStatusRequest,
api.EncodeResponse,
opts...,
), "get_rule_execution_status").ServeHTTP)
})
})
})
Expand Down Expand Up @@ -198,6 +213,10 @@ func decodeListRulesRequest(_ context.Context, r *http.Request) (any, error) {
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
lrs, err := apiutil.ReadStringQuery(r, lastRunStatusKey, re.NeverRun)
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
dir, err := apiutil.ReadStringQuery(r, api.DirKey, "desc")
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
Expand All @@ -210,21 +229,26 @@ func decodeListRulesRequest(_ context.Context, r *http.Request) (any, error) {
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
lrst, err := re.ToExecutionStatus(lrs)
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
tag, err := apiutil.ReadStringQuery(r, api.TagKey, "")
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}

return listRulesReq{
PageMeta: re.PageMeta{
Offset: offset,
Limit: limit,
Name: name,
InputChannel: ic,
Status: st,
Dir: dir,
Order: order,
Tag: tag,
Offset: offset,
Limit: limit,
Name: name,
InputChannel: ic,
Status: st,
LastRunStatus: lrst,
Dir: dir,
Order: order,
Tag: tag,
},
}, nil
}
Expand All @@ -234,3 +258,15 @@ func decodeDeleteRuleRequest(_ context.Context, r *http.Request) (any, error) {

return deleteRuleReq{id: id}, nil
}

func decodeAbortRuleExecutionRequest(_ context.Context, r *http.Request) (any, error) {
id := chi.URLParam(r, ruleIdKey)

return abortRuleExecutionReq{id: id}, nil
}

func decodeGetRuleExecutionStatusRequest(_ context.Context, r *http.Request) (any, error) {
id := chi.URLParam(r, ruleIdKey)

return getRuleExecutionStatusReq{id: id}, nil
}
14 changes: 14 additions & 0 deletions re/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
ruleUpdateSchedule = rulePrefix + "update_schedule"
ruleEnable = rulePrefix + "enable"
ruleDisable = rulePrefix + "disable"
ruleAbort = rulePrefix + "abort"
ruleRemove = rulePrefix + "remove"
)

Expand All @@ -33,6 +34,7 @@ var (
_ events.Event = (*updateRuleScheduleEvent)(nil)
_ events.Event = (*enableRuleEvent)(nil)
_ events.Event = (*disableRuleEvent)(nil)
_ events.Event = (*abortRuleExecutionEvent)(nil)
_ events.Event = (*removeRuleEvent)(nil)
)

Expand Down Expand Up @@ -187,3 +189,15 @@ func (rre removeRuleEvent) Encode() (map[string]any, error) {
val["operation"] = ruleRemove
return val, nil
}

type abortRuleExecutionEvent struct {
id string
baseRuleEvent
}

func (aree abortRuleExecutionEvent) Encode() (map[string]any, error) {
val := aree.baseRuleEvent.Encode()
val["id"] = aree.id
val["operation"] = ruleAbort
return val, nil
}
20 changes: 20 additions & 0 deletions re/events/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
UpdateScheduleStream = supermqPrefix + ruleUpdateSchedule
EnableStream = supermqPrefix + ruleEnable
DisableStream = supermqPrefix + ruleDisable
AbortStream = supermqPrefix + ruleAbort
RemoveStream = supermqPrefix + ruleRemove
)

Expand Down Expand Up @@ -183,6 +184,25 @@ func (es *eventStore) DisableRule(ctx context.Context, session authn.Session, id
return rule, nil
}

func (es *eventStore) AbortRuleExecution(ctx context.Context, session authn.Session, id string) error {
err := es.svc.AbortRuleExecution(ctx, session, id)
if err != nil {
return err
}
event := abortRuleExecutionEvent{
id: id,
baseRuleEvent: newBaseRuleEvent(session, middleware.GetReqID(ctx)),
}
if err := es.Publish(ctx, AbortStream, event); err != nil {
return err
}
return nil
}

func (es *eventStore) GetRuleExecutionStatus(ctx context.Context, session authn.Session, id string) (re.RuleExecutionStatus, error) {
return es.svc.GetRuleExecutionStatus(ctx, session, id)
}

func (es *eventStore) StartScheduler(ctx context.Context) error {
return es.svc.StartScheduler(ctx)
}
Expand Down
101 changes: 101 additions & 0 deletions re/execution_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0

package re

import (
"encoding/json"
"strings"

svcerr "github.com/absmach/supermq/pkg/errors/service"
)

// ExecutionStatus represents the last run status of a rule execution.
type ExecutionStatus uint8

// Possible execution status values.
const (
// NeverRunStatus represents a rule that has never been executed.
NeverRunStatus ExecutionStatus = iota
// SuccessStatus represents a successful rule execution.
SuccessStatus
// FailureStatus represents a failed rule execution.
FailureStatus
// PartialSuccessStatus represents a rule execution with partial success.
PartialSuccessStatus
// QueuedStatus represents a rule that is queued for execution.
QueuedStatus
// InProgressStatus represents a rule that is currently being executed.
InProgressStatus
// AbortedStatus represents a rule execution that was aborted.
AbortedStatus
// UnknownExecutionStatus represents an unknown execution status.
UnknownExecutionStatus
)

// String representation of the possible execution status values.
const (
NeverRun = "never_run"
Success = "success"
Failure = "failure"
PartialSuccess = "partial_success"
Queued = "queued"
InProgress = "in_progress"
Aborted = "aborted"
UnknownExecution = "unknown"
)

func (es ExecutionStatus) String() string {
switch es {
case NeverRunStatus:
return NeverRun
case SuccessStatus:
return Success
case FailureStatus:
return Failure
case PartialSuccessStatus:
return PartialSuccess
case QueuedStatus:
return Queued
case InProgressStatus:
return InProgress
case AbortedStatus:
return Aborted
default:
return UnknownExecution
}
}

// ToExecutionStatus converts string value to a valid execution status.
func ToExecutionStatus(status string) (ExecutionStatus, error) {
switch status {
case NeverRun:
return NeverRunStatus, nil
case Success:
return SuccessStatus, nil
case Failure:
return FailureStatus, nil
case PartialSuccess:
return PartialSuccessStatus, nil
case Queued:
return QueuedStatus, nil
case InProgress:
return InProgressStatus, nil
case Aborted:
return AbortedStatus, nil
case "", UnknownExecution:
return UnknownExecutionStatus, nil
}
return UnknownExecutionStatus, svcerr.ErrInvalidStatus
}

func (es ExecutionStatus) MarshalJSON() ([]byte, error) {
return json.Marshal(es.String())
}

func (es *ExecutionStatus) UnmarshalJSON(data []byte) error {
str := strings.Trim(string(data), "\"")
val, err := ToExecutionStatus(str)
*es = val
return err
}
Loading
Loading