@@ -15,13 +15,13 @@ package stream
15
15
16
16
import (
17
17
"context"
18
+ "github.com/stretchr/testify/assert"
19
+ "github.com/stretchr/testify/require"
20
+ "sync"
18
21
"sync/atomic"
19
22
"testing"
20
23
"time"
21
24
22
- "github.com/stretchr/testify/assert"
23
- "github.com/stretchr/testify/require"
24
-
25
25
rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1"
26
26
"github.com/dapr/dapr/tests/integration/framework"
27
27
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
@@ -61,95 +61,95 @@ func (m *multi) Run(t *testing.T, ctx context.Context) {
61
61
62
62
client := m .daprd .GRPCClient (t , ctx )
63
63
64
- // stream1, err := client.SubscribeTopicEventsAlpha1(ctx)
65
- // require.NoError(t, err)
66
- // require.NoError(t, stream1.Send(&rtv1.SubscribeTopicEventsRequestAlpha1{
67
- // SubscribeTopicEventsRequestType: &rtv1.SubscribeTopicEventsRequestAlpha1_InitialRequest{
68
- // InitialRequest: &rtv1.SubscribeTopicEventsRequestInitialAlpha1{
69
- // PubsubName: "mypub", Topic: "a",
70
- // },
71
- // },
72
- // }))
73
- // resp, err := stream1.Recv()
74
- // require.NoError(t, err)
75
- // switch resp.GetSubscribeTopicEventsResponseType().(type) {
76
- // case *rtv1.SubscribeTopicEventsResponseAlpha1_InitialResponse:
77
- // default:
78
- // require.Failf(t, "unexpected response", "got (%T) %v", resp.GetSubscribeTopicEventsResponseType(), resp)
79
- // }
80
- //
81
- // stream2, err := client.SubscribeTopicEventsAlpha1(ctx)
82
- // require.NoError(t, err)
83
- // require.NoError(t, stream2.Send(&rtv1.SubscribeTopicEventsRequestAlpha1{
84
- // SubscribeTopicEventsRequestType: &rtv1.SubscribeTopicEventsRequestAlpha1_InitialRequest{
85
- // InitialRequest: &rtv1.SubscribeTopicEventsRequestInitialAlpha1{
86
- // PubsubName: "mypub", Topic: "b",
87
- // },
88
- // },
89
- // }))
90
- // resp, err = stream2.Recv()
91
- // require.NoError(t, err)
92
- // switch resp.GetSubscribeTopicEventsResponseType().(type) {
93
- // case *rtv1.SubscribeTopicEventsResponseAlpha1_InitialResponse:
94
- // default:
95
- // require.Failf(t, "unexpected response", "got (%T) %v", resp.GetSubscribeTopicEventsResponseType(), resp)
96
- // }
97
- //
98
- // stream3, err := client.SubscribeTopicEventsAlpha1(ctx)
99
- // require.NoError(t, err)
100
- // require.NoError(t, stream3.Send(&rtv1.SubscribeTopicEventsRequestAlpha1{
101
- // SubscribeTopicEventsRequestType: &rtv1.SubscribeTopicEventsRequestAlpha1_InitialRequest{
102
- // InitialRequest: &rtv1.SubscribeTopicEventsRequestInitialAlpha1{
103
- // PubsubName: "mypub", Topic: "c",
104
- // },
105
- // },
106
- // }))
107
- // resp, err = stream3.Recv()
108
- // require.NoError(t, err)
109
- // switch resp.GetSubscribeTopicEventsResponseType().(type) {
110
- // case *rtv1.SubscribeTopicEventsResponseAlpha1_InitialResponse:
111
- // default:
112
- // require.Failf(t, "unexpected response", "got (%T) %v", resp.GetSubscribeTopicEventsResponseType(), resp)
113
- // }
114
- //
115
- // t.Cleanup(func() {
116
- // require.NoError(t, stream1.CloseSend())
117
- // require.NoError(t, stream2.CloseSend())
118
- // require.NoError(t, stream3.CloseSend())
119
- // })
120
- //
121
- // var subsInMeta []daprd.MetadataResponsePubsubSubscription
122
- // assert.EventuallyWithT(t, func(c *assert.CollectT) {
123
- // subsInMeta = m.daprd.GetMetaSubscriptions(c, ctx)
124
- // assert.Len(c, subsInMeta, 3)
125
- // }, time.Second*5, time.Millisecond*10)
126
- // assert.ElementsMatch(t, []daprd.MetadataResponsePubsubSubscription{
127
- // {PubsubName: "mypub", Topic: "a", Rules: []daprd.MetadataResponsePubsubSubscriptionRule{{Path: "/"}}, Type: rtv1.PubsubSubscriptionType_STREAMING.String()},
128
- // {PubsubName: "mypub", Topic: "c", Rules: []daprd.MetadataResponsePubsubSubscriptionRule{{Path: "/"}}, Type: rtv1.PubsubSubscriptionType_STREAMING.String()},
129
- // {PubsubName: "mypub", Topic: "b", Rules: []daprd.MetadataResponsePubsubSubscriptionRule{{Path: "/"}}, Type: rtv1.PubsubSubscriptionType_STREAMING.String()},
130
- // },
131
- // subsInMeta,
132
- // )
133
- //
134
- // for _, topic := range []string{"a", "b", "c"} {
135
- // _, err = client.PublishEvent(ctx, &rtv1.PublishEventRequest{
136
- // PubsubName: "mypub", Topic: topic,
137
- // Data: []byte(`{"status": "completed"}`),
138
- // DataContentType: "application/json",
139
- // })
140
- // require.NoError(t, err)
141
- // }
142
- //
143
- // for stream, topic := range map[rtv1.Dapr_SubscribeTopicEventsAlpha1Client]string{
144
- // stream1: "a",
145
- // stream2: "b",
146
- // stream3: "c",
147
- // } {
148
- // event, err := stream.Recv()
149
- // require.NoError(t, err)
150
- // assert.Equal(t, topic, event.GetEventMessage().GetTopic())
151
- // }
152
- //
64
+ stream1 , err := client .SubscribeTopicEventsAlpha1 (ctx )
65
+ require .NoError (t , err )
66
+ require .NoError (t , stream1 .Send (& rtv1.SubscribeTopicEventsRequestAlpha1 {
67
+ SubscribeTopicEventsRequestType : & rtv1.SubscribeTopicEventsRequestAlpha1_InitialRequest {
68
+ InitialRequest : & rtv1.SubscribeTopicEventsRequestInitialAlpha1 {
69
+ PubsubName : "mypub" , Topic : "a" ,
70
+ },
71
+ },
72
+ }))
73
+ resp , err := stream1 .Recv ()
74
+ require .NoError (t , err )
75
+ switch resp .GetSubscribeTopicEventsResponseType ().(type ) {
76
+ case * rtv1.SubscribeTopicEventsResponseAlpha1_InitialResponse :
77
+ default :
78
+ require .Failf (t , "unexpected response" , "got (%T) %v" , resp .GetSubscribeTopicEventsResponseType (), resp )
79
+ }
80
+
81
+ stream2 , err := client .SubscribeTopicEventsAlpha1 (ctx )
82
+ require .NoError (t , err )
83
+ require .NoError (t , stream2 .Send (& rtv1.SubscribeTopicEventsRequestAlpha1 {
84
+ SubscribeTopicEventsRequestType : & rtv1.SubscribeTopicEventsRequestAlpha1_InitialRequest {
85
+ InitialRequest : & rtv1.SubscribeTopicEventsRequestInitialAlpha1 {
86
+ PubsubName : "mypub" , Topic : "b" ,
87
+ },
88
+ },
89
+ }))
90
+ resp , err = stream2 .Recv ()
91
+ require .NoError (t , err )
92
+ switch resp .GetSubscribeTopicEventsResponseType ().(type ) {
93
+ case * rtv1.SubscribeTopicEventsResponseAlpha1_InitialResponse :
94
+ default :
95
+ require .Failf (t , "unexpected response" , "got (%T) %v" , resp .GetSubscribeTopicEventsResponseType (), resp )
96
+ }
97
+
98
+ stream3 , err := client .SubscribeTopicEventsAlpha1 (ctx )
99
+ require .NoError (t , err )
100
+ require .NoError (t , stream3 .Send (& rtv1.SubscribeTopicEventsRequestAlpha1 {
101
+ SubscribeTopicEventsRequestType : & rtv1.SubscribeTopicEventsRequestAlpha1_InitialRequest {
102
+ InitialRequest : & rtv1.SubscribeTopicEventsRequestInitialAlpha1 {
103
+ PubsubName : "mypub" , Topic : "c" ,
104
+ },
105
+ },
106
+ }))
107
+ resp , err = stream3 .Recv ()
108
+ require .NoError (t , err )
109
+ switch resp .GetSubscribeTopicEventsResponseType ().(type ) {
110
+ case * rtv1.SubscribeTopicEventsResponseAlpha1_InitialResponse :
111
+ default :
112
+ require .Failf (t , "unexpected response" , "got (%T) %v" , resp .GetSubscribeTopicEventsResponseType (), resp )
113
+ }
114
+
115
+ t .Cleanup (func () {
116
+ require .NoError (t , stream1 .CloseSend ())
117
+ require .NoError (t , stream2 .CloseSend ())
118
+ require .NoError (t , stream3 .CloseSend ())
119
+ })
120
+
121
+ var subsInMeta []daprd.MetadataResponsePubsubSubscription
122
+ assert .EventuallyWithT (t , func (c * assert.CollectT ) {
123
+ subsInMeta = m .daprd .GetMetaSubscriptions (c , ctx )
124
+ assert .Len (c , subsInMeta , 3 )
125
+ }, time .Second * 5 , time .Millisecond * 10 )
126
+ assert .ElementsMatch (t , []daprd.MetadataResponsePubsubSubscription {
127
+ {PubsubName : "mypub" , Topic : "a" , Rules : []daprd.MetadataResponsePubsubSubscriptionRule {{Path : "/" }}, Type : rtv1 .PubsubSubscriptionType_STREAMING .String ()},
128
+ {PubsubName : "mypub" , Topic : "c" , Rules : []daprd.MetadataResponsePubsubSubscriptionRule {{Path : "/" }}, Type : rtv1 .PubsubSubscriptionType_STREAMING .String ()},
129
+ {PubsubName : "mypub" , Topic : "b" , Rules : []daprd.MetadataResponsePubsubSubscriptionRule {{Path : "/" }}, Type : rtv1 .PubsubSubscriptionType_STREAMING .String ()},
130
+ },
131
+ subsInMeta ,
132
+ )
133
+
134
+ for _ , topic := range []string {"a" , "b" , "c" } {
135
+ _ , err = client .PublishEvent (ctx , & rtv1.PublishEventRequest {
136
+ PubsubName : "mypub" , Topic : topic ,
137
+ Data : []byte (`{"status": "completed"}` ),
138
+ DataContentType : "application/json" ,
139
+ })
140
+ require .NoError (t , err )
141
+ }
142
+
143
+ for stream , topic := range map [rtv1.Dapr_SubscribeTopicEventsAlpha1Client ]string {
144
+ stream1 : "a" ,
145
+ stream2 : "b" ,
146
+ stream3 : "c" ,
147
+ } {
148
+ event , err := stream .Recv ()
149
+ require .NoError (t , err )
150
+ assert .Equal (t , topic , event .GetEventMessage ().GetTopic ())
151
+ }
152
+
153
153
streamNew1 , err := client .SubscribeTopicEventsAlpha1 (ctx )
154
154
require .NoError (t , err )
155
155
@@ -162,73 +162,90 @@ func (m *multi) Run(t *testing.T, ctx context.Context) {
162
162
},
163
163
}))
164
164
165
- resp , err := streamNew1 .Recv ()
165
+ resp , err = streamNew1 .Recv ()
166
+ require .NoError (t , err )
167
+ switch resp .GetSubscribeTopicEventsResponseType ().(type ) {
168
+ case * rtv1.SubscribeTopicEventsResponseAlpha1_InitialResponse :
169
+ default :
170
+ require .Failf (t , "unexpected response" , "got (%T) %v" , resp .GetSubscribeTopicEventsResponseType (), resp )
171
+ }
172
+
173
+ streamNew2 , err := client .SubscribeTopicEventsAlpha1 (ctx )
174
+ require .NoError (t , err )
175
+ require .NoError (t , streamNew2 .Send (& rtv1.SubscribeTopicEventsRequestAlpha1 {
176
+ SubscribeTopicEventsRequestType : & rtv1.SubscribeTopicEventsRequestAlpha1_InitialRequest {
177
+ InitialRequest : & rtv1.SubscribeTopicEventsRequestInitialAlpha1 {
178
+ PubsubName : "mypub" , Topic : singleTopicMultipleSubscribers ,
179
+ },
180
+ },
181
+ }))
182
+ resp , err = streamNew2 .Recv ()
183
+ require .NoError (t , err )
184
+ switch resp .GetSubscribeTopicEventsResponseType ().(type ) {
185
+ case * rtv1.SubscribeTopicEventsResponseAlpha1_InitialResponse :
186
+ default :
187
+ require .Failf (t , "unexpected response" , "got (%T) %v" , resp .GetSubscribeTopicEventsResponseType (), resp )
188
+ }
189
+
190
+ streamNew3 , err := client .SubscribeTopicEventsAlpha1 (ctx )
191
+ require .NoError (t , err )
192
+ require .NoError (t , streamNew3 .Send (& rtv1.SubscribeTopicEventsRequestAlpha1 {
193
+ SubscribeTopicEventsRequestType : & rtv1.SubscribeTopicEventsRequestAlpha1_InitialRequest {
194
+ InitialRequest : & rtv1.SubscribeTopicEventsRequestInitialAlpha1 {
195
+ PubsubName : "mypub" , Topic : singleTopicMultipleSubscribers ,
196
+ },
197
+ },
198
+ }))
199
+ resp , err = streamNew3 .Recv ()
166
200
require .NoError (t , err )
167
201
switch resp .GetSubscribeTopicEventsResponseType ().(type ) {
168
202
case * rtv1.SubscribeTopicEventsResponseAlpha1_InitialResponse :
169
203
default :
170
204
require .Failf (t , "unexpected response" , "got (%T) %v" , resp .GetSubscribeTopicEventsResponseType (), resp )
171
205
}
172
206
173
- //streamNew2, err := client.SubscribeTopicEventsAlpha1(ctx)
174
- //require.NoError(t, err)
175
- //require.NoError(t, streamNew2.Send(&rtv1.SubscribeTopicEventsRequestAlpha1{
176
- // SubscribeTopicEventsRequestType: &rtv1.SubscribeTopicEventsRequestAlpha1_InitialRequest{
177
- // InitialRequest: &rtv1.SubscribeTopicEventsRequestInitialAlpha1{
178
- // PubsubName: "mypub", Topic: singleTopicMultipleSubscribers,
179
- // },
180
- // },
181
- //}))
182
- //resp, err = streamNew2.Recv()
183
- //require.NoError(t, err)
184
- //switch resp.GetSubscribeTopicEventsResponseType().(type) {
185
- //case *rtv1.SubscribeTopicEventsResponseAlpha1_InitialResponse:
186
- //default:
187
- // require.Failf(t, "unexpected response", "got (%T) %v", resp.GetSubscribeTopicEventsResponseType(), resp)
188
- //}
189
-
190
- //streamNew3, err := client.SubscribeTopicEventsAlpha1(ctx)
191
- //require.NoError(t, err)
192
- //require.NoError(t, streamNew3.Send(&rtv1.SubscribeTopicEventsRequestAlpha1{
193
- // SubscribeTopicEventsRequestType: &rtv1.SubscribeTopicEventsRequestAlpha1_InitialRequest{
194
- // InitialRequest: &rtv1.SubscribeTopicEventsRequestInitialAlpha1{
195
- // PubsubName: "mypub", Topic: singleTopicMultipleSubscribers,
196
- // },
197
- // },
198
- //}))
199
- //resp, err = streamNew3.Recv()
200
- //require.NoError(t, err)
201
- //switch resp.GetSubscribeTopicEventsResponseType().(type) {
202
- //case *rtv1.SubscribeTopicEventsResponseAlpha1_InitialResponse:
203
- //default:
204
- // require.Failf(t, "unexpected response", "got (%T) %v", resp.GetSubscribeTopicEventsResponseType(), resp)
205
- //}
206
-
207
207
var receivedTotal atomic.Int32
208
208
receivedTotal .Store (0 )
209
209
210
210
subscribers := []rtv1.Dapr_SubscribeTopicEventsAlpha1Client {
211
211
streamNew1 ,
212
- // streamNew2,
213
- // streamNew3,
212
+ streamNew2 ,
213
+ streamNew3 ,
214
214
}
215
+
216
+ messagesToSend := 100
217
+
218
+ var wg sync.WaitGroup
219
+ wg .Add (len (subscribers ))
220
+
215
221
for i , stream := range subscribers {
216
222
go func (stream rtv1.Dapr_SubscribeTopicEventsAlpha1Client , i int , c * testing.T ) {
217
- for {
218
- event , err := stream .Recv ()
219
- c .Log ("received event from stream" , i , "topic" , event .GetEventMessage ().GetTopic ())
220
- require .NoError (c , err )
223
+ defer wg .Done ()
224
+ expectedMessages := messagesToSend
225
+ receivedMessages := 0
226
+
227
+ for receivedMessages < expectedMessages {
228
+ event , recvErr := stream .Recv ()
229
+ require .NoError (c , recvErr )
221
230
assert .Equal (c , singleTopicMultipleSubscribers , event .GetEventMessage ().GetTopic ())
222
231
232
+ sendErr := stream .Send (& rtv1.SubscribeTopicEventsRequestAlpha1 {
233
+ SubscribeTopicEventsRequestType : & rtv1.SubscribeTopicEventsRequestAlpha1_EventProcessed {
234
+ EventProcessed : & rtv1.SubscribeTopicEventsRequestProcessedAlpha1 {
235
+ Id : event .GetEventMessage ().GetId (),
236
+ Status : & rtv1.TopicEventResponse {Status : rtv1 .TopicEventResponse_SUCCESS },
237
+ },
238
+ },
239
+ })
240
+ require .NoError (c , sendErr )
241
+
223
242
receivedTotal .Add (1 )
224
- time . Sleep ( 1 * time . Second )
243
+ receivedMessages ++
225
244
}
226
245
}(stream , i , t )
227
246
}
228
247
229
- messagesToSend := 2
230
- for i := range messagesToSend {
231
- t .Log ("publishing to topic" , singleTopicMultipleSubscribers , "from stream" , i )
248
+ for range messagesToSend {
232
249
_ , err = client .PublishEvent (ctx , & rtv1.PublishEventRequest {
233
250
PubsubName : "mypub" , Topic : singleTopicMultipleSubscribers ,
234
251
Data : []byte (`{"status": "completed"}` ),
@@ -238,6 +255,7 @@ func (m *multi) Run(t *testing.T, ctx context.Context) {
238
255
}
239
256
240
257
assert .Eventually (t , func () bool {
258
+ wg .Wait ()
241
259
return receivedTotal .Load () == int32 (messagesToSend * len (subscribers ))
242
260
}, time .Second * 10 , time .Millisecond * 10 )
243
261
}
0 commit comments