Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ generate-v1beta1-serverless-client: install-openapi-generator ## Generate server
@echo "==> Generating serverless cdc client"
rm -rf pkg/tidbcloud/v1beta1/serverless/cdc
cd tools/openapi-generator && npx openapi-generator-cli generate --inline-schema-options RESOLVE_INLINE_ENUMS=true --additional-properties=withGoMod=false,enumClassPrefix=true,disallowAdditionalPropertiesIfNotPresent=false --global-property=apiTests=false,apiDocs=false,modelDocs=false,modelTests=false -i ../../pkg/tidbcloud/v1beta1/serverless/cdc.swagger.json -g go -o ../../pkg/tidbcloud/v1beta1/serverless/cdc --package-name cdc -c go/config.yaml
@echo "==> Generating serverless cdc client"
rm -rf pkg/tidbcloud/v1beta1/serverless/migration
cd tools/openapi-generator && npx openapi-generator-cli generate --inline-schema-options RESOLVE_INLINE_ENUMS=true --additional-properties=withGoMod=false,enumClassPrefix=true,disallowAdditionalPropertiesIfNotPresent=false --global-property=apiTests=false,apiDocs=false,modelDocs=false,modelTests=false -i ../../pkg/tidbcloud/v1beta1/serverless/dm.swagger.json -g go -o ../../pkg/tidbcloud/v1beta1/serverless/migration --package-name migration -c go/config.yaml
cd pkg && go fmt ./tidbcloud/v1beta1/serverless/... && goimports -w .
@echo "==> Generating serverless privatelink client"
rm -rf pkg/tidbcloud/v1beta1/serverless/privatelink
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.24.0

require (
github.com/AlecAivazis/survey/v2 v2.3.6
github.com/AlekSi/pointer v1.2.0
github.com/aws/aws-sdk-go-v2 v1.27.1
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.23
github.com/charmbracelet/bubbles v0.17.1
Expand Down Expand Up @@ -31,6 +32,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.9.0
github.com/tailscale/hujson v0.0.0-20250605163823-992244df8c5a
github.com/tidbcloud/tidbcloud-cli/pkg v0.0.1
github.com/xo/usql v0.19.2
github.com/zalando/go-keyring v0.2.3
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ github.com/99designs/keyring v1.2.2 h1:pZd3neh/EmUzWONb35LxQfvuY7kiSXAq3HQd97+XB
github.com/99designs/keyring v1.2.2/go.mod h1:wes/FrByc8j7lFOAGLGSNEg8f/PaI3cgTBqhFkHUrPk=
github.com/AlecAivazis/survey/v2 v2.3.6 h1:NvTuVHISgTHEHeBFqt6BHOe4Ny/NwGZr7w+F8S9ziyw=
github.com/AlecAivazis/survey/v2 v2.3.6/go.mod h1:4AuI9b7RjAR+G7v9+C4YSlX/YL3K3cWNXgWXOhllqvI=
github.com/AlekSi/pointer v1.2.0 h1:glcy/gc4h8HnG2Z3ZECSzZ1IX1x2JxRVuDzaJwQE0+w=
github.com/AlekSi/pointer v1.2.0/go.mod h1:gZGfd3dpW4vEc/UlyfKKi1roIqcCgwOIvb0tSNSBle0=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1 h1:E+OJmp2tPvt1W+amx48v1eqbjDYsgN+RzP4q16yV5eM=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1/go.mod h1:a6xsAQUZg+VsS3TJ05SRp524Hs4pZ/AeFSr5ENf0Yjo=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.2 h1:FDif4R1+UUR+00q6wquyX90K7A8dN+R5E8GEadoP7sU=
Expand Down Expand Up @@ -641,6 +643,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/tailscale/hujson v0.0.0-20250605163823-992244df8c5a h1:a6TNDN9CgG+cYjaeN8l2mc4kSz2iMiCDQxPEyltUV/I=
github.com/tailscale/hujson v0.0.0-20250605163823-992244df8c5a/go.mod h1:EbW0wDK/qEUYI0A5bqq0C2kF8JTQwWONmGDBbzsxxHo=
github.com/thda/tds v0.1.7 h1:s29kbnJK0agL3ps85A/sb9XS2uxgKF5UJ6AZjbyqXX4=
github.com/thda/tds v0.1.7/go.mod h1:isLIF1oZdXfkqVMJM8RyNrsjlHPlTKnPlnsBs7ngZcM=
github.com/trinodb/trino-go-client v0.315.0 h1:9mU+42VGw9Hnp9R1hkhWlIrQp9o+V01Gx1KlHjTkM1c=
Expand Down
3 changes: 2 additions & 1 deletion internal/cli/serverless/changefeed/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package changefeed

import (
"fmt"
"time"

"github.com/juju/errors"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -138,7 +139,7 @@ func ListCmd(h *internal.Helper) *cobra.Command {
*item.DisplayName,
string(item.Sink.Type),
string(*item.State),
item.CreateTime.String(),
item.CreateTime.Format(time.RFC3339),
})
}
err := output.PrintHumanTable(h.IOStreams.Out, columns, rows)
Expand Down
2 changes: 2 additions & 0 deletions internal/cli/serverless/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/tidbcloud/tidbcloud-cli/internal/cli/serverless/changefeed"
"github.com/tidbcloud/tidbcloud-cli/internal/cli/serverless/dataimport"
"github.com/tidbcloud/tidbcloud-cli/internal/cli/serverless/export"
"github.com/tidbcloud/tidbcloud-cli/internal/cli/serverless/migration"
"github.com/tidbcloud/tidbcloud-cli/internal/cli/serverless/privatelink"
"github.com/tidbcloud/tidbcloud-cli/internal/cli/serverless/sqluser"

