Skip to content
This repository was archived by the owner on Sep 30, 2024. It is now read-only.

Commit cc7bd73

Browse files
burmudarjhchabran
andauthored
scaletesting: bulkreposettings - allow repos to be incrementally fetched and updated (#44203)
* allow repos to be incrementally fetched and updated * check errors and print first 5 * move log line * check result errrors * move around next page logic Co-authored-by: Jean-Hadrien Chabran <[email protected]>
1 parent f5cd524 commit cc7bd73

File tree

1 file changed

+221
-51
lines changed
  • dev/scaletesting/bulkreposettings

1 file changed

+221
-51
lines changed

dev/scaletesting/bulkreposettings/main.go

Lines changed: 221 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ import (
1313
"golang.org/x/oauth2"
1414

1515
"github.com/sourcegraph/log"
16+
1617
"github.com/sourcegraph/sourcegraph/dev/scaletesting/internal/store"
18+
"github.com/sourcegraph/sourcegraph/lib/errors"
1719
"github.com/sourcegraph/sourcegraph/lib/group"
1820
"github.com/sourcegraph/sourcegraph/lib/output"
1921
)
@@ -86,57 +88,110 @@ var app = &cli.App{
8688
return err
8789
}
8890

91+
var repoIter Iter[[]*store.Repo]
92+
var total int64
8993
if len(repos) == 0 {
90-
logger.Info("No existing state found, creating ...")
91-
repos, err = fetchRepos(cmd.Context, org, gh)
94+
logger.Info("Using GithubRepoFetcher")
95+
repoIter = &GithubRepoFetcher{
96+
client: gh,
97+
repoType: "public", // we're only interested in public repos to change visibility
98+
org: org,
99+
page: 0,
100+
done: false,
101+
err: nil,
102+
}
103+
104+
t, err := getTotalPublicRepos(ctx, gh, org)
92105
if err != nil {
93-
logger.Error("failed to fetch repositories from org", log.Error(err), log.String("github.org", org))
94-
return err
106+
logger.Fatal("failed to get total public repos size for org", log.String("org", org), log.Error(err))
95107
}
96-
if err := s.Insert(repos); err != nil {
97-
logger.Error("failed to insert repositories from org", log.Error(err), log.String("github.org", org))
98-
return err
108+
logger.Info("Estimated public repos from API", log.Int("total", t))
109+
total = int64(t)
110+
} else {
111+
logger.Info("Using StaticRepoFecther")
112+
repoIter = &MockRepoFetcher{
113+
repos: repos,
114+
iterSize: 10,
115+
start: 0,
99116
}
117+
total = int64(len(repos))
100118
}
101119

102120
out := output.NewOutput(os.Stdout, output.OutputOpts{})
103-
bars := []output.ProgressBar{
104-
{Label: "Updating repos", Max: float64(len(repos))},
105-
}
106-
progress := out.Progress(bars, nil)
107-
defer progress.Destroy()
121+
pending := out.Pending(output.Line(output.EmojiHourglass, output.StylePending, "Updating repos"))
122+
defer pending.Destroy()
108123

109124
var done int64
110-
total := len(repos)
111125

112126
g := group.NewWithResults[error]().WithMaxConcurrency(20)
113-
for _, r := range repos {
114-
r := r
115-
g.Go(func() error {
116-
if r.Pushed {
117-
return nil
118-
}
119-
var err error
120-
settings := &github.Repository{Private: github.Bool(true)}
121-
for i := 0; i < cmd.Int("retry"); i++ {
122-
_, _, err = gh.Repositories.Edit(cmd.Context, org, r.Name, settings)
123-
if err != nil {
124-
r.Failed = err.Error()
125-
} else {
126-
r.Failed = ""
127-
r.Pushed = true
128-
break
129-
}
130-
}
127+
for !repoIter.Done() && repoIter.Err() == nil {
128+
for _, r := range repoIter.Next(ctx) {
129+
r := r
131130
if err := s.SaveRepo(r); err != nil {
132131
logger.Fatal("could not save repo", log.Error(err), log.String("repo", r.Name))
133132
}
134-
atomic.AddInt64(&done, 1)
135-
progress.SetValue(0, float64(done))
136-
progress.SetLabel(0, fmt.Sprintf("Updating repos (%d/%d)", done, total))
137-
return err
138-
})
133+
134+
g.Go(func() error {
135+
if r.Pushed {
136+
return nil
137+
}
138+
var err error
139+
settings := &github.Repository{Private: github.Bool(true)}
140+
for i := 0; i < cmd.Int("retry"); i++ {
141+
_, _, err = gh.Repositories.Edit(cmd.Context, org, r.Name, settings)
142+
if err != nil {
143+
r.Failed = err.Error()
144+
} else {
145+
r.Failed = ""
146+
r.Pushed = true
147+
break
148+
}
149+
}
150+
151+
if err := s.SaveRepo(r); err != nil {
152+
logger.Fatal("could not save repo", log.Error(err), log.String("repo", r.Name))
153+
}
154+
atomic.AddInt64(&done, 1)
155+
pending.Update(fmt.Sprintf("%d repos updated (estimated total: %d)", done, total))
156+
return err
157+
})
158+
}
159+
// The total we get from Github is not correct (ie. 50k when we know the org as 200k)
160+
// So when done reaches the total, we attempt to get the total again and double the Max
161+
// of the bar
162+
if atomic.LoadInt64(&done) == int64(total) {
163+
t, err := getTotalPublicRepos(ctx, gh, org)
164+
if err != nil {
165+
logger.Fatal("failed to get updated public repos count", log.Error(err))
166+
}
167+
atomic.AddInt64(&total, int64(t))
168+
pending.Update(fmt.Sprintf("%d repos updated (estimated total: %d)", done, total))
169+
}
170+
}
171+
172+
if err := repoIter.Err(); err != nil {
173+
logger.Error("repo iterator encountered an error", log.Error(err))
139174
}
175+
176+
results := g.Wait()
177+
178+
// Check that we actually got errors
179+
errs := []error{}
180+
for _, r := range results {
181+
if r != nil {
182+
errs = append(errs, r)
183+
}
184+
}
185+
186+
if len(errs) > 0 {
187+
pending.Complete(output.Line(output.EmojiFailure, output.StyleBold, fmt.Sprintf("%d errors occured while updating repos", len(errs))))
188+
out.Writef("Printing first 5 errros")
189+
for i := 0; i < len(errs) && i < 5; i++ {
190+
logger.Error("Error updating repo", log.Error(errs[i]))
191+
}
192+
return errs[0]
193+
}
194+
pending.Complete(output.Line(output.EmojiOk, output.StyleBold, fmt.Sprintf("%d repos updated", done)))
140195
return nil
141196
},
142197
},
@@ -145,22 +200,118 @@ var app = &cli.App{
145200
},
146201
}
147202

