Skip to content

Commit 7c2ce61

Browse files
authored
use invalid arg code for upload failures (#692)
1 parent 6004715 commit 7c2ce61

File tree

6 files changed

+27
-30
lines changed

6 files changed

+27
-30
lines changed

pkg/errors/errors.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func ErrParticipantNotFound(identity string) error {
143143
// This can have many reasons, some related to invalid parameters, other because of system failure.
144144
// Do not provide an error code until we have code to analyze the error from the underlying upload library further.
145145
func ErrUploadFailed(location string, err error) error {
146-
return psrpc.NewErrorf(psrpc.Unknown, "%s upload failed: %v", location, err)
146+
return psrpc.NewErrorf(psrpc.InvalidArgument, "%s upload failed: %v", location, err)
147147
}
148148

149149
func ErrCPUExhausted(usage float64) error {

pkg/pipeline/sink/uploader/alioss.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
"github.com/aliyun/aliyun-oss-go-sdk/oss"
2222

23+
"github.com/livekit/egress/pkg/errors"
2324
"github.com/livekit/egress/pkg/types"
2425
"github.com/livekit/protocol/livekit"
2526
)
@@ -37,22 +38,22 @@ func newAliOSSUploader(conf *livekit.AliOSSUpload) (uploader, error) {
3738
func (u *AliOSSUploader) upload(localFilePath, requestedPath string, _ types.OutputType) (string, int64, error) {
3839
stat, err := os.Stat(localFilePath)
3940
if err != nil {
40-
return "", 0, wrap("AliOSS", err)
41+
return "", 0, errors.ErrUploadFailed("AliOSS", err)
4142
}
4243

4344
client, err := oss.New(u.conf.Endpoint, u.conf.AccessKey, u.conf.Secret)
4445
if err != nil {
45-
return "", 0, wrap("AliOSS", err)
46+
return "", 0, errors.ErrUploadFailed("AliOSS", err)
4647
}
4748

4849
bucket, err := client.Bucket(u.conf.Bucket)
4950
if err != nil {
50-
return "", 0, wrap("AliOSS", err)
51+
return "", 0, errors.ErrUploadFailed("AliOSS", err)
5152
}
5253

5354
err = bucket.PutObjectFromFile(requestedPath, localFilePath)
5455
if err != nil {
55-
return "", 0, wrap("AliOSS", err)
56+
return "", 0, errors.ErrUploadFailed("AliOSS", err)
5657
}
5758

5859
return fmt.Sprintf("https://%s.%s/%s", u.conf.Bucket, u.conf.Endpoint, requestedPath), stat.Size(), nil

pkg/pipeline/sink/uploader/azure.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"github.com/Azure/azure-storage-blob-go/azblob"
2424

25+
"github.com/livekit/egress/pkg/errors"
2526
"github.com/livekit/egress/pkg/types"
2627
"github.com/livekit/protocol/livekit"
2728
)
@@ -44,12 +45,12 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType
4445
u.conf.AccountKey,
4546
)
4647
if err != nil {
47-
return "", 0, wrap("Azure", err)
48+
return "", 0, errors.ErrUploadFailed("Azure", err)
4849
}
4950

5051
azUrl, err := url.Parse(u.container)
5152
if err != nil {
52-
return "", 0, wrap("Azure", err)
53+
return "", 0, errors.ErrUploadFailed("Azure", err)
5354
}
5455

5556
pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{
@@ -65,15 +66,15 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType
6566

6667
file, err := os.Open(localFilepath)
6768
if err != nil {
68-
return "", 0, wrap("Azure", err)
69+
return "", 0, errors.ErrUploadFailed("Azure", err)
6970
}
7071
defer func() {
7172
_ = file.Close()
7273
}()
7374

7475
stat, err := file.Stat()
7576
if err != nil {
76-
return "", 0, wrap("Azure", err)
77+
return "", 0, errors.ErrUploadFailed("Azure", err)
7778
}
7879

7980
// upload blocks in parallel for optimal performance
@@ -84,7 +85,7 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType
8485
Parallelism: 16,
8586
})
8687
if err != nil {
87-
return "", 0, wrap("Azure", err)
88+
return "", 0, errors.ErrUploadFailed("Azure", err)
8889
}
8990

9091
return fmt.Sprintf("%s/%s", u.container, storageFilepath), stat.Size(), nil

pkg/pipeline/sink/uploader/gcp.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/googleapis/gax-go/v2"
2828
"google.golang.org/api/option"
2929

30+
"github.com/livekit/egress/pkg/errors"
3031
"github.com/livekit/egress/pkg/types"
3132
"github.com/livekit/protocol/livekit"
3233
)
@@ -77,15 +78,15 @@ func newGCPUploader(conf *livekit.GCPUpload) (uploader, error) {
7778
func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.OutputType) (string, int64, error) {
7879
file, err := os.Open(localFilepath)
7980
if err != nil {
80-
return "", 0, wrap("GCP", err)
81+
return "", 0, errors.ErrUploadFailed("GCP", err)
8182
}
8283
defer func() {
8384
_ = file.Close()
8485
}()
8586

8687
stat, err := file.Stat()
8788
if err != nil {
88-
return "", 0, wrap("GCP", err)
89+
return "", 0, errors.ErrUploadFailed("GCP", err)
8990
}
9091

9192
wc := u.client.Bucket(u.conf.Bucket).Object(storageFilepath).Retryer(
@@ -100,11 +101,11 @@ func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.Outp
100101
wc.ChunkRetryDeadline = 0
101102

102103
if _, err = io.Copy(wc, file); err != nil {
103-
return "", 0, wrap("GCP", err)
104+
return "", 0, errors.ErrUploadFailed("GCP", err)
104105
}
105106

106107
if err = wc.Close(); err != nil {
107-
return "", 0, wrap("GCP", err)
108+
return "", 0, errors.ErrUploadFailed("GCP", err)
108109
}
109110

110111
return fmt.Sprintf("https://%s.storage.googleapis.com/%s", u.conf.Bucket, storageFilepath), stat.Size(), nil

pkg/pipeline/sink/uploader/s3.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/aws/aws-sdk-go/service/s3/s3manager"
3131

3232
"github.com/livekit/egress/pkg/config"
33+
"github.com/livekit/egress/pkg/errors"
3334
"github.com/livekit/egress/pkg/types"
3435
"github.com/livekit/protocol/logger"
3536
"github.com/livekit/psrpc"
@@ -159,11 +160,11 @@ func (u *S3Uploader) getBucketLocation() (string, error) {
159160
svc := s3.New(sess)
160161
resp, err := svc.GetBucketLocation(req)
161162
if err != nil {
162-
return "", psrpc.NewErrorf(psrpc.Unknown, "failed to retrieve upload bucket region: %v", err)
163+
return "", psrpc.NewErrorf(psrpc.InvalidArgument, "failed to retrieve upload bucket region: %v", err)
163164
}
164165

165166
if resp.LocationConstraint == nil {
166-
return "", psrpc.NewErrorf(psrpc.MalformedResponse, "invalid upload bucket region returned by provider. Try specifying the region manually in the request")
167+
return "", psrpc.NewErrorf(psrpc.InvalidArgument, "invalid upload bucket region returned by provider. Try specifying the region manually in the request")
167168
}
168169

169170
return *resp.LocationConstraint, nil
@@ -172,20 +173,20 @@ func (u *S3Uploader) getBucketLocation() (string, error) {
172173
func (u *S3Uploader) upload(localFilepath, storageFilepath string, outputType types.OutputType) (string, int64, error) {
173174
sess, err := session.NewSession(u.awsConfig)
174175
if err != nil {
175-
return "", 0, wrap("S3", err)
176+
return "", 0, errors.ErrUploadFailed("S3", err)
176177
}
177178

178179
file, err := os.Open(localFilepath)
179180
if err != nil {
180-
return "", 0, wrap("S3", err)
181+
return "", 0, errors.ErrUploadFailed("S3", err)
181182
}
182183
defer func() {
183184
_ = file.Close()
184185
}()
185186

186187
stat, err := file.Stat()
187188
if err != nil {
188-
return "", 0, wrap("S3", err)
189+
return "", 0, errors.ErrUploadFailed("S3", err)
189190
}
190191

191192
_, err = s3manager.NewUploader(sess).Upload(&s3manager.UploadInput{
@@ -198,7 +199,7 @@ func (u *S3Uploader) upload(localFilepath, storageFilepath string, outputType ty
198199
ContentDisposition: u.contentDisposition,
199200
})
200201
if err != nil {
201-
return "", 0, wrap("S3", err)
202+
return "", 0, errors.ErrUploadFailed("S3", err)
202203
}
203204

204205
endpoint := "s3.amazonaws.com"

pkg/pipeline/sink/uploader/uploader.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,10 @@
1515
package uploader
1616

1717
import (
18-
"fmt"
1918
"os"
2019
"path"
2120
"time"
2221

23-
"github.com/pkg/errors"
24-
2522
"github.com/livekit/egress/pkg/config"
2623
"github.com/livekit/egress/pkg/stats"
2724
"github.com/livekit/egress/pkg/types"
@@ -82,11 +79,11 @@ type remoteUploader struct {
8279

8380
func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputType types.OutputType, deleteAfterUpload bool, fileType string) (string, int64, error) {
8481
start := time.Now()
85-
location, size, err := u.upload(localFilepath, storageFilepath, outputType)
82+
location, size, uploadErr := u.upload(localFilepath, storageFilepath, outputType)
8683
elapsed := time.Since(start)
8784

8885
// success
89-
if err == nil {
86+
if uploadErr == nil {
9087
u.monitor.IncUploadCountSuccess(fileType, float64(elapsed.Milliseconds()))
9188
if deleteAfterUpload {
9289
_ = os.Remove(localFilepath)
@@ -117,7 +114,7 @@ func (u *remoteUploader) Upload(localFilepath, storageFilepath string, outputTyp
117114
return backupFilepath, stat.Size(), nil
118115
}
119116

120-
return "", 0, err
117+
return "", 0, uploadErr
121118
}
122119

123120
type localUploader struct{}
@@ -130,7 +127,3 @@ func (u *localUploader) Upload(localFilepath, _ string, _ types.OutputType, _ bo
130127

131128
return localFilepath, stat.Size(), nil
132129
}
133-
134-
func wrap(name string, err error) error {
135-
return errors.Wrap(err, fmt.Sprintf("%s upload failed", name))
136-
}

0 commit comments

Comments
 (0)