Skip to content

Commit 823791f

Browse files
feat(maintenance): use proto field descriptor (#3531)
1 parent d84c359 commit 823791f

File tree

3 files changed

+55
-13
lines changed

3 files changed

+55
-13
lines changed

flow/activities/maintenance_activity.go

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import (
1515
"go.temporal.io/api/workflowservice/v1"
1616
"go.temporal.io/sdk/activity"
1717
"go.temporal.io/sdk/client"
18+
proto2 "google.golang.org/protobuf/proto"
19+
"google.golang.org/protobuf/reflect/protoreflect"
1820
"google.golang.org/protobuf/types/known/timestamppb"
1921

2022
"github.com/PeerDB-io/peerdb/flow/alerting"
@@ -112,13 +114,30 @@ func (a *MaintenanceActivity) WaitForRunningSnapshotsAndIntermediateStates(
112114
return mirrors, nil
113115
}
114116

115-
var waitStatuses map[protos.FlowStatus]struct{} = map[protos.FlowStatus]struct{}{
116-
protos.FlowStatus_STATUS_SNAPSHOT: {},
117-
protos.FlowStatus_STATUS_SETUP: {},
118-
protos.FlowStatus_STATUS_RESYNC: {},
119-
protos.FlowStatus_STATUS_UNKNOWN: {},
120-
protos.FlowStatus_STATUS_PAUSING: {},
121-
protos.FlowStatus_STATUS_MODIFYING: {},
117+
var waitStatuses = buildWaitStatuses()
118+
119+
func buildWaitStatuses() map[protos.FlowStatus]struct{} {
120+
waitStatuses := make(map[protos.FlowStatus]struct{})
121+
122+
for index := range protos.FlowStatus_name {
123+
flowStatus := protos.FlowStatus(index)
124+
125+
// Get the enum value descriptor
126+
enumValueDesc := flowStatus.Descriptor().Values().ByNumber(protoreflect.EnumNumber(index))
127+
if enumValueDesc == nil {
128+
continue
129+
}
130+
131+
// Get the extension value from the enum value options
132+
if proto2.HasExtension(enumValueDesc.Options(), protos.E_PeerdbMaintenanceWait) {
133+
waitValue := proto2.GetExtension(enumValueDesc.Options(), protos.E_PeerdbMaintenanceWait)
134+
if boolVal, ok := waitValue.(bool); ok && boolVal {
135+
waitStatuses[flowStatus] = struct{}{}
136+
}
137+
}
138+
}
139+
140+
return waitStatuses
122141
}
123142

124143
func (a *MaintenanceActivity) checkAndWaitIfNeeded(
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package activities
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
8+
"github.com/PeerDB-io/peerdb/flow/generated/protos"
9+
)
10+
11+
func TestMaintenanceStatusesAreCorrectlyPopulated(t *testing.T) {
12+
statuses := buildWaitStatuses()
13+
t.Logf("statuses: %v", statuses)
14+
assert.Contains(t, statuses, protos.FlowStatus_STATUS_SNAPSHOT, "Snapshot should be in statuses")
15+
assert.NotContains(t, statuses, protos.FlowStatus_STATUS_RUNNING, "Running should not be in statuses")
16+
}

protos/flow.proto

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
11
syntax = "proto3";
22

33
import "google/protobuf/timestamp.proto";
4+
import "google/protobuf/descriptor.proto";
45
import "peers.proto";
56

7+
extend google.protobuf.EnumValueOptions {
8+
// If the flow is in this state, then maintenance should wait for it to transition to another non-wait state
9+
// This is to make sure we have only either terminal states or running state which we can pause
10+
optional bool peerdb_maintenance_wait = 16551843;
11+
}
12+
613
package peerdb_flow;
714

815
message AlertInput {
@@ -400,20 +407,20 @@ message GetOpenConnectionsForUserResult {
400407
// UI reads current workflow status and also requests status changes using same enum
401408
// see flow/cmd/handler.go FlowStateChange
402409
enum FlowStatus {
403-
STATUS_UNKNOWN = 0;
410+
STATUS_UNKNOWN = 0 [(peerdb_maintenance_wait) = true];
404411
STATUS_RUNNING = 1;
405412
STATUS_PAUSED = 2;
406-
STATUS_PAUSING = 3;
413+
STATUS_PAUSING = 3 [(peerdb_maintenance_wait) = true];;
407414
// not reachable in QRep mirrors
408-
STATUS_SETUP = 4;
415+
STATUS_SETUP = 4 [(peerdb_maintenance_wait) = true];
409416
// not reachable in QRep mirrors
410-
STATUS_SNAPSHOT = 5;
417+
STATUS_SNAPSHOT = 5 [(peerdb_maintenance_wait) = true];;
411418
STATUS_TERMINATING = 6;
412419
STATUS_TERMINATED = 7;
413420
STATUS_COMPLETED = 8;
414-
STATUS_RESYNC = 9;
421+
STATUS_RESYNC = 9 [(peerdb_maintenance_wait) = true];;
415422
STATUS_FAILED = 10;
416-
STATUS_MODIFYING = 11;
423+
STATUS_MODIFYING = 11 [(peerdb_maintenance_wait) = true];;
417424
}
418425

419426
message CDCFlowConfigUpdate {

0 commit comments

Comments
 (0)