Skip to content

Commit 1496ed5

Browse files
authored
Merge pull request #306 from tencentyun/feature_jojoliang_bbd92f3d
Feature jojoliang bbd92f3d
2 parents 9ab0722 + 7311ae1 commit 1496ed5

File tree

3 files changed

+408
-0
lines changed

3 files changed

+408
-0
lines changed

example/object/put_from_url.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/url"
7+
"os"
8+
9+
"net/http"
10+
11+
"github.com/tencentyun/cos-go-sdk-v5"
12+
"github.com/tencentyun/cos-go-sdk-v5/debug"
13+
)
14+
15+
func logStatus(err error) {
16+
if err == nil {
17+
return
18+
}
19+
if cos.IsNotFoundError(err) {
20+
// WARN
21+
fmt.Println("WARN: Resource is not existed")
22+
} else if e, ok := cos.IsCOSError(err); ok {
23+
fmt.Printf("ERROR: Code: %v\n", e.Code)
24+
fmt.Printf("ERROR: Message: %v\n", e.Message)
25+
fmt.Printf("ERROR: Resource: %v\n", e.Resource)
26+
fmt.Printf("ERROR: RequestId: %v\n", e.RequestID)
27+
// ERROR
28+
} else {
29+
fmt.Printf("ERROR: %v\n", err)
30+
// ERROR
31+
}
32+
}
33+
34+
func main() {
35+
// 存储桶名称,由bucketname-appid 组成,appid必须填入,可以在COS控制台查看存储桶名称。 https://console.cloud.tencent.com/cos5/bucket
36+
// 替换为用户的 region,存储桶region可以在COS控制台“存储桶概览”查看 https://console.cloud.tencent.com/ ,关于地域的详情见 https://cloud.tencent.com/document/product/436/6224 。
37+
u, _ := url.Parse("http://test-1259654469.cos.ap-guangzhou.myqcloud.com")
38+
b := &cos.BaseURL{BucketURL: u}
39+
c := cos.NewClient(b, &http.Client{
40+
Transport: &cos.AuthorizationTransport{
41+
// 通过环境变量获取密钥
42+
// 环境变量 SECRETID 表示用户的 SecretId,登录访问管理控制台查看密钥,https://console.cloud.tencent.com/cam/capi
43+
SecretID: os.Getenv("SECRETID"),
44+
// 环境变量 SECRETKEY 表示用户的 SecretKey,登录访问管理控制台查看密钥,https://console.cloud.tencent.com/cam/capi
45+
SecretKey: os.Getenv("SECRETKEY"),
46+
// Debug 模式,把对应 请求头部、请求内容、响应头部、响应内容 输出到标准输出
47+
Transport: &debug.DebugRequestTransport{
48+
RequestHeader: true,
49+
// Notice when put a large file and set need the request body, might happend out of memory error.
50+
RequestBody: false,
51+
ResponseHeader: true,
52+
ResponseBody: true,
53+
},
54+
},
55+
})
56+
57+
// Case1 上传对象
58+
name := "example"
59+
opt := &cos.ObjectPutFromURLOptions{
60+
PartSize: 1,
61+
InitOptions: &cos.InitiateMultipartUploadOptions{
62+
ACLHeaderOptions: &cos.ACLHeaderOptions{
63+
XCosACL: "private",
64+
},
65+
},
66+
}
67+
_, _, err := c.Object.PutFromURL(context.Background(), name, "https://test-1259654469.cos.ap-guangzhou.myqcloud.com/pub", opt)
68+
logStatus(err)
69+
}

