@@ -3,8 +3,7 @@ require('chromedriver')
33const assert = require ( 'assert' )
44const { buildDriver, goToHome, captureScreensFor, teardown, doWhile, goToQueue, delay } = require ( '../utils' )
55const { createQueue, deleteQueue, getManagementUrl, basicAuthorization } = require ( '../mgt-api' )
6- const { open : openAmqp , once : onceAmqp , on : onAmqp , close : closeAmqp ,
7- openReceiver : openReceiver } = require ( '../amqp' )
6+ const { getAmqpUrl : getAmqpUrl } = require ( '../amqp' )
87const amqplib = require ( 'amqplib' ) ;
98
109const LoginPage = require ( '../pageobjects/LoginPage' )
@@ -13,12 +12,6 @@ const QueuesAndStreamsPage = require('../pageobjects/QueuesAndStreamsPage')
1312const QueuePage = require ( '../pageobjects/QueuePage' )
1413const StreamPage = require ( '../pageobjects/StreamPage' )
1514
16- var untilConnectionEstablished = new Promise ( ( resolve , reject ) => {
17- onAmqp ( 'connection_open' , function ( context ) {
18- console . log ( "Amqp connection opened" )
19- resolve ( )
20- } )
21- } )
2215
2316describe ( 'Given a quorum queue configured with SAC' , function ( ) {
2417 let login
@@ -44,7 +37,6 @@ describe('Given a quorum queue configured with SAC', function () {
4437 throw new Error ( 'Failed to login' )
4538 }
4639 await overview . selectRefreshOption ( "Do not refresh" )
47- await overview . clickOnQueuesTab ( )
4840 queueName = "test_" + Math . floor ( Math . random ( ) * 1000 )
4941
5042 createQueue ( getManagementUrl ( ) , basicAuthorization ( "management" , "guest" ) ,
@@ -77,16 +69,21 @@ describe('Given a quorum queue configured with SAC', function () {
7769 assert . equal ( "Consumers (0)" , await queuePage . getConsumersSectionTitle ( ) )
7870 } )
7971
80- describe ( "given there is a consumer attached to the queue" , function ( ) {
81- let amqp
72+ describe ( "given there is a consumer (without priority) attached to the queue" , function ( ) {
8273 let amqp091conn
74+ let ch1
75+ let ch1Consumer
76+ let ch2
77+ let ch2Consumer
8378
8479 before ( async function ( ) {
85- amqp = openAmqp ( queueName )
86- await untilConnectionEstablished
80+ let amqpUrl = getAmqpUrl ( ) + "?frameMax=0"
81+ amqp091conn = await amqplib . connect ( amqpUrl )
82+ ch1 = await amqp091conn . createChannel ( )
83+ ch1Consumer = ch1 . consume ( queueName , ( msg ) => { } , { consumerTag : "one" } )
8784 } )
8885
89- it ( 'it should have one consumer' , async function ( ) {
86+ it ( 'it should have one consumer as active ' , async function ( ) {
9087 await doWhile ( async function ( ) {
9188 await queuePage . refresh ( )
9289 await queuePage . isLoaded ( )
@@ -100,53 +97,146 @@ describe('Given a quorum queue configured with SAC', function () {
10097 let consumerTable = await doWhile ( async function ( ) {
10198 return queuePage . getConsumersTable ( )
10299 } , function ( table ) {
103- return table [ 0 ] [ 6 ] . localeCompare ( "single active" ) == 0
100+ return table [ 0 ] [ 6 ] . localeCompare ( "single active" ) == 0 &&
101+ table [ 0 ] [ 1 ] . localeCompare ( "one" ) == 0
104102 } )
105103 assert . equal ( "single active" , consumerTable [ 0 ] [ 6 ] )
104+ assert . equal ( "one" , consumerTable [ 0 ] [ 1 ] )
106105
107106 } )
108107
109- it ( 'it should have two consumers, after adding a second subscriber' , async function ( ) {
110- amqp091conn = await amqplib . connect ( 'amqp://guest:guest@localhost?frameMax=0' )
111- const ch1 = await amqp091conn . createChannel ( )
112- // Listener
113-
114- ch1 . consume ( queueName , ( msg ) => { } , { priority : 10 } )
108+ describe ( "given another consumer is added with priority" , function ( ) {
109+ before ( async function ( ) {
110+ ch2 = await amqp091conn . createChannel ( )
111+ ch2Consumer = ch2 . consume ( queueName , ( msg ) => { } , { consumerTag : "two" , priority : 10 } )
112+ } )
113+
114+ it ( 'the latter consumer should be active and the former waiting' , async function ( ) {
115+
116+ await doWhile ( async function ( ) {
117+ await queuePage . refresh ( )
118+ await queuePage . isLoaded ( )
119+ return queuePage . getConsumerCount ( )
120+ } , function ( count ) {
121+ return count . localeCompare ( "2" ) == 0
122+ } , 5000 )
123+
124+ assert . equal ( "2" , await queuePage . getConsumerCount ( ) )
125+ assert . equal ( "Consumers (2)" , await queuePage . getConsumersSectionTitle ( ) )
126+ await queuePage . clickOnConsumerSection ( )
127+ let consumerTable = await doWhile ( async function ( ) {
128+ return queuePage . getConsumersTable ( )
129+ } , function ( table ) {
130+ return table . length == 2 && table [ 0 ] [ 1 ] != "" && table [ 1 ] [ 1 ] != ""
131+ } , 5000 )
132+
133+ let activeConsumer = consumerTable [ 1 ] [ 6 ] . localeCompare ( "single active" ) == 0 ?
134+ 1 : 0
135+ let nonActiveConsumer = activeConsumer == 1 ? 0 : 1
136+
137+ assert . equal ( "waiting" , consumerTable [ nonActiveConsumer ] [ 6 ] )
138+ assert . equal ( "one" , consumerTable [ nonActiveConsumer ] [ 1 ] )
139+ assert . equal ( "single active" , consumerTable [ activeConsumer ] [ 6 ] )
140+ assert . equal ( "two" , consumerTable [ activeConsumer ] [ 1 ] )
141+ await delay ( 5000 )
142+ } )
143+ } )
144+
145+ after ( async function ( ) {
146+ try {
147+ if ( amqp091conn != null ) {
148+ amqp091conn . close ( )
149+ }
150+ } catch ( error ) {
151+ error ( "Failed to close amqp091 connection due to " + error ) ;
152+ }
153+ // ensure there are no more consumers
154+ await doWhile ( async function ( ) {
155+ await queuePage . refresh ( )
156+ await queuePage . isLoaded ( )
157+ return queuePage . getConsumerCount ( )
158+ } , function ( count ) {
159+ return count . localeCompare ( "0" ) == 0
160+ } , 5000 )
161+
115162
163+ } )
164+ } )
165+
166+ describe ( "given there is a consumer (with priority) attached to the queue" , function ( ) {
167+ let amqp091conn
168+ let ch1
169+ let ch1Consumer
170+ let ch2
171+ let ch2Consumer
172+
173+ before ( async function ( ) {
174+ let amqpUrl = getAmqpUrl ( ) + "?frameMax=0"
175+ amqp091conn = await amqplib . connect ( amqpUrl )
176+ ch1 = await amqp091conn . createChannel ( )
177+ ch1Consumer = ch1 . consume ( queueName , ( msg ) => { } , { consumerTag : "one" , priority : 10 } )
178+ } )
179+
180+ it ( 'it should have one consumer as active' , async function ( ) {
116181 await doWhile ( async function ( ) {
117182 await queuePage . refresh ( )
118183 await queuePage . isLoaded ( )
119184 return queuePage . getConsumerCount ( )
120185 } , function ( count ) {
121- return count . localeCompare ( "2 " ) == 0
186+ return count . localeCompare ( "0 " ) == 1
122187 } , 5000 )
123-
124- assert . equal ( "2" , await queuePage . getConsumerCount ( ) )
125- assert . equal ( "Consumers (2)" , await queuePage . getConsumersSectionTitle ( ) )
188+ assert . equal ( "1" , await queuePage . getConsumerCount ( ) )
189+ assert . equal ( "Consumers (1)" , await queuePage . getConsumersSectionTitle ( ) )
126190 await queuePage . clickOnConsumerSection ( )
127191 let consumerTable = await doWhile ( async function ( ) {
128192 return queuePage . getConsumersTable ( )
129193 } , function ( table ) {
130- return table . length == 2
131- } , 5000 )
194+ return table [ 0 ] [ 6 ] . localeCompare ( "single active" ) == 0 &&
195+ table [ 0 ] [ 1 ] . localeCompare ( "one" ) == 0
196+ } )
197+ assert . equal ( "single active" , consumerTable [ 0 ] [ 6 ] )
198+ assert . equal ( "one" , consumerTable [ 0 ] [ 1 ] )
199+
200+ } )
201+
202+ describe ( "given another consumer is added without priority" , function ( ) {
203+ before ( async function ( ) {
204+ ch2 = await amqp091conn . createChannel ( )
205+ ch2Consumer = ch2 . consume ( queueName , ( msg ) => { } , { consumerTag : "two" } )
206+ } )
132207
133- let activeConsumer = consumerTable [ 1 ] [ 6 ] . localeCompare ( "single active" ) == 0 ?
134- 1 : 0
135- let nonActiveConsumer = activeConsumer == 1 ? 0 : 1
208+ it ( 'the former consumer should still be active and the latter be waiting' , async function ( ) {
209+
210+ await doWhile ( async function ( ) {
211+ await queuePage . refresh ( )
212+ await queuePage . isLoaded ( )
213+ return queuePage . getConsumerCount ( )
214+ } , function ( count ) {
215+ return count . localeCompare ( "2" ) == 0
216+ } , 5000 )
217+
218+ assert . equal ( "2" , await queuePage . getConsumerCount ( ) )
219+ assert . equal ( "Consumers (2)" , await queuePage . getConsumersSectionTitle ( ) )
220+ await queuePage . clickOnConsumerSection ( )
221+ let consumerTable = await doWhile ( async function ( ) {
222+ return queuePage . getConsumersTable ( )
223+ } , function ( table ) {
224+ return table . length == 2 && table [ 0 ] [ 1 ] != "" && table [ 1 ] [ 1 ] != ""
225+ } , 5000 )
136226
137- assert . equal ( "waiting" , consumerTable [ nonActiveConsumer ] [ 6 ] )
138- assert . equal ( "single active" , consumerTable [ activeConsumer ] [ 6 ] )
139- await delay ( 5000 )
227+ let activeConsumer = consumerTable [ 1 ] [ 6 ] . localeCompare ( "single active" ) == 0 ?
228+ 1 : 0
229+ let nonActiveConsumer = activeConsumer == 1 ? 0 : 1
230+
231+ assert . equal ( "waiting" , consumerTable [ nonActiveConsumer ] [ 6 ] )
232+ assert . equal ( "two" , consumerTable [ nonActiveConsumer ] [ 1 ] )
233+ assert . equal ( "single active" , consumerTable [ activeConsumer ] [ 6 ] )
234+ assert . equal ( "one" , consumerTable [ activeConsumer ] [ 1 ] )
235+ await delay ( 5000 )
236+ } )
140237 } )
141238
142- after ( function ( ) {
143- try {
144- if ( amqp != null ) {
145- closeAmqp ( amqp . connection )
146- }
147- } catch ( error ) {
148- error ( "Failed to close amqp10 connection due to " + error ) ;
149- }
239+ after ( function ( ) {
150240 try {
151241 if ( amqp091conn != null ) {
152242 amqp091conn . close ( )
0 commit comments