Skip to content

Commit 9ad71c3

Browse files
authored
Test: Improve flakiness of integration tests (dapr#8661)
* dapr/kit: Update to main Updates dapr/kit to latest main to update runner manager closer taking a logger. Signed-off-by: joshvanl <[email protected]> * Move grpc server shutdown to inside the `Run` close Signed-off-by: joshvanl <[email protected]> * Remove pipe close on logline to ensure full process pipe is flushed Signed-off-by: joshvanl <[email protected]> * exec: remove cmd wait and add pipe close Signed-off-by: joshvanl <[email protected]> * Ensure tee writer is closed Signed-off-by: joshvanl <[email protected]> * Adds Scheduler client wrapper to retry client connection errors Signed-off-by: joshvanl <[email protected]> * Decrease actor num for move to 100 Signed-off-by: joshvanl <[email protected]> * Fix subscription flushing of streaming subscription Signed-off-by: joshvanl <[email protected]> * Increase time to connect to new scheduler Signed-off-by: joshvanl <[email protected]> --------- Signed-off-by: joshvanl <[email protected]>
1 parent 2f1781a commit 9ad71c3

File tree

20 files changed

+465
-328
lines changed

20 files changed

+465
-328
lines changed

pkg/actors/actors.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ import (
4949
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
5050
"github.com/dapr/dapr/pkg/resiliency"
5151
"github.com/dapr/dapr/pkg/runtime/compstore"
52-
"github.com/dapr/dapr/pkg/runtime/scheduler/clients"
52+
schedclient "github.com/dapr/dapr/pkg/runtime/scheduler/client"
5353
"github.com/dapr/dapr/pkg/security"
5454
"github.com/dapr/kit/concurrency"
5555
"github.com/dapr/kit/events/queue"
@@ -76,10 +76,10 @@ type Options struct {
7676
}
7777

7878
type InitOptions struct {
79-
StateStoreName string
80-
Hostname string
81-
GRPC *manager.Manager
82-
SchedulerClients clients.Clients
79+
StateStoreName string
80+
Hostname string
81+
GRPC *manager.Manager
82+
SchedulerClient schedclient.Client
8383
}
8484

8585
// Interface is the main runtime for the actors subsystem.
@@ -613,7 +613,7 @@ func (a *actors) buildStateStore(opts InitOptions, apiLevel *apilevel.APILevel)
613613
a.reminderStore = scheduler.New(scheduler.Options{
614614
Namespace: a.namespace,
615615
AppID: a.appID,
616-
Clients: opts.SchedulerClients,
616+
Client: opts.SchedulerClient,
617617
StateReminder: a.stateReminders,
618618
Table: a.table,
619619
Healthz: a.healthz,

pkg/actors/internal/reminders/storage/scheduler/scheduler.go

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
apierrors "github.com/dapr/dapr/pkg/api/errors"
3030
"github.com/dapr/dapr/pkg/healthz"
3131
schedulerv1pb "github.com/dapr/dapr/pkg/proto/scheduler/v1"
32-
"github.com/dapr/dapr/pkg/runtime/scheduler/clients"
32+
"github.com/dapr/dapr/pkg/runtime/scheduler/client"
3333
"github.com/dapr/kit/logger"
3434
"github.com/dapr/kit/ptr"
3535
kittime "github.com/dapr/kit/time"
@@ -40,7 +40,7 @@ var log = logger.NewLogger("dapr.runtime.actor.reminders.scheduler")
4040
type Options struct {
4141
Namespace string
4242
AppID string
43-
Clients clients.Clients
43+
Client client.Client
4444
StateReminder storage.Interface
4545
Table table.Interface
4646
Healthz healthz.Healthz
@@ -50,7 +50,7 @@ type Options struct {
5050
type scheduler struct {
5151
namespace string
5252
appID string
53-
clients clients.Clients
53+
client client.Client
5454
table table.Interface
5555
stateReminder storage.Interface
5656
htarget healthz.Target
@@ -59,7 +59,7 @@ type scheduler struct {
5959
func New(opts Options) storage.Interface {
6060
log.Info("Using Scheduler service for reminders.")
6161
return &scheduler{
62-
clients: opts.Clients,
62+
client: opts.Client,
6363
namespace: opts.Namespace,
6464
appID: opts.AppID,
6565
stateReminder: opts.StateReminder,
@@ -135,16 +135,13 @@ func (s *scheduler) Create(ctx context.Context, reminder *api.CreateReminderRequ
135135
},
136136
}
137137

138-
client, err := s.clients.Next(ctx)
139-
if err != nil {
140-
return fmt.Errorf("error getting scheduler client: %w", err)
141-
}
142-
143-
_, err = client.ScheduleJob(ctx, internalScheduleJobReq)
138+
_, err = s.client.ScheduleJob(ctx, internalScheduleJobReq)
144139
if err != nil {
145140
log.Errorf("Error scheduling reminder job %s due to: %s", reminder.Name, err)
141+
return err
146142
}
147-
return err
143+
144+
return nil
148145
}
149146

150147
func scheduleFromPeriod(period string) (*string, *uint32, error) {
@@ -192,12 +189,7 @@ func (s *scheduler) Get(ctx context.Context, req *api.GetReminderRequest) (*api.
192189
},
193190
}
194191

195-
client, err := s.clients.Next(ctx)
196-
if err != nil {
197-
return nil, fmt.Errorf("error getting scheduler client: %w", err)
198-
}
199-
200-
job, err := client.GetJob(ctx, internalGetJobReq)
192+
job, err := s.client.GetJob(ctx, internalGetJobReq)
201193
if err != nil {
202194
errMetadata := map[string]string{
203195
"appID": s.appID,
@@ -241,25 +233,17 @@ func (s *scheduler) Delete(ctx context.Context, req *api.DeleteReminderRequest)
241233
},
242234
}
243235

244-
client, err := s.clients.Next(ctx)
245-
if err != nil {
246-
return fmt.Errorf("error getting scheduler client: %w", err)
247-
}
248-
249-
_, err = client.DeleteJob(ctx, internalDeleteJobReq)
236+
_, err := s.client.DeleteJob(ctx, internalDeleteJobReq)
250237
if err != nil {
251238
log.Errorf("Error deleting reminder job %s due to: %s", req.Name, err)
239+
return err
252240
}
253241

254-
return err
242+
return nil
255243
}
256244

257245
func (s *scheduler) List(ctx context.Context, req *api.ListRemindersRequest) ([]*api.Reminder, error) {
258-
client, err := s.clients.Next(ctx)
259-
if err != nil {
260-
return nil, err
261-
}
262-
resp, err := client.ListJobs(ctx, &schedulerv1pb.ListJobsRequest{
246+
resp, err := s.client.ListJobs(ctx, &schedulerv1pb.ListJobsRequest{
263247
Metadata: &schedulerv1pb.JobMetadata{
264248
AppId: s.appID,
265249
Namespace: s.namespace,

pkg/api/universal/jobs.go

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,7 @@ func (a *Universal) scheduleJob(ctx context.Context, job *runtimev1pb.Job) (*run
9696
schedCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
9797
defer cancel()
9898

99-
client, err := a.scheduler.Next(ctx)
100-
if err != nil {
101-
a.logger.Errorf("Error getting scheduler client: %s", err)
102-
return &runtimev1pb.ScheduleJobResponse{}, apierrors.SchedulerScheduleJob(errMetadata, err)
103-
}
104-
105-
_, err = client.ScheduleJob(schedCtx, internalScheduleJobReq, grpc.WaitForReady(true))
99+
_, err := a.scheduler.ScheduleJob(schedCtx, internalScheduleJobReq, grpc.WaitForReady(true))
106100
if err != nil {
107101
a.logger.Errorf("Error scheduling job %s due to: %s", job.GetName(), err)
108102
return &runtimev1pb.ScheduleJobResponse{}, apierrors.SchedulerScheduleJob(errMetadata, err)
@@ -138,13 +132,7 @@ func (a *Universal) DeleteJobAlpha1(ctx context.Context, inReq *runtimev1pb.Dele
138132
schedCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
139133
defer cancel()
140134

141-
client, err := a.scheduler.Next(ctx)
142-
if err != nil {
143-
a.logger.Errorf("Error getting scheduler client: %s", err)
144-
return &runtimev1pb.DeleteJobResponse{}, apierrors.SchedulerDeleteJob(errMetadata, err)
145-
}
146-
147-
_, err = client.DeleteJob(schedCtx, internalDeleteJobReq, grpc.WaitForReady(true))
135+
_, err := a.scheduler.DeleteJob(schedCtx, internalDeleteJobReq, grpc.WaitForReady(true))
148136
if err != nil {
149137
a.logger.Errorf("Error deleting job: %s due to: %s", inReq.GetName(), err)
150138
return &runtimev1pb.DeleteJobResponse{}, apierrors.SchedulerDeleteJob(errMetadata, err)
@@ -180,13 +168,7 @@ func (a *Universal) GetJobAlpha1(ctx context.Context, inReq *runtimev1pb.GetJobR
180168
schedCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
181169
defer cancel()
182170

183-
client, err := a.scheduler.Next(ctx)
184-
if err != nil {
185-
a.logger.Errorf("Error getting scheduler client: %s", err)
186-
return nil, apierrors.SchedulerGetJob(errMetadata, err)
187-
}
188-
189-
resp, err := client.GetJob(schedCtx, internalGetJobReq, grpc.WaitForReady(true))
171+
resp, err := a.scheduler.GetJob(schedCtx, internalGetJobReq, grpc.WaitForReady(true))
190172
if err != nil {
191173
a.logger.Errorf("Error getting job %s due to: %s", inReq.GetName(), err)
192174
return nil, apierrors.SchedulerGetJob(errMetadata, err)

pkg/api/universal/universal.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
"github.com/dapr/dapr/pkg/config"
2828
"github.com/dapr/dapr/pkg/resiliency"
2929
"github.com/dapr/dapr/pkg/runtime/compstore"
30-
"github.com/dapr/dapr/pkg/runtime/scheduler/clients"
30+
"github.com/dapr/dapr/pkg/runtime/scheduler/client"
3131
"github.com/dapr/dapr/pkg/runtime/wfengine"
3232
"github.com/dapr/kit/logger"
3333
)
@@ -43,7 +43,7 @@ type Options struct {
4343
ExtendedMetadata map[string]string
4444
AppConnectionConfig config.AppConnectionConfig
4545
GlobalConfig *config.Configuration
46-
Scheduler clients.Clients
46+
Scheduler client.Client
4747
Actors actors.Interface
4848
WorkflowEngine wfengine.Interface
4949
}
@@ -61,7 +61,7 @@ type Universal struct {
6161
appConnectionConfig config.AppConnectionConfig
6262
globalConfig *config.Configuration
6363
workflowEngine wfengine.Interface
64-
scheduler clients.Clients
64+
scheduler client.Client
6565

6666
extendedMetadataLock sync.RWMutex
6767
actors actors.Interface

pkg/runtime/pubsub/streamer/streamer.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,18 @@ type streamer struct {
5050
tracingSpec *config.TracingSpec
5151
subscribers Subscribers
5252

53-
lock sync.RWMutex
53+
lock sync.RWMutex
54+
closeCh <-chan struct{}
5455
}
5556

5657
var log = logger.NewLogger("dapr.runtime.pubsub.streamer")
5758

58-
func New(opts Options) rtpubsub.AdapterStreamer {
59+
// TODO: @joshvanl: remove context after refactor.
60+
func New(ctx context.Context, opts Options) rtpubsub.AdapterStreamer {
5961
return &streamer{
6062
tracingSpec: opts.TracingSpec,
6163
subscribers: make(Subscribers),
64+
closeCh: ctx.Done(),
6265
}
6366
}
6467

@@ -96,7 +99,22 @@ func (s *streamer) Subscribe(stream rtv1pb.Dapr_SubscribeTopicEventsAlpha1Server
9699
s.lock.Unlock()
97100
}()
98101

102+
// TODO: @joshvanl: remove after pubsub refactor.
99103
errCh := make(chan error, 2)
104+
go func() {
105+
select {
106+
case <-s.closeCh:
107+
connection.lock.Lock()
108+
connection.closed.Store(true)
109+
if len(connection.publishResponses) == 0 {
110+
errCh <- errors.New("stream closed")
111+
}
112+
connection.lock.Unlock()
113+
case <-connection.closeCh:
114+
case <-stream.Context().Done():
115+
}
116+
}()
117+
100118
go func() {
101119
var err error
102120
select {
@@ -156,7 +174,7 @@ func (s *streamer) Publish(ctx context.Context, msg *rtpubsub.SubscribedMessage)
156174
}
157175

158176
if connection.closed.Load() {
159-
return errors.New("connection is closed")
177+
return errors.New("subscription is closed")
160178
}
161179

162180
envelope, span, err := rtpubsub.GRPCEnvelopeFromSubscriptionMessage(ctx, msg, log, s.tracingSpec)

pkg/runtime/runtime.go

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,9 @@ type DaprRuntime struct {
127127
clock clock.Clock
128128
reloader *hotreload.Reloader
129129

130+
grpcAPIServer grpc.Server
131+
grpcInternalServer grpc.Server
132+
130133
// Used for testing.
131134
initComplete chan struct{}
132135

@@ -211,7 +214,7 @@ func newDaprRuntime(ctx context.Context,
211214
Resiliency: resiliencyProvider,
212215
GetPubSubFn: compStore.GetPubSub,
213216
})
214-
pubsubAdapterStreamer := streamer.New(streamer.Options{
217+
pubsubAdapterStreamer := streamer.New(ctx, streamer.Options{
215218
TracingSpec: globalConfig.Spec.TracingSpec,
216219
})
217220
outbox := pubsub.NewOutbox(pubsub.OptionsOutbox{
@@ -376,6 +379,20 @@ func newDaprRuntime(ctx context.Context,
376379

377380
return nil
378381
},
382+
func(ctx context.Context) error {
383+
<-ctx.Done()
384+
if server := rt.grpcInternalServer; server != nil {
385+
return server.Close()
386+
}
387+
return nil
388+
},
389+
func(ctx context.Context) error {
390+
<-ctx.Done()
391+
if server := rt.grpcAPIServer; server != nil {
392+
return server.Close()
393+
}
394+
return nil
395+
},
379396
)
380397

381398
if err := rt.runnerCloser.AddCloser(
@@ -620,7 +637,7 @@ func (a *DaprRuntime) initRuntime(ctx context.Context) error {
620637
ShutdownFn: a.ShutdownWithWait,
621638
AppConnectionConfig: a.runtimeConfig.appConnectionConfig,
622639
GlobalConfig: a.globalConfig,
623-
Scheduler: a.jobsManager,
640+
Scheduler: a.jobsManager.Client(),
624641
Actors: a.actors,
625642
WorkflowEngine: a.wfengine,
626643
})
@@ -914,7 +931,7 @@ func (a *DaprRuntime) startHTTPServer() error {
914931
func (a *DaprRuntime) startGRPCInternalServer(api grpc.API) error {
915932
// Since GRPCInteralServer is encrypted & authenticated, it is safe to listen on *
916933
serverConf := a.getNewServerConfig([]string{a.runtimeConfig.internalGRPCListenAddress}, a.runtimeConfig.internalGRPCPort)
917-
server := grpc.NewInternalServer(grpc.OptionsInternal{
934+
a.grpcInternalServer = grpc.NewInternalServer(grpc.OptionsInternal{
918935
API: api,
919936
Config: serverConf,
920937
TracingSpec: a.globalConfig.GetTracingSpec(),
@@ -923,10 +940,8 @@ func (a *DaprRuntime) startGRPCInternalServer(api grpc.API) error {
923940
Proxy: a.proxy,
924941
Healthz: a.runtimeConfig.healthz,
925942
})
926-
if err := server.StartNonBlocking(); err != nil {
927-
return err
928-
}
929-
if err := a.runnerCloser.AddCloser(server); err != nil {
943+
944+
if err := a.grpcInternalServer.StartNonBlocking(); err != nil {
930945
return err
931946
}
932947

@@ -935,7 +950,7 @@ func (a *DaprRuntime) startGRPCInternalServer(api grpc.API) error {
935950

936951
func (a *DaprRuntime) startGRPCAPIServer(api grpc.API, port int) error {
937952
serverConf := a.getNewServerConfig(a.runtimeConfig.apiListenAddresses, port)
938-
server := grpc.NewAPIServer(grpc.Options{
953+
a.grpcAPIServer = grpc.NewAPIServer(grpc.Options{
939954
API: api,
940955
Config: serverConf,
941956
TracingSpec: a.globalConfig.GetTracingSpec(),
@@ -945,10 +960,8 @@ func (a *DaprRuntime) startGRPCAPIServer(api grpc.API, port int) error {
945960
WorkflowEngine: a.wfengine,
946961
Healthz: a.runtimeConfig.healthz,
947962
})
948-
if err := server.StartNonBlocking(); err != nil {
949-
return err
950-
}
951-
if err := a.runnerCloser.AddCloser(server); err != nil {
963+
964+
if err := a.grpcAPIServer.StartNonBlocking(); err != nil {
952965
return err
953966
}
954967

@@ -1067,10 +1080,10 @@ func (a *DaprRuntime) initActors(ctx context.Context) error {
10671080
}
10681081

10691082
if err := a.actors.Init(actors.InitOptions{
1070-
Hostname: hostAddress,
1071-
StateStoreName: actorStateStoreName,
1072-
GRPC: a.grpc,
1073-
SchedulerClients: a.jobsManager,
1083+
Hostname: hostAddress,
1084+
StateStoreName: actorStateStoreName,
1085+
GRPC: a.grpc,
1086+
SchedulerClient: a.jobsManager.Client(),
10741087
}); err != nil {
10751088
return err
10761089
}

pkg/runtime/scheduler/clients/clients.go renamed to pkg/runtime/scheduler/client/client.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,13 @@ See the License for the specific language governing permissions and
1111
limitations under the License.
1212
*/
1313

14-
package clients
14+
package client
1515

1616
import (
17-
"context"
18-
1917
schedulerv1pb "github.com/dapr/dapr/pkg/proto/scheduler/v1"
2018
)
2119

22-
type Clients interface {
23-
Next(context.Context) (schedulerv1pb.SchedulerClient, error)
24-
All(context.Context) ([]schedulerv1pb.SchedulerClient, error)
20+
type Client interface {
21+
schedulerv1pb.SchedulerClient
2522
Addresses() []string
2623
}

0 commit comments

Comments
 (0)