Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion internal/api/keppel/manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,10 @@ func (a *API) handleGetTrivyReport(w http.ResponseWriter, r *http.Request) {
if respondwith.ObfuscatedErrorText(w, err) {
return
}
defer buf.Close()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(buf)
io.Copy(w, buf) //nolint:errcheck // we could only log that the client closed the connection early
return
}

Expand Down
2 changes: 1 addition & 1 deletion internal/api/keppel/manifests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestManifestsAPI(t *testing.T) {
must.SucceedT(t, s.SD.WriteManifest(
s.Ctx,
models.ReducedAccount{Name: repo.AccountName},
repo.Name, dummyDigest, []byte(strings.Repeat("x", sizeBytes)),
repo.Name, dummyDigest, strings.NewReader(strings.Repeat("x", sizeBytes)),
))
must.SucceedT(t, s.DB.Insert(&models.TrivySecurityInfo{
RepositoryID: int64(repoID),
Expand Down
2 changes: 1 addition & 1 deletion internal/api/keppel/repos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestReposAPI(t *testing.T) {
PushedAt: manifestPushedAt,
NextValidationAt: manifestPushedAt.Add(models.ManifestValidationInterval),
}))
must.SucceedT(t, s.SD.WriteManifest(s.Ctx, models.ReducedAccount{Name: "test1"}, "repo1-3", dummyDigest, []byte("data")))
must.SucceedT(t, s.SD.WriteManifest(s.Ctx, models.ReducedAccount{Name: "test1"}, "repo1-3", dummyDigest, strings.NewReader("data")))
must.SucceedT(t, s.DB.Insert(&models.TrivySecurityInfo{
RepositoryID: filledRepo.ID,
Digest: dummyDigest,
Expand Down
10 changes: 9 additions & 1 deletion internal/api/peer/delegatedpull.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package peerv1

import (
"io"
"net/http"
"strconv"

Expand Down Expand Up @@ -41,7 +42,7 @@ func (a *API) handleDelegatedPullManifest(w http.ResponseWriter, r *http.Request
Password: r.Header.Get("X-Keppel-Delegated-Pull-Password"), // may be empty
}
ref := models.ParseManifestReference(vars["reference"])
manifestBytes, manifestMediaType, err := rc.DownloadManifest(r.Context(), ref, &opts)
manifestReader, manifestMediaType, err := rc.DownloadManifest(r.Context(), ref, &opts)

if err != nil {
if rerr, ok := errext.As[*keppel.RegistryV2Error](err); ok {
Expand All @@ -52,6 +53,13 @@ func (a *API) handleDelegatedPullManifest(w http.ResponseWriter, r *http.Request
return
}
}
defer manifestReader.Close()

manifestBytes, err := io.ReadAll(manifestReader)
if err != nil {
respondwith.ObfuscatedErrorText(w, err)
return
}

w.Header().Set("Content-Type", manifestMediaType)
w.Header().Set("Content-Length", strconv.Itoa(len(manifestBytes)))
Expand Down
7 changes: 6 additions & 1 deletion internal/api/registry/manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,12 @@ func (a *API) handleGetOrHeadManifest(w http.ResponseWriter, r *http.Request) {
logg.Info("could not read manifest %s@%s from DB (falling back to read from storage): %s",
repo.FullName(), dbManifest.Digest, err.Error())
}
manifestBytes, err = a.sd.ReadManifest(r.Context(), *account, repo.Name, dbManifest.Digest)
manifestReader, err := a.sd.ReadManifest(r.Context(), *account, repo.Name, dbManifest.Digest)
if respondWithError(w, r, err) {
return
}
defer manifestReader.Close()
manifestBytes, err = io.ReadAll(manifestReader)
if respondWithError(w, r, err) {
return
}
Expand Down
19 changes: 5 additions & 14 deletions internal/client/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ type DownloadManifestOpts struct {
ExtraHeaders http.Header
}

// DownloadManifest fetches a manifest from this repository. If an error is
// returned, it's usually a *keppel.RegistryV2Error.
func (c *RepoClient) DownloadManifest(ctx context.Context, reference models.ManifestReference, opts *DownloadManifestOpts) (contents []byte, mediaType string, returnErr error) {
// DownloadManifest fetches a manifest from this repository.
// If an error is returned, it's usually a *keppel.RegistryV2Error.
// The caller is responsible for closing the returned ReadCloser.
func (c *RepoClient) DownloadManifest(ctx context.Context, reference models.ManifestReference, opts *DownloadManifestOpts) (contents io.ReadCloser, mediaType string, returnErr error) {
if opts == nil {
opts = &DownloadManifestOpts{}
}
Expand All @@ -69,15 +70,5 @@ func (c *RepoClient) DownloadManifest(ctx context.Context, reference models.Mani
return nil, "", err
}

respBytes, err := io.ReadAll(resp.Body)
if err == nil {
err = resp.Body.Close()
} else {
resp.Body.Close()
}
if err != nil {
return nil, "", err
}

return respBytes, resp.Header.Get("Content-Type"), nil
return resp.Body, resp.Header.Get("Content-Type"), nil
}
18 changes: 7 additions & 11 deletions internal/client/peer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"fmt"
"io"
"net/http"
"strings"

"github.com/sapcc/keppel/internal/auth"
"github.com/sapcc/keppel/internal/keppel"
Expand Down Expand Up @@ -39,18 +38,19 @@ func (c *Client) initToken(ctx context.Context, cfg keppel.Configuration, scope
ourUserName := "replication@" + cfg.APIPublicHostname
authHeader := map[string]string{"Authorization": keppel.BuildBasicAuthHeader(ourUserName, c.peer.OurPassword)}

respBodyBytes, respStatusCode, _, err := c.doRequest(ctx, http.MethodGet, reqURL, http.NoBody, authHeader)
respBody, respStatusCode, _, err := c.doRequest(ctx, http.MethodGet, reqURL, http.NoBody, authHeader)
if err != nil {
return err
}
defer respBody.Close()
if respStatusCode != http.StatusOK {
return fmt.Errorf("expected 200 OK, but got %d: %s", respStatusCode, strings.TrimSpace(string(respBodyBytes)))
return fmt.Errorf("expected 200 OK, but got %d: %s", respStatusCode, tryReadAllAndTrimSpace(respBody))
}

var data struct {
Token string `json:"token"`
}
err = json.Unmarshal(respBodyBytes, &data)
err = json.NewDecoder(respBody).Decode(&data)
if err != nil {
return err
}
Expand All @@ -62,7 +62,8 @@ func (c Client) buildRequestURL(path string) string {
return fmt.Sprintf("https://%s/%s", c.peer.HostName, path)
}

func (c Client) doRequest(ctx context.Context, method, url string, body io.Reader, headers map[string]string) (respBodyBytes []byte, respStatusCode int, respHeader http.Header, err error) {
// The caller is responsible for closing respBodyBytes if it's non-nil.
func (c Client) doRequest(ctx context.Context, method, url string, body io.Reader, headers map[string]string) (respBodyBytes io.ReadCloser, respStatusCode int, respHeader http.Header, err error) {
req, err := http.NewRequestWithContext(ctx, method, url, body)
if err != nil {
return nil, 0, nil, fmt.Errorf("during %s %s: %w", method, url, err)
Expand All @@ -78,11 +79,6 @@ func (c Client) doRequest(ctx context.Context, method, url string, body io.Reade
if err != nil {
return nil, 0, nil, fmt.Errorf("during %s %s: %w", method, url, err)
}
defer resp.Body.Close()
respBodyBytes, err = io.ReadAll(resp.Body)
if err != nil {
return nil, 0, nil, fmt.Errorf("during %s %s: %w", method, url, err)
}

return respBodyBytes, resp.StatusCode, resp.Header, nil
return resp.Body, resp.StatusCode, resp.Header, nil
}
51 changes: 33 additions & 18 deletions internal/client/peer/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"

"github.com/sapcc/keppel/internal/keppel"
"github.com/sapcc/keppel/internal/models"
Expand All @@ -17,7 +19,8 @@ import (
// DownloadManifestViaPullDelegation asks the peer to download a manifest from
// an external registry for us. This gets used when the external registry
// denies the pull to us because we hit our rate limit.
func (c Client) DownloadManifestViaPullDelegation(ctx context.Context, imageRef models.ImageReference, userName, password string) (respBodyBytes []byte, contentType string, err error) {
// The caller is responsible for closing the returned ReadCloser.
func (c Client) DownloadManifestViaPullDelegation(ctx context.Context, imageRef models.ImageReference, userName, password string) (respBodyBytes io.ReadCloser, contentType string, err error) {
reqURL := c.buildRequestURL(fmt.Sprintf(
"peer/v1/delegatedpull/%s/v2/%s/manifests/%s",
imageRef.Host, imageRef.RepoName, imageRef.Reference,
Expand All @@ -27,15 +30,16 @@ func (c Client) DownloadManifestViaPullDelegation(ctx context.Context, imageRef
"X-Keppel-Delegated-Pull-Password": password,
}

respBodyBytes, respStatusCode, respHeader, err := c.doRequest(ctx, http.MethodGet, reqURL, http.NoBody, reqHeaders)
respBody, respStatusCode, respHeader, err := c.doRequest(ctx, http.MethodGet, reqURL, http.NoBody, reqHeaders)
if err != nil {
return nil, "", err
}
if respStatusCode != http.StatusOK {
return nil, "", fmt.Errorf("during GET %s: expected 200, got %d with response: %s",
reqURL, respStatusCode, string(respBodyBytes))
respBody.Close()
return nil, "", fmt.Errorf("during GET %s: expected 200, got %d with response: %s", reqURL, respStatusCode, tryReadAllAndTrimSpace(respBody))
}
return respBodyBytes, respHeader.Get("Content-Type"), nil

return respBody, respHeader.Get("Content-Type"), nil
}

// GetForeignAccountConfiguration asks the peer for the configuration of the
Expand All @@ -48,19 +52,19 @@ func (c Client) DownloadManifestViaPullDelegation(ctx context.Context, imageRef
func (c Client) GetForeignAccountConfigurationInto(ctx context.Context, target any, accountName models.AccountName) error {
reqURL := c.buildRequestURL("keppel/v1/accounts/" + string(accountName))

respBodyBytes, respStatusCode, _, err := c.doRequest(ctx, http.MethodGet, reqURL, http.NoBody, nil)
respBody, respStatusCode, _, err := c.doRequest(ctx, http.MethodGet, reqURL, http.NoBody, nil)
if err != nil {
return err
}
defer respBody.Close()
if respStatusCode != http.StatusOK {
return fmt.Errorf("during GET %s: expected 200, got %d with response: %s",
reqURL, respStatusCode, string(respBodyBytes))
return fmt.Errorf("during GET %s: expected 200, got %d with response: %s", reqURL, respStatusCode, tryReadAllAndTrimSpace(respBody))
}

data := struct {
Target any `json:"account"`
}{target}
err = jsonUnmarshalStrict(respBodyBytes, &data)
err = jsonUnmarshalStrict(respBody, &data)
if err != nil {
return fmt.Errorf("while parsing response for GET %s: %w", reqURL, err)
}
Expand All @@ -72,19 +76,20 @@ func (c Client) GetForeignAccountConfigurationInto(ctx context.Context, target a
func (c Client) GetSubleaseToken(ctx context.Context, accountName models.AccountName) (keppel.SubleaseToken, error) {
reqURL := c.buildRequestURL("keppel/v1/accounts/" + string(accountName) + "/sublease")

respBodyBytes, respStatusCode, _, err := c.doRequest(ctx, http.MethodPost, reqURL, http.NoBody, nil)
respBody, respStatusCode, _, err := c.doRequest(ctx, http.MethodPost, reqURL, http.NoBody, nil)
if err != nil {
return keppel.SubleaseToken{}, err
}
defer respBody.Close()
if respStatusCode != http.StatusOK {
return keppel.SubleaseToken{}, fmt.Errorf("while obtaining sublease token with POST %s: expected 200, got %d with response: %s",
reqURL, respStatusCode, string(respBodyBytes))
reqURL, respStatusCode, tryReadAllAndTrimSpace(respBody))
}

data := struct {
SubleaseToken string `json:"sublease_token"`
}{}
err = jsonUnmarshalStrict(respBodyBytes, &data)
err = jsonUnmarshalStrict(respBody, &data)
if err != nil {
return keppel.SubleaseToken{}, fmt.Errorf("while parsing sublease token response from POST %s: %w", reqURL, err)
}
Expand All @@ -104,31 +109,41 @@ func (c Client) PerformReplicaSync(ctx context.Context, fullRepoName string, pay
return nil, fmt.Errorf("while marshalling POST %s: %w", reqURL, err)
}

respBodyBytes, respStatusCode, _, err := c.doRequest(ctx, http.MethodPost, reqURL, bytes.NewReader(reqBodyBytes), nil)
respBody, respStatusCode, _, err := c.doRequest(ctx, http.MethodPost, reqURL, bytes.NewReader(reqBodyBytes), nil)
if err != nil {
return nil, err
}
defer respBody.Close()
if respStatusCode == http.StatusNotFound {
// 404 can occur when the repo has been deleted on primary; in this case,
// fall back to verifying the deletion explicitly using the normal API
return nil, nil
}
if respStatusCode != http.StatusOK {
return nil, fmt.Errorf("during POST %s: expected 200, got %d with response: %s",
reqURL, respStatusCode, string(respBodyBytes))
return nil, fmt.Errorf("during POST %s: expected 200, got %d with response: %s", reqURL, respStatusCode, tryReadAllAndTrimSpace(respBody))
}

var respPayload keppel.ReplicaSyncPayload
err = jsonUnmarshalStrict(respBodyBytes, &respPayload)
err = jsonUnmarshalStrict(respBody, &respPayload)
if err != nil {
return nil, fmt.Errorf("while parsing response from POST %s: %w", reqURL, err)
}
return &respPayload, nil
}

// Like yaml.UnmarshalStrict(), but for JSON.
func jsonUnmarshalStrict(buf []byte, target any) error {
dec := json.NewDecoder(bytes.NewReader(buf))
func jsonUnmarshalStrict(in io.Reader, target any) error {
dec := json.NewDecoder(in)
dec.DisallowUnknownFields()
return dec.Decode(target)
}

func tryReadAllAndTrimSpace(respBody io.ReadCloser) (respString string) {
respBodyBytes, err := io.ReadAll(respBody)
if err == nil {
respString = strings.TrimSpace(string(respBodyBytes))
} else {
respString = "<could not read response body>"
}
return respString
}
9 changes: 8 additions & 1 deletion internal/client/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,17 @@ func (c *RepoClient) doValidateManifest(ctx context.Context, reference models.Ma
}
}()

manifestBytes, manifestMediaType, err := c.DownloadManifest(ctx, reference, nil)
manifestReader, manifestMediaType, err := c.DownloadManifest(ctx, reference, nil)
if err != nil {
return err
}
defer manifestReader.Close()

manifestBytes, err := io.ReadAll(manifestReader)
if err != nil {
return err
}

manifest, err := keppel.ParseManifest(manifestMediaType, manifestBytes)
if err != nil {
return err
Expand Down
23 changes: 14 additions & 9 deletions internal/drivers/filesystem/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,20 +126,25 @@ func (d *StorageDriver) DeleteBlob(ctx context.Context, account models.ReducedAc
}

// ReadManifest implements the keppel.StorageDriver interface.
func (d *StorageDriver) ReadManifest(ctx context.Context, account models.ReducedAccount, repoName string, manifestDigest digest.Digest) ([]byte, error) {
func (d *StorageDriver) ReadManifest(ctx context.Context, account models.ReducedAccount, repoName string, manifestDigest digest.Digest) (io.ReadCloser, error) {
path := d.getManifestPath(account, repoName, manifestDigest)
return os.ReadFile(path)
return os.Open(path)
}

// WriteManifest implements the keppel.StorageDriver interface.
func (d *StorageDriver) WriteManifest(ctx context.Context, account models.ReducedAccount, repoName string, manifestDigest digest.Digest, contents []byte) error {
func (d *StorageDriver) WriteManifest(ctx context.Context, account models.ReducedAccount, repoName string, manifestDigest digest.Digest, contents io.Reader) (err error) {
path := d.getManifestPath(account, repoName, manifestDigest)
tmpPath := path + ".tmp"
err := os.MkdirAll(filepath.Dir(tmpPath), 0777) // subject to umask
err = os.MkdirAll(filepath.Dir(tmpPath), 0777) // subject to umask
if err != nil {
return err
}
err = os.WriteFile(tmpPath, contents, 0666) // subject to umask
file, err := os.OpenFile(tmpPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) // subject to umask
if err != nil {
return err
}
defer func() { err = file.Close() }()
_, err = io.Copy(file, contents)
if err != nil {
return err
}
Expand All @@ -153,16 +158,16 @@ func (d *StorageDriver) DeleteManifest(ctx context.Context, account models.Reduc
}

// ReadTrivyReport implements the keppel.StorageDriver interface.
func (d *StorageDriver) ReadTrivyReport(ctx context.Context, account models.ReducedAccount, repoName string, manifestDigest digest.Digest, format string) ([]byte, error) {
func (d *StorageDriver) ReadTrivyReport(ctx context.Context, account models.ReducedAccount, repoName string, manifestDigest digest.Digest, format string) (io.ReadCloser, error) {
path := d.getTrivyReportPath(account, repoName, manifestDigest, format)
return os.ReadFile(path)
return os.Open(path)
}

// WriteTrivyReport implements the keppel.StorageDriver interface.
func (d *StorageDriver) WriteTrivyReport(ctx context.Context, account models.ReducedAccount, repoName string, manifestDigest digest.Digest, payload trivy.ReportPayload) error {
func (d *StorageDriver) WriteTrivyReport(ctx context.Context, account models.ReducedAccount, repoName string, manifestDigest digest.Digest, payload trivy.ReportPayload) (err error) {
path := d.getTrivyReportPath(account, repoName, manifestDigest, payload.Format)
tmpPath := path + ".tmp"
err := os.MkdirAll(filepath.Dir(tmpPath), 0777) // subject to umask
err = os.MkdirAll(filepath.Dir(tmpPath), 0777) // subject to umask
if err != nil {
return err
}
Expand Down
Loading
Loading