@@ -170,6 +170,162 @@ func TestParsePulsarMetadataSubscriptionInitialPosition(t *testing.T) {
170170 }
171171}
172172
173+ func TestParsePulsarMetadataSubscriptionMode (t * testing.T ) {
174+ tt := []struct {
175+ name string
176+ subscribeMode string
177+ expected string
178+ err bool
179+ }{
180+ {
181+ name : "test valid subscribe mode - durable" ,
182+ subscribeMode : "durable" ,
183+ expected : "durable" ,
184+ err : false ,
185+ },
186+ {
187+ name : "test valid subscribe mode - non_durable" ,
188+ subscribeMode : "non_durable" ,
189+ expected : "non_durable" ,
190+ err : false ,
191+ },
192+ {
193+ name : "test valid subscribe mode - empty" ,
194+ subscribeMode : "" ,
195+ expected : "durable" ,
196+ err : false ,
197+ },
198+ {
199+ name : "test invalid subscribe mode" ,
200+ subscribeMode : "invalid" ,
201+ err : true ,
202+ },
203+ }
204+ for _ , tc := range tt {
205+ t .Run (tc .name , func (t * testing.T ) {
206+ m := pubsub.Metadata {}
207+
208+ m .Properties = map [string ]string {
209+ "host" : "a" ,
210+ "subscribeMode" : tc .subscribeMode ,
211+ }
212+ meta , err := parsePulsarMetadata (m )
213+
214+ if tc .err {
215+ require .Error (t , err )
216+ assert .Nil (t , meta )
217+ return
218+ }
219+
220+ require .NoError (t , err )
221+ assert .Equal (t , tc .expected , meta .SubscriptionMode )
222+ })
223+ }
224+ }
225+
226+ func TestParsePulsarMetadataSubscriptionCombination (t * testing.T ) {
227+ tt := []struct {
228+ name string
229+ subscribeType string
230+ subscribeInitialPosition string
231+ subscribeMode string
232+ expectedType string
233+ expectedInitialPosition string
234+ expectedMode string
235+ err bool
236+ }{
237+ {
238+ name : "test valid subscribe - default" ,
239+ subscribeType : "" ,
240+ subscribeInitialPosition : "" ,
241+ subscribeMode : "" ,
242+ expectedType : "shared" ,
243+ expectedInitialPosition : "latest" ,
244+ expectedMode : "durable" ,
245+ err : false ,
246+ },
247+ {
248+ name : "test valid subscribe - pass case 1" ,
249+ subscribeType : "key_shared" ,
250+ subscribeInitialPosition : "earliest" ,
251+ subscribeMode : "non_durable" ,
252+ expectedType : "key_shared" ,
253+ expectedInitialPosition : "earliest" ,
254+ expectedMode : "non_durable" ,
255+ err : false ,
256+ },
257+ {
258+ name : "test valid subscribe - pass case 2" ,
259+ subscribeType : "exclusive" ,
260+ subscribeInitialPosition : "latest" ,
261+ subscribeMode : "durable" ,
262+ expectedType : "exclusive" ,
263+ expectedInitialPosition : "latest" ,
264+ expectedMode : "durable" ,
265+ err : false ,
266+ },
267+ {
268+ name : "test valid subscribe - pass case 3" ,
269+ subscribeType : "failover" ,
270+ subscribeInitialPosition : "earliest" ,
271+ subscribeMode : "durable" ,
272+ expectedType : "failover" ,
273+ expectedInitialPosition : "earliest" ,
274+ expectedMode : "durable" ,
275+ err : false ,
276+ },
277+ {
278+ name : "test valid subscribe - pass case 4" ,
279+ subscribeType : "shared" ,
280+ subscribeInitialPosition : "latest" ,
281+ subscribeMode : "non_durable" ,
282+ expectedType : "shared" ,
283+ expectedInitialPosition : "latest" ,
284+ expectedMode : "non_durable" ,
285+ err : false ,
286+ },
287+ {
288+ name : "test valid subscribe - fail case 1" ,
289+ subscribeType : "invalid" ,
290+ err : true ,
291+ },
292+ {
293+ name : "test valid subscribe - fail case 2" ,
294+ subscribeInitialPosition : "invalid" ,
295+ err : true ,
296+ },
297+ {
298+ name : "test valid subscribe - fail case 3" ,
299+ subscribeMode : "invalid" ,
300+ err : true ,
301+ },
302+ }
303+ for _ , tc := range tt {
304+ t .Run (tc .name , func (t * testing.T ) {
305+ m := pubsub.Metadata {}
306+
307+ m .Properties = map [string ]string {
308+ "host" : "a" ,
309+ "subscribeType" : tc .subscribeType ,
310+ "subscribeInitialPosition" : tc .subscribeInitialPosition ,
311+ "subscribeMode" : tc .subscribeMode ,
312+ }
313+ meta , err := parsePulsarMetadata (m )
314+
315+ if tc .err {
316+ require .Error (t , err )
317+ assert .Nil (t , meta )
318+ return
319+ }
320+
321+ require .NoError (t , err )
322+ assert .Equal (t , tc .expectedType , meta .SubscriptionType )
323+ assert .Equal (t , tc .expectedInitialPosition , meta .SubscriptionInitialPosition )
324+ assert .Equal (t , tc .expectedMode , meta .SubscriptionMode )
325+ })
326+ }
327+ }
328+
173329func TestParsePulsarSchemaMetadata (t * testing.T ) {
174330 t .Run ("test json" , func (t * testing.T ) {
175331 m := pubsub.Metadata {}
0 commit comments