From 5c4268c3e2304b6b7ecca7a92f5ace371cfe97cd Mon Sep 17 00:00:00 2001 From: ConcurrentCrab Date: Fri, 21 Jun 2024 17:38:44 +0000 Subject: [PATCH 1/6] routers/private: fix push-on-create being incorrectly triggered The code here checks if the repo being requested doesn't exist. If it doesn't, then a write operation might create it. But a read operation doesn't make any sense, and should error out. So simply check the access mode. I assume this was the intent here, but only checked for one "verb" instead, while there exist other read-only verbs as well. And ofc more can be introduced in the future ;) Possibly some write verbs don't make sense as well (presumably those that only add stuff incrementally to existing repos)? --- routers/private/serv.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/routers/private/serv.go b/routers/private/serv.go index dbb28cc2bb072..4dd7d06fb36e2 100644 --- a/routers/private/serv.go +++ b/routers/private/serv.go @@ -136,16 +136,15 @@ func ServCommand(ctx *context.PrivateContext) { if err != nil { if repo_model.IsErrRepoNotExist(err) { repoExist = false - for _, verb := range ctx.FormStrings("verb") { - if verb == "git-upload-pack" { - // User is fetching/cloning a non-existent repository - log.Warn("Failed authentication attempt (cannot find repository: %s/%s) from %s", results.OwnerName, results.RepoName, ctx.RemoteAddr()) - ctx.JSON(http.StatusNotFound, private.Response{ - UserMsg: fmt.Sprintf("Cannot find repository: %s/%s", results.OwnerName, results.RepoName), - }) - return - } + if mode == perm.AccessModeRead { + // User is fetching/cloning a non-existent repository + log.Warn("Failed authentication attempt (cannot find repository: %s/%s) from %s", results.OwnerName, results.RepoName, ctx.RemoteAddr()) + ctx.JSON(http.StatusNotFound, private.Response{ + UserMsg: fmt.Sprintf("Cannot find repository: %s/%s", results.OwnerName, results.RepoName), + }) + return } + // else fallthrough (push-to-create may kick in below) } else { log.Error("Unable to get repository: %s/%s Error: %v", results.OwnerName, results.RepoName, err) ctx.JSON(http.StatusInternalServerError, private.Response{ From a177bb0533d47f5071b2376f7f98286885527dad Mon Sep 17 00:00:00 2001 From: ConcurrentCrab Date: Fri, 21 Jun 2024 12:34:44 +0000 Subject: [PATCH 2/6] cmd: refactor runServ() Coalesce access mode detection into one place. Yes, "upload" really has opposite semantics for git commands vs. git-lfs commands. Wow. This commit makes no functional changes. --- cmd/serv.go | 84 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 50 insertions(+), 34 deletions(-) diff --git a/cmd/serv.go b/cmd/serv.go index 2bfd1110617e5..36064928444e3 100644 --- a/cmd/serv.go +++ b/cmd/serv.go @@ -36,7 +36,10 @@ import ( ) const ( - lfsAuthenticateVerb = "git-lfs-authenticate" + verbUploadPack = "git-upload-pack" + verbUploadArchive = "git-upload-archive" + verbReceivePack = "git-receive-pack" + verbLfsAuthenticate = "git-lfs-authenticate" ) // CmdServ represents the available serv sub-command. @@ -73,11 +76,16 @@ func setup(ctx context.Context, debug bool) { } var ( - allowedCommands = map[string]perm.AccessMode{ - "git-upload-pack": perm.AccessModeRead, - "git-upload-archive": perm.AccessModeRead, - "git-receive-pack": perm.AccessModeWrite, - lfsAuthenticateVerb: perm.AccessModeNone, + // anything not in map will return false (zero value) + // keep getAccessMode() in sync + allowedCommands = map[string]bool{ + verbUploadPack: true, + verbUploadArchive: true, + verbReceivePack: true, + verbLfsAuthenticate: true, + } + allowedCommandsLfs = map[string]bool{ + verbLfsAuthenticate: true, } alphaDashDotPattern = regexp.MustCompile(`[^\w-\.]`) ) @@ -124,6 +132,24 @@ func handleCliResponseExtra(extra private.ResponseExtra) error { return nil } +func getAccessMode(verb string, lfsVerb string) perm.AccessMode { + switch verb { + case verbUploadPack, verbUploadArchive: + return perm.AccessModeRead + case verbReceivePack: + return perm.AccessModeWrite + case verbLfsAuthenticate: + switch lfsVerb { + case "upload": + return perm.AccessModeWrite + case "download": + return perm.AccessModeRead + } + } + // should be unreachable + return perm.AccessModeNone +} + func runServ(c *cli.Context) error { ctx, cancel := installSignals() defer cancel() @@ -193,17 +219,7 @@ func runServ(c *cli.Context) error { if repoPath[0] == '/' { repoPath = repoPath[1:] } - var lfsVerb string - if verb == lfsAuthenticateVerb { - if !setting.LFS.StartServer { - return fail(ctx, "Unknown git command", "LFS authentication request over SSH denied, LFS support is disabled") - } - - if len(words) > 2 { - lfsVerb = words[2] - } - } rr := strings.SplitN(repoPath, "/", 2) if len(rr) != 2 { @@ -240,20 +256,20 @@ func runServ(c *cli.Context) error { }() } - requestedMode, has := allowedCommands[verb] - if !has { + if allowedCommands[verb] { + if allowedCommandsLfs[verb] { + if !setting.LFS.StartServer { + return fail(ctx, "Unknown git command", "LFS authentication request over SSH denied, LFS support is disabled") + } + if len(words) > 2 { + lfsVerb = words[2] + } + } + } else { return fail(ctx, "Unknown git command", "Unknown git command %s", verb) } - if verb == lfsAuthenticateVerb { - if lfsVerb == "upload" { - requestedMode = perm.AccessModeWrite - } else if lfsVerb == "download" { - requestedMode = perm.AccessModeRead - } else { - return fail(ctx, "Unknown LFS verb", "Unknown lfs verb %s", lfsVerb) - } - } + requestedMode := getAccessMode(verb, lfsVerb) results, extra := private.ServCommand(ctx, keyID, username, reponame, requestedMode, verb, lfsVerb) if extra.HasError() { @@ -261,7 +277,7 @@ func runServ(c *cli.Context) error { } // LFS token authentication - if verb == lfsAuthenticateVerb { + if verb == verbLfsAuthenticate { url := fmt.Sprintf("%s%s/%s.git/info/lfs", setting.AppURL, url.PathEscape(results.OwnerName), url.PathEscape(results.RepoName)) now := time.Now() @@ -296,22 +312,22 @@ func runServ(c *cli.Context) error { return nil } - var gitcmd *exec.Cmd gitBinPath := filepath.Dir(git.GitExecutable) // e.g. /usr/bin gitBinVerb := filepath.Join(gitBinPath, verb) // e.g. /usr/bin/git-upload-pack + gitExe := gitBinVerb + gitArgs := make([]string, 0, 3) // capacity to accommodate max args if _, err := os.Stat(gitBinVerb); err != nil { // if the command "git-upload-pack" doesn't exist, try to split "git-upload-pack" to use the sub-command with git // ps: Windows only has "git.exe" in the bin path, so Windows always uses this way verbFields := strings.SplitN(verb, "-", 2) if len(verbFields) == 2 { // use git binary with the sub-command part: "C:\...\bin\git.exe", "upload-pack", ... - gitcmd = exec.CommandContext(ctx, git.GitExecutable, verbFields[1], repoPath) + gitExe = git.GitExecutable + gitArgs = append(gitArgs, verbFields[1]) } } - if gitcmd == nil { - // by default, use the verb (it has been checked above by allowedCommands) - gitcmd = exec.CommandContext(ctx, gitBinVerb, repoPath) - } + gitArgs = append(gitArgs, repoPath) + gitcmd := exec.CommandContext(ctx, gitExe, gitArgs...) process.SetSysProcAttribute(gitcmd) gitcmd.Dir = setting.RepoRootPath From abc01ea84debba4d88d609d22ae8af196957f387 Mon Sep 17 00:00:00 2001 From: ConcurrentCrab Date: Thu, 27 Jun 2024 08:43:16 +0530 Subject: [PATCH 3/6] modules/lfs: add GetMeta method to ContentStore A download command in the SSH protocol doesn't specify size, so this was necessary. --- modules/lfs/content_store.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/modules/lfs/content_store.go b/modules/lfs/content_store.go index 0d9c0c98acca0..b7854d24a8aa4 100644 --- a/modules/lfs/content_store.go +++ b/modules/lfs/content_store.go @@ -16,6 +16,8 @@ import ( ) var ( + // ErrObjectNotInStore occurs if the OID is not in store + ErrObjectNotInStore = errors.New("content hash does not match OID") // ErrHashMismatch occurs if the content has does not match OID ErrHashMismatch = errors.New("content hash does not match OID") // ErrSizeMismatch occurs if the content size does not match @@ -89,6 +91,20 @@ func (s *ContentStore) Exists(pointer Pointer) (bool, error) { return true, nil } +// GetMeta takes a pointer with OID and returns a pointer with Size +func (s *ContentStore) GetMeta(pointer Pointer) (Pointer, error) { + p := pointer.RelativePath() + fi, err := s.ObjectStorage.Stat(p) + if os.IsNotExist(err) { + return pointer, ErrObjectNotInStore + } else if err != nil { + log.Error("Unable stat file: %s for LFS OID[%s] Error: %v", p, pointer.Oid, err) + return pointer, err + } + pointer.Size = fi.Size() + return pointer, nil +} + // Verify returns true if the object exists in the content store and size is correct. func (s *ContentStore) Verify(pointer Pointer) (bool, error) { p := pointer.RelativePath() From 5b3f2a3e9dafd585cbc5ee11469a3325fa6dda41 Mon Sep 17 00:00:00 2001 From: ConcurrentCrab Date: Thu, 27 Jun 2024 02:26:17 +0530 Subject: [PATCH 4/6] modules/lfstransfer/transfer: vendor from charmbracelet Big changes are inevitable due to the difference in Gitea's approach to storing LFS files. And upstream hasn't tagged for a while anyway. --- go.mod | 1 + go.sum | 2 + modules/lfstransfer/transfer/LICENSE | 21 + modules/lfstransfer/transfer/README.md | 8 + modules/lfstransfer/transfer/args.go | 50 +++ modules/lfstransfer/transfer/backend.go | 45 ++ modules/lfstransfer/transfer/batch.go | 12 + modules/lfstransfer/transfer/caps.go | 10 + modules/lfstransfer/transfer/errors.go | 28 ++ modules/lfstransfer/transfer/hash.go | 42 ++ modules/lfstransfer/transfer/log.go | 14 + modules/lfstransfer/transfer/oid.go | 44 ++ modules/lfstransfer/transfer/pktline.go | 169 +++++++ modules/lfstransfer/transfer/processor.go | 508 ++++++++++++++++++++++ modules/lfstransfer/transfer/status.go | 116 +++++ 15 files changed, 1070 insertions(+) create mode 100644 modules/lfstransfer/transfer/LICENSE create mode 100644 modules/lfstransfer/transfer/README.md create mode 100644 modules/lfstransfer/transfer/args.go create mode 100644 modules/lfstransfer/transfer/backend.go create mode 100644 modules/lfstransfer/transfer/batch.go create mode 100644 modules/lfstransfer/transfer/caps.go create mode 100644 modules/lfstransfer/transfer/errors.go create mode 100644 modules/lfstransfer/transfer/hash.go create mode 100644 modules/lfstransfer/transfer/log.go create mode 100644 modules/lfstransfer/transfer/oid.go create mode 100644 modules/lfstransfer/transfer/pktline.go create mode 100644 modules/lfstransfer/transfer/processor.go create mode 100644 modules/lfstransfer/transfer/status.go diff --git a/go.mod b/go.mod index 06c2fbd0c8534..dd2e27ed2c74f 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,7 @@ require ( github.com/ethantkoenig/rupture v1.0.1 github.com/felixge/fgprof v0.9.4 github.com/fsnotify/fsnotify v1.7.0 + github.com/git-lfs/pktline v0.0.0-20210330133718-06e9096e2825 github.com/gliderlabs/ssh v0.3.7 github.com/go-ap/activitypub v0.0.0-20240408091739-ba76b44c2594 github.com/go-ap/jsonld v0.0.0-20221030091449-f2a191312c73 diff --git a/go.sum b/go.sum index dbbbf342d6c99..868e1b4fdca6c 100644 --- a/go.sum +++ b/go.sum @@ -276,6 +276,8 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/fxamacker/cbor/v2 v2.6.0 h1:sU6J2usfADwWlYDAFhZBQ6TnLFBHxgesMrQfQgk1tWA= github.com/fxamacker/cbor/v2 v2.6.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= +github.com/git-lfs/pktline v0.0.0-20210330133718-06e9096e2825 h1:riQhgheTL7tMF4d5raz9t3+IzoR1i1wqxE1kZC6dY+U= +github.com/git-lfs/pktline v0.0.0-20210330133718-06e9096e2825/go.mod h1:fenKRzpXDjNpsIBhuhUzvjCKlDjKam0boRAenTE0Q6A= github.com/gliderlabs/ssh v0.3.7 h1:iV3Bqi942d9huXnzEF2Mt+CY9gLu8DNM4Obd+8bODRE= github.com/gliderlabs/ssh v0.3.7/go.mod h1:zpHEXBstFnQYtGnB8k8kQLol82umzn/2/snG7alWVD8= github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE= diff --git a/modules/lfstransfer/transfer/LICENSE b/modules/lfstransfer/transfer/LICENSE new file mode 100644 index 0000000000000..3d2e904d47afc --- /dev/null +++ b/modules/lfstransfer/transfer/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2022-2023 Charmbracelet, Inc + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/modules/lfstransfer/transfer/README.md b/modules/lfstransfer/transfer/README.md new file mode 100644 index 0000000000000..29508e3df41b7 --- /dev/null +++ b/modules/lfstransfer/transfer/README.md @@ -0,0 +1,8 @@ +# LFS SSH transfer protocol server + +Module vendored from charmbracelet's implementation of the protocol: +https://github.com/charmbracelet/git-lfs-transfer/ +Modified to suit Gitea's internal LFS API. + +MIT Licensed, written by original authors with reference from: +https://github.com/bk2204/scutiger diff --git a/modules/lfstransfer/transfer/args.go b/modules/lfstransfer/transfer/args.go new file mode 100644 index 0000000000000..3f7ae4cefe543 --- /dev/null +++ b/modules/lfstransfer/transfer/args.go @@ -0,0 +1,50 @@ +package transfer + +import ( + "fmt" + "strings" +) + +// batch request argument keys. +const ( + HashAlgoKey = "hash-algo" + TransferKey = "transfer" + RefnameKey = "refname" + ExpiresInKey = "expires-in" + ExpiresAtKey = "expires-at" + SizeKey = "size" + PathKey = "path" + LimitKey = "limit" + CursorKey = "cursor" +) + +// ParseArgs parses the given args. +func ParseArgs(parts []string) (Args, error) { + args := make(Args, 0) + for _, line := range parts { + parts := strings.SplitN(line, "=", 2) + if len(parts) != 2 { + return nil, fmt.Errorf("invalid argument: %q", line) + } + key, value := parts[0], parts[1] + args[key] = value + } + return args, nil +} + +// ArgsToList converts the given args to a list. +func ArgsToList(args Args) []string { + list := make([]string, 0) + for key, value := range args { + list = append(list, fmt.Sprintf("%s=%s", key, value)) + } + return list +} + +// Args is a key-value pair of arguments. +type Args map[string]string + +// String returns the string representation of the arguments. +func (a Args) String() string { + return strings.Join(ArgsToList(a), " ") +} diff --git a/modules/lfstransfer/transfer/backend.go b/modules/lfstransfer/transfer/backend.go new file mode 100644 index 0000000000000..2b10235593e8c --- /dev/null +++ b/modules/lfstransfer/transfer/backend.go @@ -0,0 +1,45 @@ +package transfer + +import ( + "io" + "io/fs" +) + +const ( + // UploadOperation is an upload operation. + UploadOperation = "upload" + // DownloadOperation is a download operation. + DownloadOperation = "download" +) + +// Backend is a Git LFS backend. +type Backend interface { + Batch(op string, pointers []BatchItem, args Args) ([]BatchItem, error) + StartUpload(oid string, r io.Reader, args Args) (io.Closer, error) + FinishUpload(state io.Closer, args Args) error + Verify(oid string, args Args) (Status, error) + Download(oid string, args Args) (fs.File, error) + LockBackend(args Args) LockBackend +} + +// Lock is a Git LFS lock. +type Lock interface { + Unlock() error + ID() string + Path() string + FormattedTimestamp() string + OwnerName() string + AsLockSpec(ownerID bool) ([]string, error) + AsArguments() []string +} + +// LockBackend is a Git LFS lock backend. +type LockBackend interface { + // Create creates a lock for the given path and refname. + // Refname can be empty. + Create(path, refname string) (Lock, error) + Unlock(lock Lock) error + FromPath(path string) (Lock, error) + FromID(id string) (Lock, error) + Range(cursor string, limit int, iter func(Lock) error) (string, error) +} diff --git a/modules/lfstransfer/transfer/batch.go b/modules/lfstransfer/transfer/batch.go new file mode 100644 index 0000000000000..6f590a2b17791 --- /dev/null +++ b/modules/lfstransfer/transfer/batch.go @@ -0,0 +1,12 @@ +package transfer + +// BatchItem is a Git LFS batch item. +type BatchItem struct { + Pointer + + // Present is used to determine the action to take for the batch item. + Present bool + + // Args is an optional oid-line key-value pairs. + Args Args +} diff --git a/modules/lfstransfer/transfer/caps.go b/modules/lfstransfer/transfer/caps.go new file mode 100644 index 0000000000000..ffcdfe3da6497 --- /dev/null +++ b/modules/lfstransfer/transfer/caps.go @@ -0,0 +1,10 @@ +package transfer + +// Version is the git-lfs-transfer protocol version number. +const Version = "1" + +// Capabilities is a list of Git LFS capabilities supported by this package. +var Capabilities = []string{ + "version=" + Version, + "locking", +} diff --git a/modules/lfstransfer/transfer/errors.go b/modules/lfstransfer/transfer/errors.go new file mode 100644 index 0000000000000..7e28642c446a3 --- /dev/null +++ b/modules/lfstransfer/transfer/errors.go @@ -0,0 +1,28 @@ +package transfer + +import ( + "errors" +) + +var ( + // ErrConflict is the conflict error. + ErrConflict = errors.New("conflict") + // ErrParseError is the parse error. + ErrParseError = errors.New("parse error") + // ErrMissingData is the missing data error. + ErrMissingData = errors.New("missing data") + // ErrExtraData is the extra data error. + ErrExtraData = errors.New("extra data") + // ErrCorruptData is the corrupt data error. + ErrCorruptData = errors.New("corrupt data") + // ErrNotAllowed is the not allowed error. + ErrNotAllowed = errors.New("not allowed") + // ErrInvalidPacket is the invalid packet error. + ErrInvalidPacket = errors.New("invalid packet") + // ErrNotFound is the not found error. + ErrNotFound = errors.New("not found") + // ErrUnauthorized is the unauthorized error. + ErrUnauthorized = errors.New("unauthorized") + // ErrUnauthorized is the forbidden error. + ErrForbidden = errors.New("forbidden") +) diff --git a/modules/lfstransfer/transfer/hash.go b/modules/lfstransfer/transfer/hash.go new file mode 100644 index 0000000000000..aba044e4a3572 --- /dev/null +++ b/modules/lfstransfer/transfer/hash.go @@ -0,0 +1,42 @@ +package transfer + +import ( + "encoding/hex" + "hash" + "io" +) + +var _ io.Reader = (*HashingReader)(nil) + +// HashingReader is a reader that hashes the data it reads. +type HashingReader struct { + r io.Reader + hash hash.Hash + size int64 +} + +// NewHashingReader creates a new hashing reader. +func NewHashingReader(r io.Reader, hash hash.Hash) *HashingReader { + return &HashingReader{ + r: r, + hash: hash, + } +} + +// Size returns the number of bytes read. +func (h *HashingReader) Size() int64 { + return h.size +} + +// Oid returns the hash of the data read. +func (h *HashingReader) Oid() string { + return hex.EncodeToString(h.hash.Sum(nil)) +} + +// Read reads data from the underlying reader and hashes it. +func (h *HashingReader) Read(p []byte) (int, error) { + n, err := h.r.Read(p) + h.size += int64(n) + h.hash.Write(p[:n]) + return n, err +} diff --git a/modules/lfstransfer/transfer/log.go b/modules/lfstransfer/transfer/log.go new file mode 100644 index 0000000000000..937402d943b5e --- /dev/null +++ b/modules/lfstransfer/transfer/log.go @@ -0,0 +1,14 @@ +package transfer + +// Logger is a logging interface. +type Logger interface { + // Log logs the given message and structured arguments. + Log(msg string, kv ...interface{}) +} + +type noopLogger struct{} + +var _ Logger = (*noopLogger)(nil) + +// Log implements Logger. +func (*noopLogger) Log(string, ...interface{}) {} diff --git a/modules/lfstransfer/transfer/oid.go b/modules/lfstransfer/transfer/oid.go new file mode 100644 index 0000000000000..ba4d99c33e4ef --- /dev/null +++ b/modules/lfstransfer/transfer/oid.go @@ -0,0 +1,44 @@ +package transfer + +import ( + "fmt" + "path" + "regexp" +) + +// Pointer is a Git LFS pointer. +type Pointer struct { + Oid string `json:"oid"` + Size int64 `json:"size"` +} + +// String returns the string representation of the pointer. +func (p Pointer) String() string { + return fmt.Sprintf("%s %d", p.Oid, p.Size) +} + +var oidPattern = regexp.MustCompile(`^[a-f\d]{64}$`) + +// IsValid checks if the pointer has a valid structure. +// It doesn't check if the pointed-to-content exists. +func (p Pointer) IsValid() bool { + if len(p.Oid) != 64 { + return false + } + if !oidPattern.MatchString(p.Oid) { + return false + } + if p.Size < 0 { + return false + } + return true +} + +// RelativePath returns the relative storage path of the pointer. +func (p Pointer) RelativePath() string { + if len(p.Oid) < 5 { + return p.Oid + } + + return path.Join(p.Oid[0:2], p.Oid[2:4], p.Oid) +} diff --git a/modules/lfstransfer/transfer/pktline.go b/modules/lfstransfer/transfer/pktline.go new file mode 100644 index 0000000000000..e2e832c20741c --- /dev/null +++ b/modules/lfstransfer/transfer/pktline.go @@ -0,0 +1,169 @@ +package transfer + +import ( + "fmt" + "io" + + "github.com/git-lfs/pktline" +) + +const ( + // Flush is the flush packet. + Flush = '\x00' + // Delim is the delimiter packet. + Delim = '\x01' +) + +// List of Git LFS commands. +const ( + versionCommand = "version" + batchCommand = "batch" + putObjectCommand = "put-object" + verifyObjectCommand = "verify-object" + getObjectCommand = "get-object" + lockCommand = "lock" + listLockCommand = "list-lock" + unlockCommand = "unlock" + quitCommand = "quit" +) + +// PktLine is a Git packet line handler. +type Pktline struct { + *pktline.Pktline + r io.Reader + w io.Writer + logger Logger +} + +// NewPktline creates a new Git packet line handler. +func NewPktline(r io.Reader, w io.Writer, logger Logger) *Pktline { + if logger == nil { + logger = new(noopLogger) + } + return &Pktline{ + Pktline: pktline.NewPktline(r, w), + r: r, + w: w, + logger: logger, + } +} + +// SendError sends an error msg. +func (p *Pktline) SendError(status uint32, message string) error { + p.logger.Log("sending error status", "code", status, "msg", message) + if err := p.WritePacketText(fmt.Sprintf("status %03d", status)); err != nil { + p.logger.Log("failed to write packet", "err", err) + } + if err := p.WriteDelim(); err != nil { + p.logger.Log("failed to write delimiter", "err", err) + } + if message != "" { + if err := p.WritePacketText(message); err != nil { + p.logger.Log("failed to write message", "err", err) + } + } + return p.WriteFlush() +} + +// SendStatus sends a status message. +func (p *Pktline) SendStatus(status Status) error { + p.logger.Log("sending status", "code", status) + if err := p.WritePacketText(fmt.Sprintf("status %03d", status.Code())); err != nil { + p.logger.Log("failed to write status", "err", err) + } + if args := status.Args(); len(args) > 0 { + for _, arg := range args { + if err := p.WritePacketText(arg); err != nil { + p.logger.Log("failed to write argument", "arg", arg, "err", err) + } + } + } + if msgs := status.Messages(); msgs != nil { + if err := p.WriteDelim(); err != nil { + p.logger.Log("failed to write delimiter", "err", err) + } + for _, msg := range msgs { + if err := p.WritePacketText(msg); err != nil { + p.logger.Log("failed to write message", "err", err) + } + } + } else if r := status.Reader(); r != nil { + p.logger.Log("sending reader") + // Close reader if it implements io.Closer. + if c, ok := r.(io.Closer); ok { + defer c.Close() + } + if err := p.WriteDelim(); err != nil { + p.logger.Log("failed to write delimiter", "err", err) + } + w := p.Writer() + if _, err := io.Copy(w, r); err != nil { + p.logger.Log("failed to copy reader", "err", err) + } + defer p.logger.Log("done copying") + return w.Flush() + } + return p.WriteFlush() +} + +// Reader returns a reader for the packet line. +func (p *Pktline) Reader() *pktline.PktlineReader { + return p.ReaderWithSize(pktline.MaxPacketLength) +} + +// ReaderWithSize returns a reader for the packet line with the given size. +func (p *Pktline) ReaderWithSize(size int) *pktline.PktlineReader { + return pktline.NewPktlineReaderFromPktline(p.Pktline, size) +} + +// Writer returns a writer for the packet line. +func (p *Pktline) Writer() *pktline.PktlineWriter { + return p.WriterWithSize(pktline.MaxPacketLength) +} + +// WriterWithSize returns a writer for the packet line with the given size. +func (p *Pktline) WriterWithSize(size int) *pktline.PktlineWriter { + return pktline.NewPktlineWriterFromPktline(p.Pktline, size) +} + +// ReadPacketListToDelim reads as many packets as possible using the `readPacketText` +// function before encountering a delim packet. It returns a slice of all the +// packets it read, or an error if one was encountered. +func (p *Pktline) ReadPacketListToDelim() ([]string, error) { + var list []string + for { + data, pktLen, err := p.ReadPacketTextWithLength() + if err != nil { + return nil, err + } + + if pktLen == Delim { + break + } + + list = append(list, data) + } + + return list, nil +} + +// ReadPacketListToFlush reads as many packets as possible using the `readPacketText` +// function before encountering a flush packet. It returns a slice of all the +// packets it read, or an error if one was encountered. +func (p *Pktline) ReadPacketListToFlush() ([]string, error) { + var list []string + for { + data, pktLen, err := p.ReadPacketTextWithLength() + if err != nil { + return nil, err + } + + if pktLen == Flush { + break + } + + list = append(list, data) + } + + return list, nil +} diff --git a/modules/lfstransfer/transfer/processor.go b/modules/lfstransfer/transfer/processor.go new file mode 100644 index 0000000000000..91bc0291e286b --- /dev/null +++ b/modules/lfstransfer/transfer/processor.go @@ -0,0 +1,508 @@ +package transfer + +import ( + "crypto/sha256" + "errors" + "fmt" + "io" + "io/fs" + "os" + "strconv" + "strings" +) + +// Processor is a transfer processor. +type Processor struct { + handler *Pktline + backend Backend + logger Logger +} + +// NewProcessor creates a new transfer processor. +func NewProcessor(line *Pktline, backend Backend, logger Logger) *Processor { + if logger == nil { + logger = new(noopLogger) + } + return &Processor{ + handler: line, + backend: backend, + logger: logger, + } +} + +// Version returns the version of the transfer protocol. +func (p *Processor) Version() (Status, error) { + _, err := p.handler.ReadPacketListToFlush() + if err != nil { + return nil, err + } + return NewSuccessStatusWithArgs([]string{}), nil +} + +// Error returns a transfer protocol error. +func (p *Processor) Error(code uint32, message string, args ...string) (Status, error) { + return NewStatusWithArgs(code, []string{message}, args...), nil +} + +// ReadBatch reads a batch request. +func (p *Processor) ReadBatch(op string, args Args) ([]BatchItem, error) { + data, err := p.handler.ReadPacketListToFlush() + if err != nil { + return nil, fmt.Errorf("%w: %s", ErrParseError, err) + } + hashAlgo := args[HashAlgoKey] + switch hashAlgo { + case "", "sha256": + default: + return nil, fmt.Errorf("%w: %s", ErrNotAllowed, fmt.Sprintf("unsupported hash algorithm: %s", hashAlgo)) + } + p.logger.Log("read batch", "operation", op, "args-len", len(args), "args", args, "data-len", len(data), "data", data) + items := make([]BatchItem, 0) + for _, line := range data { + if line == "" { + return nil, ErrInvalidPacket + } + parts := strings.Split(line, " ") + if len(parts) < 2 || parts[1] == "" { + return nil, ErrParseError + } + size, err := strconv.ParseInt(parts[1], 10, 64) + if err != nil { + return nil, fmt.Errorf("%w: invalid integer, got: %q", ErrParseError, parts[1]) + } + var oidArgs Args + if len(parts) > 2 { + oidArgs, err = ParseArgs(parts[2:]) + if err != nil { + return nil, fmt.Errorf("%w: %s", ErrParseError, err) + } + } + item := BatchItem{ + Pointer: Pointer{ + Oid: parts[0], + Size: size, + }, + Args: oidArgs, + } + items = append(items, item) + } + p.logger.Log("batch items", "items", items) + its, err := p.backend.Batch(op, items, args) + if err != nil { + return nil, err + } + p.logger.Log("batch items", "items", items) + return its, nil +} + +// BatchData writes batch data to the transfer protocol. +func (p *Processor) BatchData(op string, presentAction string, missingAction string) (Status, error) { + ar, err := p.handler.ReadPacketListToDelim() + if err != nil { + return nil, fmt.Errorf("%w: %s", ErrParseError, err) + } + args, err := ParseArgs(ar) + if err != nil { + return nil, fmt.Errorf("%w: %s", ErrParseError, err) + } + batch, err := p.ReadBatch(op, args) + if err != nil { + return nil, err + } + oids := make([]string, 0) + for _, item := range batch { + action := missingAction + if item.Present { + action = presentAction + } + line := fmt.Sprintf("%s %s", item.Pointer, action) + if len(item.Args) > 0 { + line = fmt.Sprintf("%s %s", line, item.Args) + } + oids = append(oids, line) + } + return NewSuccessStatus(oids...), nil +} + +// UploadBatch writes upload data to the transfer protocol. +func (p *Processor) UploadBatch() (Status, error) { + return p.BatchData(UploadOperation, "noop", "upload") +} + +// DownloadBatch writes download data to the transfer protocol. +func (p *Processor) DownloadBatch() (Status, error) { + return p.BatchData(DownloadOperation, "download", "noop") +} + +// SizeFromArgs returns the size from the given args. +func SizeFromArgs(args Args) (int64, error) { + size, ok := args[SizeKey] + if !ok { + return 0, fmt.Errorf("missing required size header") + } + n, err := strconv.ParseInt(size, 10, 64) + if err != nil { + return 0, fmt.Errorf("invalid size: %w", err) + } + return n, nil +} + +// PutObject writes an object ID to the transfer protocol. +func (p *Processor) PutObject(oid string) (Status, error) { + ar, err := p.handler.ReadPacketListToDelim() + if err != nil { + return nil, fmt.Errorf("%w: %s", ErrParseError, err) + } + args, err := ParseArgs(ar) + if err != nil { + return nil, fmt.Errorf("%w: %s", ErrParseError, err) + } + expectedSize, err := SizeFromArgs(args) + if err != nil { + return nil, err + } + r := p.handler.Reader() + rdr := NewHashingReader(r, sha256.New()) + state, err := p.backend.StartUpload(oid, rdr, args) + if err != nil { + return nil, err + } + defer state.Close() // nolint: errcheck + actualSize := rdr.Size() + if actualSize != expectedSize { + err := fmt.Errorf("invalid size, expected %d, got %d", expectedSize, actualSize) + if actualSize > expectedSize { + err = fmt.Errorf("%w: %s", ErrExtraData, err) + } else { + err = fmt.Errorf("%w: %s", ErrMissingData, err) + } + return nil, err + } + if actualOid := rdr.Oid(); actualOid != oid { + return nil, fmt.Errorf("%w: %s", ErrCorruptData, fmt.Sprintf("invalid object ID, expected %s, got %s", oid, actualOid)) + } + if err := p.backend.FinishUpload(state, args); err != nil { + return nil, err + } + return SuccessStatus(), nil +} + +// VerifyObject verifies an object ID. +func (p *Processor) VerifyObject(oid string) (Status, error) { + ar, err := p.handler.ReadPacketListToFlush() + if err != nil { + return nil, fmt.Errorf("%w: %s", ErrParseError, err) + } + args, err := ParseArgs(ar) + if err != nil { + return nil, fmt.Errorf("%w: %s", ErrParseError, err) + } + return p.backend.Verify(oid, args) +} + +// GetObject writes an object ID to the transfer protocol. +func (p *Processor) GetObject(oid string) (Status, error) { + ar, err := p.handler.ReadPacketListToFlush() + if err != nil { + return nil, fmt.Errorf("%w: %s", ErrParseError, err) + } + args, err := ParseArgs(ar) + if err != nil { + return nil, fmt.Errorf("%w: %s", ErrParseError, err) + } + r, err := p.backend.Download(oid, args) + if errors.Is(err, fs.ErrNotExist) { + return NewStatus(StatusNotFound, fmt.Sprintf("object %s not found", oid)), nil + } + if err != nil { + return nil, err + } + info, err := r.Stat() + if err != nil { + return nil, err + } + return NewSuccessStatusWithReader(r, fmt.Sprintf("size=%d", info.Size())), nil +} + +// Lock writes a lock to the transfer protocol. +func (p *Processor) Lock() (Status, error) { + data, err := p.handler.ReadPacketListToFlush() + if err != nil { + return nil, fmt.Errorf("%w: %s", ErrParseError, err) + } + args, err := ParseArgs(data) + if err != nil { + return nil, fmt.Errorf("%w: %s", ErrParseError, err) + } + path := args[PathKey] + refname := args[RefnameKey] + if path == "" { + return nil, fmt.Errorf("%w: %s", ErrMissingData, "path and refname are required") + } + lockBackend := p.backend.LockBackend(args) + retried := false + for { + lock, err := lockBackend.Create(path, refname) + if errors.Is(err, ErrConflict) { + p.logger.Log("lock conflict") + if lock == nil { + lock, err = lockBackend.FromPath(path) + if err != nil { + p.logger.Log("lock conflict, but no lock found") + if retried { + p.logger.Log("lock conflict, but no lock found, and retried") + return nil, err + } + retried = true + continue + } + } + return NewStatusWithArgs(StatusConflict, []string{"conflict"}, lock.AsArguments()...), nil + } + if err != nil { + p.logger.Log("failed to create lock", "err", err) + return nil, err + } + p.logger.Log("lock success", "lock", lock) + return NewStatusWithArgs(StatusCreated, nil, lock.AsArguments()...), nil + } + // unreachable +} + +// ListLocksForPath lists locks for a path. cursor can be empty. +func (p *Processor) ListLocksForPath(path string, cursor string, useOwnerID bool, args map[string]string) (Status, error) { + lock, err := p.backend.LockBackend(args).FromPath(path) + if err != nil { + return nil, err + } + if (lock == nil && cursor == "") || + (lock.ID() < cursor) { + return p.Error(StatusNotFound, fmt.Sprintf("lock for path %s not found", path)) + } + spec, err := lock.AsLockSpec(useOwnerID) + if err != nil { + return nil, err + } + return NewSuccessStatus(spec...), nil +} + +// ListLocks lists locks. +func (p *Processor) ListLocks(useOwnerID bool) (Status, error) { + ar, err := p.handler.ReadPacketListToFlush() + if err != nil { + return nil, fmt.Errorf("%w: %s", ErrParseError, err) + } + + args, err := ParseArgs(ar) + if err != nil { + return nil, fmt.Errorf("%w: %s", ErrParseError, err) + } + + limit, _ := strconv.Atoi(args[LimitKey]) + if limit <= 0 { + limit = 20 + } else if limit > 100 { + // Try to avoid DoS attacks. + limit = 100 + } + + cursor := args[CursorKey] + if path, ok := args[PathKey]; ok && path != "" { + return p.ListLocksForPath(path, cursor, useOwnerID, args) + } + + locks := make([]Lock, 0) + lb := p.backend.LockBackend(args) + nextCursor, err := lb.Range(cursor, limit, func(lock Lock) error { + if len(locks) >= limit { + // stop iterating when limit is reached. + return io.EOF + } + if lock == nil { + // skip nil locks + return nil + } + p.logger.Log("adding lock", "path", lock.Path(), "id", lock.ID()) + locks = append(locks, lock) + return nil + }) + if err != nil { + if err != io.EOF { + return nil, err + } + } + + msgs := make([]string, 0, len(locks)) + for _, item := range locks { + specs, err := item.AsLockSpec(useOwnerID) + if err != nil { + return nil, err + } + msgs = append(msgs, specs...) + } + + dataArgs := []string{} + if nextCursor != "" { + dataArgs = append(dataArgs, fmt.Sprintf("next-cursor=%s", nextCursor)) + } + + return NewSuccessStatusWithArgs(msgs, dataArgs...), nil +} + +// Unlock unlocks a lock. +func (p *Processor) Unlock(id string) (Status, error) { + ar, err := p.handler.ReadPacketListToFlush() + if err != nil { + return nil, fmt.Errorf("%w: %s", ErrParseError, err) + } + args, err := ParseArgs(ar) + if err != nil { + return nil, fmt.Errorf("%w: %s", ErrParseError, err) + } + lock, err := p.backend.LockBackend(args).FromID(id) + if err != nil && !errors.Is(err, ErrNotFound) { + return nil, err + } + if lock == nil || errors.Is(err, ErrNotFound) { + return p.Error(StatusNotFound, fmt.Sprintf("lock %s not found", id)) + } + if err := lock.Unlock(); err != nil { + switch { + case errors.Is(err, os.ErrNotExist): + return p.Error(StatusNotFound, fmt.Sprintf("lock %s not found", id)) + case errors.Is(err, os.ErrPermission): + return p.Error(StatusForbidden, fmt.Sprintf("lock %s not owned by you", id)) + default: + return nil, err + } + } + return NewSuccessStatusWithArgs(nil, lock.AsArguments()...), nil +} + +// ProcessCommands processes commands from the transfer protocol. +func (p *Processor) ProcessCommands(op string) error { + p.logger.Log("processing commands") + for { + pkt, err := p.handler.ReadPacketText() + if errors.Is(err, io.EOF) { + return nil + } + if err != nil { + return err + } + p.logger.Log("received packet", "packet", pkt) + if pkt == "" { + if err := p.handler.SendError(StatusBadRequest, "unknown command"); err != nil { + p.logger.Log("failed to send pktline", "err", err) + } + continue + } + msgs := strings.Split(pkt, " ") + if len(msgs) < 1 { + if err := p.handler.SendError(StatusBadRequest, "no command provided"); err != nil { + p.logger.Log("failed to send pktline", "err", err) + } + continue + } + p.logger.Log("received command", "command", msgs[0], "messages", msgs[1:]) + var status Status + switch msgs[0] { + case versionCommand: + if len(msgs) > 0 && msgs[1] == Version { + status, err = p.Version() + } else { + err = p.handler.SendError(StatusBadRequest, "unknown version") + } + case batchCommand: + switch op { + case UploadOperation: + p.logger.Log("upload batch command received") + status, err = p.UploadBatch() + case DownloadOperation: + p.logger.Log("download batch command received") + status, err = p.DownloadBatch() + default: + err = p.handler.SendError(StatusBadRequest, "unknown operation") + } + case putObjectCommand: + if len(msgs) > 1 { + status, err = p.PutObject(msgs[1]) + } else { + err = p.handler.SendError(StatusBadRequest, "bad request") + } + case verifyObjectCommand: + if len(msgs) > 1 { + status, err = p.VerifyObject(msgs[1]) + } else { + err = p.handler.SendError(StatusBadRequest, "bad request") + } + case getObjectCommand: + if len(msgs) > 1 { + status, err = p.GetObject(msgs[1]) + } else { + err = p.handler.SendError(StatusBadRequest, "bad request") + } + case lockCommand: + status, err = p.Lock() + case listLockCommand, "list-locks": + switch op { + case UploadOperation: + status, err = p.ListLocks(true) + case DownloadOperation: + status, err = p.ListLocks(false) + } + p.logger.Log("list lock command", "status", status, "err", err) + case unlockCommand: + if len(msgs) > 1 { + status, err = p.Unlock(msgs[1]) + } else { + err = p.handler.SendError(StatusBadRequest, "unknown command") + } + case quitCommand: + if err := p.handler.SendStatus(SuccessStatus()); err != nil { + p.logger.Log("failed to send pktline", "err", err) + } + return nil + default: + err = p.handler.SendError(StatusBadRequest, "unknown command") + } + if err != nil { + switch { + case errors.Is(err, ErrExtraData), + errors.Is(err, ErrParseError), + errors.Is(err, ErrInvalidPacket), + errors.Is(err, ErrCorruptData): + if err := p.handler.SendError(StatusBadRequest, fmt.Errorf("error: %w", err).Error()); err != nil { + p.logger.Log("failed to send pktline", "err", err) + } + case errors.Is(err, ErrNotAllowed): + if err := p.handler.SendError(StatusMethodNotAllowed, fmt.Errorf("error: %w", err).Error()); err != nil { + p.logger.Log("failed to send pktline", "err", err) + } + case errors.Is(err, ErrNotFound): + if err := p.handler.SendError(StatusNotFound, fmt.Errorf("error: %w", err).Error()); err != nil { + p.logger.Log("failed to send pktline", "err", err) + } + case errors.Is(err, ErrUnauthorized): + if err := p.handler.SendError(StatusUnauthorized, fmt.Errorf("error: %w", err).Error()); err != nil { + p.logger.Log("failed to send pktline", "err", err) + } + case errors.Is(err, ErrForbidden): + if err := p.handler.SendError(StatusForbidden, fmt.Errorf("error: %w", err).Error()); err != nil { + p.logger.Log("failed to send pktline", "err", err) + } + default: + p.logger.Log("failed to process command", "err", err) + if err := p.handler.SendError(StatusInternalServerError, "internal error"); err != nil { + p.logger.Log("failed to send pktline", "err", err) + } + } + } + if status != nil { + if err := p.handler.SendStatus(status); err != nil { + p.logger.Log("failed to send pktline", "err", err) + } + } + p.logger.Log("processed command") + } +} diff --git a/modules/lfstransfer/transfer/status.go b/modules/lfstransfer/transfer/status.go new file mode 100644 index 0000000000000..a4170bc18420f --- /dev/null +++ b/modules/lfstransfer/transfer/status.go @@ -0,0 +1,116 @@ +package transfer + +import ( + "fmt" + "io" + "net/http" + "strings" +) + +// Status codes. +const ( + StatusContinue uint32 = http.StatusContinue + StatusOK uint32 = http.StatusOK + StatusCreated uint32 = http.StatusCreated + StatusAccepted uint32 = http.StatusAccepted + StatusBadRequest uint32 = http.StatusBadRequest + StatusForbidden uint32 = http.StatusForbidden + StatusNotFound uint32 = http.StatusNotFound + StatusMethodNotAllowed uint32 = http.StatusMethodNotAllowed + StatusConflict uint32 = http.StatusConflict + StatusInternalServerError uint32 = http.StatusInternalServerError + StatusUnauthorized uint32 = http.StatusUnauthorized +) + +// StatusString returns the status string lowercased for a status code. +func StatusText(code uint32) string { + return strings.ToLower(http.StatusText(int(code))) +} + +// Status is a Git LFS status. +type Status interface { + Code() uint32 + Args() []string + Messages() []string + Reader() io.Reader +} + +type status struct { + code uint32 + args []string + messages []string + reader io.Reader +} + +// String returns the string representation of the status. +func (s status) String() string { + var b strings.Builder + fmt.Fprintf(&b, "status %d ", s.code) + fmt.Fprintf(&b, "args %v ", s.args) + fmt.Fprintf(&b, "messages %v ", s.messages) + if s.reader != nil { + fmt.Fprintf(&b, "reader %v ", s.reader) + } + return b.String() +} + +// Code returns the status code. +func (s *status) Code() uint32 { + return s.code +} + +// Args returns the status args. +func (s *status) Args() []string { + return s.args +} + +// Messages returns the status messages. +func (s *status) Messages() []string { + return s.messages +} + +// Reader returns the status reader. +func (s *status) Reader() io.Reader { + return s.reader +} + +// SuccessStatus returns a successful status. +func SuccessStatus() Status { + return NewSuccessStatus() +} + +// NewSuccessStatus returns a new successful status. +func NewSuccessStatus(messages ...string) Status { + return NewSuccessStatusWithArgs(messages) +} + +// NewSuccessStatusWithArgs returns a new successful status with data. +func NewSuccessStatusWithArgs(messages []string, args ...string) Status { + return NewStatusWithArgs(StatusOK, messages, args...) +} + +// NewStatus returns a new status with messages. +func NewStatus(code uint32, messages ...string) Status { + return &status{ + code: code, + messages: messages, + } +} + +// NewStatusWithArgs returns a new successful status with data. +func NewStatusWithArgs(code uint32, messages []string, args ...string) Status { + return &status{ + code: code, + args: args, + messages: messages, + } +} + +// NewSuccessStatusWithReader returns a new status with a reader. +func NewSuccessStatusWithReader(reader io.Reader, args ...string) Status { + return &status{ + code: StatusOK, + args: args, + reader: reader, + } +} From 8492908340902e8c788eb4f07609685359089f56 Mon Sep 17 00:00:00 2001 From: ConcurrentCrab Date: Thu, 27 Jun 2024 04:09:28 +0530 Subject: [PATCH 5/6] modules/lfstransfer: modify transfer.Backend interface Make a few changes to make the transfer backend suit Gitea better. Merge Upload into one method: Makes things simpler since out content store verifies size & hashes itself Change Download return into io.ReadCloser: Removes dependency on filesystem Add size: int64 arguments wherever appropriate In arguments for Upload and Verify, and in return for Download --- modules/lfstransfer/transfer/backend.go | 8 ++--- modules/lfstransfer/transfer/processor.go | 37 ++++++----------------- 2 files changed, 12 insertions(+), 33 deletions(-) diff --git a/modules/lfstransfer/transfer/backend.go b/modules/lfstransfer/transfer/backend.go index 2b10235593e8c..155f1323395bc 100644 --- a/modules/lfstransfer/transfer/backend.go +++ b/modules/lfstransfer/transfer/backend.go @@ -2,7 +2,6 @@ package transfer import ( "io" - "io/fs" ) const ( @@ -15,10 +14,9 @@ const ( // Backend is a Git LFS backend. type Backend interface { Batch(op string, pointers []BatchItem, args Args) ([]BatchItem, error) - StartUpload(oid string, r io.Reader, args Args) (io.Closer, error) - FinishUpload(state io.Closer, args Args) error - Verify(oid string, args Args) (Status, error) - Download(oid string, args Args) (fs.File, error) + Upload(oid string, size int64, r io.Reader, args Args) error + Verify(oid string, size int64, args Args) (Status, error) + Download(oid string, args Args) (io.ReadCloser, int64, error) LockBackend(args Args) LockBackend } diff --git a/modules/lfstransfer/transfer/processor.go b/modules/lfstransfer/transfer/processor.go index 91bc0291e286b..be275d1b6f8aa 100644 --- a/modules/lfstransfer/transfer/processor.go +++ b/modules/lfstransfer/transfer/processor.go @@ -1,7 +1,6 @@ package transfer import ( - "crypto/sha256" "errors" "fmt" "io" @@ -161,29 +160,11 @@ func (p *Processor) PutObject(oid string) (Status, error) { if err != nil { return nil, err } - r := p.handler.Reader() - rdr := NewHashingReader(r, sha256.New()) - state, err := p.backend.StartUpload(oid, rdr, args) + rdr := p.handler.Reader() + err = p.backend.Upload(oid, expectedSize, rdr, args) if err != nil { return nil, err } - defer state.Close() // nolint: errcheck - actualSize := rdr.Size() - if actualSize != expectedSize { - err := fmt.Errorf("invalid size, expected %d, got %d", expectedSize, actualSize) - if actualSize > expectedSize { - err = fmt.Errorf("%w: %s", ErrExtraData, err) - } else { - err = fmt.Errorf("%w: %s", ErrMissingData, err) - } - return nil, err - } - if actualOid := rdr.Oid(); actualOid != oid { - return nil, fmt.Errorf("%w: %s", ErrCorruptData, fmt.Sprintf("invalid object ID, expected %s, got %s", oid, actualOid)) - } - if err := p.backend.FinishUpload(state, args); err != nil { - return nil, err - } return SuccessStatus(), nil } @@ -197,7 +178,11 @@ func (p *Processor) VerifyObject(oid string) (Status, error) { if err != nil { return nil, fmt.Errorf("%w: %s", ErrParseError, err) } - return p.backend.Verify(oid, args) + size, err := SizeFromArgs(args) + if err != nil { + return nil, fmt.Errorf("%w: %s", ErrParseError, err) + } + return p.backend.Verify(oid, size, args) } // GetObject writes an object ID to the transfer protocol. @@ -210,18 +195,14 @@ func (p *Processor) GetObject(oid string) (Status, error) { if err != nil { return nil, fmt.Errorf("%w: %s", ErrParseError, err) } - r, err := p.backend.Download(oid, args) + r, size, err := p.backend.Download(oid, args) if errors.Is(err, fs.ErrNotExist) { return NewStatus(StatusNotFound, fmt.Sprintf("object %s not found", oid)), nil } if err != nil { return nil, err } - info, err := r.Stat() - if err != nil { - return nil, err - } - return NewSuccessStatusWithReader(r, fmt.Sprintf("size=%d", info.Size())), nil + return NewSuccessStatusWithReader(r, fmt.Sprintf("size=%d", size)), nil } // Lock writes a lock to the transfer protocol. From 81e8e90d2765f63b6fec69438f71751ad160584f Mon Sep 17 00:00:00 2001 From: ConcurrentCrab Date: Thu, 27 Jun 2024 08:45:03 +0530 Subject: [PATCH 6/6] modules/lfstransfer: add a backend and runner Also add handler in runServ() The protocol lib supports locking but the backend does not, as neither does Gitea. Support can be added later and the capability advertised. --- cmd/serv.go | 11 +- modules/lfstransfer/backend/backend.go | 164 +++++++++++++++++++++++++ modules/lfstransfer/logger.go | 18 +++ modules/lfstransfer/main.go | 70 +++++++++++ 4 files changed, 262 insertions(+), 1 deletion(-) create mode 100644 modules/lfstransfer/backend/backend.go create mode 100644 modules/lfstransfer/logger.go create mode 100644 modules/lfstransfer/main.go diff --git a/cmd/serv.go b/cmd/serv.go index 36064928444e3..bf6b765c9445a 100644 --- a/cmd/serv.go +++ b/cmd/serv.go @@ -22,6 +22,7 @@ import ( "code.gitea.io/gitea/models/perm" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/json" + "code.gitea.io/gitea/modules/lfstransfer" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/pprof" "code.gitea.io/gitea/modules/private" @@ -40,6 +41,7 @@ const ( verbUploadArchive = "git-upload-archive" verbReceivePack = "git-receive-pack" verbLfsAuthenticate = "git-lfs-authenticate" + verbLfsTransfer = "git-lfs-transfer" ) // CmdServ represents the available serv sub-command. @@ -83,9 +85,11 @@ var ( verbUploadArchive: true, verbReceivePack: true, verbLfsAuthenticate: true, + verbLfsTransfer: true, } allowedCommandsLfs = map[string]bool{ verbLfsAuthenticate: true, + verbLfsTransfer: true, } alphaDashDotPattern = regexp.MustCompile(`[^\w-\.]`) ) @@ -138,7 +142,7 @@ func getAccessMode(verb string, lfsVerb string) perm.AccessMode { return perm.AccessModeRead case verbReceivePack: return perm.AccessModeWrite - case verbLfsAuthenticate: + case verbLfsAuthenticate, verbLfsTransfer: switch lfsVerb { case "upload": return perm.AccessModeWrite @@ -276,6 +280,11 @@ func runServ(c *cli.Context) error { return fail(ctx, extra.UserMsg, "ServCommand failed: %s", extra.Error) } + // LFS SSH protocol + if verb == verbLfsTransfer { + return lfstransfer.Main(ctx, repoPath, lfsVerb) + } + // LFS token authentication if verb == verbLfsAuthenticate { url := fmt.Sprintf("%s%s/%s.git/info/lfs", setting.AppURL, url.PathEscape(results.OwnerName), url.PathEscape(results.RepoName)) diff --git a/modules/lfstransfer/backend/backend.go b/modules/lfstransfer/backend/backend.go new file mode 100644 index 0000000000000..a5c55b255bf86 --- /dev/null +++ b/modules/lfstransfer/backend/backend.go @@ -0,0 +1,164 @@ +package backend + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + + git_model "code.gitea.io/gitea/models/git" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/lfs" + "code.gitea.io/gitea/modules/lfstransfer/transfer" +) + +// Version is the git-lfs-transfer protocol version number. +const Version = "1" + +// Capabilities is a list of Git LFS capabilities supported by this package. +var Capabilities = []string{ + "version=" + Version, + // "locking", // no support yet in gitea backend +} + +// GiteaBackend is an adapter between git-lfs-transfer library and Gitea's internal LFS API +type GiteaBackend struct { + ctx context.Context + repo *repo_model.Repository + store *lfs.ContentStore +} + +var _ transfer.Backend = &GiteaBackend{} + +// Batch implements transfer.Backend +func (g *GiteaBackend) Batch(_ string, pointers []transfer.BatchItem, _ transfer.Args) ([]transfer.BatchItem, error) { + for i := range pointers { + pointers[i].Present = false + pointer := lfs.Pointer{Oid: pointers[i].Oid, Size: pointers[i].Size} + exists, err := g.store.Verify(pointer) + if err != nil || !exists { + continue + } + accessible, err := g.repoHasAccess(pointers[i].Oid) + if err != nil || !accessible { + continue + } + pointers[i].Present = true + } + return pointers, nil +} + +// Download implements transfer.Backend. The returned reader must be closed by the +// caller. +func (g *GiteaBackend) Download(oid string, _ transfer.Args) (io.ReadCloser, int64, error) { + pointer := lfs.Pointer{Oid: oid} + pointer, err := g.store.GetMeta(pointer) + if err != nil { + return nil, 0, err + } + obj, err := g.store.Get(pointer) + if err != nil { + return nil, 0, err + } + accessible, err := g.repoHasAccess(oid) + if err != nil { + return nil, 0, err + } + if !accessible { + return nil, 0, fmt.Errorf("LFS Meta Object [%v] not accessible from repo: %v", oid, g.repo.RepoPath()) + } + return obj, pointer.Size, nil +} + +// StartUpload implements transfer.Backend. +func (g *GiteaBackend) Upload(oid string, size int64, r io.Reader, _ transfer.Args) error { + if r == nil { + return fmt.Errorf("%w: received null data", transfer.ErrMissingData) + } + pointer := lfs.Pointer{Oid: oid, Size: size} + exists, err := g.store.Verify(pointer) + if err != nil { + return err + } + if exists { + accessible, err := g.repoHasAccess(oid) + if err != nil { + return err + } + if accessible { + // we already have this object in the store and metadata + return nil + } + // we have this object in the store but not accessible + // so verify hash and size, and add it to metadata + hash := sha256.New() + written, err := io.Copy(hash, r) + if err != nil { + return fmt.Errorf("error creating hash: %v", err) + } + if written != size { + return fmt.Errorf("uploaded object [%v] has unexpected size: %v expected != %v received", oid, size, written) + } + recvOid := hex.EncodeToString(hash.Sum(nil)) != oid + if recvOid { + return fmt.Errorf("uploaded object [%v] has hash mismatch: %v received", oid, recvOid) + } + } else { + err = g.store.Put(pointer, r) + if err != nil { + return err + } + } + _, err = git_model.NewLFSMetaObject(g.ctx, g.repo.ID, pointer) + if err != nil { + return fmt.Errorf("could not create LFS Meta Object: %v", err) + } + return nil +} + +// Verify implements transfer.Backend. +func (g *GiteaBackend) Verify(oid string, size int64, args transfer.Args) (transfer.Status, error) { + pointer := lfs.Pointer{Oid: oid, Size: size} + exists, err := g.store.Verify(pointer) + if err != nil { + return transfer.NewStatus(transfer.StatusNotFound, err.Error()), err + } + if !exists { + return transfer.NewStatus(transfer.StatusNotFound, "not found"), fmt.Errorf("LFS Meta Object [%v] does not exist", oid) + } + accessible, err := g.repoHasAccess(oid) + if err != nil { + return transfer.NewStatus(transfer.StatusNotFound, "not found"), err + } + if !accessible { + return transfer.NewStatus(transfer.StatusNotFound, "not found"), fmt.Errorf("LFS Meta Object [%v] not accessible from repo: %v", oid, g.repo.RepoPath()) + } + return transfer.SuccessStatus(), nil +} + +// LockBackend implements transfer.Backend. +func (g *GiteaBackend) LockBackend(_ transfer.Args) transfer.LockBackend { + // Gitea doesn't support the locking API + // this should never be called as we don't advertise the capability + return (transfer.LockBackend)(nil) +} + +// repoHasAccess checks if the repo already has the object with OID stored +func (g *GiteaBackend) repoHasAccess(oid string) (bool, error) { + // check if OID is in global LFS store + exists, err := g.store.Exists(lfs.Pointer{Oid: oid}) + if err != nil || !exists { + return false, err + } + // check if OID is in repo LFS store + metaObj, err := git_model.GetLFSMetaObjectByOid(g.ctx, g.repo.ID, oid) + if err != nil || metaObj == nil { + return false, err + } + return true, nil +} + +func New(ctx context.Context, r *repo_model.Repository, s *lfs.ContentStore) transfer.Backend { + return &GiteaBackend{ctx: ctx, repo: r, store: s} +} diff --git a/modules/lfstransfer/logger.go b/modules/lfstransfer/logger.go new file mode 100644 index 0000000000000..b9f33ca266dd7 --- /dev/null +++ b/modules/lfstransfer/logger.go @@ -0,0 +1,18 @@ +package lfstransfer + +import ( + "code.gitea.io/gitea/modules/lfstransfer/transfer" +) + +// noop logger for passing into transfer +type GiteaLogger struct{} + +// Log implements transfer.Logger +func (g *GiteaLogger) Log(msg string, itms ...interface{}) { +} + +var _ transfer.Logger = (*GiteaLogger)(nil) + +func newLogger() transfer.Logger { + return &GiteaLogger{} +} diff --git a/modules/lfstransfer/main.go b/modules/lfstransfer/main.go new file mode 100644 index 0000000000000..81cf21b5e0dd6 --- /dev/null +++ b/modules/lfstransfer/main.go @@ -0,0 +1,70 @@ +package lfstransfer + +import ( + "context" + "fmt" + "os" + "strings" + + db_model "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/lfs" + "code.gitea.io/gitea/modules/lfstransfer/backend" + "code.gitea.io/gitea/modules/lfstransfer/transfer" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/storage" +) + +func initServices(ctx context.Context) error { + setting.MustInstalled() + setting.LoadDBSetting() + setting.InitSQLLoggersForCli(log.INFO) + if err := db_model.InitEngine(ctx); err != nil { + return fmt.Errorf("unable to initialize the database using configuration [%q]: %w", setting.CustomConf, err) + } + if err := storage.Init(); err != nil { + return fmt.Errorf("unable to initialise storage: %v", err) + } + return nil +} + +func getRepo(ctx context.Context, path string) (*repo_model.Repository, error) { + // runServ ensures repoPath is [owner]/[name].git + pathSeg := strings.Split(path, "/") + pathSeg[1] = strings.TrimSuffix(pathSeg[1], ".git") + return repo_model.GetRepositoryByOwnerAndName(ctx, pathSeg[0], pathSeg[1]) +} + +func Main(ctx context.Context, repoPath string, verb string) error { + if err := initServices(ctx); err != nil { + return err + } + + logger := newLogger() + pktline := transfer.NewPktline(os.Stdin, os.Stdout, logger) + repo, err := getRepo(ctx, repoPath) + if err != nil { + return fmt.Errorf("unable to get repository: %s Error: %v", repoPath, err) + } + giteaBackend := backend.New(ctx, repo, lfs.NewContentStore()) + + for _, cap := range backend.Capabilities { + if err := pktline.WritePacketText(cap); err != nil { + log.Error("error sending capability [%v] due to error: %v", cap, err) + } + } + if err := pktline.WriteFlush(); err != nil { + log.Error("error flushing capabilities: %v", err) + } + p := transfer.NewProcessor(pktline, giteaBackend, logger) + defer log.Info("done processing commands") + switch verb { + case "upload": + return p.ProcessCommands(transfer.UploadOperation) + case "download": + return p.ProcessCommands(transfer.DownloadOperation) + default: + return fmt.Errorf("unknown operation %q", verb) + } +}