Skip to content

Commit 85c11fb

Browse files
committed
Add kibana upgrade assistant
1 parent c79fae6 commit 85c11fb

File tree

3 files changed

+1085
-9
lines changed

3 files changed

+1085
-9
lines changed

functionaltests/internal/kbclient/client.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"fmt"
2828
"io"
2929
"net/http"
30+
"slices"
3031
"time"
3132
)
3233

@@ -235,3 +236,108 @@ func (c *Client) EnableIntegrationsServer(ctx context.Context) error {
235236

236237
return nil
237238
}
239+
240+
func (c *Client) ResolveMigrationDeprecations(ctx context.Context) error {
241+
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
242+
defer cancel()
243+
244+
deprecations, err := c.QueryCriticalESDeprecations(ctx)
245+
if err != nil {
246+
return fmt.Errorf("failed to query critical deprecations: %w", err)
247+
}
248+
249+
for _, deprecation := range deprecations {
250+
switch deprecation.Type {
251+
case "index_settings":
252+
if err = c.markIndexAsReadOnly(ctx, deprecation.Name); err != nil {
253+
return fmt.Errorf("failed to mark index as read-only: %w", err)
254+
}
255+
case "data_streams":
256+
if err = c.markDataStreamAsReadOnly(
257+
ctx,
258+
deprecation.Name,
259+
deprecation.CorrectiveAction.Metadata.IndicesRequiringUpgrade,
260+
); err != nil {
261+
return fmt.Errorf("failed to mark data stream as read-only: %w", err)
262+
}
263+
default:
264+
return fmt.Errorf("unknown deprecation type: %s", deprecation.Type)
265+
}
266+
}
267+
268+
return nil
269+
}
270+
271+
type MigrationDeprecation struct {
272+
Name string `json:"index"`
273+
Type string `json:"type"`
274+
IsCritical bool `json:"isCritical"`
275+
CorrectiveAction struct {
276+
Type string `json:"type"`
277+
Metadata struct {
278+
IndicesRequiringUpgrade []string `json:"indicesRequiringUpgrade,omitempty"`
279+
}
280+
} `json:"correctiveAction"`
281+
}
282+
283+
type esDeprecationsResponse struct {
284+
MigrationDeprecations []MigrationDeprecation `json:"migrationsDeprecations"`
285+
}
286+
287+
// QueryCriticalESDeprecations retrieves the critical deprecation warnings for Elasticsearch.
288+
// It is essentially equivalent to `GET _migration/deprecations`, but through Kibana Upgrade
289+
// Assistant API.
290+
func (c *Client) QueryCriticalESDeprecations(ctx context.Context) ([]MigrationDeprecation, error) {
291+
path := "/api/upgrade_assistant/es_deprecations"
292+
b, err := c.sendRequest(ctx, http.MethodGet, path, nil, nil)
293+
if err != nil {
294+
return nil, err
295+
}
296+
297+
var esDeprecationsResp esDeprecationsResponse
298+
if err = json.Unmarshal(b, &esDeprecationsResp); err != nil {
299+
return nil, fmt.Errorf("cannot unmarshal response body: %w", err)
300+
}
301+
302+
// Remove all non-critical deprecation info.
303+
return slices.DeleteFunc(
304+
esDeprecationsResp.MigrationDeprecations,
305+
func(dep MigrationDeprecation) bool {
306+
return !dep.IsCritical
307+
},
308+
), nil
309+
}
310+
311+
type upgradeAssistUpdateIndexRequest struct {
312+
Operations []string `json:"operations"`
313+
}
314+
315+
// markIndexAsReadOnly updates the index to read-only through the Upgrade Assistant API:
316+
// https://www.elastic.co/guide/en/kibana/current/upgrade-assistant.html.
317+
func (c *Client) markIndexAsReadOnly(ctx context.Context, index string) error {
318+
path := fmt.Sprintf("/api/upgrade_assistant/update_index/%s", index)
319+
req := upgradeAssistUpdateIndexRequest{
320+
Operations: []string{"blockWrite", "unfreeze"},
321+
}
322+
323+
_, err := c.sendRequest(ctx, http.MethodPost, path, req, nil)
324+
return err
325+
}
326+
327+
type upgradeAssistMigrateDSRequest struct {
328+
Indices []string `json:"indices"`
329+
}
330+
331+
// markDataStreamAsReadOnly marks the backing indices of the data stream as read-only
332+
// through the Upgrade Assistant API:
333+
// https://www.elastic.co/guide/en/kibana/current/upgrade-assistant.html.
334+
func (c *Client) markDataStreamAsReadOnly(ctx context.Context, dataStream string, indices []string) error {
335+
// Data stream
336+
path := fmt.Sprintf("/api/upgrade_assistant/migrate_data_stream/%s/readonly", dataStream)
337+
req := upgradeAssistMigrateDSRequest{
338+
Indices: indices,
339+
}
340+
341+
_, err := c.sendRequest(ctx, http.MethodPost, path, req, nil)
342+
return err
343+
}

