77
88import pyarrow
99import pytest
10+ from pyarrow ._flight import FlightError
1011
1112from influxdb_client_3 import InfluxDBClient3 , InfluxDBError , write_client_options , WriteOptions
1213
@@ -122,7 +123,6 @@ def error(conf, data, exception: InfluxDBError):
122123 database = self .database ,
123124 token = self .token ,
124125 write_client_options = wc_opts ) as w_client :
125-
126126 for i in range (0 , data_set_size ):
127127 w_client .write (f'{ measurement } ,location=harfa val={ i } i { now - (i * 1_000_000_000 )} ' )
128128
@@ -134,7 +134,6 @@ def error(conf, data, exception: InfluxDBError):
134134 database = self .database ,
135135 token = self .token ,
136136 write_client_options = wc_opts ) as r_client :
137-
138137 query = f"SELECT * FROM \" { measurement } \" WHERE time >= now() - interval '3 minute'"
139138 reader : pyarrow .Table = r_client .query (query )
140139 list_results = reader .to_pylist ()
@@ -165,7 +164,6 @@ def test_batch_write_closed(self):
165164 token = self .token ,
166165 write_client_options = wc_opts ,
167166 debug = True ) as w_client :
168-
169167 for i in range (0 , data_size ):
170168 w_client .write (f'{ measurement } ,location=harfa val={ i } i { now - (i * 1_000_000_000 )} ' )
171169
@@ -177,10 +175,77 @@ def test_batch_write_closed(self):
177175 database = self .database ,
178176 token = self .token ,
179177 write_client_options = wc_opts ) as r_client :
180-
181178 logging .info ("PREPARING QUERY" )
182179
183180 query = f"SELECT * FROM \" { measurement } \" WHERE time >= now() - interval '3 hours'"
184181 reader : pyarrow .Table = r_client .query (query , mode = "" )
185182 list_results = reader .to_pylist ()
186183 self .assertEqual (data_size , len (list_results ))
184+
185+ test_cert = """-----BEGIN CERTIFICATE-----
186+ MIIDUzCCAjugAwIBAgIUZB55ULutbc9gy6xLp1BkTQU7siowDQYJKoZIhvcNAQEL
187+ BQAwNjE0MDIGA1UEAwwraW5mbHV4ZGIzLWNsdXN0ZXJlZC1zd2FuLmJyYW1ib3Jh
188+ LnpvbmEtYi5ldTAeFw0yNTAyMTgxNTIyMTJaFw0yNjAyMTgxNTIyMTJaMDYxNDAy
189+ BgNVBAMMK2luZmx1eGRiMy1jbHVzdGVyZWQtc3dhbi5icmFtYm9yYS56b25hLWIu
190+ ZXUwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCugeNrx0ZfyyP8H4e0
191+ zDSkKWnEXlVdjMi+ZSHhMbjvvqMkUQGLc/W59AEmMJ0Uiljka9d+F7jdu+oqDq9p
192+ 4kGPhO3Oh7zIG0IGbncj8AwIXMGDNkNyL8s7C1+LoYotlSWDpWwkEKXUeAzdqS63
193+ CSJFqSJM2dss8qe9BpM6zHWJAKS1I30QT3SXQFEsF5m2F62dXCEEI6pO7jlik8/w
194+ aI47dTM20QyimVzea48SC/ELO/T4AjbmMeBGlTyCm39KOElOKRTJvB4KESEWaL3r
195+ EvPZbTh+72PUyrjxiDa56+RmtDPo7EN3uxuRVFX/HWiNnFk7orQLKZg5Kr8wE46R
196+ KmVvAgMBAAGjWTBXMDYGA1UdEQQvMC2CK2luZmx1eGRiMy1jbHVzdGVyZWQtc3dh
197+ bi5icmFtYm9yYS56b25hLWIuZXUwHQYDVR0OBBYEFH8et6JCzGD7Ny84aNRtq5Nj
198+ hvS/MA0GCSqGSIb3DQEBCwUAA4IBAQCuDwARea/Xr3+hmte9A0H+XB8wMPAJ64e8
199+ QA0qi0oy0gGdLfQHhsBWWmKSYLv7HygTNzb+7uFOTtq1UPLt18F+POPeLIj74QZV
200+ z89Pbo1TwUMzQ2pgbu0yRvraXIpqXGrPm5GWYp5mopX0rBWKdimbmEMkhZA0sVeH
201+ IdKIRUY6EyIVG+Z/nbuVqUlgnIWOMp0yg4RRC91zHy3Xvykf3Vai25H/jQpa6cbU
202+ //MIodzUIqT8Tja5cHXE51bLdUkO1rtNKdM7TUdjzkZ+bAOpqKl+c0FlYZI+F7Ly
203+ +MdCcNgKFc8o8jGiyP6uyAJeg+tSICpFDw00LyuKmU62c7VKuyo7
204+ -----END CERTIFICATE-----"""
205+
206+ def create_test_cert (self , cert_file ):
207+ f = open (cert_file , "w" )
208+ f .write (self .test_cert )
209+ f .close ()
210+
211+ def remove_test_cert (self , cert_file ):
212+ os .remove (cert_file )
213+
214+ def test_queries_w_bad_cert (self ):
215+ cert_file = "test_cert.pem"
216+ self .create_test_cert (cert_file )
217+ with InfluxDBClient3 (host = self .host ,
218+ database = self .database ,
219+ token = self .token ,
220+ verify_ssl = True ,
221+ ssl_ca_cert = cert_file ,
222+ debug = True ) as client :
223+ try :
224+ query = "SELECT table_name FROM information_schema.tables"
225+ client .query (query , mode = "" )
226+ assert False , "query should throw SSL_ERROR"
227+ except FlightError as fe :
228+ assert str (fe ).__contains__ ('SSL_ERROR_SSL' )
229+ finally :
230+ self .remove_test_cert (cert_file )
231+
232+ def test_verify_ssl_false (self ):
233+ cert_file = "test_cert.pem"
234+ self .create_test_cert (cert_file )
235+ measurement = f'test{ random_hex (6 )} '
236+
237+ with InfluxDBClient3 (host = self .host ,
238+ database = self .database ,
239+ token = self .token ,
240+ verify_ssl = False ,
241+ ssl_ca_cert = cert_file ,
242+ debug = True ) as client :
243+ try :
244+ now = time .time_ns ()
245+ client .write (f'{ measurement } ,location=harfa val=42i { now - 1_000_000_000 } ' )
246+ query = f"SELECT * FROM \" { measurement } \" "
247+ reader : pyarrow .Table = client .query (query , mode = "" )
248+ list_results = reader .to_pylist ()
249+ assert len (list_results ) > 0
250+ finally :
251+ self .remove_test_cert (cert_file )
0 commit comments