Expand Down Expand Up @@ -54,6 +55,7 @@ func Cmd(h *internal.Helper) *cobra.Command {
serverlessCmd.AddCommand(authorizednetwork.AuthorizedNetworkCmd(h))
serverlessCmd.AddCommand(changefeed.ChangefeedCmd(h))
serverlessCmd.AddCommand(privatelink.PrivateLinkConnectionCmd(h))
serverlessCmd.AddCommand(migration.MigrationCmd(h))

return serverlessCmd
}
303 changes: 303 additions & 0 deletions internal/cli/serverless/migration/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package migration

import (
"context"
"encoding/json"
"fmt"
"os"
"slices"
"strings"
"time"

"github.com/AlekSi/pointer"
aws "github.com/aws/aws-sdk-go-v2/aws"
"github.com/fatih/color"
"github.com/juju/errors"
"github.com/spf13/cobra"
"github.com/tailscale/hujson"

"github.com/tidbcloud/tidbcloud-cli/internal"
"github.com/tidbcloud/tidbcloud-cli/internal/config"
"github.com/tidbcloud/tidbcloud-cli/internal/flag"
"github.com/tidbcloud/tidbcloud-cli/internal/output"
"github.com/tidbcloud/tidbcloud-cli/internal/service/cloud"
pkgmigration "github.com/tidbcloud/tidbcloud-cli/pkg/tidbcloud/v1beta1/serverless/migration"
)

