Skip to content

Commit 41d1c99

Browse files
committed
Review comments
Signed-off-by: joshvanl <[email protected]>
1 parent afcfd29 commit 41d1c99

File tree

3 files changed

+27
-23
lines changed

3 files changed

+27
-23
lines changed

pkg/scheduler/exportimport.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ func Import(ctx context.Context, opts ExportImportOptions) error {
118118
ops := make([]clientv3.Op, 0, len(in.Jobs)+len(in.Counters))
119119

120120
for key, b := range in.Jobs {
121-
// Optional: verify bytes are valid before writing
122121
var j stored.Job
123122
if err := proto.Unmarshal(b, &j); err != nil {
124123
return fmt.Errorf("unmarshal job %q: %w", key, err)
@@ -134,19 +133,21 @@ func Import(ctx context.Context, opts ExportImportOptions) error {
134133
ops = append(ops, clientv3.OpPut(key, string(b)))
135134
}
136135

136+
var end int
137137
for i := 0; i < len(ops); i += 128 {
138138
txn := client.Txn(ctx)
139-
end := i + 128
139+
end = i + 128
140140
if end > len(ops) {
141141
end = len(ops)
142142
}
143143
txn.Then(ops[i:end]...)
144144
if _, err := txn.Commit(); err != nil {
145+
print.FailureStatusEvent(os.Stderr, "Incomplete import with %d items.", end)
145146
return fmt.Errorf("commit transaction: %w", err)
146147
}
147-
148-
print.InfoStatusEvent(os.Stdout, "Imported %d items.", end)
149148
}
150149

150+
print.InfoStatusEvent(os.Stdout, "Imported %d items.", end)
151+
151152
return nil
152153
}

pkg/scheduler/get.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -99,34 +99,37 @@ func getSingle(ctx context.Context, cl *clientv3.Client, key string, opts GetOpt
9999
}
100100

101101
func pathsFromJobKey(jobKey *jobKey, namespace string) [2]string {
102+
const reminderPath = "dapr/jobs/actorreminder"
103+
const reminderCounterPath = "dapr/counters/actorreminder"
104+
102105
var paths [2]string
103106
switch {
104107
case jobKey.actorType != nil:
105-
paths[0] = fmt.Sprintf("dapr/jobs/actorreminder||%s||%s||%s||%s",
106-
namespace, *jobKey.actorType, *jobKey.actorID, jobKey.name,
108+
paths[0] = fmt.Sprintf("%s||%s||%s||%s||%s",
109+
reminderPath, namespace, *jobKey.actorType, *jobKey.actorID, jobKey.name,
107110
)
108-
paths[1] = fmt.Sprintf("dapr/counters/actorreminder||%s||%s||%s||%s",
109-
namespace, *jobKey.actorType, *jobKey.actorID, jobKey.name,
111+
paths[1] = fmt.Sprintf("%s||%s||%s||%s||%s",
112+
reminderCounterPath, namespace, *jobKey.actorType, *jobKey.actorID, jobKey.name,
110113
)
111114

112115
case jobKey.activity:
113116
actorType := fmt.Sprintf("dapr.internal.%s.%s.activity", namespace, *jobKey.appID)
114117
actorID := jobKey.name
115-
paths[0] = fmt.Sprintf("dapr/jobs/actorreminder||%s||%s||%s||run-activity",
116-
namespace, actorType, actorID,
118+
paths[0] = fmt.Sprintf("%s||%s||%s||%s||run-activity",
119+
reminderPath, namespace, actorType, actorID,
117120
)
118-
paths[1] = fmt.Sprintf("dapr/counters/actorreminder||%s||%s||%s||run-activity",
119-
namespace, actorType, actorID,
121+
paths[1] = fmt.Sprintf("%s||%s||%s||%s||run-activity",
122+
reminderCounterPath, namespace, actorType, actorID,
120123
)
121124

122125
case jobKey.instanceID != nil:
123126
actorType := fmt.Sprintf("dapr.internal.%s.%s.workflow", namespace, *jobKey.appID)
124127
actorID := *jobKey.instanceID
125-
paths[0] = fmt.Sprintf("dapr/jobs/actorreminder||%s||%s||%s||%s",
126-
namespace, actorType, actorID, jobKey.name,
128+
paths[0] = fmt.Sprintf("%s||%s||%s||%s||%s",
129+
reminderPath, namespace, actorType, actorID, jobKey.name,
127130
)
128-
paths[1] = fmt.Sprintf("dapr/counters/actorreminder||%s||%s||%s||%s",
129-
namespace, actorType, actorID, jobKey.name,
131+
paths[1] = fmt.Sprintf("%s||%s||%s||%s||%s",
132+
reminderCounterPath, namespace, actorType, actorID, jobKey.name,
130133
)
131134

132135
default:

pkg/scheduler/scheduler.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func parseJobKey(key string) (*jobKey, error) {
148148
}
149149

150150
switch split[0] {
151-
case "app":
151+
case FilterApp:
152152
if len(split) != 3 {
153153
return nil, fmt.Errorf("expecting job key to be in format 'app/{app ID}/{job name}', got '%s'", key)
154154
}
@@ -157,7 +157,7 @@ func parseJobKey(key string) (*jobKey, error) {
157157
name: split[2],
158158
}, nil
159159

160-
case "actor":
160+
case FilterActor:
161161
if len(split) != 4 {
162162
return nil, fmt.Errorf("expecting actor reminder key to be in format 'actor/{actor type}/{actor id}/{name}', got '%s'", key)
163163
}
@@ -167,19 +167,19 @@ func parseJobKey(key string) (*jobKey, error) {
167167
name: split[3],
168168
}, nil
169169

170-
case "workflow":
170+
case FilterWorkflow:
171171
if len(split) != 4 {
172-
return nil, fmt.Errorf("expecting worklow key to be in format 'workflow/{app ID}/{instance ID}/{name}', got '%s'", key)
172+
return nil, fmt.Errorf("expecting workflow key to be in format 'workflow/{app ID}/{instance ID}/{name}', got '%s'", key)
173173
}
174174
return &jobKey{
175175
appID: &split[1],
176176
instanceID: &split[2],
177177
name: split[3],
178178
}, nil
179179

180-
case "activity":
180+
case FilterActivity:
181181
if len(split) != 3 {
182-
return nil, fmt.Errorf("expecting activity key to be in format 'activity/{app ID}/{activity ID}/', got '%s'", key)
182+
return nil, fmt.Errorf("expecting activity key to be in format 'activity/{app ID}/{activity ID}', got '%s'", key)
183183
}
184184
return &jobKey{
185185
appID: &split[1],
@@ -188,7 +188,7 @@ func parseJobKey(key string) (*jobKey, error) {
188188
}, nil
189189

190190
default:
191-
return nil, fmt.Errorf("unsupported job type '%s', accepts 'app', 'actor', or 'workflow'", split[0])
191+
return nil, fmt.Errorf("unsupported job type '%s', accepts 'app', 'actor', 'workflow', or 'activity'", split[0])
192192
}
193193
}
194194

0 commit comments

Comments
 (0)