-
Notifications
You must be signed in to change notification settings - Fork 408
Cp command #9628
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Cp command #9628
Conversation
arielshaqed
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! This is neat & useful.
Blocking because I think:
- It has a race condition.
- It might not be able to copy when the source is not the destination branch.
- Ideally we would have an Esti lakectl CLI test for this. I understand that this can be hard to produce; if we skip it then I will ask you to open an issue to do so.
| } | ||
|
|
||
| success := true | ||
| var errorsWg sync.WaitGroup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other places, especially ones written more recently, lakectl uses errgroup.Group, which seems like a higher-level abstraction over what you do here. Can you use it here, too?
| go func() { | ||
| defer errorsWg.Done() | ||
| for err := range errors { | ||
| _, _ = fmt.Fprintln(os.Stderr, err) | ||
| success = false | ||
| } | ||
| }() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other places (for instance, fs_upload.go) we use a control flow which is inverted compared to this one:
- Spawn a goroutine, compute work items there, load onto the workers channel.
- In the main goroutine, read all errors from the errors channel.
This has the advantage of avoiding a waitgroup (the main goroutine can just count errors or nil returns; or if using errgroup it can just ask for the errors directly)
|
|
||
| close(paths) | ||
| copyWg.Wait() | ||
| close(errors) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This races with the error-handling goroutine above: once all copiers are done, if the error-handling goroutine is waiting then this line may close errors before that goroutine reads again. That results in reading on a closed channel, which is a race: the errors goroutine will end up discovering an error, but the code below on l. 95 will see success==true and exit cleanly when it should fail.
Refactoring to load from a goroutine and unload from the main thread of execution helps avoid this race.
| resp, err := client.CopyObjectWithResponse(ctx, sourceURI.Repository, sourceURI.Ref, &apigen.CopyObjectParams{ | ||
| DestPath: *destinationURI.Path, | ||
| }, apigen.CopyObjectJSONRequestBody{ | ||
| SrcPath: *sourceURI.Path, | ||
| SrcRef: &sourceURI.Ref, | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not understand how this copies from a ref to another branch: destinationURI.Ref is ignored. I would expect e.g.
| resp, err := client.CopyObjectWithResponse(ctx, sourceURI.Repository, sourceURI.Ref, &apigen.CopyObjectParams{ | |
| DestPath: *destinationURI.Path, | |
| }, apigen.CopyObjectJSONRequestBody{ | |
| SrcPath: *sourceURI.Path, | |
| SrcRef: &sourceURI.Ref, | |
| }) | |
| resp, err := client.CopyObjectWithResponse(ctx, destinationURI.Repository, destinationURI.Ref, &apigen.CopyObjectParams{ | |
| DestPath: *destinationURI.Path, | |
| }, apigen.CopyObjectJSONRequestBody{ | |
| SrcPath: *sourceURI.Path, | |
| SrcRef: &sourceURI.Ref, | |
| }) |
(but I could be wrong, it's been a long while...).
Closes #8683
Change Description
Background
All the API functionality for copying exists, but not the cli command
New Feature
new cp command. Usage is like following:
$ lakectl fs cp lakefs://repo/branch/source lakefs://repo/branch/some/destor recursively:
$ lakectl fs cp lakefs://repo/branch/sourceDir lakefs://repo/branch/some/dest --recursiveworks both with and without slashes in the end in case of recursion
Testing Details
Manually. I didn't see any unit tests for cobra commands, if needed though I can explore
Breaking Change?
No
Additional info
./lakectl fs cp lakefs://quickstart/main/images lakefs://quickstart/main/data --recursivelog:
DEBUG [2025-10-31T13:52:34+01:00]usr/local/go/src/net/http/server.go:2294 net/http.HandlerFunc.ServeHTTP HTTP call ended host="127.0.0.1:8000" method=POST operation_id=CopyObject path="/api/v1/repositories/quickstart/branches/main/objects/copy?dest_path=data%2Fduckdb-editor-06.png" source_ip="127.0.0.1:62902" status_code=201 took=52966791 took_str=52.966791ms user=admin
DEBUG [2025-10-31T13:52:34+01:00]usr/local/go/src/net/http/server.go:2294 net/http.HandlerFunc.ServeHTTP HTTP call ended host="127.0.0.1:8000" method=POST operation_id=CopyObject path="/api/v1/repositories/quickstart/branches/main/objects/copy?dest_path=data%2Fhooks-03.png" source_ip="127.0.0.1:62898" status_code=201 took=82716458 took_str=82.716458ms user=admin
DEBUG [2025-10-31T13:52:34+01:00]usr/local/go/src/net/http/server.go:2294 net/http.HandlerFunc.ServeHTTP HTTP call ended host="127.0.0.1:8000" method=POST operation_id=CopyObject path="/api/v1/repositories/quickstart/branches/main/objects/copy?dest_path=data%2Fduckdb-main-03.png" source_ip="127.0.0.1:62892" status_code=201 took=90485333 took_str=90.485333ms user=admin
Contact Details
Slack + [email protected]