|
| 1 | +package kitexconnect |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "sync" |
| 6 | + "time" |
| 7 | + |
| 8 | + "github.com/cloudwego/kitex/client" |
| 9 | + "github.com/cloudwego/kitex/pkg/retry" |
| 10 | + "github.com/cloudwego/kitex/pkg/rpcinfo" |
| 11 | +) |
| 12 | + |
| 13 | +const ( |
| 14 | + // Default RPC timeout value, equal to kitex default value. |
| 15 | + defaultRPCTimeout = time.Duration(0) |
| 16 | + // Default connection timeout value (50 milliseconds), equal to kitex default value. |
| 17 | + defaultConnectTimeout = time.Millisecond * 50 |
| 18 | + // Default read/write timeout value (5 seconds), equal to kitex default value. |
| 19 | + defaultReadWriteTimeout = time.Second * 5 |
| 20 | + |
| 21 | + // Default maximum retry times for the RPC calls, equal to kitex default value. |
| 22 | + defaultMaxRetryTimes = 2 |
| 23 | +) |
| 24 | + |
| 25 | +// connectOptions holds configurations for establishing connections, including timeouts and retry policies. |
| 26 | +type connectOptions struct { |
| 27 | + connectTimeout time.Duration // Timeout for establishing a connection |
| 28 | + rpcTimeout time.Duration // Timeout for the RPC call |
| 29 | + rwTimeout time.Duration // Timeout for read/write operations |
| 30 | + |
| 31 | + retryTimes *int // Pointer to the maximum retry times |
| 32 | + |
| 33 | + // Optional retry functions |
| 34 | + errorRetry func(ctx context.Context, err error, ri rpcinfo.RPCInfo) bool // Function to determine if an error should trigger a retry |
| 35 | + respRetry func(ctx context.Context, resp interface{}, ri rpcinfo.RPCInfo) bool // Function to determine if a response should trigger a retry |
| 36 | +} |
| 37 | + |
| 38 | +// NewConnectOptions creates and returns a new connectOptions instance with default values. |
| 39 | +func NewConnectOptions() *connectOptions { |
| 40 | + return &connectOptions{ |
| 41 | + connectTimeout: defaultConnectTimeout, |
| 42 | + rpcTimeout: defaultRPCTimeout, |
| 43 | + rwTimeout: defaultReadWriteTimeout, |
| 44 | + } |
| 45 | +} |
| 46 | + |
| 47 | +// SetConnectTimeout sets the connection timeout and returns the updated connectOptions for chaining. |
| 48 | +func (co *connectOptions) SetConnectTimeout(timeout time.Duration) *connectOptions { |
| 49 | + co.connectTimeout = timeout |
| 50 | + return co |
| 51 | +} |
| 52 | + |
| 53 | +// SetRPCTimeout sets the RPC timeout and returns the updated connectOptions for chaining. |
| 54 | +func (co *connectOptions) SetRPCTimeout(timeout time.Duration) *connectOptions { |
| 55 | + co.rpcTimeout = timeout |
| 56 | + return co |
| 57 | +} |
| 58 | + |
| 59 | +// SetReadWriteTimeout sets the read/write timeout and returns the updated connectOptions for chaining. |
| 60 | +func (co *connectOptions) SetReadWriteTimeout(timeout time.Duration) *connectOptions { |
| 61 | + co.rwTimeout = timeout |
| 62 | + return co |
| 63 | +} |
| 64 | + |
| 65 | +// SetMaxRetryTimes sets the maximum retry times and returns the updated connectOptions for chaining. |
| 66 | +func (co *connectOptions) SetMaxRetryTimes(retryTimes int) *connectOptions { |
| 67 | + co.retryTimes = &retryTimes |
| 68 | + return co |
| 69 | +} |
| 70 | + |
| 71 | +// SetErrorRetry sets the error retry function and returns the updated connectOptions for chaining. |
| 72 | +func (co *connectOptions) SetErrorRetry(errorRetry func(ctx context.Context, err error, ri rpcinfo.RPCInfo) bool) *connectOptions { |
| 73 | + co.errorRetry = errorRetry |
| 74 | + return co |
| 75 | +} |
| 76 | + |
| 77 | +// SetResponseRetry sets the response retry function and returns the updated connectOptions for chaining. |
| 78 | +func (co *connectOptions) SetResponseRetry(respRetry func(ctx context.Context, resp interface{}, ri rpcinfo.RPCInfo) bool) *connectOptions { |
| 79 | + co.respRetry = respRetry |
| 80 | + return co |
| 81 | +} |
| 82 | + |
| 83 | +// RPCTimeout returns the RPC timeout value. |
| 84 | +func (co *connectOptions) RPCTimeout() time.Duration { |
| 85 | + return co.rpcTimeout |
| 86 | +} |
| 87 | + |
| 88 | +// ConnectTimeout returns the connection timeout value. |
| 89 | +func (co *connectOptions) ConnectTimeout() time.Duration { |
| 90 | + return co.connectTimeout |
| 91 | +} |
| 92 | + |
| 93 | +// ReadWriteTimeout returns the read/write timeout value. |
| 94 | +func (co *connectOptions) ReadWriteTimeout() time.Duration { |
| 95 | + return co.rwTimeout |
| 96 | +} |
| 97 | + |
| 98 | +// connectPolicy manages connection options for RPC methods, ensuring safe access with mutex. |
| 99 | +type connectPolicy struct { |
| 100 | + mu sync.Mutex // Mutex for thread safety |
| 101 | + registry map[string]*connectOptions // Map to store connection options for each method |
| 102 | +} |
| 103 | + |
| 104 | +// NewConnectPolicy creates and returns a new connectPolicy instance with an initialized registry. |
| 105 | +func NewConnectPolicy() *connectPolicy { |
| 106 | + return &connectPolicy{ |
| 107 | + registry: make(map[string]*connectOptions, 0), |
| 108 | + } |
| 109 | +} |
| 110 | + |
| 111 | +// M retrieves the connectOptions for a given method, creating a new one if it does not exist. |
| 112 | +func (cp *connectPolicy) M(method string) *connectOptions { |
| 113 | + cp.mu.Lock() |
| 114 | + defer cp.mu.Unlock() |
| 115 | + |
| 116 | + // Create new options if none found. |
| 117 | + co, ok := cp.registry[method] |
| 118 | + if !ok { |
| 119 | + co = NewConnectOptions() |
| 120 | + cp.registry[method] = co |
| 121 | + } |
| 122 | + |
| 123 | + return co |
| 124 | +} |
| 125 | + |
| 126 | +// Timeouts retrieves the timeout settings for a given RPCInfo. |
| 127 | +func (cp *connectPolicy) Timeouts(ri rpcinfo.RPCInfo) rpcinfo.Timeouts { |
| 128 | + timeouts, ok := cp.registry[ri.To().Method()] |
| 129 | + if ok { |
| 130 | + return timeouts |
| 131 | + } |
| 132 | + |
| 133 | + return NewConnectOptions() |
| 134 | +} |
| 135 | + |
| 136 | +// Build constructs client options based on the defined connectPolicy. |
| 137 | +func (cp *connectPolicy) Build() []client.Option { |
| 138 | + kitexClientOptions := make([]client.Option, 0) |
| 139 | + |
| 140 | + policies := make(map[string]retry.Policy) |
| 141 | + for method, co := range cp.registry { |
| 142 | + // Create a new failure policy with default values. |
| 143 | + fp := retry.NewFailurePolicy() |
| 144 | + // Set maximum retry times if setted. |
| 145 | + if co.retryTimes != nil { |
| 146 | + fp.WithMaxRetryTimes(*co.retryTimes) |
| 147 | + } |
| 148 | + |
| 149 | + fp.ShouldResultRetry = new(retry.ShouldResultRetry) |
| 150 | + |
| 151 | + if co.errorRetry != nil { |
| 152 | + fp.ShouldResultRetry.ErrorRetryWithCtx = co.errorRetry |
| 153 | + } |
| 154 | + |
| 155 | + if co.respRetry != nil { |
| 156 | + fp.ShouldResultRetry.RespRetryWithCtx = co.respRetry |
| 157 | + } |
| 158 | + // Build and add the retry policy for the method |
| 159 | + policies[method] = retry.BuildFailurePolicy(fp) |
| 160 | + } |
| 161 | + |
| 162 | + // Add retry policies to client options. |
| 163 | + kitexClientOptions = append(kitexClientOptions, client.WithRetryMethodPolicies(policies)) |
| 164 | + // Register timeout provider. |
| 165 | + kitexClientOptions = append(kitexClientOptions, client.WithTimeoutProvider(cp)) |
| 166 | + |
| 167 | + return kitexClientOptions |
| 168 | +} |
0 commit comments