func CreateCmd(h *internal.Helper) *cobra.Command {
var cmd = &cobra.Command{
Use: "create",
Short: "Create a migration",
Args: cobra.NoArgs,
Example: fmt.Sprintf(` Create a migration:
$ %[1]s serverless migration create -c <cluster-id> --display-name <name> --config-file <file-path> --dry-run
$ %[1]s serverless migration create -c <cluster-id> --display-name <name> --config-file <file-path>
`, config.CliName),
PreRunE: func(cmd *cobra.Command, args []string) error {
return markCreateMigrationRequiredFlags(cmd)
},
RunE: func(cmd *cobra.Command, args []string) error {
d, err := h.Client()
if err != nil {
return err
}
ctx := cmd.Context()

dryRun, err := cmd.Flags().GetBool(flag.DryRun)
if err != nil {
return errors.Trace(err)
}
clusterID, err := cmd.Flags().GetString(flag.ClusterID)
if err != nil {
return errors.Trace(err)
}
name, err := cmd.Flags().GetString(flag.DisplayName)
if err != nil {
return errors.Trace(err)
}
if strings.TrimSpace(name) == "" {
return errors.New("display name is required")
}
configPath, err := cmd.Flags().GetString(flag.MigrationConfigFile)
if err != nil {
return errors.Trace(err)
}
configPath = strings.TrimSpace(configPath)
if configPath == "" {
return errors.New("config file path is required")
}
definitionBytes, err := os.ReadFile(configPath)
if err != nil {
return errors.Annotatef(err, "failed to read config file %q", configPath)
}
definitionStr := string(definitionBytes)

sources, target, mode, err := parseMigrationDefinition(definitionStr)
if err != nil {
return err
}

if dryRun {
precheckBody := &pkgmigration.MigrationServicePrecheckBody{
DisplayName: name,
Sources: sources,
Target: target,
Mode: mode,
}
return runMigrationPrecheck(ctx, d, clusterID, precheckBody, h)
}

createBody := &pkgmigration.MigrationServiceCreateMigrationBody{
DisplayName: name,
Sources: sources,
Target: target,
Mode: mode,
}

resp, err := d.CreateMigration(ctx, clusterID, createBody)
if err != nil {
return errors.Trace(err)
}

migrationID := aws.ToString(resp.MigrationId)
fmt.Fprintln(h.IOStreams.Out, color.GreenString("migration %s(%s) created", name, migrationID))
return nil
},
}

cmd.Flags().StringP(flag.ClusterID, flag.ClusterIDShort, "", "The ID of the target cluster.")
cmd.Flags().StringP(flag.DisplayName, flag.DisplayNameShort, "", "Display name for the migration.")
cmd.Flags().String(flag.MigrationConfigFile, "", "Path to a migration config JSON file. Use \"ticloud serverless migration template --mode <mode>\" to print templates.")
cmd.Flags().Bool(flag.DryRun, false, "Run a migration precheck (dry run) with the provided inputs without creating a migration.")

return cmd
}

func markCreateMigrationRequiredFlags(cmd *cobra.Command) error {
for _, fn := range []string{flag.ClusterID, flag.DisplayName, flag.MigrationConfigFile} {
if err := cmd.MarkFlagRequired(fn); err != nil {
return err
}
}
return nil
}

const (
precheckPollInterval = 5 * time.Second
precheckPollTimeout = 2 * time.Minute
)

func runMigrationPrecheck(ctx context.Context, client cloud.TiDBCloudClient, clusterID string, body *pkgmigration.MigrationServicePrecheckBody, h *internal.Helper) error {
resp, err := client.CreateMigrationPrecheck(ctx, clusterID, body)
if err != nil {
return errors.Trace(err)
}
if resp.PrecheckId == nil || *resp.PrecheckId == "" {
return errors.New("precheck created but ID is empty")
}
precheckID := *resp.PrecheckId
fmt.Fprintf(h.IOStreams.Out, "migration precheck %s created, polling results...\n", precheckID)

ticker := time.NewTicker(precheckPollInterval)
defer ticker.Stop()
pollCtx, cancel := context.WithTimeout(ctx, precheckPollTimeout)
defer cancel()

// Poll precheck status until it finishes or the overall timeout is hit.
for {
select {
case <-pollCtx.Done():
if pollCtx.Err() == context.DeadlineExceeded {
return errors.Errorf("migration precheck polling timed out after %s", precheckPollTimeout)
}
return pollCtx.Err()
case <-ticker.C:
result, err := client.GetMigrationPrecheck(pollCtx, clusterID, precheckID)
if err != nil {
return errors.Trace(err)
}
finished, err := printPrecheckSummary(result, h)
if err != nil {
return err
}
if !finished {
continue
}
if result.GetStatus() == pkgmigration.MIGRATIONPRECHECKSTATUS_FAILED {
fmt.Fprintln(h.IOStreams.Out, color.RedString("migration precheck %s failed", precheckID))
return errors.New("migration precheck failed")
}
fmt.Fprintln(h.IOStreams.Out, color.GreenString("migration precheck %s passed", precheckID))
return nil
}
}
}

func isPrecheckUnfinished(status pkgmigration.MigrationPrecheckStatus) bool {
switch status {
case pkgmigration.MIGRATIONPRECHECKSTATUS_PENDING,
pkgmigration.MIGRATIONPRECHECKSTATUS_RUNNING:
return true
default:
return false
}
}

