Skip to content

Commit 88d869f

Browse files
committed
changes
1 parent 0c1b326 commit 88d869f

File tree

7 files changed

+360
-303
lines changed

7 files changed

+360
-303
lines changed

api/v1alpha1/configuration.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ func generateHosts(cr *Storage) []schema.Host {
6464

6565
func generateKeyConfig(cr *Storage, crDB *Database) *schema.KeyConfig {
6666
var keyConfig *schema.KeyConfig
67-
6867
if crDB != nil && crDB.Spec.Encryption != nil && crDB.Spec.Encryption.Enabled {
6968
keyConfig = &schema.KeyConfig{
7069
Keys: []schema.Key{
@@ -152,12 +151,12 @@ func TryParseDynconfig(rawYamlConfiguration string) (bool, schema.Dynconfig, err
152151
var dynconfig schema.Dynconfig
153152
err := dec.Decode(&dynconfig)
154153
if err != nil {
155-
return false, schema.Dynconfig{}, err
154+
return false, schema.Dynconfig{}, fmt.Errorf("error unmarshal yaml to dynconfig: %w", err)
156155
}
157156

158157
err = validateDynconfig(dynconfig)
159158
if err != nil {
160-
return true, dynconfig, err
159+
return true, dynconfig, fmt.Errorf("error validate dynconfig: %w", err)
161160
}
162161

163162
return true, dynconfig, nil
@@ -183,9 +182,7 @@ func validateDynconfig(dynConfig schema.Dynconfig) error {
183182
return nil
184183
}
185184

186-
func GetConfigForCMS(rawYamlConfiguration string) ([]byte, error) {
187-
_, dynconfig, _ := TryParseDynconfig(rawYamlConfiguration)
188-
185+
func GetConfigForCMS(dynconfig schema.Dynconfig) ([]byte, error) {
189186
delete(dynconfig.Config, "static_erasure")
190187
delete(dynconfig.Config, "host_configs")
191188
delete(dynconfig.Config, "nameservice_config")

internal/cms/dynconfig.go

Lines changed: 58 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -9,104 +9,106 @@ import (
99
"github.com/ydb-platform/ydb-go-genproto/draft/protos/Ydb_DynamicConfig"
1010
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
1111
"github.com/ydb-platform/ydb-go-sdk/v3"
12-
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
1312
"google.golang.org/protobuf/types/known/durationpb"
1413
"sigs.k8s.io/controller-runtime/pkg/log"
1514

16-
"github.com/ydb-platform/ydb-kubernetes-operator/api/v1alpha1"
1715
"github.com/ydb-platform/ydb-kubernetes-operator/internal/connection"
18-
"github.com/ydb-platform/ydb-kubernetes-operator/internal/resources"
1916
)
2017

2118
const (
2219
GetConfigTimeoutSeconds = 10
2320
ReplaceConfigTimeoutSeconds = 30
2421
)
2522

26-
func GetConfig(
23+
type Config struct {
24+
StorageEndpoint string
25+
Domain string
26+
Config string
27+
Version uint64
28+
DryRun bool
29+
AllowUnknownFields bool
30+
}
31+
32+
func (c *Config) GetConfig(
2733
ctx context.Context,
28-
storage *resources.StorageClusterBuilder,
29-
creds credentials.Credentials,
3034
opts ...ydb.Option,
3135
) (*Ydb_DynamicConfig.GetConfigResponse, error) {
32-
endpoint := fmt.Sprintf(
33-
"%s/%s",
34-
storage.GetStorageEndpointWithProto(),
35-
storage.Spec.Domain,
36-
)
37-
conn, err := connection.Open(ctx,
38-
endpoint,
39-
ydb.WithCredentials(creds),
40-
ydb.MergeOptions(opts...),
41-
)
36+
logger := log.FromContext(ctx)
37+
38+
endpoint := fmt.Sprintf("%s/%s", c.StorageEndpoint, c.Domain)
39+
ydbCtx, ydbCtxCancel := context.WithTimeout(ctx, time.Second)
40+
defer ydbCtxCancel()
41+
conn, err := connection.Open(ydbCtx, endpoint, ydb.MergeOptions(opts...))
4242
if err != nil {
4343
return nil, fmt.Errorf("error connecting to YDB: %w", err)
4444
}
4545
defer func() {
46-
connection.Close(ctx, conn)
46+
connection.Close(ydbCtx, conn)
4747
}()
4848

49-
cmsCtx, cancel := context.WithTimeout(ctx, GetConfigTimeoutSeconds*time.Second)
50-
defer cancel()
49+
cmsCtx, cmsCtxCancel := context.WithTimeout(ctx, GetConfigTimeoutSeconds*time.Second)
50+
defer cmsCtxCancel()
5151
client := Ydb_DynamicConfig_V1.NewDynamicConfigServiceClient(ydb.GRPCConn(conn))
52-
request := &Ydb_DynamicConfig.GetConfigRequest{
53-
OperationParams: &Ydb_Operations.OperationParams{
54-
OperationMode: Ydb_Operations.OperationParams_SYNC,
55-
OperationTimeout: &durationpb.Duration{Seconds: GetConfigTimeoutSeconds},
56-
},
57-
}
52+
request := c.makeGetConfigRequest()
53+
54+
logger.Info("CMS GetConfig", "endpoint", endpoint, "request", request)
5855
return client.GetConfig(cmsCtx, request)
5956
}
6057

61-
func GetConfigResult(
62-
response *Ydb_DynamicConfig.GetConfigResponse,
63-
) (*Ydb_DynamicConfig.GetConfigResult, error) {
58+
func (c *Config) ProcessConfigResponse(response *Ydb_DynamicConfig.GetConfigResponse) error {
6459
configResult := &Ydb_DynamicConfig.GetConfigResult{}
6560
err := response.GetOperation().GetResult().UnmarshalTo(configResult)
6661
if err != nil {
67-
return nil, err
62+
return err
6863
}
69-
return configResult, nil
64+
65+
c.Config = configResult.GetConfig()
66+
c.Version = configResult.GetIdentity().GetVersion()
67+
return nil
7068
}
7169

72-
func ReplaceConfig(
70+
func (c *Config) ReplaceConfig(
7371
ctx context.Context,
74-
storage *resources.StorageClusterBuilder,
75-
dryRun bool,
76-
creds credentials.Credentials,
7772
opts ...ydb.Option,
7873
) (*Ydb_DynamicConfig.ReplaceConfigResponse, error) {
7974
logger := log.FromContext(ctx)
80-
endpoint := fmt.Sprintf(
81-
"%s/%s",
82-
storage.GetStorageEndpointWithProto(),
83-
storage.Spec.Domain,
84-
)
85-
conn, err := connection.Open(ctx,
86-
endpoint,
87-
ydb.WithCredentials(creds),
88-
ydb.MergeOptions(opts...),
89-
)
75+
76+
endpoint := fmt.Sprintf("%s/%s", c.StorageEndpoint, c.Domain)
77+
ydbCtx, ydbCtxCancel := context.WithTimeout(ctx, time.Second)
78+
defer ydbCtxCancel()
79+
conn, err := connection.Open(ydbCtx, endpoint, ydb.MergeOptions(opts...))
9080
if err != nil {
9181
return nil, fmt.Errorf("error connecting to YDB: %w", err)
9282
}
9383
defer func() {
94-
connection.Close(ctx, conn)
84+
connection.Close(ydbCtx, conn)
9585
}()
9686

97-
config, err := v1alpha1.GetConfigForCMS(storage.Spec.Configuration)
98-
if err != nil {
99-
return nil, err
100-
}
101-
102-
cmsCtx, cancel := context.WithTimeout(ctx, ReplaceConfigTimeoutSeconds*time.Second)
103-
defer cancel()
87+
cmsCtx, cmsCtxCancel := context.WithTimeout(ctx, ReplaceConfigTimeoutSeconds*time.Second)
88+
defer cmsCtxCancel()
10489
client := Ydb_DynamicConfig_V1.NewDynamicConfigServiceClient(ydb.GRPCConn(conn))
10590
request := &Ydb_DynamicConfig.ReplaceConfigRequest{
106-
Config: string(config),
107-
DryRun: dryRun,
108-
AllowUnknownFields: true,
91+
Config: c.Config,
92+
DryRun: c.DryRun,
93+
AllowUnknownFields: c.AllowUnknownFields,
10994
}
110-
logger.Info("CMS ReplaceConfig", "request", request)
95+
96+
logger.Info("CMS ReplaceConfig", "endpoint", endpoint, "request", request)
11197
return client.ReplaceConfig(cmsCtx, request)
11298
}
99+
100+
func (c *Config) CheckReplaceConfigResponse(ctx context.Context, response *Ydb_DynamicConfig.ReplaceConfigResponse) (bool, string, error) {
101+
logger := log.FromContext(ctx)
102+
103+
logger.Info("CMS ReplaceConfig response", "response", response)
104+
return CheckOperationStatus(response.GetOperation())
105+
}
106+
107+
func (c *Config) makeGetConfigRequest() *Ydb_DynamicConfig.GetConfigRequest {
108+
request := &Ydb_DynamicConfig.GetConfigRequest{}
109+
request.OperationParams = &Ydb_Operations.OperationParams{
110+
OperationTimeout: &durationpb.Duration{Seconds: GetConfigTimeoutSeconds},
111+
}
112+
113+
return request
114+
}

internal/cms/operation.go

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,59 +3,72 @@ package cms
33
import (
44
"context"
55
"fmt"
6+
"time"
67

78
"github.com/ydb-platform/ydb-go-genproto/Ydb_Operation_V1"
89
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
910
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
1011
"github.com/ydb-platform/ydb-go-sdk/v3"
11-
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
1212
"sigs.k8s.io/controller-runtime/pkg/log"
1313

1414
"github.com/ydb-platform/ydb-kubernetes-operator/internal/connection"
15-
"github.com/ydb-platform/ydb-kubernetes-operator/internal/resources"
1615
)
1716

18-
func GetOperation(
17+
const (
18+
GetOperationTimeoutSeconds = 10
19+
)
20+
21+
type Operation struct {
22+
StorageEndpoint string
23+
Domain string
24+
Id string
25+
}
26+
27+
func (op *Operation) GetOperation(
1928
ctx context.Context,
20-
storage *resources.StorageClusterBuilder,
21-
operationID string,
22-
creds credentials.Credentials,
2329
opts ...ydb.Option,
2430
) (*Ydb_Operations.GetOperationResponse, error) {
2531
logger := log.FromContext(ctx)
26-
endpoint := fmt.Sprintf(
27-
"%s/%s",
28-
storage.GetStorageEndpointWithProto(),
29-
storage.Spec.Domain,
30-
)
31-
conn, err := connection.Open(ctx,
32-
endpoint,
33-
ydb.WithCredentials(creds),
34-
ydb.MergeOptions(opts...),
35-
)
32+
33+
endpoint := fmt.Sprintf("%s/%s", op.StorageEndpoint, op.Domain)
34+
ydbCtx, ydbCtxCancel := context.WithTimeout(ctx, time.Second)
35+
defer ydbCtxCancel()
36+
conn, err := connection.Open(ydbCtx, endpoint, ydb.MergeOptions(opts...))
3637
if err != nil {
3738
return nil, fmt.Errorf("error connecting to YDB: %w", err)
3839
}
3940
defer func() {
40-
connection.Close(ctx, conn)
41+
connection.Close(ydbCtx, conn)
4142
}()
4243

44+
cmsCtx, cmsCtxCancel := context.WithTimeout(ctx, GetOperationTimeoutSeconds*time.Second)
45+
defer cmsCtxCancel()
4346
client := Ydb_Operation_V1.NewOperationServiceClient(ydb.GRPCConn(conn))
44-
request := &Ydb_Operations.GetOperationRequest{
45-
Id: operationID,
46-
}
47-
logger.Info("CMS GetOperation", "request", request)
48-
return client.GetOperation(ctx, request)
47+
request := &Ydb_Operations.GetOperationRequest{Id: op.Id}
48+
49+
logger.Info("CMS GetOperation request", "endpoint", endpoint, "request", request)
50+
return client.GetOperation(cmsCtx, request)
4951
}
5052

51-
func CheckOperationSuccess(operation *Ydb_Operations.Operation) error {
52-
if operation.Status == Ydb.StatusIds_ALREADY_EXISTS || operation.Status == Ydb.StatusIds_SUCCESS {
53-
return nil
53+
func (op *Operation) CheckOperationResponse(ctx context.Context, response *Ydb_Operations.GetOperationResponse) (bool, string, error) {
54+
logger := log.FromContext(ctx)
55+
56+
logger.Info("CMS GetOperation response", "response", response)
57+
return CheckOperationStatus(response.GetOperation())
58+
}
59+
60+
func CheckOperationStatus(operation *Ydb_Operations.Operation) (bool, string, error) {
61+
if operation == nil {
62+
return false, "", ErrEmptyReplyFromStorage
5463
}
5564

56-
if operation.Status == Ydb.StatusIds_STATUS_CODE_UNSPECIFIED && len(operation.Issues) == 0 {
57-
return nil
65+
if !operation.GetReady() {
66+
return false, operation.Id, nil
67+
}
68+
69+
if operation.Status == Ydb.StatusIds_ALREADY_EXISTS || operation.Status == Ydb.StatusIds_SUCCESS {
70+
return true, operation.Id, nil
5871
}
5972

60-
return fmt.Errorf("operation status is %v: %v", operation.Status, operation.Issues)
73+
return true, operation.Id, fmt.Errorf("YDB response error: %v %v", operation.Status, operation.Issues)
6174
}

0 commit comments

Comments
 (0)