-
Notifications
You must be signed in to change notification settings - Fork 408
Cp command #9628
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Cp command #9628
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,145 @@ | ||||||||||||||||||||||||||
| package cmd | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| import ( | ||||||||||||||||||||||||||
| "context" | ||||||||||||||||||||||||||
| "fmt" | ||||||||||||||||||||||||||
| "github.com/spf13/cobra" | ||||||||||||||||||||||||||
| "github.com/treeverse/lakefs/pkg/api/apigen" | ||||||||||||||||||||||||||
| "github.com/treeverse/lakefs/pkg/api/apiutil" | ||||||||||||||||||||||||||
| "github.com/treeverse/lakefs/pkg/uri" | ||||||||||||||||||||||||||
| "net/http" | ||||||||||||||||||||||||||
| "os" | ||||||||||||||||||||||||||
| "path" | ||||||||||||||||||||||||||
| "strings" | ||||||||||||||||||||||||||
| "sync" | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| var fsCpCmd = &cobra.Command{ | ||||||||||||||||||||||||||
| Use: "cp <source URI> <destination URI>", | ||||||||||||||||||||||||||
| Short: "Copy object", | ||||||||||||||||||||||||||
| Args: cobra.ExactArgs(2), | ||||||||||||||||||||||||||
| Run: func(cmd *cobra.Command, args []string) { | ||||||||||||||||||||||||||
| recursive := Must(cmd.Flags().GetBool(recursiveFlagName)) | ||||||||||||||||||||||||||
| concurrency := Must(cmd.Flags().GetInt("concurrency")) | ||||||||||||||||||||||||||
| sourceURI := MustParsePathURI("source", args[0]) | ||||||||||||||||||||||||||
| destinationURI := MustParsePathURI("destination", args[1]) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if sourceURI.Repository != destinationURI.Repository { | ||||||||||||||||||||||||||
| Die("Can only copy files in the same repo", 1) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| client := getClient() | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| ctx := cmd.Context() | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if !recursive { | ||||||||||||||||||||||||||
| err := copyObject(ctx, client, sourceURI, destinationURI) | ||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||
| DieErr(err) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| success := true | ||||||||||||||||||||||||||
| var errorsWg sync.WaitGroup | ||||||||||||||||||||||||||
| errors := make(chan error) | ||||||||||||||||||||||||||
| errorsWg.Add(1) | ||||||||||||||||||||||||||
| go func() { | ||||||||||||||||||||||||||
| defer errorsWg.Done() | ||||||||||||||||||||||||||
| for err := range errors { | ||||||||||||||||||||||||||
| _, _ = fmt.Fprintln(os.Stderr, err) | ||||||||||||||||||||||||||
| success = false | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| }() | ||||||||||||||||||||||||||
|
Comment on lines
+47
to
+53
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In other places (for instance, fs_upload.go) we use a control flow which is inverted compared to this one:
This has the advantage of avoiding a waitgroup (the main goroutine can just count errors or nil returns; or if using errgroup it can just ask for the errors directly) |
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| var copyWg sync.WaitGroup | ||||||||||||||||||||||||||
| paths := make(chan string) | ||||||||||||||||||||||||||
| copyWg.Add(concurrency) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| for i := 0; i < concurrency; i++ { | ||||||||||||||||||||||||||
| go copyObjectWorker(ctx, client, sourceURI, destinationURI, paths, errors, ©Wg) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| prefix := *sourceURI.Path | ||||||||||||||||||||||||||
| var paramsDelimiter apigen.PaginationDelimiter = "" | ||||||||||||||||||||||||||
| var from string | ||||||||||||||||||||||||||
| pfx := apigen.PaginationPrefix(prefix) | ||||||||||||||||||||||||||
| for { | ||||||||||||||||||||||||||
| params := &apigen.ListObjectsParams{ | ||||||||||||||||||||||||||
| Prefix: &pfx, | ||||||||||||||||||||||||||
| After: apiutil.Ptr(apigen.PaginationAfter(from)), | ||||||||||||||||||||||||||
| Delimiter: ¶msDelimiter, | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| resp, err := client.ListObjectsWithResponse(cmd.Context(), sourceURI.Repository, sourceURI.Ref, params) | ||||||||||||||||||||||||||
| DieOnErrorOrUnexpectedStatusCode(resp, err, http.StatusOK) | ||||||||||||||||||||||||||
| if resp.JSON200 == nil { | ||||||||||||||||||||||||||
| Die("Bad response from server", 1) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| results := resp.JSON200.Results | ||||||||||||||||||||||||||
| for i := range results { | ||||||||||||||||||||||||||
| paths <- results[i].Path | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| pagination := resp.JSON200.Pagination | ||||||||||||||||||||||||||
| if !pagination.HasMore { | ||||||||||||||||||||||||||
| break | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| from = pagination.NextOffset | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| close(paths) | ||||||||||||||||||||||||||
| copyWg.Wait() | ||||||||||||||||||||||||||
| close(errors) | ||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This races with the error-handling goroutine above: once all copiers are done, if the error-handling goroutine is waiting then this line may close errors before that goroutine reads again. That results in reading on a closed channel, which is a race: the errors goroutine will end up discovering an error, but the code below on l. 95 will see Refactoring to load from a goroutine and unload from the main thread of execution helps avoid this race. |
||||||||||||||||||||||||||
| errorsWg.Wait() | ||||||||||||||||||||||||||
| if !success { | ||||||||||||||||||||||||||
| os.Exit(1) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| func copyObjectWorker(ctx context.Context, client apigen.ClientWithResponsesInterface, sourceURI, destinationURI *uri.URI, paths <-chan string, errors chan<- error, wg *sync.WaitGroup) { | ||||||||||||||||||||||||||
| defer wg.Done() | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| srcPrefix := strings.TrimSuffix(*sourceURI.Path, "/") | ||||||||||||||||||||||||||
| dstPrefix := strings.TrimSuffix(*destinationURI.Path, "/") | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| for srcObjPath := range paths { | ||||||||||||||||||||||||||
| rel := strings.TrimPrefix(srcObjPath, srcPrefix+"/") | ||||||||||||||||||||||||||
| destObjPath := dstPrefix | ||||||||||||||||||||||||||
| if rel != "" { | ||||||||||||||||||||||||||
| destObjPath = path.Join(dstPrefix, rel) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| resp, err := client.CopyObjectWithResponse(ctx, sourceURI.Repository, sourceURI.Ref, &apigen.CopyObjectParams{ | ||||||||||||||||||||||||||
| DestPath: destObjPath, | ||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||
| apigen.CopyObjectJSONRequestBody{ | ||||||||||||||||||||||||||
| SrcPath: srcObjPath, | ||||||||||||||||||||||||||
| SrcRef: &sourceURI.Ref, | ||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
| if err = RetrieveError(resp, err); err != nil { | ||||||||||||||||||||||||||
| errors <- fmt.Errorf("cp %q -> %q: %w", srcObjPath, destObjPath, err) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| func copyObject(ctx context.Context, client apigen.ClientWithResponsesInterface, sourceURI, destinationURI *uri.URI) error { | ||||||||||||||||||||||||||
| resp, err := client.CopyObjectWithResponse(ctx, sourceURI.Repository, sourceURI.Ref, &apigen.CopyObjectParams{ | ||||||||||||||||||||||||||
| DestPath: *destinationURI.Path, | ||||||||||||||||||||||||||
| }, apigen.CopyObjectJSONRequestBody{ | ||||||||||||||||||||||||||
| SrcPath: *sourceURI.Path, | ||||||||||||||||||||||||||
| SrcRef: &sourceURI.Ref, | ||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||
|
Comment on lines
+129
to
+134
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not understand how this copies from a ref to another branch: destinationURI.Ref is ignored. I would expect e.g.
Suggested change
(but I could be wrong, it's been a long while...). |
||||||||||||||||||||||||||
| return RetrieveError(resp, err) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| //nolint:gochecknoinits | ||||||||||||||||||||||||||
| func init() { | ||||||||||||||||||||||||||
| const defaultConcurrency = 50 | ||||||||||||||||||||||||||
| withRecursiveFlag(fsCpCmd, "recursively copy all objects under the specified path") | ||||||||||||||||||||||||||
| fsCpCmd.Flags().IntP("concurrency", "C", defaultConcurrency, "max concurrent single copy operations to send to the lakeFS server") | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| fsCmd.AddCommand(fsCpCmd) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other places, especially ones written more recently, lakectl uses errgroup.Group, which seems like a higher-level abstraction over what you do here. Can you use it here, too?