func printPrecheckSummary(result *pkgmigration.MigrationPrecheck, h *internal.Helper) (bool, error) {
if isPrecheckUnfinished(result.GetStatus()) {
fmt.Fprintf(h.IOStreams.Out, "precheck %s summary (status %s)\n", result.GetPrecheckId(), result.GetStatus())
fmt.Fprintf(h.IOStreams.Out, "Total: %d, Success: %d, Warn: %d, Failed: %d\n",
aws.ToInt32(result.Total), aws.ToInt32(result.SuccessCnt), aws.ToInt32(result.WarnCnt), aws.ToInt32(result.FailedCnt))
return false, nil
}

fmt.Fprintf(h.IOStreams.Out, "precheck %s finished with status %s\n", result.GetPrecheckId(), result.GetStatus())
fmt.Fprintf(h.IOStreams.Out, "Total: %d, Success: %d, Warn: %d, Failed: %d\n",
aws.ToInt32(result.Total), aws.ToInt32(result.SuccessCnt), aws.ToInt32(result.WarnCnt), aws.ToInt32(result.FailedCnt))
if len(result.Items) == 0 {
return true, nil
}
columns := []output.Column{"Type", "Status", "Description", "Reason", "Solution"}
rows := make([]output.Row, 0, len(result.Items))
for _, item := range result.Items {
if !shouldPrintPrecheckItem(item.Status) {
continue
}
rows = append(rows, output.Row{
string(pointer.Get(item.Type)),
string(pointer.Get(item.Status)),
pointer.Get(item.Description),
pointer.Get(item.Reason),
pointer.Get(item.Solution),
})
}
if len(rows) == 0 {
return true, nil
}
return true, output.PrintHumanTable(h.IOStreams.Out, columns, rows)
}

// shouldPrintPrecheckItem reports whether a precheck item should be shown to users.
// Currently only WARNING and FAILED statuses surface because SUCCESS does not
// provide actionable information.
func shouldPrintPrecheckItem(status *pkgmigration.PrecheckItemStatus) bool {
if status == nil {
return false
}
switch *status {
case pkgmigration.PRECHECKITEMSTATUS_WARNING,
pkgmigration.PRECHECKITEMSTATUS_FAILED:
return true
default:
return false
}
}

func parseMigrationDefinition(value string) ([]pkgmigration.Source, pkgmigration.Target, pkgmigration.TaskMode, error) {
trimmed := strings.TrimSpace(value)
if trimmed == "" {
return nil, pkgmigration.Target{}, "", errors.New("migration config is required; use --config-file")
}
var payload struct {
Sources []pkgmigration.Source `json:"sources"`
Target *pkgmigration.Target `json:"target"`
Mode string `json:"mode"`
}
stdJson, err := standardizeJSON([]byte(trimmed))
if err != nil {
return nil, pkgmigration.Target{}, "", errors.Annotate(err, "invalid migration definition JSON")
}
if err := json.Unmarshal(stdJson, &payload); err != nil {
return nil, pkgmigration.Target{}, "", errors.Annotate(err, "invalid migration definition JSON")
}
if len(payload.Sources) == 0 {
return nil, pkgmigration.Target{}, "", errors.New("migration definition must include at least one source")
}
if payload.Target == nil {
return nil, pkgmigration.Target{}, "", errors.New("migration definition must include the target block")
}
mode, err := parseMigrationMode(payload.Mode)
if err != nil {
return nil, pkgmigration.Target{}, "", err
}
return payload.Sources, *payload.Target, mode, nil
}

func parseMigrationMode(value string) (pkgmigration.TaskMode, error) {
trimmed := strings.TrimSpace(value)
if trimmed == "" {
return "", errors.New("empty config file")
}
normalized := strings.ToUpper(trimmed)
mode := pkgmigration.TaskMode(normalized)
if slices.Contains(pkgmigration.AllowedTaskModeEnumValues, mode) {
return mode, nil
}
return "", errors.Errorf("invalid mode %q, allowed values: %s", value, pkgmigration.AllowedTaskModeEnumValues)
}

// standardizeJSON accepts JSON With Commas and Comments(JWCC) see
// https://nigeltao.github.io/blog/2021/json-with-commas-comments.html) and
// returns a standard JSON byte slice ready for json.Unmarshal.
func standardizeJSON(b []byte) ([]byte, error) {
ast, err := hujson.Parse(b)
if err != nil {
return b, err
}
ast.Standardize()
return ast.Pack(), nil
}
Loading
Loading