Skip to content

Commit 5512cfa

Browse files
committed
fix database initializing conditions
1 parent 073ed86 commit 5512cfa

File tree

4 files changed

+162
-87
lines changed

4 files changed

+162
-87
lines changed

internal/cms/operation.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package cms
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/ydb-platform/ydb-go-genproto/Ydb_Operation_V1"
9+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
10+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
11+
"github.com/ydb-platform/ydb-go-sdk/v3"
12+
"sigs.k8s.io/controller-runtime/pkg/log"
13+
14+
"github.com/ydb-platform/ydb-kubernetes-operator/internal/connection"
15+
)
16+
17+
const (
18+
GetOperationTimeoutSeconds = 10
19+
)
20+
21+
type Operation struct {
22+
StorageEndpoint string
23+
Domain string
24+
ID string
25+
}
26+
27+
func (op *Operation) GetOperation(
28+
ctx context.Context,
29+
opts ...ydb.Option,
30+
) (*Ydb_Operations.GetOperationResponse, error) {
31+
logger := log.FromContext(ctx)
32+
33+
endpoint := fmt.Sprintf("%s/%s", op.StorageEndpoint, op.Domain)
34+
ydbCtx, ydbCtxCancel := context.WithTimeout(ctx, time.Second)
35+
defer ydbCtxCancel()
36+
conn, err := connection.Open(ydbCtx, endpoint, ydb.MergeOptions(opts...))
37+
if err != nil {
38+
return nil, fmt.Errorf("error connecting to YDB: %w", err)
39+
}
40+
defer func() {
41+
connection.Close(ydbCtx, conn)
42+
}()
43+
44+
cmsCtx, cmsCtxCancel := context.WithTimeout(ctx, GetOperationTimeoutSeconds*time.Second)
45+
defer cmsCtxCancel()
46+
client := Ydb_Operation_V1.NewOperationServiceClient(ydb.GRPCConn(conn))
47+
request := &Ydb_Operations.GetOperationRequest{Id: op.ID}
48+
49+
logger.Info("CMS GetOperation", "endpoint", endpoint, "request", request)
50+
return client.GetOperation(cmsCtx, request)
51+
}
52+
53+
func (op *Operation) CheckGetOperationResponse(ctx context.Context, response *Ydb_Operations.GetOperationResponse) (bool, string, error) {
54+
logger := log.FromContext(ctx)
55+
56+
logger.Info("CMS GetOperation", "response", response)
57+
return CheckOperationStatus(response.GetOperation())
58+
}
59+
60+
func CheckOperationStatus(operation *Ydb_Operations.Operation) (bool, string, error) {
61+
if operation == nil {
62+
return false, "", ErrEmptyReplyFromStorage
63+
}
64+
65+
if !operation.GetReady() {
66+
return false, operation.Id, nil
67+
}
68+
69+
if operation.Status == Ydb.StatusIds_ALREADY_EXISTS || operation.Status == Ydb.StatusIds_SUCCESS {
70+
return true, operation.Id, nil
71+
}
72+
73+
return true, operation.Id, fmt.Errorf("YDB response error: %v %v", operation.Status, operation.Issues)
74+
}

internal/cms/tenant.go

Lines changed: 26 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,21 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"time"
78

89
"github.com/ydb-platform/ydb-go-genproto/Ydb_Cms_V1"
9-
"github.com/ydb-platform/ydb-go-genproto/Ydb_Operation_V1"
10-
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
1110
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Cms"
12-
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
1311
ydb "github.com/ydb-platform/ydb-go-sdk/v3"
1412
"sigs.k8s.io/controller-runtime/pkg/log"
1513

1614
ydbv1alpha1 "github.com/ydb-platform/ydb-kubernetes-operator/api/v1alpha1"
1715
"github.com/ydb-platform/ydb-kubernetes-operator/internal/connection"
1816
)
1917

18+
const (
19+
CreateDatabaseTimeoutSeconds = 10
20+
)
21+
2022
var ErrEmptyReplyFromStorage = errors.New("empty reply from storage")
2123

