Skip to content

Commit 979f00d

Browse files
feat(lib/backend): add functionality to close clients (#451)
* feat(lib/backend): add functionality to close clients What? This PR aims to add functionality to close the clients and it's resources in case the resources have closers. Why? There are use cases in for gcsbacknd using `storage.transfermanager` to close the single downloader it creates. If we don't close it, there's a possibility of leaking resources. How? Add `Close` to client interface and implement the method for each client.\ Then `backend.Manager` will use them to expose a `Close()` method that can be used by build-index and origin to close them. * add close methods to tests for proper cleanups * resolved comments
1 parent 384bc49 commit 979f00d

File tree

25 files changed

+223
-25
lines changed

25 files changed

+223
-25
lines changed

build-index/cmd/cmd.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ func Run(flags *Flags, opts ...Option) {
144144
if err != nil {
145145
log.Fatalf("Error creating backend manager: %s", err)
146146
}
147+
defer closers.Close(backends)
147148

148149
tls, err := config.TLS.BuildClient()
149150
if err != nil {

lib/backend/client.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,8 @@ type Client interface {
6565

6666
// List lists entries whose names start with prefix.
6767
List(prefix string, opts ...ListOption) (*ListResult, error)
68+
69+
// Close can be used to close any resources held by the client.
70+
// This operation should be idempotent.
71+
Close() error
6872
}

lib/backend/gcsbackend/client.go

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,11 @@ func (f *factory) Create(
7070

7171
// Client implements a backend.Client for GCS.
7272
type Client struct {
73-
config Config
74-
pather namepath.Pather
75-
stats tally.Scope
76-
gcs GCS
73+
config Config
74+
pather namepath.Pather
75+
stats tally.Scope
76+
gcs GCS
77+
sClient *storage.Client
7778
}
7879

7980
// Option allows setting optional Client parameters.
@@ -111,7 +112,13 @@ func NewClient(
111112

112113
if len(opts) > 0 {
113114
// For mock.
114-
client := &Client{config, pather, stats, nil}
115+
client := &Client{
116+
config: config,
117+
pather: pather,
118+
stats: stats,
119+
gcs: nil,
120+
sClient: nil,
121+
}
115122
for _, opt := range opts {
116123
opt(client)
117124
}
@@ -125,8 +132,13 @@ func NewClient(
125132
return nil, fmt.Errorf("invalid gcs credentials: %s", err)
126133
}
127134

128-
client := &Client{config, pather, stats,
129-
NewGCS(ctx, sClient.Bucket(config.Bucket), &config)}
135+
client := &Client{
136+
config: config,
137+
pather: pather,
138+
stats: stats,
139+
gcs: NewGCS(ctx, sClient.Bucket(config.Bucket), &config),
140+
sClient: sClient,
141+
}
130142

131143
log.Infof("Initalized GCS backend with config: %s", config)
132144
return client, nil
@@ -216,6 +228,14 @@ func (c *Client) List(prefix string, opts ...backend.ListOption) (*backend.ListR
216228
return result, nil
217229
}
218230

231+
// Close closes the storage client
232+
func (c *Client) Close() error {
233+
if c.sClient == nil {
234+
return nil
235+
}
236+
return c.sClient.Close()
237+
}
238+
219239
// isObjectNotFound is helper function for identify non-existing object error.
220240
func isObjectNotFound(err error) bool {
221241
return err == storage.ErrObjectNotExist || err == storage.ErrBucketNotExist

lib/backend/gcsbackend/client_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/uber/kraken/core"
2626
"github.com/uber/kraken/lib/backend"
2727
mockgcsbackend "github.com/uber/kraken/mocks/lib/backend/gcsbackend"
28+
"github.com/uber/kraken/utils/closers"
2829
"github.com/uber/kraken/utils/mockutil"
2930
"github.com/uber/kraken/utils/randutil"
3031
"github.com/uber/kraken/utils/rwutil"
@@ -98,6 +99,7 @@ func TestClientStat(t *testing.T) {
9899
defer cleanup()
99100

100101
client := mocks.new()
102+
defer closers.Close(client)
101103

102104
var objectAttrs storage.ObjectAttrs
103105
objectAttrs.Size = 100
@@ -116,6 +118,7 @@ func TestClientDownload(t *testing.T) {
116118
defer cleanup()
117119

118120
client := mocks.new()
121+
defer closers.Close(client)
119122
data := randutil.Text(32)
120123

121124
mocks.gcs.EXPECT().Download(
@@ -194,6 +197,7 @@ func TestClientList(t *testing.T) {
194197
defer cleanup()
195198

196199
client := mocks.new()
200+
defer closers.Close(client)
197201

198202
contToken := ""
199203
mocks.gcs.EXPECT().GetObjectIterator(

lib/backend/hdfsbackend/client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,3 +274,9 @@ func (c *Client) List(prefix string, opts ...backend.ListOption) (*backend.ListR
274274
Names: files,
275275
}, nil
276276
}
277+
278+
// Close closes the client and releases any held resources.
279+
func (c *Client) Close() error {
280+
// No resources to close for HDFS client
281+
return nil
282+
}

lib/backend/hdfsbackend/client_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/uber/kraken/core"
2424
"github.com/uber/kraken/lib/backend/hdfsbackend/webhdfs"
2525
mockwebhdfs "github.com/uber/kraken/mocks/lib/backend/hdfsbackend/webhdfs"
26+
"github.com/uber/kraken/utils/closers"
2627
"github.com/uber/kraken/utils/mockutil"
2728
"github.com/uber/kraken/utils/randutil"
2829

@@ -76,6 +77,7 @@ func TestClientStat(t *testing.T) {
7677
defer cleanup()
7778

7879
client := mocks.new()
80+
defer closers.Close(client)
7981

8082
mocks.webhdfs.EXPECT().GetFileStatus("/root/test").Return(webhdfs.FileStatus{Length: 32}, nil)
8183

@@ -149,6 +151,7 @@ func TestClientList(t *testing.T) {
149151
defer cleanup()
150152

151153
client := mocks.new()
154+
defer closers.Close(client)
152155

153156
mocks.webhdfs.EXPECT().ListFileStatus("/root").Return([]webhdfs.FileStatus{{
154157
PathSuffix: "foo",
@@ -214,6 +217,7 @@ func TestClientListErrorDoesNotLeakGoroutines(t *testing.T) {
214217
defer cleanup()
215218

216219
client := mocks.new()
220+
defer closers.Close(client)
217221

218222
initDirectoryTree(mocks, "/root", 10, 3) // 1000 nodes.
219223

lib/backend/httpbackend/http.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,9 @@ func (c *Client) Upload(namespace, name string, src io.Reader) error {
123123
func (c *Client) List(prefix string, opts ...backend.ListOption) (*backend.ListResult, error) {
124124
return nil, errors.New("not supported")
125125
}
126+
127+
// Close closes the client and releases any held resources.
128+
func (c *Client) Close() error {
129+
// No resources to close for http client
130+
return nil
131+
}

lib/backend/httpbackend/http_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/uber-go/tally"
2323
"github.com/uber/kraken/core"
2424
"github.com/uber/kraken/lib/backend/backenderrors"
25+
"github.com/uber/kraken/utils/closers"
2526
"github.com/uber/kraken/utils/memsize"
2627
"github.com/uber/kraken/utils/randutil"
2728
"github.com/uber/kraken/utils/testutil"
@@ -56,6 +57,7 @@ func TestHttpDownloadSuccess(t *testing.T) {
5657
config := Config{DownloadURL: "http://" + addr + "/data/%s"}
5758
client, err := NewClient(config, tally.NoopScope)
5859
require.NoError(err)
60+
defer closers.Close(client)
5961

6062
var b bytes.Buffer
6163
require.NoError(client.Download(core.NamespaceFixture(), "data", &b))
@@ -77,6 +79,7 @@ func TestHttpDownloadFileNotFound(t *testing.T) {
7779
config := Config{DownloadURL: "http://" + addr + "/data/%s"}
7880
client, err := NewClient(config, tally.NoopScope)
7981
require.NoError(err)
82+
defer closers.Close(client)
8083

8184
var b bytes.Buffer
8285
require.Equal(backenderrors.ErrBlobNotFound, client.Download(core.NamespaceFixture(), "data", &b))
@@ -92,6 +95,7 @@ func TestDownloadMalformedURLThrowsError(t *testing.T) {
9295
config := Config{DownloadURL: "http://" + addr + "/data"}
9396
client, err := NewClient(config, tally.NoopScope)
9497
require.NoError(err)
98+
defer closers.Close(client)
9599

96100
var b bytes.Buffer
97101
require.Error(client.Download(core.NamespaceFixture(), "data", &b))

lib/backend/manager.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,3 +168,16 @@ func (m *Manager) CheckReadiness() error {
168168
}
169169
return nil
170170
}
171+
172+
func (m *Manager) Close() error {
173+
totalErrors := make([]error, 0)
174+
for _, b := range m.backends {
175+
if err := b.client.Close(); err != nil {
176+
totalErrors = append(totalErrors, fmt.Errorf("closing backend for namespace '%s': %s", b.regexp.String(), err))
177+
}
178+
}
179+
if len(totalErrors) > 0 {
180+
return errors.Join(totalErrors...)
181+
}
182+
return nil
183+
}

lib/backend/noop.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,9 @@ func (c NoopClient) Download(namespace, name string, dst io.Writer) error {
4848
func (c NoopClient) List(prefix string, opts ...ListOption) (*ListResult, error) {
4949
return nil, nil
5050
}
51+
52+
// Close closes the client and releases any held resources.
53+
func (c NoopClient) Close() error {
54+
// No resources to close for noop client
55+
return nil
56+
}

0 commit comments

Comments
 (0)