object.go

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1991,3 +1991,192 @@ func (s *ObjectService) GetSymlink(ctx context.Context, name string, opt *Object
19911991
}
19921992
return resp.Header.Get("x-cos-symlink-target"), resp, err
19931993
}
1994+
1995+
type ObjectPutFromURLOptions struct {
1996+
PartSize int
1997+
QueueSize int
1998+
InitOptions *InitiateMultipartUploadOptions
1999+
}
2000+
2001+
func (s *ObjectService) PutFromURL(ctx context.Context, name string, downloadURL string, opt *ObjectPutFromURLOptions) (*CompleteMultipartUploadResult, *Response, error) {
2002+
if opt == nil {
2003+
opt = &ObjectPutFromURLOptions{}
2004+
}
2005+
// init
2006+
v, resp, err := s.InitiateMultipartUpload(ctx, name, opt.InitOptions)
2007+
if err != nil {
2008+
return nil, resp, err
2009+
}
2010+
uploadId := v.UploadID
2011+
var isErr bool
2012+
defer func() {
2013+
if isErr {
2014+
s.AbortMultipartUpload(ctx, name, uploadId, nil)
2015+
}
2016+
}()
2017+
// request from url
2018+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, downloadURL, nil)
2019+
if err != nil {
2020+
isErr = true
2021+
return nil, nil, err
2022+
}
2023+
rsp, err := http.DefaultClient.Do(req)
2024+
if err != nil || rsp == nil {
2025+
isErr = true
2026+
return nil, nil, err
2027+
}
2028+
defer rsp.Body.Close()
2029+
if rsp.StatusCode > 299 {
2030+
isErr = true
2031+
return nil, &Response{rsp}, fmt.Errorf("the status code of downloadURL response is failed: %d", rsp.StatusCode)
2032+
}
2033+
factory := newPartFactory(opt.PartSize, opt.QueueSize)
2034+
partChannel, errChannel := factory.Produce(rsp.Body)
2035+
defer factory.Close()
2036+
2037+
comOpt := &CompleteMultipartUploadOptions{}
2038+
var partNumber int
2039+
for {
2040+
select {
2041+
case part, ok := <-partChannel:
2042+
if !ok {
2043+
partChannel = nil
2044+
break
2045+
}
2046+
partNumber++
2047+
resp, err := s.UploadPart(ctx, name, uploadId, partNumber, part, nil)
2048+
if err != nil {
2049+
isErr = true
2050+
return nil, resp, err
2051+
}
2052+
comOpt.Parts = append(comOpt.Parts, Object{
2053+
PartNumber: partNumber,
2054+
ETag: resp.Header.Get("ETag"),
2055+
})
2056+
case err, ok := <-errChannel:
2057+
if !ok {
2058+
errChannel = nil
2059+
break
2060+
}
2061+
if err != nil {
2062+
isErr = true
2063+
return nil, nil, err
2064+
}
2065+
}
2066+
if partChannel == nil && errChannel == nil {
2067+
break
2068+
}
2069+
}
2070+
res, resp, err := s.CompleteMultipartUpload(ctx, name, uploadId, comOpt)
2071+
if err != nil {
2072+
isErr = true
2073+
}
2074+
return res, resp, err
2075+
}
2076+
2077+
type partFactory struct {
2078+
partSize int
2079+
queueSize int
2080+
current *bytes.Buffer
2081+
partChannel chan *bytes.Buffer
2082+
errChannel chan error
2083+
cancelChannel chan struct{}
2084+
}
2085+
2086+
const CHUNK_SIZE = 1024 * 1024
2087+
2088+
func newPartFactory(partSize int, queueSize int) *partFactory {
2089+
if partSize <= 0 {
2090+
partSize = 8
2091+
}
2092+
if queueSize <= 0 {
2093+
queueSize = 10
2094+
}
2095+
return &partFactory{
2096+
partSize: partSize * 1024 * 1024,
2097+
queueSize: queueSize,
2098+
current: bytes.NewBuffer(nil),
2099+
}
2100+
}
2101+
2102+
func (pf *partFactory) Produce(reader io.ReadCloser) (<-chan *bytes.Buffer, <-chan error) {
2103+
pf.cancelChannel = make(chan struct{}, 1)
2104+
pf.partChannel = make(chan *bytes.Buffer, pf.queueSize)
2105+
pf.errChannel = make(chan error, 1)
2106+
2107+
go pf.Run(reader)
2108+
return pf.partChannel, pf.errChannel
2109+
}
2110+
2111+
func (pf *partFactory) Close() {
2112+
pf.cancelChannel <- struct{}{}
2113+
}
2114+
2115+
func (pf *partFactory) Run(reader io.ReadCloser) {
2116+
var total, parts int
2117+
defer func() {
2118+
close(pf.errChannel)
2119+
close(pf.partChannel)
2120+
}()
2121+
buf := make([]byte, CHUNK_SIZE)
2122+
for {
2123+
select {
2124+
case <-pf.cancelChannel:
2125+
return
2126+
default:
2127+
n, err := reader.Read(buf)
2128+
total += n
2129+
if n > 0 {
2130+
part, e := pf.Write(buf[:n])
2131+
if e != nil {
2132+
pf.errChannel <- e
2133+
return
2134+
}
2135+
if part != nil {
2136+
parts++
2137+
select {
2138+
case pf.partChannel <- part:
2139+
case <-pf.cancelChannel:
2140+
return
2141+
}
2142+
}
2143+
}
2144+
if err != nil && err != io.EOF {
2145+
pf.errChannel <- err
2146+
return
2147+
}
2148+
if err == io.EOF || n == 0 {
2149+
if pf.current.Len() > 0 {
2150+
parts++
2151+
select {
2152+
case pf.partChannel <- pf.current:
2153+
case <-pf.cancelChannel:
2154+
return
2155+
}
2156+
}
2157+
return
2158+
}
2159+
}
2160+
}
2161+
}
2162+
2163+
func (pf *partFactory) Write(p []byte) (*bytes.Buffer, error) {
2164+
var res *bytes.Buffer
2165+
for nwrite := 0; nwrite < len(p); {
2166+
if pf.current.Len() == pf.partSize {
2167+
res = pf.current
2168+
pf.current = bytes.NewBuffer(nil)
2169+
}
2170+
end := len(p)
2171+
// 大于缓存区大小
2172+
if pf.current.Len()+end-nwrite > pf.partSize {
2173+
end = nwrite + pf.partSize - pf.current.Len()
2174+
}
2175+
nr, err := pf.current.Write(p[nwrite:end])
2176+
if err != nil {
2177+
return res, err
2178+
}
2179+
nwrite += nr
2180+
}
2181+
return res, nil
2182+
}

0 commit comments

Comments
 (0)