148-
func fetchRepos(ctx context.Context, org string, gh *github.Client) ([]*store.Repo, error) {
203+
type Iter[T any] interface {
204+
Err() error
205+
Next(ctx context.Context) T
206+
Done() bool
207+
}
208+
209+
var _ Iter[[]*store.Repo] = (*GithubRepoFetcher)(nil)
210+
211+
// StaticRepoFetcher satisfies the Iter interface allowing one to iterate over a static array of repos. To change
212+
// how many repos are returned per invocation of next, set iterSize (default 10). To start iterating at a different
213+
// index, set start to a different value.
214+
//
215+
// The iteration is considered done when start >= len(repos)
216+
type MockRepoFetcher struct {
217+
repos []*store.Repo
218+
iterSize int
219+
start int
220+
}
221+
222+
// Err returns the last error (if any) encountered by Iter. For MockRepoFetcher, this retuns nil always
223+
func (m *MockRepoFetcher) Err() error {
224+
return nil
225+
}
226+
227+
// Done determines whether this Iter can produce more items. When start >= length of repos, then this will return true
228+
func (m *MockRepoFetcher) Done() bool {
229+
return m.start >= len(m.repos)
230+
}
231+
232+
// Next returns the next set of Repos. The amount of repos returned is determined by iterSize. When Done() is true,
233+
// nil is returned.
234+
func (m *MockRepoFetcher) Next(_ context.Context) []*store.Repo {
235+
if m.iterSize == 0 {
236+
m.iterSize = 10
237+
}
238+
if m.Done() {
239+
return nil
240+
}
241+
if m.start+m.iterSize > len(m.repos) {
242+
results := m.repos[m.start:]
243+
m.start = len(m.repos)
244+
return results
245+
}
246+
247+
results := m.repos[m.start : m.start+m.iterSize]
248+
// advance the start index
249+
m.start += m.iterSize
250+
return results
251+
252+
}
253+
254+
type GithubRepoFetcher struct {
255+
client *github.Client
256+
repoType string
257+
org string
258+
page int
259+
perPage int
260+
done bool
261+
err error
262+
}
263+
264+
// Done determines whether more repos can be retrieved from Github.
265+
func (g *GithubRepoFetcher) Done() bool {
266+
return g.done
267+
}
268+
269+
// Err returns the last error encountered by Iter
270+
func (g *GithubRepoFetcher) Err() error {
271+
return g.err
272+
}
273+
274+
// Next retrieves the next set of repos by contact Github. The amount of repos fetched is determined by pageSize.
275+
// The next page start is automatically advanced based on the response received from Github. When the next page response
276+
// from Github is 0, it means there are no more repos to fetch and this Iter is done, thus done is then set to true and
277+
// Done() will also return true.
278+
//
279+
// If any error is encountered during retrieval of Repos the err value will be set and can be retrieved with Err()
280+
func (g *GithubRepoFetcher) Next(ctx context.Context) []*store.Repo {
281+
if g.done {
282+
return nil
283+
}
284+
285+
results, next, err := g.listRepos(ctx, g.org, g.page, g.perPage)
286+
if err != nil {
287+
g.err = err
288+
return nil
289+
}
290+
291+
// when next is 0, it means the Github api returned the nextPage as 0, which indicates that there are not more pages to fetch
292+
if next > 0 {
293+
// Ensure that the next request starts at the next page
294+
g.page = next
295+
} else {
296+
g.done = true
297+
}
298+
299+
return results
300+
}
301+
302+
func (g *GithubRepoFetcher) listRepos(ctx context.Context, org string, start int, size int) ([]*store.Repo, int, error) {
149303
opts := github.RepositoryListByOrgOptions{
150-
ListOptions: github.ListOptions{},
304+
Type: g.repoType,
305+
ListOptions: github.ListOptions{Page: start, PerPage: size},
306+
}
307+
308+
repos, resp, err := g.client.Repositories.ListByOrg(ctx, org, &opts)
309+
if err != nil {
310+
return nil, 0, err
151311
}
152-
var repos []*github.Repository
153-
for {
154-
rs, resp, err := gh.Repositories.ListByOrg(ctx, org, &opts)
155-
if err != nil {
156-
return nil, err
157-
}
158-
repos = append(repos, rs...)
159-
160-
if resp.NextPage == 0 {
161-
break
162-
}
163-
opts.ListOptions.Page = resp.NextPage
312+
313+
if resp.StatusCode >= 300 {
314+
return nil, 0, errors.Newf("failed to list repos for org %s. Got status %d code", org, resp.StatusCode)
164315
}
165316

166317
res := make([]*store.Repo, 0, len(repos))
@@ -171,7 +322,26 @@ func fetchRepos(ctx context.Context, org string, gh *github.Client) ([]*store.Re
171322
})
172323
}
173324

174-
return res, nil
325+
next := resp.NextPage
326+
// If next page is 0 we're at the last page, so set the last page
327+
if next == 0 && g.page != resp.LastPage {
328+
next = resp.LastPage
329+
}
330+
331+
return res, next, nil
332+
}
333+
334+
func getTotalPublicRepos(ctx context.Context, client *github.Client, org string) (int, error) {
335+
orgRes, resp, err := client.Organizations.Get(ctx, org)
336+
if err != nil {
337+
return 0, err
338+
}
339+
340+
if resp.StatusCode >= 300 {
341+
return 0, errors.Newf("failed to get org %s. Got status %d code", org, resp.StatusCode)
342+
}
343+
344+
return *orgRes.PublicRepos, nil
175345
}
176346

177347
func main() {

0 commit comments

Comments
 (0)