11/*
2- Copyright 2020, 2021 The Flux authors
2+ Copyright 2022 The Flux authors
33
44Licensed under the Apache License, Version 2.0 (the "License");
55you may not use this file except in compliance with the License.
@@ -17,27 +17,36 @@ limitations under the License.
1717package controllers
1818
1919import (
20- "bytes"
2120 "context"
22- "crypto/sha256"
2321 "fmt"
24- "io"
25- "net/http"
2622 "os"
2723
28- "k8s.io/apimachinery/pkg/runtime"
2924 ctrl "sigs.k8s.io/controller-runtime"
3025 "sigs.k8s.io/controller-runtime/pkg/builder"
3126 "sigs.k8s.io/controller-runtime/pkg/client"
3227
33- "github.com/fluxcd/pkg/untar"
28+ "github.com/fluxcd/pkg/http/fetch"
29+ "github.com/fluxcd/pkg/tar"
3430 sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
3531)
3632
3733// GitRepositoryWatcher watches GitRepository objects for revision changes
3834type GitRepositoryWatcher struct {
3935 client.Client
40- Scheme * runtime.Scheme
36+ artifactFetcher * fetch.ArchiveFetcher
37+ HttpRetry int
38+ }
39+
40+ func (r * GitRepositoryWatcher ) SetupWithManager (mgr ctrl.Manager ) error {
41+ r .artifactFetcher = fetch .NewArchiveFetcher (
42+ r .HttpRetry ,
43+ tar .UnlimitedUntarSize ,
44+ os .Getenv ("SOURCE_CONTROLLER_LOCALHOST" ),
45+ )
46+
47+ return ctrl .NewControllerManagedBy (mgr ).
48+ For (& sourcev1.GitRepository {}, builder .WithPredicates (GitRepositoryRevisionChangePredicate {})).
49+ Complete (r )
4150}
4251
4352// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories,verbs=get;list;watch
@@ -52,7 +61,8 @@ func (r *GitRepositoryWatcher) Reconcile(ctx context.Context, req ctrl.Request)
5261 return ctrl.Result {}, client .IgnoreNotFound (err )
5362 }
5463
55- log .Info ("New revision detected" , "revision" , repository .Status .Artifact .Revision )
64+ artifact := repository .Status .Artifact
65+ log .Info ("New revision detected" , "revision" , artifact .Revision )
5666
5767 // create tmp dir
5868 tmpDir , err := os .MkdirTemp ("" , repository .Name )
@@ -62,12 +72,10 @@ func (r *GitRepositoryWatcher) Reconcile(ctx context.Context, req ctrl.Request)
6272 defer os .RemoveAll (tmpDir )
6373
6474 // download and extract artifact
65- summary , err := r .fetchArtifact (ctx , repository , tmpDir )
66- if err != nil {
75+ if err := r .artifactFetcher .Fetch (artifact .URL , artifact .Checksum , tmpDir ); err != nil {
6776 log .Error (err , "unable to fetch artifact" )
6877 return ctrl.Result {}, err
6978 }
70- log .Info (summary )
7179
7280 // list artifact content
7381 files , err := os .ReadDir (tmpDir )
@@ -82,71 +90,3 @@ func (r *GitRepositoryWatcher) Reconcile(ctx context.Context, req ctrl.Request)
8290
8391 return ctrl.Result {}, nil
8492}
85-
86- func (r * GitRepositoryWatcher ) SetupWithManager (mgr ctrl.Manager ) error {
87- return ctrl .NewControllerManagedBy (mgr ).
88- For (& sourcev1.GitRepository {}, builder .WithPredicates (GitRepositoryRevisionChangePredicate {})).
89- Complete (r )
90- }
91-
92- func (r * GitRepositoryWatcher ) fetchArtifact (ctx context.Context , repository sourcev1.GitRepository , dir string ) (string , error ) {
93- if repository .Status .Artifact == nil {
94- return "" , fmt .Errorf ("respository %s does not containt an artifact" , repository .Name )
95- }
96-
97- url := repository .Status .Artifact .URL
98-
99- // for local run:
100- // kubectl -n flux-system port-forward svc/source-controller 8080:80
101- // export SOURCE_HOST=localhost:8080
102- if hostname := os .Getenv ("SOURCE_HOST" ); hostname != "" {
103- url = fmt .Sprintf ("http://%s/gitrepository/%s/%s/latest.tar.gz" , hostname , repository .Namespace , repository .Name )
104- }
105-
106- // download the tarball
107- req , err := http .NewRequest ("GET" , url , nil )
108- if err != nil {
109- return "" , fmt .Errorf ("failed to create HTTP request, error: %w" , err )
110- }
111-
112- resp , err := http .DefaultClient .Do (req .WithContext (ctx ))
113- if err != nil {
114- return "" , fmt .Errorf ("failed to download artifact from %s, error: %w" , url , err )
115- }
116- defer resp .Body .Close ()
117-
118- // check response
119- if resp .StatusCode != http .StatusOK {
120- return "" , fmt .Errorf ("failed to download artifact, status: %s" , resp .Status )
121- }
122-
123- var buf bytes.Buffer
124-
125- // verify checksum matches origin
126- if err := r .verifyArtifact (repository .GetArtifact (), & buf , resp .Body ); err != nil {
127- return "" , err
128- }
129-
130- // extract
131- summary , err := untar .Untar (& buf , dir )
132- if err != nil {
133- return "" , fmt .Errorf ("faild to untar artifact, error: %w" , err )
134- }
135-
136- return summary , nil
137- }
138-
139- func (r * GitRepositoryWatcher ) verifyArtifact (artifact * sourcev1.Artifact , buf * bytes.Buffer , reader io.Reader ) error {
140- hasher := sha256 .New ()
141- mw := io .MultiWriter (hasher , buf )
142- if _ , err := io .Copy (mw , reader ); err != nil {
143- return err
144- }
145-
146- if checksum := fmt .Sprintf ("%x" , hasher .Sum (nil )); checksum != artifact .Checksum {
147- return fmt .Errorf ("failed to verify artifact: computed checksum '%s' doesn't match advertised '%s'" ,
148- checksum , artifact .Checksum )
149- }
150-
151- return nil
152- }
0 commit comments