Skip to content

Commit 8ec8fe4

Browse files
add limits for confluence
1 parent 5fd9157 commit 8ec8fe4

File tree

5 files changed

+357
-11
lines changed

5 files changed

+357
-11
lines changed

plugins/confluence.go

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ const (
2727
flagUsername = "username"
2828
flagToken = "token"
2929
flagHistory = "history"
30+
31+
flagMaxAPIResponseMB = "max-api-response-megabytes"
32+
flagMaxPageBodyMB = "max-page-body-megabytes"
33+
flagMaxTotalScanMB = "max-total-scan-megabytes"
3034
)
3135

3236
// Confluence Cloud REST API v2 per-request limits (server caps by endpoint/param).
@@ -46,6 +50,9 @@ const (
4650
// maxPageSize is the requested number of items per page in paginated responses.
4751
// Confluence v2 accepts 1–250; we use 250 to minimize requests so we're less likely to hit rate limits.
4852
maxPageSize = 250
53+
54+
// bytesPerMegabyte is used to convert CLI megabyte values into bytes.
55+
bytesPerMegabyte = 1024 * 1024
4956
)
5057

5158
type ConfluencePlugin struct {
@@ -82,6 +89,11 @@ type ConfluencePlugin struct {
8289

8390
// invalidPageIDs holds user-provided page-ids that failed numeric/length validation.
8491
invalidPageIDs map[string]struct{}
92+
93+
// Optional limits (0 means "no limit").
94+
maxAPIResponseBytes int64
95+
maxTotalScanBytes int64
96+
maxPageBodyBytes int64
8597
}
8698

8799
// NewConfluencePlugin constructs a new Confluence plugin with a default chunker.
@@ -107,6 +119,11 @@ func (p *ConfluencePlugin) DefineCommand(items chan ISourceItem, errs chan error
107119
var username string
108120
var token string
109121

122+
// CLI values in megabytes; converted to bytes in PreRunE.
123+
var maxAPIResponseMB int
124+
var maxPageBodyMB int
125+
var maxTotalScanMB int
126+
110127
cmd := &cobra.Command{
111128
Use: fmt.Sprintf("%s <URL>", p.GetName()),
112129
Short: "Scan Confluence Cloud",
@@ -118,9 +135,21 @@ func (p *ConfluencePlugin) DefineCommand(items chan ISourceItem, errs chan error
118135
p.SpaceIDs = trimNonEmpty(p.SpaceIDs)
119136
p.PageIDs = trimNonEmpty(p.PageIDs)
120137

138+
// Convert per-run/response/page limits from megabytes to bytes.
139+
if maxAPIResponseMB > 0 {
140+
p.maxAPIResponseBytes = int64(maxAPIResponseMB) * bytesPerMegabyte
141+
}
142+
if maxTotalScanMB > 0 {
143+
p.maxTotalScanBytes = int64(maxTotalScanMB) * bytesPerMegabyte
144+
}
145+
if maxPageBodyMB > 0 {
146+
p.maxPageBodyBytes = int64(maxPageBodyMB) * bytesPerMegabyte
147+
}
148+
121149
if err := p.initialize(args[0], username, token); err != nil {
122150
return err
123151
}
152+
124153
if username == "" || token == "" {
125154
log.Warn().Msg("Confluence credentials not provided. The scan will run anonymously (public pages only).")
126155
}
@@ -144,6 +173,14 @@ func (p *ConfluencePlugin) DefineCommand(items chan ISourceItem, errs chan error
144173
flags.StringVar(&token, flagToken, "", "Confluence API/scoped token value.")
145174
flags.BoolVar(&p.History, flagHistory, false, "Also scan all page revisions (all versions).")
146175

176+
// Optional limits (0 disables each check).
177+
flags.IntVar(&maxAPIResponseMB, flagMaxAPIResponseMB, 0,
178+
"limit for per-request API response size in megabytes. When exceeded, the batch is skipped and a warning is logged (0 disables the check).")
179+
flags.IntVar(&maxPageBodyMB, flagMaxPageBodyMB, 0,
180+
"limit for individual page body size in megabytes. Pages above this size are skipped and logged as warnings (0 disables the check).")
181+
flags.IntVar(&maxTotalScanMB, flagMaxTotalScanMB, 0,
182+
"limit for total downloaded data in megabytes for this Confluence scan. When exceeded, scanning stops gracefully and logs a warning (0 disables the check).")
183+
147184
return cmd, nil
148185
}
149186

@@ -185,7 +222,12 @@ func isValidURL(_ *cobra.Command, args []string) error {
185222
// initialize stores the normalized base wiki URL and constructs the Confluence client.
186223
// It validates the base by discovering the cloudId; if discovery fails, init fails.
187224
func (p *ConfluencePlugin) initialize(base, username, token string) error {
188-
client, err := NewConfluenceClient(base, username, token)
225+
client, err := NewConfluenceClient(
226+
base,
227+
username,
228+
token,
229+
WithLimits(p.maxAPIResponseBytes, p.maxTotalScanBytes, p.maxPageBodyBytes),
230+
)
189231
if err != nil {
190232
return err
191233
}
@@ -198,34 +240,46 @@ func (p *ConfluencePlugin) initialize(base, username, token string) error {
198240
// Also tracks which selectors yielded results and logs a single consolidated warning for
199241
// invalid/unresolvable/inaccessible selectors, without issuing additional API requests.
200242
func (p *ConfluencePlugin) walkAndEmitPages(ctx context.Context) error {
243+
handleTotalLimit := func(err error) error {
244+
if errors.Is(err, ErrTotalScanVolumeExceeded) {
245+
log.Warn().Msg("Some Confluence pages could not be processed because the scan reached the configured total-volume limit.")
246+
return nil
247+
}
248+
return err
249+
}
250+
201251
if len(p.SpaceIDs) > 0 || len(p.SpaceKeys) > 0 {
202252
allSpaceIDs, err := p.resolveAndCollectSpaceIDs(ctx)
203253
if err != nil {
204-
return err
254+
return handleTotalLimit(err)
205255
}
206256
if err := p.scanBySpaceIDs(ctx, allSpaceIDs); err != nil {
207-
return err
257+
return handleTotalLimit(err)
208258
}
209259
}
210260

211261
if len(p.PageIDs) > 0 {
212262
if err := p.scanByPageIDs(ctx); err != nil {
213-
return err
263+
return handleTotalLimit(err)
214264
}
215265
}
216266

217267
if len(p.SpaceIDs) == 0 && len(p.SpaceKeys) == 0 && len(p.PageIDs) == 0 {
218268
if err := p.client.WalkAllPages(ctx, maxPageSize, func(page *Page) error {
219269
return p.emitUniquePage(ctx, page)
220270
}); err != nil {
221-
return err
271+
return handleTotalLimit(err)
222272
}
223273
}
224274

225275
if msg := p.missingSelectorsWarningMessage(); msg != "" {
226276
log.Warn().Msg(msg)
227277
}
228278

279+
if p.client.APIResponseLimitHit() {
280+
log.Warn().Msg("Some Confluence pages could not be processed because one or more API responses exceeded the configured size limit.")
281+
}
282+
229283
return nil
230284
}
231285

plugins/confluence_client.go

Lines changed: 172 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"strconv"
1313
"strings"
1414
"time"
15+
16+
"github.com/rs/zerolog/log"
1517
)
1618

1719
const (
@@ -33,6 +35,16 @@ var (
3335

3436
// ErrUnexpectedHTTPStatus is used for other non-2xx statuses not classified above.
3537
ErrUnexpectedHTTPStatus = errors.New("unexpected http status")
38+
39+
// ErrAPIResponseTooLarge is returned internally when a single API response exceeds
40+
// the configured per-request size limit. Callers treat this as a soft failure:
41+
// skip the current batch and move on when possible.
42+
ErrAPIResponseTooLarge = errors.New("confluence api response exceeded configured size limit")
43+
44+
// ErrTotalScanVolumeExceeded is returned when the total downloaded bytes across
45+
// the scan exceed the configured global limit. Callers should stop scanning
46+
// gracefully and not treat this as a hard failure.
47+
ErrTotalScanVolumeExceeded = errors.New("confluence total scan volume exceeded configured limit")
3648
)
3749

3850
// ConfluenceClient defines the operations required by the Confluence plugin.
@@ -45,6 +57,7 @@ type ConfluenceClient interface {
4557
FetchPageAtVersion(ctx context.Context, pageID string, version int) (*Page, error)
4658
WalkSpacesByKeys(ctx context.Context, spaceKeys []string, limit int, visit func(*Space) error) error
4759
WikiBaseURL() string
60+
APIResponseLimitHit() bool
4861
}
4962

5063
// httpConfluenceClient is a ConfluenceClient implementation backed by net/http.
@@ -55,14 +68,40 @@ type httpConfluenceClient struct {
5568
username string
5669
token string
5770
apiBase string
71+
72+
// Optional limits (0 means "no limit").
73+
maxAPIResponseBytes int64
74+
maxTotalScanBytes int64
75+
maxPageBodyBytes int64
76+
77+
totalScanBytes int64
78+
79+
// apiResponseLimitHit indicates that at least one API batch exceeded the
80+
// configured per-request size limit during this scan. Used by the plugin
81+
// to emit a single consolidated warning instead of one per batch.
82+
apiResponseLimitHit bool
83+
}
84+
85+
type ConfluenceClientOption func(*httpConfluenceClient)
86+
87+
// WithLimits configures optional per-response, total-scan, and page-body size
88+
// limits in bytes for the Confluence client.
89+
func WithLimits(maxAPIResponseBytes, maxTotalScanBytes, maxPageBodyBytes int64) ConfluenceClientOption {
90+
return func(c *httpConfluenceClient) {
91+
c.maxAPIResponseBytes = maxAPIResponseBytes
92+
c.maxTotalScanBytes = maxTotalScanBytes
93+
c.maxPageBodyBytes = maxPageBodyBytes
94+
}
5895
}
5996

6097
// NewConfluenceClient constructs a ConfluenceClient for the given base input URL.
6198
// Behavior:
6299
// - Normalizes base wiki URL to "https://{host}/wiki".
63100
// - Discovers cloudId and always uses platform v2:
64101
// "https://api.atlassian.com/ex/confluence/{cloudId}/wiki/api/v2".
65-
func NewConfluenceClient(inputBaseURL, username, token string) (ConfluenceClient, error) {
102+
// - Applies any provided ConfluenceClientOption values (for example, response/scan
103+
// size limits) before issuing API requests.
104+
func NewConfluenceClient(inputBaseURL, username, token string, opts ...ConfluenceClientOption) (ConfluenceClient, error) {
66105
baseWikiURL, err := normalizeWikiBase(inputBaseURL)
67106
if err != nil {
68107
return nil, err
@@ -73,6 +112,9 @@ func NewConfluenceClient(inputBaseURL, username, token string) (ConfluenceClient
73112
username: username,
74113
token: token,
75114
}
115+
for _, opt := range opts {
116+
opt(c)
117+
}
76118
apiBase, err := c.buildAPIBase(context.Background())
77119
if err != nil {
78120
return nil, err
@@ -85,6 +127,12 @@ func NewConfluenceClient(inputBaseURL, username, token string) (ConfluenceClient
85127
// Example: "https://tenant.atlassian.net/wiki".
86128
func (c *httpConfluenceClient) WikiBaseURL() string { return c.baseWikiURL }
87129

130+
// APIResponseLimitHit reports whether any API response exceeded the configured
131+
// per-request size limit during this scan.
132+
func (c *httpConfluenceClient) APIResponseLimitHit() bool {
133+
return c.apiResponseLimitHit
134+
}
135+
88136
// normalizeWikiBase takes any Confluence-related URL (site root, /wiki, a page URL, etc.)
89137
// and returns "https://{host}/wiki".
90138
func normalizeWikiBase(inputURL string) (string, error) {
@@ -300,17 +348,87 @@ func (c *httpConfluenceClient) walkPagesPaginated(
300348
if err != nil {
301349
return err
302350
}
303-
// Prefer Link header; body may also include _links.next.
351+
352+
// Prefer Link header to discover the next cursor so we can safely move on
353+
// even if we have to stop decoding this batch early.
304354
linkNext := nextURLFromLinkHeader(headers)
305-
bodyNext, decodeErr := streamPagesFromBody(rc, visit)
355+
356+
// Wrap the body in a counting reader to enforce:
357+
// - per-response size limit (maxAPIResponseBytes)
358+
// - total scan volume limit (maxTotalScanBytes)
359+
var reader io.Reader = rc
360+
if c.maxAPIResponseBytes > 0 || c.maxTotalScanBytes > 0 {
361+
reader = &countingReader{
362+
r: rc,
363+
apiLimitBytes: c.maxAPIResponseBytes,
364+
apiConsumed: 0,
365+
totalBytes: &c.totalScanBytes,
366+
totalLimitBytes: c.maxTotalScanBytes,
367+
}
368+
}
369+
370+
// Wrap the visitor to apply per-page body-size limit when configured.
371+
limitedVisit := visit
372+
if c.maxPageBodyBytes > 0 {
373+
underlying := visit
374+
limitedVisit = func(p *Page) error {
375+
if p.Body.Storage != nil {
376+
bodySize := int64(len(p.Body.Storage.Value))
377+
if bodySize > c.maxPageBodyBytes {
378+
log.Warn().
379+
Str("page_id", p.ID).
380+
Int64("body_bytes", bodySize).
381+
Msg("Skipping page content because the Confluence page body exceeded the configured size limit.")
382+
return nil
383+
}
384+
}
385+
return underlying(p)
386+
}
387+
}
388+
389+
bodyNext, decodeErr := streamPagesFromBody(reader, limitedVisit)
306390
closeErr := rc.Close()
391+
307392
if decodeErr != nil {
308-
return decodeErr
393+
switch {
394+
case errors.Is(decodeErr, ErrAPIResponseTooLarge):
395+
// record that at least one batch exceeded the per-request
396+
// size limit and move to the next page if we know the cursor.
397+
// The plugin will emit a single consolidated warning after the scan.
398+
c.apiResponseLimitHit = true
399+
400+
if closeErr != nil {
401+
return closeErr
402+
}
403+
404+
cur := cursorFromURL(linkNext)
405+
if cur == "" {
406+
// No "next" link: nothing else to do.
407+
return nil
408+
}
409+
nextURL = withCursor(base, cur)
410+
continue
411+
412+
case errors.Is(decodeErr, ErrTotalScanVolumeExceeded):
413+
// Global total-volume limit hit: stop scanning gracefully.
414+
if closeErr != nil {
415+
return closeErr
416+
}
417+
return ErrTotalScanVolumeExceeded
418+
419+
default:
420+
if closeErr != nil {
421+
return closeErr
422+
}
423+
return decodeErr
424+
}
309425
}
426+
310427
if closeErr != nil {
311428
return closeErr
312429
}
313-
// Extract only the cursor and rebuild the next URL from our base.
430+
431+
// use Link header when available, fall back to body _links.next.
314432
cur := firstNonEmptyString(cursorFromURL(linkNext), cursorFromURL(bodyNext))
315433
if cur == "" {
316434
return nil
@@ -366,6 +484,15 @@ func (c *httpConfluenceClient) getJSON(ctx context.Context, reqURL string) ([]by
366484
if err != nil {
367485
return nil, nil, fmt.Errorf("read body: %w", err)
368486
}
487+
488+
// Track total scan volume for non-streamed responses as well.
489+
if c.maxTotalScanBytes > 0 {
490+
c.totalScanBytes += int64(len(body))
491+
if c.totalScanBytes > c.maxTotalScanBytes {
492+
return nil, nil, ErrTotalScanVolumeExceeded
493+
}
494+
}
495+
369496
return body, resp.Header.Clone(), nil
370497
}
371498

@@ -679,3 +806,43 @@ func firstNonEmptyString(primary, fallback string) string {
679806
}
680807
return fallback
681808
}
809+
810+
// countingReader wraps an io.Reader and tracks:
811+
// - bytes read for the current API response, enforcing maxAPIResponseBytes
812+
// - global total bytes read across the scan, enforcing totalLimitBytes.
813+
//
814+
// When a limit is exceeded, subsequent reads return a sentinel error and allow
815+
// callers (streaming decoders) to bail out gracefully.
816+
type countingReader struct {
817+
r io.Reader
818+
apiLimitBytes int64
819+
apiConsumed int64
820+
totalBytes *int64
821+
totalLimitBytes int64
822+
}
823+
824+
func (cr *countingReader) Read(p []byte) (int, error) {
825+
n, err := cr.r.Read(p)
826+
if n <= 0 {
827+
return n, err
828+
}
829+
830+
if cr.apiLimitBytes > 0 {
831+
cr.apiConsumed += int64(n)
832+
if cr.apiConsumed > cr.apiLimitBytes {
833+
// Surface a sentinel error to the decoder so callers can classify it
834+
// as warn for this batch.
835+
return n, ErrAPIResponseTooLarge
836+
}
837+
}
838+
839+
if cr.totalBytes != nil {
840+
*cr.totalBytes += int64(n)
841+
if cr.totalLimitBytes > 0 && *cr.totalBytes > cr.totalLimitBytes {
842+
// Global total-volume limit exceeded.
843+
return n, ErrTotalScanVolumeExceeded
844+
}
845+
}
846+
847+
return n, err
848+
}

0 commit comments

Comments
 (0)