@@ -901,7 +901,9 @@ def test_set_passwd_cb(self, tmpfile: bytes) -> None:
901
901
pemFile = self ._write_encrypted_pem (passphrase , tmpfile )
902
902
calledWith = []
903
903
904
- def passphraseCallback (maxlen , verify , extra ):
904
+ def passphraseCallback (
905
+ maxlen : int , verify : bool , extra : None
906
+ ) -> bytes :
905
907
calledWith .append ((maxlen , verify , extra ))
906
908
return passphrase
907
909
@@ -920,7 +922,9 @@ def test_passwd_callback_exception(self, tmpfile: bytes) -> None:
920
922
"""
921
923
pemFile = self ._write_encrypted_pem (b"monkeys are nice" , tmpfile )
922
924
923
- def passphraseCallback (maxlen , verify , extra ):
925
+ def passphraseCallback (
926
+ maxlen : int , verify : bool , extra : None
927
+ ) -> bytes :
924
928
raise RuntimeError ("Sorry, I am a fail." )
925
929
926
930
context = Context (SSLv23_METHOD )
@@ -935,7 +939,9 @@ def test_passwd_callback_false(self, tmpfile: bytes) -> None:
935
939
"""
936
940
pemFile = self ._write_encrypted_pem (b"monkeys are nice" , tmpfile )
937
941
938
- def passphraseCallback (maxlen , verify , extra ):
942
+ def passphraseCallback (
943
+ maxlen : int , verify : bool , extra : None
944
+ ) -> bytes :
939
945
return b""
940
946
941
947
context = Context (SSLv23_METHOD )
@@ -950,11 +956,11 @@ def test_passwd_callback_non_string(self, tmpfile: bytes) -> None:
950
956
"""
951
957
pemFile = self ._write_encrypted_pem (b"monkeys are nice" , tmpfile )
952
958
953
- def passphraseCallback (maxlen , verify , extra ) :
959
+ def passphraseCallback (maxlen : int , verify : bool , extra : None ) -> int :
954
960
return 10
955
961
956
962
context = Context (SSLv23_METHOD )
957
- context .set_passwd_cb (passphraseCallback )
963
+ context .set_passwd_cb (passphraseCallback ) # type: ignore[arg-type]
958
964
# TODO: Surely this is the wrong error?
959
965
with pytest .raises (ValueError ):
960
966
context .use_privatekey_file (pemFile )
@@ -968,7 +974,9 @@ def test_passwd_callback_too_long(self, tmpfile: bytes) -> None:
968
974
passphrase = b"x" * 1024
969
975
pemFile = self ._write_encrypted_pem (passphrase , tmpfile )
970
976
971
- def passphraseCallback (maxlen , verify , extra ):
977
+ def passphraseCallback (
978
+ maxlen : int , verify : bool , extra : None
979
+ ) -> bytes :
972
980
assert maxlen == 1024
973
981
return passphrase + b"y"
974
982
@@ -990,7 +998,7 @@ def test_set_info_callback(self) -> None:
990
998
991
999
called = []
992
1000
993
- def info (conn , where , ret ) :
1001
+ def info (conn : Connection , where : int , ret : int ) -> None :
994
1002
called .append ((conn , where , ret ))
995
1003
996
1004
context = Context (SSLv23_METHOD )
@@ -1028,7 +1036,7 @@ def test_set_keylog_callback(self) -> None:
1028
1036
"""
1029
1037
called = []
1030
1038
1031
- def keylog (conn , line ) :
1039
+ def keylog (conn : Connection , line : bytes ) -> None :
1032
1040
called .append ((conn , line ))
1033
1041
1034
1042
server_context = Context (TLSv1_2_METHOD )
@@ -1385,9 +1393,9 @@ def test_set_verify_callback_connection_argument(self) -> None:
1385
1393
serverConnection = Connection (serverContext , None )
1386
1394
1387
1395
class VerifyCallback :
1388
- def callback (self , connection , * args ):
1396
+ def callback (self , connection : Connection , * args ) -> bool :
1389
1397
self .connection = connection
1390
- return 1
1398
+ return True
1391
1399
1392
1400
verify = VerifyCallback ()
1393
1401
clientContext = Context (SSLv23_METHOD )
@@ -1415,9 +1423,11 @@ def test_x509_in_verify_works(self) -> None:
1415
1423
)
1416
1424
serverConnection = Connection (serverContext , None )
1417
1425
1418
- def verify_cb_get_subject (conn , cert , errnum , depth , ok ):
1426
+ def verify_cb_get_subject (
1427
+ conn : Connection , cert : X509 , errnum : int , depth : int , ok : int
1428
+ ) -> bool :
1419
1429
assert cert .get_subject ()
1420
- return 1
1430
+ return True
1421
1431
1422
1432
clientContext = Context (SSLv23_METHOD )
1423
1433
clientContext .set_verify (VERIFY_PEER , verify_cb_get_subject )
@@ -1817,10 +1827,10 @@ def test_old_callback_forgotten(self) -> None:
1817
1827
a new callback, the one it replaces is dereferenced.
1818
1828
"""
1819
1829
1820
- def callback (connection ) : # pragma: no cover
1830
+ def callback (connection : Connection ) -> None : # pragma: no cover
1821
1831
pass
1822
1832
1823
- def replacement (connection ) : # pragma: no cover
1833
+ def replacement (connection : Connection ) -> None : # pragma: no cover
1824
1834
pass
1825
1835
1826
1836
context = Context (SSLv23_METHOD )
@@ -1851,7 +1861,7 @@ def test_no_servername(self) -> None:
1851
1861
"""
1852
1862
args = []
1853
1863
1854
- def servername (conn ) :
1864
+ def servername (conn : Connection ) -> None :
1855
1865
args .append ((conn , conn .get_servername ()))
1856
1866
1857
1867
context = Context (SSLv23_METHOD )
@@ -1888,7 +1898,7 @@ def test_servername(self) -> None:
1888
1898
"""
1889
1899
args = []
1890
1900
1891
- def servername (conn ) :
1901
+ def servername (conn : Connection ) -> None :
1892
1902
args .append ((conn , conn .get_servername ()))
1893
1903
1894
1904
context = Context (SSLv23_METHOD )
@@ -1926,7 +1936,7 @@ def test_alpn_success(self) -> None:
1926
1936
"""
1927
1937
select_args = []
1928
1938
1929
- def select (conn , options ) :
1939
+ def select (conn : Connection , options : list [ bytes ]) -> bytes :
1930
1940
select_args .append ((conn , options ))
1931
1941
return b"spdy/2"
1932
1942
@@ -1974,7 +1984,7 @@ def test_alpn_set_on_connection(self) -> None:
1974
1984
"""
1975
1985
select_args = []
1976
1986
1977
- def select (conn , options ) :
1987
+ def select (conn : Connection , options : list [ bytes ]) -> bytes :
1978
1988
select_args .append ((conn , options ))
1979
1989
return b"spdy/2"
1980
1990
@@ -2015,7 +2025,7 @@ def test_alpn_server_fail(self) -> None:
2015
2025
"""
2016
2026
select_args = []
2017
2027
2018
- def select (conn , options ) :
2028
+ def select (conn : Connection , options : list [ bytes ]) -> bytes :
2019
2029
select_args .append ((conn , options ))
2020
2030
return b""
2021
2031
@@ -2054,7 +2064,7 @@ def test_alpn_no_server_overlap(self) -> None:
2054
2064
"""
2055
2065
refusal_args = []
2056
2066
2057
- def refusal (conn , options ):
2067
+ def refusal (conn : Connection , options : list [ bytes ] ):
2058
2068
refusal_args .append ((conn , options ))
2059
2069
return NO_OVERLAPPING_PROTOCOLS
2060
2070
@@ -2094,15 +2104,15 @@ def test_alpn_select_cb_returns_invalid_value(self) -> None:
2094
2104
"""
2095
2105
invalid_cb_args = []
2096
2106
2097
- def invalid_cb (conn , options ) :
2107
+ def invalid_cb (conn : Connection , options : list [ bytes ]) -> str :
2098
2108
invalid_cb_args .append ((conn , options ))
2099
2109
return "can't return unicode"
2100
2110
2101
2111
client_context = Context (SSLv23_METHOD )
2102
2112
client_context .set_alpn_protos ([b"http/1.1" , b"spdy/2" ])
2103
2113
2104
2114
server_context = Context (SSLv23_METHOD )
2105
- server_context .set_alpn_select_callback (invalid_cb )
2115
+ server_context .set_alpn_select_callback (invalid_cb ) # type: ignore[arg-type]
2106
2116
2107
2117
# Necessary to actually accept the connection
2108
2118
server_context .use_privatekey (
@@ -2163,7 +2173,7 @@ def test_alpn_callback_exception(self) -> None:
2163
2173
"""
2164
2174
select_args = []
2165
2175
2166
- def select (conn , options ) :
2176
+ def select (conn : Connection , options : list [ bytes ]) -> bytes :
2167
2177
select_args .append ((conn , options ))
2168
2178
raise TypeError ()
2169
2179
@@ -2790,8 +2800,10 @@ def test_set_verify_callback_reference(self) -> None:
2790
2800
the context and all connections created by it do not use it anymore.
2791
2801
"""
2792
2802
2793
- def callback (conn , cert , errnum , depth , ok ): # pragma: no cover
2794
- return ok
2803
+ def callback (
2804
+ conn : Connection , cert : X509 , errnum : int , depth : int , ok : int
2805
+ ) -> bool : # pragma: no cover
2806
+ return bool (ok )
2795
2807
2796
2808
tracker = ref (callback )
2797
2809
@@ -2872,7 +2884,7 @@ def test_client_set_session(self) -> None:
2872
2884
ctx .use_certificate (cert )
2873
2885
ctx .set_session_id (b"unity-test" )
2874
2886
2875
- def makeServer (socket ) :
2887
+ def makeServer (socket : socket ) -> Connection :
2876
2888
server = Connection (ctx , socket )
2877
2889
server .set_accept_state ()
2878
2890
return server
@@ -2881,7 +2893,7 @@ def makeServer(socket):
2881
2893
originalSession = originalClient .get_session ()
2882
2894
assert originalSession is not None
2883
2895
2884
- def makeClient (socket ) :
2896
+ def makeClient (socket : socket ) -> Connection :
2885
2897
client = loopback_client_factory (socket )
2886
2898
client .set_session (originalSession )
2887
2899
return client
@@ -2914,12 +2926,12 @@ def test_set_session_wrong_method(self) -> None:
2914
2926
ctx .use_certificate (cert )
2915
2927
ctx .set_session_id (b"unity-test" )
2916
2928
2917
- def makeServer (socket ) :
2929
+ def makeServer (socket : socket ) -> Connection :
2918
2930
server = Connection (ctx , socket )
2919
2931
server .set_accept_state ()
2920
2932
return server
2921
2933
2922
- def makeOriginalClient (socket ) :
2934
+ def makeOriginalClient (socket : socket ) -> Connection :
2923
2935
client = Connection (Context (v1 ), socket )
2924
2936
client .set_connect_state ()
2925
2937
return client
@@ -2930,7 +2942,7 @@ def makeOriginalClient(socket):
2930
2942
originalSession = originalClient .get_session ()
2931
2943
assert originalSession is not None
2932
2944
2933
- def makeClient (socket ) :
2945
+ def makeClient (socket : socket ) -> Connection :
2934
2946
# Intentionally use a different, incompatible method here.
2935
2947
client = Connection (Context (v2 ), socket )
2936
2948
client .set_connect_state ()
@@ -3193,7 +3205,7 @@ class VeryLarge(bytes):
3193
3205
Mock object so that we don't have to allocate 2**31 bytes
3194
3206
"""
3195
3207
3196
- def __len__ (self ):
3208
+ def __len__ (self ) -> int :
3197
3209
return 2 ** 31
3198
3210
3199
3211
@@ -3275,7 +3287,7 @@ def test_buf_too_large(self) -> None:
3275
3287
exc_info .match (r"Cannot send more than .+ bytes at once" )
3276
3288
3277
3289
3278
- def _make_memoryview (size ) :
3290
+ def _make_memoryview (size : int ) -> memoryview :
3279
3291
"""
3280
3292
Create a new ``memoryview`` wrapped around a ``bytearray`` of the given
3281
3293
size.
@@ -3933,7 +3945,7 @@ def test_set_empty_ca_list(self) -> None:
3933
3945
after the connection is set up.
3934
3946
"""
3935
3947
3936
- def no_ca (ctx ) :
3948
+ def no_ca (ctx : Context ) -> list [ X509Name ] :
3937
3949
ctx .set_client_ca_list ([])
3938
3950
return []
3939
3951
@@ -3950,7 +3962,7 @@ def test_set_one_ca_list(self) -> None:
3950
3962
cacert = load_certificate (FILETYPE_PEM , root_cert_pem )
3951
3963
cadesc = cacert .get_subject ()
3952
3964
3953
- def single_ca (ctx ) :
3965
+ def single_ca (ctx : Context ) -> list [ X509Name ] :
3954
3966
ctx .set_client_ca_list ([cadesc ])
3955
3967
return [cadesc ]
3956
3968
@@ -3970,7 +3982,7 @@ def test_set_multiple_ca_list(self) -> None:
3970
3982
sedesc = secert .get_subject ()
3971
3983
cldesc = clcert .get_subject ()
3972
3984
3973
- def multiple_ca (ctx ) :
3985
+ def multiple_ca (ctx : Context ) -> list [ X509Name ] :
3974
3986
L = [sedesc , cldesc ]
3975
3987
ctx .set_client_ca_list (L )
3976
3988
return L
@@ -3991,7 +4003,7 @@ def test_reset_ca_list(self) -> None:
3991
4003
sedesc = secert .get_subject ()
3992
4004
cldesc = clcert .get_subject ()
3993
4005
3994
- def changed_ca (ctx ) :
4006
+ def changed_ca (ctx : Context ) -> list [ X509Name ] :
3995
4007
ctx .set_client_ca_list ([sedesc , cldesc ])
3996
4008
ctx .set_client_ca_list ([cadesc ])
3997
4009
return [cadesc ]
@@ -4010,7 +4022,7 @@ def test_mutated_ca_list(self) -> None:
4010
4022
cadesc = cacert .get_subject ()
4011
4023
sedesc = secert .get_subject ()
4012
4024
4013
- def mutated_ca (ctx ) :
4025
+ def mutated_ca (ctx : Context ) -> list [ X509Name ] :
4014
4026
L = [cadesc ]
4015
4027
ctx .set_client_ca_list ([cadesc ])
4016
4028
L .append (sedesc )
@@ -4035,7 +4047,7 @@ def test_one_add_client_ca(self) -> None:
4035
4047
cacert = load_certificate (FILETYPE_PEM , root_cert_pem )
4036
4048
cadesc = cacert .get_subject ()
4037
4049
4038
- def single_ca (ctx ) :
4050
+ def single_ca (ctx : Context ) -> list [ X509Name ] :
4039
4051
ctx .add_client_ca (cacert )
4040
4052
return [cadesc ]
4041
4053
@@ -4052,7 +4064,7 @@ def test_multiple_add_client_ca(self) -> None:
4052
4064
cadesc = cacert .get_subject ()
4053
4065
sedesc = secert .get_subject ()
4054
4066
4055
- def multiple_ca (ctx ) :
4067
+ def multiple_ca (ctx : Context ) -> list [ X509Name ] :
4056
4068
ctx .add_client_ca (cacert )
4057
4069
ctx .add_client_ca (secert .to_cryptography ())
4058
4070
return [cadesc , sedesc ]
@@ -4073,7 +4085,7 @@ def test_set_and_add_client_ca(self) -> None:
4073
4085
sedesc = secert .get_subject ()
4074
4086
cldesc = clcert .get_subject ()
4075
4087
4076
- def mixed_set_add_ca (ctx ) :
4088
+ def mixed_set_add_ca (ctx : Context ) -> list [ X509Name ] :
4077
4089
ctx .set_client_ca_list ([cadesc , sedesc ])
4078
4090
ctx .add_client_ca (clcert )
4079
4091
return [cadesc , sedesc , cldesc ]
@@ -4093,7 +4105,7 @@ def test_set_after_add_client_ca(self) -> None:
4093
4105
cadesc = cacert .get_subject ()
4094
4106
sedesc = secert .get_subject ()
4095
4107
4096
- def set_replaces_add_ca (ctx ) :
4108
+ def set_replaces_add_ca (ctx : Context ) -> list [ X509Name ] :
4097
4109
ctx .add_client_ca (clcert .to_cryptography ())
4098
4110
ctx .set_client_ca_list ([cadesc ])
4099
4111
ctx .add_client_ca (secert )
@@ -4253,7 +4265,9 @@ def test_client_negotiates_without_server(self) -> None:
4253
4265
"""
4254
4266
called = []
4255
4267
4256
- def ocsp_callback (conn , ocsp_data , ignored ):
4268
+ def ocsp_callback (
4269
+ conn : Connection , ocsp_data : bytes , ignored : None
4270
+ ) -> bool :
4257
4271
called .append (ocsp_data )
4258
4272
return True
4259
4273
@@ -4273,7 +4287,9 @@ def test_client_receives_servers_data(self) -> None:
4273
4287
def server_callback (* args , ** kwargs ):
4274
4288
return self .sample_ocsp_data
4275
4289
4276
- def client_callback (conn , ocsp_data , ignored ):
4290
+ def client_callback (
4291
+ conn : Connection , ocsp_data : bytes , ignored : None
4292
+ ) -> bool :
4277
4293
calls .append (ocsp_data )
4278
4294
return True
4279
4295
@@ -4347,7 +4363,9 @@ def test_server_returns_empty_string(self) -> None:
4347
4363
def server_callback (* args ):
4348
4364
return b""
4349
4365
4350
- def client_callback (conn , ocsp_data , ignored ):
4366
+ def client_callback (
4367
+ conn : Connection , ocsp_data : bytes , ignored : None
4368
+ ) -> bool :
4351
4369
client_calls .append (ocsp_data )
4352
4370
return True
4353
4371
@@ -4509,10 +4527,10 @@ class TestDTLS:
4509
4527
def _test_handshake_and_data (self , srtp_profile : bytes | None ) -> None :
4510
4528
s_ctx = Context (DTLS_METHOD )
4511
4529
4512
- def generate_cookie (ssl ) :
4530
+ def generate_cookie (ssl : Connection ) -> bytes :
4513
4531
return b"xyzzy"
4514
4532
4515
- def verify_cookie (ssl , cookie ) :
4533
+ def verify_cookie (ssl : Connection , cookie : bytes ) -> bool :
4516
4534
return cookie == b"xyzzy"
4517
4535
4518
4536
s_ctx .set_cookie_generate_callback (generate_cookie )
0 commit comments