Skip to content

Commit daeaa10

Browse files
authored
Merge branch 'main' into cdc
2 parents 2833ea6 + 71cfb79 commit daeaa10

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1248
-971
lines changed

internal/cli/serverless/export/create.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func (c CreateOpts) NonInteractiveFlags() []string {
7373
flag.OSSURI,
7474
flag.OSSAccessKeyID,
7575
flag.OSSAccessKeySecret,
76+
flag.Partitions,
7677
}
7778
}
7879

@@ -161,6 +162,8 @@ func CreateCmd(h *internal.Helper) *cobra.Command {
161162
// oss
162163
var ossURI, ossAccessKeyID, ossAccessKeySecret string
163164
var displayName string
165+
// partitions
166+
var partitions []string
164167

165168
if opts.interactive {
166169
if !h.IOStreams.CanPrompt {
@@ -303,8 +306,8 @@ func CreateCmd(h *internal.Helper) *cobra.Command {
303306
return errors.New("sql is empty")
304307
}
305308
case FilterTable:
306-
fmt.Fprintln(h.IOStreams.Out, color.BlueString("Please input the following options, require at least one field"))
307-
inputs := []string{flag.TableFilter, flag.TableWhere}
309+
fmt.Fprintln(h.IOStreams.Out, color.BlueString("Please input the following options"))
310+
inputs := []string{flag.TableFilter, flag.TableWhere, flag.Partitions}
308311
textInput, err := ui.InitialInputModel(inputs, inputDescription)
309312
if err != nil {
310313
return err
@@ -314,9 +317,16 @@ func CreateCmd(h *internal.Helper) *cobra.Command {
314317
if err != nil {
315318
return err
316319
}
320+
if len(patterns) == 0 {
321+
return errors.New("table filters are required, if want to export whole cluster, type *.*")
322+
}
317323
where = textInput.Inputs[1].Value()
318-
if len(patterns) == 0 && where == "" {
319-
return errors.New("both patterns and where are empty, require at least one field")
324+
partitionString := textInput.Inputs[2].Value()
325+
if partitionString != "" {
326+
partitions, err = util.StringSliceConv(partitionString)
327+
if err != nil {
328+
return err
329+
}
320330
}
321331
}
322332

@@ -573,6 +583,10 @@ func CreateCmd(h *internal.Helper) *cobra.Command {
573583
if err != nil {
574584
return errors.Trace(err)
575585
}
586+
partitions, err = cmd.Flags().GetStringSlice(flag.Partitions)
587+
if err != nil {
588+
return errors.Trace(err)
589+
}
576590
}
577591

578592
if !opts.interactive && sql == "" && len(patterns) == 0 && !force {
@@ -669,11 +683,12 @@ func CreateCmd(h *internal.Helper) *cobra.Command {
669683
Sql: &sql,
670684
}
671685
}
672-
if len(patterns) > 0 || where != "" {
686+
if len(patterns) > 0 || where != "" || len(partitions) > 0 {
673687
params.ExportOptions.Filter = &export.ExportOptionsFilter{
674688
Table: &export.ExportOptionsFilterTable{
675-
Where: &where,
676-
Patterns: patterns,
689+
Where: &where,
690+
Patterns: patterns,
691+
Partitions: partitions,
677692
},
678693
}
679694
}
@@ -729,9 +744,11 @@ func CreateCmd(h *internal.Helper) *cobra.Command {
729744
createCmd.Flags().String(flag.OSSAccessKeySecret, "", "The access key secret of the OSS.")
730745
createCmd.Flags().String(flag.ParquetCompression, "ZSTD", fmt.Sprintf("The parquet compression algorithm. One of %q.", export.AllowedExportParquetCompressionTypeEnumEnumValues))
731746
createCmd.Flags().String(flag.DisplayName, "", "The display name of the export. (default \"SNAPSHOT_<snapshot_time>\")")
747+
createCmd.Flags().StringSlice(flag.Partitions, nil, "Filter the exported partition table(s) with specified partition(s).")
732748

733749
createCmd.MarkFlagsMutuallyExclusive(flag.TableFilter, flag.SQL)
734750
createCmd.MarkFlagsMutuallyExclusive(flag.TableWhere, flag.SQL)
751+
createCmd.MarkFlagsMutuallyExclusive(flag.Partitions, flag.SQL)
735752
createCmd.MarkFlagsMutuallyExclusive(flag.S3RoleArn, flag.S3AccessKeyID)
736753
createCmd.MarkFlagsMutuallyExclusive(flag.S3RoleArn, flag.S3SecretAccessKey)
737754
return createCmd

internal/cli/serverless/export/create_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -348,8 +348,9 @@ func (suite *CreateExportSuite) TestCreateExportWithTableFilter() {
348348
body := getDefaultCreateExportBody()
349349
body.ExportOptions.Filter = &export.ExportOptionsFilter{
350350
Table: &export.ExportOptionsFilterTable{
351-
Patterns: []string{pattern1, pattern2},
352-
Where: &where,
351+
Patterns: []string{pattern1, pattern2},
352+
Where: &where,
353+
Partitions: []string{},
353354
},
354355
}
355356

internal/cli/serverless/export/ui.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ var inputDescription = map[string]string{
3636
flag.GCSURI: "Input your GCS URI in gs://<bucket>/<path> format",
3737
flag.GCSServiceAccountKey: "Input your base64 encoded GCS service account key",
3838
flag.SQL: "Input the SELECT SQL statement",
39-
flag.TableFilter: "Input the table filter patterns (comma separated). Example: database.table,database.*,`database-1`.`table-1`",
40-
flag.TableWhere: "Input the where clause which will apply to all filtered tables. Example: id > 10",
39+
flag.TableFilter: "Required, input the table filter patterns (comma separated). Example: database.table,database.*,`database-1`.`table-1`",
40+
flag.TableWhere: "Optional, input the where clause which will apply to all filtered tables. Example: id > 10",
4141
flag.CSVSeparator: "Input the CSV separator: separator of each value in CSV files, skip to use default value (,)",
4242
flag.CSVDelimiter: "Input the CSV delimiter: delimiter of string type variables in CSV files, skip to use default value (\"). If you want to set empty string, please use non-interactive mode",
4343
flag.CSVNullValue: "Input the CSV null value: representation of null values in CSV files, skip to use default value (\\N). If you want to set empty string, please use non-interactive mode",
@@ -46,6 +46,7 @@ var inputDescription = map[string]string{
4646
flag.OSSURI: "Input your OSS URI in oss://<bucket>/<path> format",
4747
flag.OSSAccessKeyID: "Input your OSS access key id",
4848
flag.OSSAccessKeySecret: "Input your OSS access key secret",
49+
flag.Partitions: "Optional, input the partitions (comma separated) which will apply to all filtered tables. Example: p1,p2. It is useful when you want to export specific partitions from a partition table.",
4950
}
5051

5152
func GetSelectedParquetCompression() (export.ExportParquetCompressionTypeEnum, error) {

internal/flag/flag.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ const (
9898
ParquetCompression string = "parquet.compression"
9999
StartDate string = "start-date"
100100
EndDate string = "end-date"
101+
Partitions string = "partitions"
101102

102103
AuditLogUnRedacted string = "unredacted"
103104
AuditLogFilterRuleID string = "filter-rule-id"

internal/service/cloud/api_client.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -405,17 +405,17 @@ func (d *ClientDelegate) CancelUpload(ctx context.Context, clusterId string, upl
405405
}
406406

407407
func (d *ClientDelegate) GetExport(ctx context.Context, clusterId string, exportId string) (*export.Export, error) {
408-
res, h, err := d.ec.ExportServiceAPI.ExportServiceGetExport(ctx, clusterId, exportId).Execute()
408+
res, h, err := d.ec.ExportAPI.ExportServiceGetExport(ctx, clusterId, exportId).Execute()
409409
return res, parseError(err, h)
410410
}
411411

412412
func (d *ClientDelegate) CancelExport(ctx context.Context, clusterId string, exportId string) (*export.Export, error) {
413-
res, h, err := d.ec.ExportServiceAPI.ExportServiceCancelExport(ctx, clusterId, exportId).Execute()
413+
res, h, err := d.ec.ExportAPI.ExportServiceCancelExport(ctx, clusterId, exportId).Execute()
414414
return res, parseError(err, h)
415415
}
416416

417417
func (d *ClientDelegate) CreateExport(ctx context.Context, clusterId string, body *export.ExportServiceCreateExportBody) (*export.Export, error) {
418-
r := d.ec.ExportServiceAPI.ExportServiceCreateExport(ctx, clusterId)
418+
r := d.ec.ExportAPI.ExportServiceCreateExport(ctx, clusterId)
419419
if body != nil {
420420
r = r.Body(*body)
421421
}
@@ -424,12 +424,12 @@ func (d *ClientDelegate) CreateExport(ctx context.Context, clusterId string, bod
424424
}
425425

426426
func (d *ClientDelegate) DeleteExport(ctx context.Context, clusterId string, exportId string) (*export.Export, error) {
427-
res, h, err := d.ec.ExportServiceAPI.ExportServiceDeleteExport(ctx, clusterId, exportId).Execute()
427+
res, h, err := d.ec.ExportAPI.ExportServiceDeleteExport(ctx, clusterId, exportId).Execute()
428428
return res, parseError(err, h)
429429
}
430430

431431
func (d *ClientDelegate) ListExports(ctx context.Context, clusterId string, pageSize *int32, pageToken *string, orderBy *string) (*export.ListExportsResponse, error) {
432-
r := d.ec.ExportServiceAPI.ExportServiceListExports(ctx, clusterId)
432+
r := d.ec.ExportAPI.ExportServiceListExports(ctx, clusterId)
433433
if pageSize != nil {
434434
r = r.PageSize(*pageSize)
435435
}

0 commit comments

Comments
 (0)