@@ -124,7 +124,7 @@ def can_connect(self, server, client):
124
124
except zmq .Again :
125
125
warnings .warn ("client set POLLIN, but cannot recv" , RuntimeWarning )
126
126
else :
127
- self . assertEqual ( rcvd_msg , msg )
127
+ assert rcvd_msg == msg
128
128
result = True
129
129
return result
130
130
@@ -139,15 +139,15 @@ def test_null(self):
139
139
140
140
server = self .socket (zmq .PUSH )
141
141
client = self .socket (zmq .PULL )
142
- self .assertTrue ( self . can_connect (server , client ) )
142
+ assert self .can_connect (server , client )
143
143
144
144
# By setting a domain we switch on authentication for NULL sockets,
145
145
# though no policies are configured yet. The client connection
146
146
# should still be allowed.
147
147
server = self .socket (zmq .PUSH )
148
148
server .zap_domain = b'global'
149
149
client = self .socket (zmq .PULL )
150
- self .assertTrue ( self . can_connect (server , client ) )
150
+ assert self .can_connect (server , client )
151
151
152
152
def test_blacklist (self ):
153
153
"""threaded auth - Blacklist"""
@@ -158,7 +158,7 @@ def test_blacklist(self):
158
158
# though no policies are configured yet.
159
159
server .zap_domain = b'global'
160
160
client = self .socket (zmq .PULL )
161
- self .assertFalse ( self . can_connect (server , client ) )
161
+ assert not self .can_connect (server , client )
162
162
163
163
def test_whitelist (self ):
164
164
"""threaded auth - Whitelist"""
@@ -169,7 +169,7 @@ def test_whitelist(self):
169
169
# though no policies are configured yet.
170
170
server .zap_domain = b'global'
171
171
client = self .socket (zmq .PULL )
172
- self .assertTrue ( self . can_connect (server , client ) )
172
+ assert self .can_connect (server , client )
173
173
174
174
def test_plain (self ):
175
175
"""threaded auth - PLAIN"""
@@ -180,7 +180,7 @@ def test_plain(self):
180
180
client = self .socket (zmq .PULL )
181
181
client .plain_username = b'admin'
182
182
client .plain_password = b'Password'
183
- self .assertFalse ( self . can_connect (server , client ) )
183
+ assert not self .can_connect (server , client )
184
184
185
185
# Try PLAIN authentication - with server configured, connection should pass
186
186
server = self .socket (zmq .PUSH )
@@ -189,23 +189,23 @@ def test_plain(self):
189
189
client .plain_username = b'admin'
190
190
client .plain_password = b'Password'
191
191
self .auth .configure_plain (domain = '*' , passwords = {'admin' : 'Password' })
192
- self .assertTrue ( self . can_connect (server , client ) )
192
+ assert self .can_connect (server , client )
193
193
194
194
# Try PLAIN authentication - with bogus credentials, connection should fail
195
195
server = self .socket (zmq .PUSH )
196
196
server .plain_server = True
197
197
client = self .socket (zmq .PULL )
198
198
client .plain_username = b'admin'
199
199
client .plain_password = b'Bogus'
200
- self .assertFalse ( self . can_connect (server , client ) )
200
+ assert not self .can_connect (server , client )
201
201
202
202
# Remove authenticator and check that a normal connection works
203
203
self .auth .stop ()
204
204
self .auth = None
205
205
206
206
server = self .socket (zmq .PUSH )
207
207
client = self .socket (zmq .PULL )
208
- self .assertTrue ( self . can_connect (server , client ) )
208
+ assert self .can_connect (server , client )
209
209
client .close ()
210
210
server .close ()
211
211
@@ -224,7 +224,7 @@ def test_curve(self):
224
224
client .curve_publickey = client_public
225
225
client .curve_secretkey = client_secret
226
226
client .curve_serverkey = server_public
227
- self .assertFalse ( self . can_connect (server , client ) )
227
+ assert not self .can_connect (server , client )
228
228
229
229
# Try CURVE authentication - with server configured to CURVE_ALLOW_ANY, connection should pass
230
230
self .auth .configure_curve (domain = '*' , location = zmq .auth .CURVE_ALLOW_ANY )
@@ -236,7 +236,7 @@ def test_curve(self):
236
236
client .curve_publickey = client_public
237
237
client .curve_secretkey = client_secret
238
238
client .curve_serverkey = server_public
239
- self .assertTrue ( self . can_connect (server , client ) )
239
+ assert self .can_connect (server , client )
240
240
241
241
# Try CURVE authentication - with server configured, connection should pass
242
242
self .auth .configure_curve (domain = '*' , location = self .public_keys_dir )
@@ -257,7 +257,7 @@ def test_curve(self):
257
257
# Try connecting using NULL and no authentication enabled, connection should pass
258
258
server = self .socket (zmq .PUSH )
259
259
client = self .socket (zmq .PULL )
260
- self .assertTrue ( self . can_connect (server , client ) )
260
+ assert self .can_connect (server , client )
261
261
262
262
def test_curve_callback (self ):
263
263
"""threaded auth - CURVE with callback authentication"""
@@ -274,7 +274,7 @@ def test_curve_callback(self):
274
274
client .curve_publickey = client_public
275
275
client .curve_secretkey = client_secret
276
276
client .curve_serverkey = server_public
277
- self .assertFalse ( self . can_connect (server , client ) )
277
+ assert not self .can_connect (server , client )
278
278
279
279
# Try CURVE authentication - with callback authentication configured, connection should pass
280
280
@@ -298,7 +298,7 @@ def callback(self, domain, key):
298
298
client .curve_publickey = client_public
299
299
client .curve_secretkey = client_secret
300
300
client .curve_serverkey = server_public
301
- self .assertTrue ( self . can_connect (server , client ) )
301
+ assert self .can_connect (server , client )
302
302
303
303
# Try CURVE authentication - with callback authentication configured with wrong key, connection should not pass
304
304
@@ -322,7 +322,7 @@ def callback(self, domain, key):
322
322
client .curve_publickey = client_public
323
323
client .curve_secretkey = client_secret
324
324
client .curve_serverkey = server_public
325
- self .assertFalse ( self . can_connect (server , client ) )
325
+ assert not self .can_connect (server , client )
326
326
327
327
@skip_pypy
328
328
def test_curve_user_id (self ):
0 commit comments