Skip to content

Commit a3cb65f

Browse files
authored
Merge branch 'main' into dependabot/go_modules/dot-github/workflows/dapr-bot/github.com/cloudflare/circl-1.6.1
2 parents a1bac59 + 6dd4349 commit a3cb65f

File tree

34 files changed

+912
-333
lines changed

34 files changed

+912
-333
lines changed

.github/workflows/validate_examples.yaml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ jobs:
3838
CHECKOUT_REF: ${{ github.ref }}
3939
outputs:
4040
DAPR_INSTALL_URL: ${{ env.DAPR_INSTALL_URL }}
41-
DAPR_CLI_VER: ${{ steps.outputs.outputs.DAPR_CLI_VER }}
41+
DAPR_CLI_VER: 1.16.0-rc.1
4242
DAPR_CLI_REF: ${{ steps.outputs.outputs.DAPR_CLI_REF }}
43-
DAPR_RUNTIME_VER: ${{ steps.outputs.outputs.DAPR_RUNTIME_VER }}
43+
DAPR_RUNTIME_VER: 1.16.0-rc.3
4444
CHECKOUT_REPO: ${{ steps.outputs.outputs.CHECKOUT_REPO }}
4545
CHECKOUT_REF: ${{ steps.outputs.outputs.CHECKOUT_REF }}
4646
DAPR_REF: ${{ steps.outputs.outputs.DAPR_REF }}
@@ -166,7 +166,7 @@ jobs:
166166
"configuration",
167167
"conversation",
168168
"crypto",
169-
"dist-scheduler",
169+
"jobs",
170170
"grpc-service",
171171
"hello-world",
172172
"pubsub",
@@ -175,6 +175,7 @@ jobs:
175175
"socket",
176176
"workflow",
177177
"workflow-parallel",
178+
"workflow-taskexecutionid"
178179
]
179180
steps:
180181
- name: Check out code onto GOPATH

client/client.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ func NewClientWithAddressContext(ctx context.Context, address string) (client Cl
355355
return nil, fmt.Errorf("error parsing address '%s': %w", address, err)
356356
}
357357

358-
at := &authToken{}
358+
at := newAuthToken()
359359

