Skip to content

Commit edbfc46

Browse files
committed
Implement atomicTrigger AckEvent
1 parent f718f3b commit edbfc46

File tree

3 files changed

+22
-6
lines changed

3 files changed

+22
-6
lines changed

pkg/capabilities/registry/base.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,14 @@ func (a *atomicTriggerCapability) GetState() connectivity.State {
244244
return connectivity.State(-1) // unknown
245245
}
246246

247+
func (a *atomicTriggerCapability) AckEvent(ctx context.Context, eventId string) error {
248+
c := a.Load()
249+
if c == nil {
250+
return errors.New("capability unavailable")
251+
}
252+
return (*c).AckEvent(ctx, eventId)
253+
}
254+
247255
func (a *atomicTriggerCapability) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
248256
c := a.Load()
249257
if c == nil {
@@ -360,6 +368,14 @@ func (a *atomicExecuteAndTriggerCapability) GetState() connectivity.State {
360368
return connectivity.State(-1) // unknown
361369
}
362370

371+
func (a *atomicExecuteAndTriggerCapability) AckEvent(ctx context.Context, eventId string) error {
372+
c := a.Load()
373+
if c == nil {
374+
return errors.New("capability unavailable")
375+
}
376+
return (*c).AckEvent(ctx, eventId)
377+
}
378+
363379
func (a *atomicExecuteAndTriggerCapability) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
364380
c := a.Load()
365381
if c == nil {

pkg/capabilities/triggers/mercury_trigger.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,6 @@ type MercuryTriggerService struct {
3737
metaOverride datastreams.Metadata // usually empty, but set to a value in mock trigger
3838
}
3939

40-
func (o *MercuryTriggerService) AckEvent(ctx context.Context, eventId string) error {
41-
//TODO Use BaseTriggerCapability?
42-
return nil
43-
}
44-
4540
var _ capabilities.TriggerCapability = (*MercuryTriggerService)(nil)
4641
var _ services.Service = &MercuryTriggerService{}
4742

@@ -96,6 +91,11 @@ func (o *MercuryTriggerService) ProcessReport(reports []datastreams.FeedReport)
9691
return nil
9792
}
9893

94+
func (o *MercuryTriggerService) AckEvent(ctx context.Context, eventId string) error {
95+
// TODO Implement?
96+
return nil
97+
}
98+
9999
func (o *MercuryTriggerService) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
100100
wid := req.Metadata.WorkflowID
101101

pkg/capabilities/triggers/on_demand_trigger.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (o *OnDemand) SendEvent(ctx context.Context, wid string, event capabilities
7373
}
7474

7575
func (o *OnDemand) AckEvent(ctx context.Context, eventId string) error {
76-
//TODO Should this trigger type use BaseTriggerCapability?
76+
//TODO Implement?
7777
return nil
7878
}
7979

0 commit comments

Comments
 (0)