1
1
from . import S3_CONN_INFO
2
-
2
+ import time
3
3
from minio import Minio
4
+ from minio .objectlockconfig import ObjectLockConfig
5
+ from minio .commonconfig import COMPLIANCE , ENABLED , GOVERNANCE
4
6
import urllib3
5
7
import certifi
6
- from nose .tools import assert_true
8
+ from nose .tools import assert_true , raises
7
9
10
+ from .schema_external import schema , SimpleRemote
11
+ from datajoint .errors import DataJointError
8
12
9
13
class TestS3 :
10
14
@@ -29,7 +33,7 @@ def test_connection():
29
33
http_client = http_client )
30
34
31
35
assert_true (minio_client .bucket_exists (S3_CONN_INFO ['bucket' ]))
32
-
36
+
33
37
@staticmethod
34
38
def test_connection_secure ():
35
39
@@ -52,6 +56,31 @@ def test_connection_secure():
52
56
53
57
assert_true (minio_client .bucket_exists (S3_CONN_INFO ['bucket' ]))
54
58
@staticmethod
55
- def test_remove_object ():
59
+ @raises (DataJointError )
60
+ def test_remove_object_exception ():
56
61
#https://github.com/datajoint/datajoint-python/issues/952
57
- raise Exception ("test" )
62
+
63
+ # Initialize httpClient with relevant timeout.
64
+ http_client = urllib3 .PoolManager (
65
+ timeout = 30 , cert_reqs = 'CERT_REQUIRED' ,
66
+ ca_certs = certifi .where (),
67
+ retries = urllib3 .Retry (total = 3 , backoff_factor = 0.2 ,
68
+ status_forcelist = [
69
+ 500 , 502 ,
70
+ 503 , 504 ]))
71
+
72
+ # Initialize minioClient with an endpoint and access/secret keys.
73
+ minio_client = Minio (
74
+ S3_CONN_INFO ['endpoint' ],
75
+ access_key = S3_CONN_INFO ['access_key' ],
76
+ secret_key = S3_CONN_INFO ['secret_key' ],
77
+ secure = True ,
78
+ http_client = http_client )
79
+
80
+
81
+ schema .external ['share' ].s3 .bucket = ''
82
+ schema .external ['share' ].s3 .remove_object ('test' )
83
+
84
+
85
+
86
+
0 commit comments