diff --git a/oss2/api.py b/oss2/api.py index 8562bfdf..f639ce96 100644 --- a/oss2/api.py +++ b/oss2/api.py @@ -119,7 +119,6 @@ def progress_callback(bytes_consumed, total_bytes): from .models import * from .compat import urlquote, urlparse, to_unicode, to_string -import time import shutil import oss2.utils @@ -247,6 +246,8 @@ class Bucket(_Base): STATUS = 'status' VOD = 'vod' SYMLINK = 'symlink' + QOS = 'qos' + def __init__(self, auth, endpoint, bucket_name, is_cname=False, @@ -828,6 +829,37 @@ def delete_bucket(self): resp = self.__do_bucket('DELETE') return RequestResult(resp) + def head_bucket(self): + """探测一个Bucket是否存在。 + + :return: :class:`HeadBucketResult ` + """ + resp = self.__do_bucket('HEAD') + return HeadBucketResult(resp) + + def put_bucket_storage_capacity(self, num_giga_bytes): + """设置Bucket容量限额。 + + :param int num_giga_bytes: 容量限制,以GB为单位;-1表示没有容量限制。 + + :return: :class:`RequestResult ` + """ + input = BucketQos(storage_capacity=num_giga_bytes) + data = self.__convert_data(BucketQos, xml_utils.to_bucket_qos, input) + + resp = self.__do_bucket('PUT', data=data, params={Bucket.QOS: ''}) + return RequestResult(resp) + + def get_bucket_storage_capacity(self): + """获取Bucket容量限额。 + + :return: 返回当前Bucket容量限额,以GB为单位。如果没有限制则返回-1。 + """ + resp = self.__do_bucket('GET', params={Bucket.QOS: ''}) + bucket_qos = self._parse_result(resp, xml_utils.parse_get_bucket_qos, GetBucketQosResult) + + return bucket_qos.storage_capacity + def put_bucket_acl(self, permission): """设置Bucket的ACL。 diff --git a/oss2/models.py b/oss2/models.py index 3c617ac3..6c3229c2 100644 --- a/oss2/models.py +++ b/oss2/models.py @@ -310,6 +310,13 @@ def __init__(self, resp): BUCKET_ACL_PUBLIC_READ_WRITE = 'public-read-write' +class HeadBucketResult(RequestResult): + def __init__(self, resp): + super(HeadBucketResult, self).__init__(resp) + + self.location = _hget(self.headers, 'x-oss-bucket-region') + + class GetBucketAclResult(RequestResult): def __init__(self, resp): super(GetBucketAclResult, self).__init__(resp) @@ -430,6 +437,22 @@ def __init__(self, resp): BucketLifecycle.__init__(self) +class BucketQos(object): + """Bucket QoS设置。 + + :param int storage_capacity: 容量限额,单位为GB。 + """ + def __init__(self, storage_capacity=-1): + self.storage_capacity = storage_capacity + + +class GetBucketQosResult(RequestResult): + def __init__(self, resp): + super(GetBucketQosResult, self).__init__(resp) + + self.storage_capacity = -1 + + class CorsRule(object): """CORS(跨域资源共享)规则。 @@ -701,6 +724,7 @@ def __init__(self, resp): RequestResult.__init__(self, resp) LiveChannelStat.__init__(self) + class GetLiveChannelHistoryResult(RequestResult, LiveChannelHistory): def __init__(self, resp): RequestResult.__init__(self, resp) diff --git a/oss2/xml_utils.py b/oss2/xml_utils.py index 54b0f184..f12da252 100644 --- a/oss2/xml_utils.py +++ b/oss2/xml_utils.py @@ -386,6 +386,14 @@ def parse_get_bucket_cors(result, body): return result +def parse_get_bucket_qos(result, body): + root = ElementTree.fromstring(body) + + result.storage_capacity = _find_int(root, 'StorageCapacity') + + return result + + def to_complete_upload_request(parts): root = ElementTree.Element('CompleteMultipartUpload') for p in parts: @@ -479,6 +487,7 @@ def to_put_bucket_cors(bucket_cors): return _node_to_string(root) + def to_create_live_channel(live_channel): root = ElementTree.Element('LiveChannelConfiguration') @@ -492,3 +501,11 @@ def to_create_live_channel(live_channel): _add_text_child(target_node, 'PlaylistName', str(live_channel.target.playlist_name)) return _node_to_string(root) + + +def to_bucket_qos(bucket_qos): + root = ElementTree.Element('BucketUserQos') + + _add_text_child(root, 'StorageCapacity', str(bucket_qos.storage_capacity)) + + return _node_to_string(root) \ No newline at end of file diff --git a/tests/test_bucket.py b/tests/test_bucket.py index bf7c1410..cfe03ade 100644 --- a/tests/test_bucket.py +++ b/tests/test_bucket.py @@ -245,6 +245,29 @@ def test_xml_input_output(self): self.assertEqual(result.allow_empty_referer, True) self.assertEqual(result.referers[0], to_string(u'阿里云')) + def test_head_bucket_normal(self): + head_result = self.bucket.head_bucket() + get_result = self.bucket.get_bucket_location() + + self.assertEqual(head_result.location, get_result.location) + + def test_head_bucket_not_exist(self): + auth = oss2.Auth(OSS_ID, OSS_SECRET) + bucket = oss2.Bucket(auth, OSS_ENDPOINT, random_string(63).lower()) + + self.assertRaises(oss2.exceptions.NotFound, bucket.head_bucket) + + def test_bucket_storage_capacity(self): + old_capacity = self.bucket.get_bucket_storage_capacity() + + if old_capacity == -1: + new_capacity = 100 + else: + new_capacity = old_capacity + 100 + + self.bucket.put_bucket_storage_capacity(new_capacity) + self.assertEqual(new_capacity, self.bucket.get_bucket_storage_capacity()) + if __name__ == '__main__': unittest.main()