functionaltests/internal/kbclient/client_test.go

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func getHttpClient(t *testing.T) (*recorder.Recorder, *http.Client) {
108108
return rec, hc
109109
}
110110

111-
func TestClient_GetPackagePolicyByID(t *testing.T) {
111+
func newRecordedTestClient(t *testing.T) *kbclient.Client {
112112
kibanaURL := os.Getenv("KIBANA_URL")
113113
username := os.Getenv("KIBANA_USERNAME")
114114
password := os.Getenv("KIBANA_PASSWORD")
@@ -117,23 +117,23 @@ func TestClient_GetPackagePolicyByID(t *testing.T) {
117117
_, httpc := getHttpClient(t)
118118
kbc.Client = *httpc
119119

120+
return kbc
121+
}
122+
123+
func TestClient_GetPackagePolicyByID(t *testing.T) {
124+
kbc := newRecordedTestClient(t)
125+
120126
policy, err := kbc.GetPackagePolicyByID(context.Background(), "elastic-cloud-apm")
121127
require.NoError(t, err)
122128
assert.Equal(t, "Elastic APM", policy.Name)
123129
}
124130

125131
func TestClient_UpdatePackagePolicyByID(t *testing.T) {
126-
kibanaURL := os.Getenv("KIBANA_URL")
127-
username := os.Getenv("KIBANA_USERNAME")
128-
password := os.Getenv("KIBANA_PASSWORD")
129-
kbc, err := kbclient.New(kibanaURL, username, password)
130-
require.NoError(t, err)
131-
_, httpc := getHttpClient(t)
132-
kbc.Client = *httpc
132+
kbc := newRecordedTestClient(t)
133133

134134
ctx := context.Background()
135135
policyID := "elastic-cloud-apm"
136-
err = kbc.UpdatePackagePolicyByID(ctx, policyID,
136+
err := kbc.UpdatePackagePolicyByID(ctx, policyID,
137137
kbclient.PackagePolicy{
138138
Name: "Elastic APM",
139139
Description: "Hello World",
@@ -149,3 +149,25 @@ func TestClient_UpdatePackagePolicyByID(t *testing.T) {
149149
require.NoError(t, err)
150150
assert.Equal(t, "Hello World", policy.Description)
151151
}
152+
153+
func TestClient_ResolveMigrationDeprecations(t *testing.T) {
154+
kbc := newRecordedTestClient(t)
155+
156+
ctx := context.Background()
157+
// Check that there are some critical deprecation warnings.
158+
deprecations, err := kbc.QueryCriticalESDeprecations(ctx)
159+
require.NoError(t, err)
160+
require.Greater(t, len(deprecations), 0)
161+
for _, deprecation := range deprecations {
162+
require.True(t, deprecation.IsCritical)
163+
}
164+
165+
// Resolve them.
166+
err = kbc.ResolveMigrationDeprecations(ctx)
167+
require.NoError(t, err)
168+
169+
// Check that there are no more.
170+
deprecations, err = kbc.QueryCriticalESDeprecations(ctx)
171+
require.NoError(t, err)
172+
assert.Len(t, deprecations, 0)
173+
}

0 commit comments

Comments
 (0)