Skip to content

Commit 96346c9

Browse files
authored
Merge pull request dapr#8710 from JoshVanL/test-integration-stream-order
Integration: clean up stream ordering test to prevent panic
2 parents 5a400b1 + d45c946 commit 96346c9

File tree

1 file changed

+35
-82
lines changed
  • tests/integration/suite/daprd/subscriptions/stream

1 file changed

+35
-82
lines changed

tests/integration/suite/daprd/subscriptions/stream/ordering.go

Lines changed: 35 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@ package stream
1515

1616
import (
1717
"context"
18-
"fmt"
18+
serrors "errors"
1919
"strconv"
20-
"sync/atomic"
2120
"testing"
2221
"time"
2322

@@ -88,90 +87,44 @@ func (o *ordering) Run(t *testing.T, ctx context.Context) {
8887
assert.Len(c, subsInMeta, 1)
8988
}, time.Second*10, time.Millisecond*10)
9089

91-
messageCount := 100000
92-
sendCount := atomic.Int32{}
93-
sendCount.Store(0)
94-
receivedCount := atomic.Int32{}
95-
receivedCount.Store(0)
96-
97-
sentMessages := make(chan int, messageCount)
98-
receivedMessages := make(chan int, messageCount)
99-
100-
go func(c *testing.T) {
101-
for msgID := range messageCount {
102-
select {
103-
case <-ctx.Done():
104-
c.Errorf("Context canceled or deadline exceeded, stop publishing, %+v", ctx.Err())
105-
return
106-
default:
107-
_, publishErr := client.PublishEvent(ctx, &rtv1.PublishEventRequest{
108-
PubsubName: "mypub",
109-
Topic: "ordered",
110-
Data: []byte(strconv.Itoa(msgID)),
111-
DataContentType: "text/plain",
112-
})
113-
114-
if publishErr != nil {
115-
if ctx.Err() == nil {
116-
// This is a real error, not related to context deadline or cancellation
117-
c.Errorf("Failed to publish message: %v", publishErr)
118-
}
119-
// Either way, stop publishing more messages
120-
return
121-
}
122-
123-
sendCount.Add(1)
124-
sentMessages <- msgID
125-
}
126-
}
127-
}(t)
128-
129-
errCh := make(chan error)
130-
131-
go func(c *testing.T) {
132-
for range messageCount {
133-
event, recvErr := stream.Recv()
134-
if recvErr != nil {
135-
errCh <- fmt.Errorf("failed to receive message: %w", recvErr)
136-
return
137-
}
138-
139-
data := string(event.GetEventMessage().GetData())
140-
msgID, err := strconv.Atoi(data)
141-
if err != nil {
142-
errCh <- fmt.Errorf("failed to parse message ID from data: %w", err)
143-
return
144-
}
145-
146-
sendErr := stream.Send(&rtv1.SubscribeTopicEventsRequestAlpha1{
147-
SubscribeTopicEventsRequestType: &rtv1.SubscribeTopicEventsRequestAlpha1_EventProcessed{
148-
EventProcessed: &rtv1.SubscribeTopicEventsRequestProcessedAlpha1{
149-
Id: event.GetEventMessage().GetId(),
150-
Status: &rtv1.TopicEventResponse{Status: rtv1.TopicEventResponse_SUCCESS},
151-
},
152-
},
90+
n := 10000
91+
errs := make([]error, n)
92+
done := make(chan struct{})
93+
go func() {
94+
defer close(done)
95+
for i := range n {
96+
_, err := client.PublishEvent(ctx, &rtv1.PublishEventRequest{
97+
PubsubName: "mypub",
98+
Topic: "ordered",
99+
Data: []byte(strconv.Itoa(i)),
100+
DataContentType: "text/plain",
153101
})
154-
if sendErr != nil {
155-
errCh <- fmt.Errorf("failed to send message: %w", sendErr)
156-
return
157-
}
158-
receivedCount.Add(1)
159-
receivedMessages <- msgID
102+
errs[i] = err
160103
}
161-
errCh <- nil
162-
}(t)
163-
164-
assert.EventuallyWithT(t, func(c *assert.CollectT) {
165-
assert.EqualValues(c, messageCount, sendCount.Load())
166-
assert.EqualValues(c, messageCount, receivedCount.Load())
167-
}, time.Second*60, time.Millisecond*100)
168-
169-
for i := range messageCount {
170-
assert.Equal(t, i, <-sentMessages)
171-
assert.Equal(t, i, <-receivedMessages)
104+
}()
105+
106+
for i := range n {
107+
event, err := stream.Recv()
108+
require.NoError(t, err)
109+
assert.Equal(t, "ordered", event.GetEventMessage().GetTopic())
110+
assert.Equal(t, []byte(strconv.Itoa(i)), event.GetEventMessage().GetData())
111+
112+
require.NoError(t, stream.Send(&rtv1.SubscribeTopicEventsRequestAlpha1{
113+
SubscribeTopicEventsRequestType: &rtv1.SubscribeTopicEventsRequestAlpha1_EventProcessed{
114+
EventProcessed: &rtv1.SubscribeTopicEventsRequestProcessedAlpha1{
115+
Id: event.GetEventMessage().GetId(),
116+
Status: &rtv1.TopicEventResponse{Status: rtv1.TopicEventResponse_SUCCESS},
117+
},
118+
},
119+
}))
172120
}
173121

174-
require.NoError(t, <-errCh)
122+
select {
123+
case <-done:
124+
case <-time.After(time.Second * 10):
125+
require.Fail(t, "timeout waiting for publish to finish")
126+
}
175127

128+
require.NoError(t, serrors.Join(errs...))
176129
require.NoError(t, stream.CloseSend())
177130
}

0 commit comments

Comments
 (0)