360360
opts := []grpc.DialOption{
361361
grpc.WithUserAgent(userAgent()),
@@ -404,7 +404,7 @@ func NewClientWithSocket(socket string) (client Client, err error) {
404404
if socket == "" {
405405
return nil, errors.New("nil socket")
406406
}
407-
at := &authToken{}
407+
at := newAuthToken()
408408
logger.Printf("dapr client initializing for: %s", socket)
409409
addr := "unix://" + socket
410410
conn, err := grpc.Dial( //nolint:staticcheck
@@ -421,11 +421,6 @@ func NewClientWithSocket(socket string) (client Client, err error) {
421421
}
422422

423423
func newClientWithConnection(conn *grpc.ClientConn, authToken *authToken) Client {
424-
apiToken := os.Getenv(apiTokenEnvVarName)
425-
if apiToken != "" {
426-
logger.Println("client uses API token")
427-
authToken.set(apiToken)
428-
}
429424
return &GRPCClient{
430425
connection: conn,
431426
protoClient: pb.NewDaprClient(conn),
@@ -435,14 +430,26 @@ func newClientWithConnection(conn *grpc.ClientConn, authToken *authToken) Client
435430

436431
// NewClientWithConnection instantiates Dapr client using specific connection.
437432
func NewClientWithConnection(conn *grpc.ClientConn) Client {
438-
return newClientWithConnection(conn, &authToken{})
433+
return newClientWithConnection(conn, newAuthToken())
439434
}
440435

436+
// NOTE: authToken must be created using newAuthToken()
437+
// it is crucial to correctly initialize the dapr client with the API token from the environment variable
441438
type authToken struct {
442439
mu sync.RWMutex
443440
authToken string
444441
}
445442

443+
func newAuthToken() *authToken {
444+
apiToken := os.Getenv(apiTokenEnvVarName)
445+
if apiToken != "" {
446+
logger.Println("API Token loaded from the environment variable")
447+
}
448+
return &authToken{
449+
authToken: apiToken,
450+
}
451+
}
452+
446453
func (a *authToken) get() string {
447454
a.mu.RLock()
448455
defer a.mu.RUnlock()

client/client_test.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"google.golang.org/grpc/credentials/insecure"
3333
"google.golang.org/grpc/test/bufconn"
3434
"google.golang.org/protobuf/types/known/anypb"
35+
"google.golang.org/protobuf/types/known/durationpb"
3536
"google.golang.org/protobuf/types/known/emptypb"
3637

3738
commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1"
@@ -563,10 +564,11 @@ func (s *testDaprServer) ScheduleJobAlpha1(ctx context.Context, in *pb.ScheduleJ
563564

564565
func (s *testDaprServer) GetJobAlpha1(ctx context.Context, in *pb.GetJobRequest) (*pb.GetJobResponse, error) {
565566
var (
566-
schedule = "@every 10s"
567-
dueTime = "10s"
568-
repeats uint32 = 4
569-
ttl = "10s"
567+
schedule = "@every 10s"
568+
dueTime = "10s"
569+
repeats uint32 = 4
570+
ttl = "10s"
571+
maxRetries uint32 = 4
570572
)
571573
return &pb.GetJobResponse{
572574
Job: &pb.Job{
@@ -576,6 +578,14 @@ func (s *testDaprServer) GetJobAlpha1(ctx context.Context, in *pb.GetJobRequest)
576578
DueTime: &dueTime,
577579
Ttl: &ttl,
578580
Data: nil,
581+
FailurePolicy: &commonv1pb.JobFailurePolicy{
582+
Policy: &commonv1pb.JobFailurePolicy_Constant{
583+
Constant: &commonv1pb.JobFailurePolicyConstant{
584+
MaxRetries: &maxRetries,
585+
Interval: &durationpb.Duration{Seconds: 10},
586+
},
587+
},
588+
},
579589
},
580590
}, nil
581591
}

client/conversation.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,10 @@ func WithTemperature(temp float64) conversationRequestOption {
101101

102102
// ConverseAlpha1 can invoke an LLM given a request created by the NewConversationRequest function.
103103
func (c *GRPCClient) ConverseAlpha1(ctx context.Context, req conversationRequest, options ...conversationRequestOption) (*ConversationResponse, error) {
104+
//nolint:staticcheck
104105
cinputs := make([]*runtimev1pb.ConversationInput, len(req.inputs))
105106
for i, in := range req.inputs {
107+
//nolint:staticcheck
106108
cinputs[i] = &runtimev1pb.ConversationInput{
107109
Content: in.Content,
108110
Role: in.Role,
@@ -115,7 +117,7 @@ func (c *GRPCClient) ConverseAlpha1(ctx context.Context, req conversationRequest
115117
opt(&req)
116118
}
117119
}
118-
120+
//nolint:staticcheck
119121
request := runtimev1pb.ConversationRequest{
120122
Name: req.name,
121123
ContextID: req.ContextID,

client/jobs.go

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
/*
2+
Copyright 2021 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package client
15+
16+
import (
17+
"context"
18+
"errors"
19+
"time"
20+
21+
"google.golang.org/protobuf/types/known/anypb"
22+
"google.golang.org/protobuf/types/known/durationpb"
23+
24+
commonpb "github.com/dapr/dapr/pkg/proto/common/v1"
25+
runtimepb "github.com/dapr/dapr/pkg/proto/runtime/v1"
26+
"github.com/dapr/kit/ptr"
27+
)
28+
29+
type FailurePolicy interface {
30+
GetPBFailurePolicy() *commonpb.JobFailurePolicy
31+
}
32+
33+
type JobFailurePolicyConstant struct {
34+
MaxRetries *uint32
35+
Interval *time.Duration
36+
}
37+
38+
func (f *JobFailurePolicyConstant) GetPBFailurePolicy() *commonpb.JobFailurePolicy {
39+
constantfp := &commonpb.JobFailurePolicyConstant{}
40+
if f.MaxRetries != nil {
41+
constantfp.MaxRetries = f.MaxRetries
42+
}
43+
if f.Interval != nil {
44+
constantfp.Interval = durationpb.New(*f.Interval)
45+
}
46+
return &commonpb.JobFailurePolicy{
47+
Policy: &commonpb.JobFailurePolicy_Constant{
48+
Constant: constantfp,
49+
},
50+
}
51+
}
52+
53+
type JobFailurePolicyDrop struct{}
54+
55+
func (f *JobFailurePolicyDrop) GetPBFailurePolicy() *commonpb.JobFailurePolicy {
56+
return &commonpb.JobFailurePolicy{
57+
Policy: &commonpb.JobFailurePolicy_Drop{
58+
Drop: &commonpb.JobFailurePolicyDrop{},
59+
},
60+
}
61+
}
62+
63+
type Job struct {
64+
Name string
65+
Schedule *string
66+
Repeats *uint32
67+
DueTime *string
68+
TTL *string
69+
Data *anypb.Any
70+
FailurePolicy FailurePolicy
71+
Overwrite bool
72+
}
73+
74+
type JobOption func(*Job)
75+
76+
func NewJob(name string, opts ...JobOption) *Job {
77+
job := &Job{
78+
Name: name,
79+
}
80+
for _, opt := range opts {
81+
opt(job)
82+
}
83+
return job
84+
}
85+
86+
func WithJobSchedule(schedule string) JobOption {
87+
return func(job *Job) {
88+
job.Schedule = &schedule
89+
}
90+
}
91+
92+
func WithJobRepeats(repeats uint32) JobOption {
93+
return func(job *Job) {
94+
job.Repeats = &repeats
95+
}
96+
}
97+
98+
func WithJobDueTime(dueTime string) JobOption {
99+
return func(job *Job) {
100+
job.DueTime = &dueTime
101+
}
102+
}
103+
104+
func WithJobTTL(ttl string) JobOption {
105+
return func(job *Job) {
106+
job.TTL = &ttl
107+
}
108+
}
109+
110+
func WithJobData(data *anypb.Any) JobOption {
111+
return func(job *Job) {
112+
job.Data = data
113+
}
114+
}
115+
116+
func WithJobConstantFailurePolicy() JobOption {
117+
return func(job *Job) {
118+
job.FailurePolicy = &JobFailurePolicyConstant{}
119+
}
120+
}
121+
122+
func WithJobConstantFailurePolicyMaxRetries(maxRetries uint32) JobOption {
123+
return func(job *Job) {
124+
if job.FailurePolicy == nil {
125+
job.FailurePolicy = &JobFailurePolicyConstant{}
126+
}
127+
if constantPolicy, ok := job.FailurePolicy.(*JobFailurePolicyConstant); ok {
128+
constantPolicy.MaxRetries = &maxRetries
129+
} else {
130+
job.FailurePolicy = &JobFailurePolicyConstant{
131+
MaxRetries: &maxRetries,
132+
}
133+
}
134+
}
135+
}
136+
137+
func WithJobConstantFailurePolicyInterval(interval time.Duration) JobOption {
138+
return func(job *Job) {
139+
if job.FailurePolicy == nil {
140+
job.FailurePolicy = &JobFailurePolicyConstant{}
141+
}
142+
if constantPolicy, ok := job.FailurePolicy.(*JobFailurePolicyConstant); ok {
143+
constantPolicy.Interval = &interval
144+
} else {
145+
job.FailurePolicy = &JobFailurePolicyConstant{
146+
Interval: &interval,
147+
}
148+
}
149+
}
150+
}
151+
152+
func WithJobDropFailurePolicy() JobOption {
153+
return func(job *Job) {
154+
job.FailurePolicy = &JobFailurePolicyDrop{}
155+
}
156+
}
157+
158+
// ScheduleJobAlpha1 raises and schedules a job.
159+
func (c *GRPCClient) ScheduleJobAlpha1(ctx context.Context, job *Job) error {
160+
if job.Name == "" {
161+
return errors.New("job name is required")
162+
}
163+
if job.Data == nil {
164+
return errors.New("job data is required")
165+
}
166+
167+
jobRequest := &runtimepb.Job{
168+
Name: job.Name,
169+
Data: job.Data,
170+
Schedule: job.Schedule,
171+
Repeats: job.Repeats,
172+
DueTime: job.DueTime,
173+
Ttl: job.TTL,
174+
}
175+
176+
if job.FailurePolicy != nil {
177+
jobRequest.FailurePolicy = job.FailurePolicy.GetPBFailurePolicy()
178+
}
179+
_, err := c.protoClient.ScheduleJobAlpha1(ctx, &runtimepb.ScheduleJobRequest{
180+
Job: jobRequest,
181+
Overwrite: job.Overwrite,
182+
})
183+
return err
184+
}
185+
186+
// GetJobAlpha1 retrieves a scheduled job.
187+
func (c *GRPCClient) GetJobAlpha1(ctx context.Context, name string) (*Job, error) {
188+
if name == "" {
189+
return nil, errors.New("job name is required")
190+
}
191+
192+
resp, err := c.protoClient.GetJobAlpha1(ctx, &runtimepb.GetJobRequest{
193+
Name: name,
194+
})
195+
if err != nil {
196+
return nil, err
197+
}
198+
199+
var failurePolicy FailurePolicy
200+
switch policy := resp.GetJob().GetFailurePolicy().GetPolicy().(type) {
201+
case *commonpb.JobFailurePolicy_Constant:
202+
interval := time.Duration(policy.Constant.GetInterval().GetSeconds()) * time.Second
203+
failurePolicy = &JobFailurePolicyConstant{
204+
MaxRetries: ptr.Of(policy.Constant.GetMaxRetries()),
205+
Interval: &interval,
206+
}
207+
case *commonpb.JobFailurePolicy_Drop:
208+
failurePolicy = &JobFailurePolicyDrop{}
209+
}
210+
211+
return &Job{
212+
Name: resp.GetJob().GetName(),
213+
Schedule: ptr.Of(resp.GetJob().GetSchedule()),
214+
Repeats: ptr.Of(resp.GetJob().GetRepeats()),
215+
DueTime: ptr.Of(resp.GetJob().GetDueTime()),
216+
TTL: ptr.Of(resp.GetJob().GetTtl()),
217+
Data: resp.GetJob().GetData(),
218+
FailurePolicy: failurePolicy,
219+
}, nil
220+
}
221+
222+
// DeleteJobAlpha1 deletes a scheduled job.
223+
func (c *GRPCClient) DeleteJobAlpha1(ctx context.Context, name string) error {
224+
if name == "" {
225+
return errors.New("job name is required")
226+
}
227+
228+
_, err := c.protoClient.DeleteJobAlpha1(ctx, &runtimepb.DeleteJobRequest{
229+
Name: name,
230+
})
231+
return err
232+
}

0 commit comments

Comments
 (0)