Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (p *endpointPicker) checkHealth(ctx context.Context, endpoint string) {
Credentials: p.clientCredentials,
Timeout: &endpointPickerCheckTimeout,
},
NewNfsClientLog(nfs_client.LOG_DEBUG),
logging.GetLogger(ctx),
)
if err != nil {
p.markAsUnhealthy(ctx, endpoint)
Expand Down
87 changes: 1 addition & 86 deletions cloud/disk_manager/internal/pkg/clients/nfs/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package nfs

import (
"context"
"fmt"
"time"

"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/auth"
Expand All @@ -16,90 +15,6 @@ import (

////////////////////////////////////////////////////////////////////////////////

type errorLogger struct {
}

func (l *errorLogger) Print(ctx context.Context, v ...interface{}) {
ctx = logging.AddCallerSkip(ctx, 1)
logging.Error(ctx, fmt.Sprint(v...))
}

func (l *errorLogger) Printf(ctx context.Context, format string, v ...interface{}) {
ctx = logging.AddCallerSkip(ctx, 1)
logging.Error(ctx, fmt.Sprintf(format, v...))
}

////////////////////////////////////////////////////////////////////////////////

type warnLogger struct {
}

func (l *warnLogger) Print(ctx context.Context, v ...interface{}) {
ctx = logging.AddCallerSkip(ctx, 1)
logging.Warn(ctx, fmt.Sprint(v...))
}

func (l *warnLogger) Printf(ctx context.Context, format string, v ...interface{}) {
ctx = logging.AddCallerSkip(ctx, 1)
logging.Warn(ctx, fmt.Sprintf(format, v...))
}

////////////////////////////////////////////////////////////////////////////////

type infoLogger struct {
}

func (l *infoLogger) Print(ctx context.Context, v ...interface{}) {
ctx = logging.AddCallerSkip(ctx, 1)
logging.Info(ctx, fmt.Sprint(v...))
}

func (l *infoLogger) Printf(ctx context.Context, format string, v ...interface{}) {
ctx = logging.AddCallerSkip(ctx, 1)
logging.Info(ctx, fmt.Sprintf(format, v...))
}

////////////////////////////////////////////////////////////////////////////////

type debugLogger struct {
}

func (l *debugLogger) Print(ctx context.Context, v ...interface{}) {
ctx = logging.AddCallerSkip(ctx, 1)
logging.Debug(ctx, fmt.Sprint(v...))
}

func (l *debugLogger) Printf(ctx context.Context, format string, v ...interface{}) {
ctx = logging.AddCallerSkip(ctx, 1)
logging.Debug(ctx, fmt.Sprintf(format, v...))
}

////////////////////////////////////////////////////////////////////////////////

type nfsClientLogWrapper struct {
level nfs_client.LogLevel
loggers []nfs_client.Logger
}

func (w *nfsClientLogWrapper) Logger(level nfs_client.LogLevel) nfs_client.Logger {
if level <= w.level {
return w.loggers[level]
}
return nil
}

func NewNfsClientLog(level nfs_client.LogLevel) nfs_client.Log {
loggers := []nfs_client.Logger{
&errorLogger{},
&warnLogger{},
&infoLogger{},
&debugLogger{},
}
return &nfsClientLogWrapper{level, loggers}
}

////////////////////////////////////////////////////////////////////////////////

type factory struct {
config *nfs_config.ClientConfig
clientCredentials *nfs_client.ClientCredentials
Expand Down Expand Up @@ -159,7 +74,7 @@ func (f *factory) NewClient(
f.metrics.OnError(err)
},
},
NewNfsClientLog(nfs_client.LOG_DEBUG),
logging.GetLogger(ctx),
)
if err != nil {
return nil, errors.NewRetriableError(err)
Expand Down
12 changes: 6 additions & 6 deletions cloud/filestore/public/sdk/go/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,15 +530,15 @@ func (client *EndpointClient) Ping(
func NewClient(
grpcOpts *GrpcClientOpts,
durableOpts *DurableClientOpts,
log Log,
logger Logger,
) (*Client, error) {

grpcClient, err := NewGrpcClient(grpcOpts, log)
grpcClient, err := NewGrpcClient(grpcOpts, logger)
if err != nil {
return nil, err
}

durableClient := NewDurableClient(grpcClient, durableOpts, log)
durableClient := NewDurableClient(grpcClient, durableOpts, logger)

return &Client{
durableClient,
Expand All @@ -548,15 +548,15 @@ func NewClient(
func NewEndpointClient(
grpcOpts *GrpcClientOpts,
durableOpts *DurableClientOpts,
log Log,
logger Logger,
) (*EndpointClient, error) {

grpcClient, err := NewGrpcEndpointClient(grpcOpts, log)
grpcClient, err := NewGrpcEndpointClient(grpcOpts, logger)
if err != nil {
return nil, err
}

durableClient := NewDurableEndpointClient(grpcClient, durableOpts, log)
durableClient := NewDurableEndpointClient(grpcClient, durableOpts, logger)

return &EndpointClient{
durableClient,
Expand Down
109 changes: 45 additions & 64 deletions cloud/filestore/public/sdk/go/client/durable.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type durableClient struct {
timeout time.Duration
timeoutIncrement time.Duration
onError func(ClientError)
log Log
logger Logger
}

func (client *durableClient) executeRequest(
Expand All @@ -38,17 +38,13 @@ func (client *durableClient) executeRequest(
for delay := client.timeoutIncrement; ; delay += client.timeoutIncrement {
resp, err := call(ctx)
if err == nil && retryCount > 1 {
if logger := client.log.Logger(LOG_INFO); logger != nil {
duration := time.Since(started)
logger.Printf(
ctx,
"%s%s completed (retries: %d, duration: %v)",
requestName(req),
requestDetails(req),
retryCount,
duration,
)
}
client.logger.Infof(
"%s%s completed (retries: %d, duration: %v)",
requestName(req),
requestDetails(req),
retryCount,
time.Since(started),
)
}

cerr := GetClientError(err)
Expand All @@ -59,30 +55,24 @@ func (client *durableClient) executeRequest(
client.onError(cerr)

if !cerr.IsRetriable() {
if logger := client.log.Logger(LOG_ERROR); logger != nil {
logger.Printf(
ctx,
"%s%s request failed: %v",
requestName(req),
requestDetails(req),
err,
)
}
return resp, err
}

if logger := client.log.Logger(LOG_WARN); logger != nil {
logger.Printf(
ctx,
"%s%s retry request (retries: %d, timeout: %v, error: %v)",
client.logger.Errorf(
"%s%s request failed: %v",
requestName(req),
requestDetails(req),
retryCount,
client.timeout,
err,
)
return resp, err
}

client.logger.Warnf(
"%s%s retry request (retries: %d, timeout: %v, error: %v)",
requestName(req),
requestDetails(req),
retryCount,
client.timeout,
err,
)

retryCount++
select {
case <-ctx.Done():
Expand Down Expand Up @@ -327,7 +317,7 @@ type durableEndpointClient struct {
timeout time.Duration
timeoutIncrement time.Duration
onError func(ClientError)
log Log
logger Logger
}

func (client *durableEndpointClient) executeRequest(
Expand All @@ -344,17 +334,14 @@ func (client *durableEndpointClient) executeRequest(
for delay := client.timeoutIncrement; ; delay += client.timeoutIncrement {
resp, err := call(ctx)
if err == nil && retryCount > 1 {
if logger := client.log.Logger(LOG_INFO); logger != nil {
duration := time.Since(started)
logger.Printf(
ctx,
"%s%s completed (retries: %d, duration: %v)",
requestName(req),
requestDetails(req),
retryCount,
duration,
)
}
duration := time.Since(started)
client.logger.Infof(
"%s%s completed (retries: %d, duration: %v)",
requestName(req),
requestDetails(req),
retryCount,
duration,
)
}

cerr := GetClientError(err)
Expand All @@ -365,30 +352,24 @@ func (client *durableEndpointClient) executeRequest(
client.onError(cerr)

if !cerr.IsRetriable() {
if logger := client.log.Logger(LOG_ERROR); logger != nil {
logger.Printf(
ctx,
"%s%s request failed: %v",
requestName(req),
requestDetails(req),
err,
)
}
return resp, err
}

if logger := client.log.Logger(LOG_WARN); logger != nil {
logger.Printf(
ctx,
"%s%s retry request (retries: %d, timeout: %v, error: %v)",
client.logger.Errorf(
"%s%s request failed: %v",
requestName(req),
requestDetails(req),
retryCount,
client.timeout,
err,
)
return resp, err
}

client.logger.Warnf(
"%s%s retry request (retries: %d, timeout: %v, error: %v)",
requestName(req),
requestDetails(req),
retryCount,
client.timeout,
err,
)

retryCount++
select {
case <-ctx.Done():
Expand Down Expand Up @@ -493,7 +474,7 @@ type DurableClientOpts struct {
func NewDurableClient(
impl ClientIface,
opts *DurableClientOpts,
log Log,
logger Logger,
) ClientIface {

retryTimeout := defaultRetryTimeout
Expand All @@ -516,14 +497,14 @@ func NewDurableClient(
retryTimeout,
retryTimeoutIncrement,
onError,
log,
logger,
}
}

func NewDurableEndpointClient(
impl EndpointClientIface,
opts *DurableClientOpts,
log Log,
logger Logger,
) EndpointClientIface {

retryTimeout := defaultRetryTimeout
Expand All @@ -546,6 +527,6 @@ func NewDurableEndpointClient(
retryTimeout,
retryTimeoutIncrement,
onError,
log,
logger,
}
}
8 changes: 4 additions & 4 deletions cloud/filestore/public/sdk/go/client/durable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func TestRetryUndeliveredRequests(t *testing.T) {
TimeoutIncrement: &timeoutIncrement,
}

log := NewStderrLog(LOG_DEBUG)
durable := NewDurableClient(client, opts, log)
logger := NewStderrLog(LOG_DEBUG)
durable := NewDurableClient(client, opts, logger)

_, err := durable.Ping(context.TODO(), &protos.TPingRequest{})
if err != nil {
Expand Down Expand Up @@ -69,8 +69,8 @@ func TestNoRetryUnretriableRequests(t *testing.T) {
},
}

log := NewStderrLog(LOG_DEBUG)
durable := NewDurableClient(client, &DurableClientOpts{}, log)
logger := NewStderrLog(LOG_DEBUG)
durable := NewDurableClient(client, &DurableClientOpts{}, logger)

_, err := durable.Ping(context.TODO(), &protos.TPingRequest{})
if err == nil {
Expand Down
Loading
Loading