2224
type Tenant struct {
@@ -28,31 +30,37 @@ type Tenant struct {
2830
SharedDatabasePath string
2931
}
3032

31-
func (t *Tenant) Create(
33+
func (t *Tenant) CreateDatabase(
3234
ctx context.Context,
3335
opts ...ydb.Option,
34-
) (string, error) {
36+
) (*Ydb_Cms.CreateDatabaseResponse, error) {
3537
logger := log.FromContext(ctx)
36-
url := fmt.Sprintf("%s/%s", t.StorageEndpoint, t.Domain)
37-
conn, err := connection.Open(ctx, url, opts...)
38+
39+
endpoint := fmt.Sprintf("%s/%s", t.StorageEndpoint, t.Domain)
40+
ydbCtx, ydbCtxCancel := context.WithTimeout(ctx, time.Second)
41+
defer ydbCtxCancel()
42+
conn, err := connection.Open(ydbCtx, endpoint, opts...)
3843
if err != nil {
39-
logger.Error(err, "Error connecting to YDB storage")
40-
return "", err
44+
logger.Error(err, "Error connecting to YDB")
45+
return nil, err
4146
}
4247
defer func() {
43-
connection.Close(ctx, conn)
48+
connection.Close(ydbCtx, conn)
4449
}()
4550

51+
cmsCtx, cmsCtxCancel := context.WithTimeout(ctx, CreateDatabaseTimeoutSeconds*time.Second)
52+
defer cmsCtxCancel()
4653
client := Ydb_Cms_V1.NewCmsServiceClient(ydb.GRPCConn(conn))
47-
logger.Info(fmt.Sprintf("creating tenant, url: %s", url))
4854
request := t.makeCreateDatabaseRequest()
49-
logger.Info(fmt.Sprintf("creating tenant, request: %s", request))
50-
response, err := client.CreateDatabase(ctx, request)
51-
if err != nil {
52-
return "", err
53-
}
54-
logger.Info(fmt.Sprintf("creating tenant, response: %s", response))
55-
return processDatabaseCreationOperation(response.Operation)
55+
logger.Info("CMS CreateDatabase", "endpoint", endpoint, "request", request)
56+
return client.CreateDatabase(cmsCtx, request)
57+
}
58+
59+
func (t *Tenant) CheckCreateDatabaseResponse(ctx context.Context, response *Ydb_Cms.CreateDatabaseResponse) (bool, string, error) {
60+
logger := log.FromContext(ctx)
61+
62+
logger.Info("CMS CreateDatabase", "response", response)
63+
return CheckOperationStatus(response.GetOperation())
5664
}
5765

5866
func (t *Tenant) makeCreateDatabaseRequest() *Ydb_Cms.CreateDatabaseRequest {
@@ -87,47 +95,3 @@ func (t *Tenant) makeCreateDatabaseRequest() *Ydb_Cms.CreateDatabaseRequest {
8795
}
8896
return request
8997
}
90-
91-
func processDatabaseCreationOperation(operation *Ydb_Operations.Operation) (string, error) {
92-
if operation == nil {
93-
return "", ErrEmptyReplyFromStorage
94-
}
95-
if !operation.Ready {
96-
return operation.Id, nil
97-
}
98-
if operation.Status == Ydb.StatusIds_ALREADY_EXISTS || operation.Status == Ydb.StatusIds_SUCCESS {
99-
return "", nil
100-
}
101-
return "", fmt.Errorf("YDB response error: %v %v", operation.Status, operation.Issues)
102-
}
103-
104-
func (t *Tenant) CheckCreateOperation(
105-
ctx context.Context,
106-
operationID string,
107-
opts ...ydb.Option,
108-
) (bool, error, error) {
109-
logger := log.FromContext(ctx)
110-
url := fmt.Sprintf("%s/%s", t.StorageEndpoint, t.Domain)
111-
conn, err := connection.Open(ctx, url, opts...)
112-
if err != nil {
113-
logger.Error(err, "Error connecting to YDB storage")
114-
return false, nil, err
115-
}
116-
defer func() {
117-
connection.Close(ctx, conn)
118-
}()
119-
120-
client := Ydb_Operation_V1.NewOperationServiceClient(ydb.GRPCConn(conn))
121-
request := &Ydb_Operations.GetOperationRequest{Id: operationID}
122-
logger.Info(fmt.Sprintf("checking operation, url: %s, operationId: %s, request: %s", url, operationID, request))
123-
response, err := client.GetOperation(ctx, request)
124-
if err != nil {
125-
return false, nil, err
126-
}
127-
logger.Info(fmt.Sprintf("checking operation, response: %s", response))
128-
if response.Operation == nil {
129-
return false, nil, ErrEmptyReplyFromStorage
130-
}
131-
oid, err := processDatabaseCreationOperation(response.Operation)
132-
return len(oid) == 0, err, nil
133-
}

internal/controllers/constants/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ const (
4545
ReasonInProgress = "InProgress"
4646
ReasonNotRequired = "NotRequired"
4747
ReasonCompleted = "Completed"
48+
ReasonFailed = "Failed"
4849

4950
DefaultRequeueDelay = 10 * time.Second
5051
StatusUpdateRequeueDelay = 1 * time.Second

internal/controllers/database/init.go

Lines changed: 61 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"context"
55
"fmt"
66

7-
"github.com/ydb-platform/ydb-go-sdk/v3"
7+
ydb "github.com/ydb-platform/ydb-go-sdk/v3"
88
corev1 "k8s.io/api/core/v1"
99
apierrors "k8s.io/apimachinery/pkg/api/errors"
1010
"k8s.io/apimachinery/pkg/api/meta"
@@ -72,62 +72,85 @@ func (r *Reconciler) setInitDatabaseCompleted(
7272
return r.updateStatus(ctx, database, StatusUpdateRequeueDelay)
7373
}
7474

75-
func (r *Reconciler) checkCreateTenantOperation(
75+
func (r *Reconciler) checkCreateDatabaseOperation(
7676
ctx context.Context,
7777
database *resources.DatabaseBuilder,
7878
tenant *cms.Tenant,
7979
ydbOptions ydb.Option,
8080
) (bool, ctrl.Result, error) {
8181
condition := meta.FindStatusCondition(database.Status.Conditions, CreateDatabaseOperationCondition)
82-
if condition == nil || len(condition.Message) == 0 {
82+
if len(condition.Message) == 0 {
8383
// Something is wrong with the condition where we save operation id
8484
// retry create tenant
85+
message := fmt.Sprintf("Something is wrong with the condition, retry creating tenant %s", tenant.Path)
86+
r.Recorder.Event(
87+
database,
88+
corev1.EventTypeWarning,
89+
"InitializingFailed",
90+
message,
91+
)
8592
meta.SetStatusCondition(&database.Status.Conditions, metav1.Condition{
86-
Type: CreateDatabaseOperationCondition,
87-
Status: metav1.ConditionTrue,
88-
Reason: ReasonNotRequired,
93+
Type: CreateDatabaseOperationCondition,
94+
Status: metav1.ConditionFalse,
95+
Reason: ReasonFailed,
96+
Message: message,
8997
})
9098
return r.updateStatus(ctx, database, DatabaseInitializationRequeueDelay)
9199
}
92-
operationID := condition.Message
93-
finished, operationErr, err := tenant.CheckCreateOperation(ctx, operationID, ydbOptions)
100+
101+
operation := &cms.Operation{
102+
StorageEndpoint: tenant.StorageEndpoint,
103+
Domain: tenant.Domain,
104+
ID: condition.Message,
105+
}
106+
response, err := operation.GetOperation(ctx, ydbOptions)
94107
if err != nil {
95108
r.Recorder.Event(
96109
database,
97110
corev1.EventTypeWarning,
98111
"InitializingFailed",
99-
fmt.Sprintf("Error creating tenant %s: %s", tenant.Path, err),
112+
fmt.Sprintf("Failed to check creation operation, operationID %s: %s", operation.ID, err),
100113
)
101114
return Stop, ctrl.Result{RequeueAfter: DatabaseInitializationRequeueDelay}, err
102115
}
103-
if operationErr != nil {
104-
// Creation operation failed - retry Create Tenant
116+
117+
finished, operationID, err := operation.CheckGetOperationResponse(ctx, response)
118+
if err != nil {
105119
r.Recorder.Event(
106120
database,
107121
corev1.EventTypeWarning,
108122
"InitializingFailed",
109-
fmt.Sprintf("Error creating tenant %s: %s", tenant.Path, operationErr),
123+
fmt.Sprintf("Error creating tenant %s: %s", tenant.Path, err),
110124
)
111125
meta.SetStatusCondition(&database.Status.Conditions, metav1.Condition{
112-
Type: CreateDatabaseOperationCondition,
113-
Status: metav1.ConditionTrue,
114-
Reason: ReasonNotRequired,
126+
Type: CreateDatabaseOperationCondition,
127+
Status: metav1.ConditionFalse,
128+
Reason: ReasonFailed,
129+
Message: fmt.Sprintf("Failed to create tenant %s", tenant.Path),
115130
})
116131
return r.updateStatus(ctx, database, DatabaseInitializationRequeueDelay)
117132
}
133+
118134
if !finished {
119135
r.Recorder.Event(
120136
database,
121137
corev1.EventTypeWarning,
122-
"Pending",
138+
string(DatabaseInitializing),
123139
fmt.Sprintf("Tenant creation operation is not completed, operationID: %s", operationID),
124140
)
125-
return Stop, ctrl.Result{RequeueAfter: DatabaseInitializationRequeueDelay}, nil
141+
meta.SetStatusCondition(&database.Status.Conditions, metav1.Condition{
142+
Type: CreateDatabaseOperationCondition,
143+
Status: metav1.ConditionUnknown,
144+
Reason: ReasonInProgress,
145+
Message: operationID,
146+
})
147+
return r.updateStatus(ctx, database, DatabaseInitializationRequeueDelay)
126148
}
149+
127150
r.Recorder.Event(
128151
database,
129152
corev1.EventTypeNormal,
130-
"Initialized",
153+
string(DatabaseInitializing),
131154
fmt.Sprintf("Tenant %s created", tenant.Path),
132155
)
133156
return r.setInitDatabaseCompleted(ctx, database, "Database initialized successfully")
@@ -238,10 +261,11 @@ func (r *Reconciler) initializeTenant(
238261
}
239262
ydbOpts := ydb.MergeOptions(ydb.WithCredentials(creds), tlsOptions)
240263

241-
if meta.IsStatusConditionFalse(database.Status.Conditions, CreateDatabaseOperationCondition) {
242-
return r.checkCreateTenantOperation(ctx, database, tenant, ydbOpts)
264+
if meta.IsStatusConditionPresentAndEqual(database.Status.Conditions, CreateDatabaseOperationCondition, metav1.ConditionUnknown) {
265+
return r.checkCreateDatabaseOperation(ctx, database, tenant, ydbOpts)
243266
}
244-
operationID, err := tenant.Create(ctx, ydb.WithCredentials(creds), tlsOptions)
267+
268+
response, err := tenant.CreateDatabase(ctx, ydbOpts)
245269
if err != nil {
246270
r.Recorder.Event(
247271
database,
@@ -251,16 +275,28 @@ func (r *Reconciler) initializeTenant(
251275
)
252276
return Stop, ctrl.Result{RequeueAfter: DatabaseInitializationRequeueDelay}, err
253277
}
254-
if len(operationID) > 0 {
278+
279+
finished, operationID, err := tenant.CheckCreateDatabaseResponse(ctx, response)
280+
if err != nil {
281+
r.Recorder.Event(
282+
database,
283+
corev1.EventTypeWarning,
284+
"InitializingFailed",
285+
fmt.Sprintf("Failed %s: %s", tenant.Path, err),
286+
)
287+
return Stop, ctrl.Result{RequeueAfter: DatabaseInitializationRequeueDelay}, err
288+
}
289+
290+
if !finished {
255291
r.Recorder.Event(
256292
database,
257293
corev1.EventTypeWarning,
258-
"Pending",
294+
string(DatabaseInitializing),
259295
fmt.Sprintf("Tenant creation operation in progress, operationID: %s", operationID),
260296
)
261297
meta.SetStatusCondition(&database.Status.Conditions, metav1.Condition{
262298
Type: CreateDatabaseOperationCondition,
263-
Status: metav1.ConditionFalse,
299+
Status: metav1.ConditionUnknown,
264300
Reason: ReasonInProgress,
265301
Message: operationID,
266302
})
@@ -269,7 +305,7 @@ func (r *Reconciler) initializeTenant(
269305
r.Recorder.Event(
270306
database,
271307
corev1.EventTypeNormal,
272-
"Initialized",
308+
string(DatabaseInitializing),
273309
fmt.Sprintf("Tenant %s created", tenant.Path),
274310
)
275311

0 commit comments

Comments
 (0)