Skip to content

Commit bcd8e93

Browse files
committed
Revive the rate-limited filesystem post-rebase
1 parent 0681807 commit bcd8e93

File tree

7 files changed

+223
-22
lines changed

7 files changed

+223
-22
lines changed

generate/param_generator.go

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ func GenParamEnum() {
8383
stringParamMap := make(map[string]string)
8484
stringSliceParamMap := make(map[string]string)
8585
intParamMap := make(map[string]string)
86+
byteRateParamMap := make(map[string]string)
8687
boolParamMap := make(map[string]string)
8788
durationParamMap := make(map[string]string)
8889
objectParamMap := make(map[string]string)
@@ -177,8 +178,7 @@ func GenParamEnum() {
177178
case "int":
178179
intParamMap[name] = rawName
179180
case "byterate":
180-
// byterate is stored as int (bytes per second) internally
181-
intParamMap[name] = rawName
181+
byteRateParamMap[name] = rawName
182182
case "bool":
183183
boolParamMap[name] = rawName
184184
case "duration":
@@ -207,13 +207,14 @@ func GenParamEnum() {
207207
StringMap map[string]string
208208
StringSliceMap map[string]string
209209
IntMap map[string]string
210+
ByteRateMap map[string]string
210211
BoolMap map[string]string
211212
DurationMap map[string]string
212213
ObjectMap map[string]string
213214
DeprecatedMap map[string][]string
214215
RuntimeConfigurableMap map[string]bool
215216
AllParamNames []string
216-
}{StringMap: stringParamMap, StringSliceMap: stringSliceParamMap, IntMap: intParamMap, BoolMap: boolParamMap, DurationMap: durationParamMap, ObjectMap: objectParamMap, DeprecatedMap: deprecatedMap, RuntimeConfigurableMap: runtimeConfigurableMap, AllParamNames: allParamNames})
217+
}{StringMap: stringParamMap, StringSliceMap: stringSliceParamMap, IntMap: intParamMap, ByteRateMap: byteRateParamMap, BoolMap: boolParamMap, DurationMap: durationParamMap, ObjectMap: objectParamMap, DeprecatedMap: deprecatedMap, RuntimeConfigurableMap: runtimeConfigurableMap, AllParamNames: allParamNames})
217218

218219
if err != nil {
219220
panic(err)
@@ -370,8 +371,7 @@ func GenParamStruct() {
370371
case "int":
371372
goType = "int"
372373
case "byterate":
373-
// byterate is parsed as string but stored as int (bytes per second)
374-
goType = "int"
374+
goType = "byte_rate.ByteRate"
375375
case "bool":
376376
goType = "bool"
377377
case "duration":
@@ -459,6 +459,8 @@ import (
459459
"time"
460460
461461
"github.com/spf13/viper"
462+
463+
"github.com/pelicanplatform/pelican/byte_rate"
462464
)
463465
464466
type StringParam struct {
@@ -477,6 +479,10 @@ type IntParam struct {
477479
name string
478480
}
479481
482+
type ByteRateParam struct {
483+
name string
484+
}
485+
480486
type DurationParam struct {
481487
name string
482488
}
@@ -604,6 +610,31 @@ func (iP IntParam) GetEnvVarName() string {
604610
return paramNameToEnvVar(iP.name)
605611
}
606612
613+
func (bRP ByteRateParam) GetByteRate() byte_rate.ByteRate {
614+
config := getOrCreateConfig()
615+
switch bRP.name {
616+
case "Origin.TransferRateLimit":
617+
return config.Origin.TransferRateLimit
618+
}
619+
return 0
620+
}
621+
622+
func (bRP ByteRateParam) GetName() string {
623+
return bRP.name
624+
}
625+
626+
func (bRP ByteRateParam) IsSet() bool {
627+
return viper.IsSet(bRP.name)
628+
}
629+
630+
func (bRP ByteRateParam) IsRuntimeConfigurable() bool {
631+
return IsRuntimeConfigurable(bRP.name)
632+
}
633+
634+
func (bRP ByteRateParam) GetEnvVarName() string {
635+
return paramNameToEnvVar(bRP.name)
636+
}
637+
607638
func (bP BoolParam) GetBool() bool {
608639
config := getOrCreateConfig()
609640
switch bP.name {
@@ -702,6 +733,11 @@ var ({{range $key, $value := .IntMap}}
702733
{{- end}}
703734
)
704735
736+
var ({{range $key, $value := .ByteRateMap}}
737+
{{$key}} = ByteRateParam{{"{"}}{{printf "%q" $value}}{{"}"}}
738+
{{- end}}
739+
)
740+
705741
var ({{range $key, $value := .BoolMap}}
706742
{{$key}} = BoolParam{{"{"}}{{printf "%q" $value}}{{"}"}}
707743
{{- end}}
@@ -741,6 +777,8 @@ package param
741777
742778
import (
743779
"time"
780+
781+
"github.com/pelicanplatform/pelican/byte_rate"
744782
)
745783
746784
{{.GeneratedConfig}}

origin_serve/handlers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -319,9 +319,9 @@ func InitializeHandlers(exports []server_utils.OriginExport) error {
319319
exportPrefixMap = make(map[string]string) // Initialize the global map
320320

321321
// Get optional rate limit for testing
322-
readRateLimit := param.Origin_TransferRateLimit.GetInt()
322+
readRateLimit := param.Origin_TransferRateLimit.GetByteRate()
323323
if readRateLimit > 0 {
324-
log.Infof("Applying read rate limit: %d bytes/sec", readRateLimit)
324+
log.Infof("Applying read rate limit: %s", readRateLimit.String())
325325
}
326326

327327
for _, export := range exports {

origin_serve/rate_limited_fs.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/***************************************************************
2+
*
3+
* Copyright (C) 2026, Pelican Project, Morgridge Institute for Research
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License"); you
6+
* may not use this file except in compliance with the License. You may
7+
* obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
***************************************************************/
18+
19+
package origin_serve
20+
21+
import (
22+
"context"
23+
"os"
24+
25+
"github.com/spf13/afero"
26+
"golang.org/x/time/rate"
27+
28+
"github.com/pelicanplatform/pelican/byte_rate"
29+
)
30+
31+
type rateLimitedFs struct {
32+
afero.Fs
33+
limiter *rate.Limiter
34+
}
35+
36+
type rateLimitedFile struct {
37+
afero.File
38+
limiter *rate.Limiter
39+
}
40+
41+
func newRateLimitedFs(fs afero.Fs, rateLimit byte_rate.ByteRate) afero.Fs {
42+
if rateLimit <= 0 {
43+
return fs
44+
}
45+
limit := int(rateLimit)
46+
limiter := rate.NewLimiter(rate.Limit(limit), limit)
47+
return &rateLimitedFs{Fs: fs, limiter: limiter}
48+
}
49+
50+
func (r *rateLimitedFs) Open(name string) (afero.File, error) {
51+
file, err := r.Fs.Open(name)
52+
if err != nil {
53+
return nil, err
54+
}
55+
return &rateLimitedFile{File: file, limiter: r.limiter}, nil
56+
}
57+
58+
func (r *rateLimitedFs) OpenFile(name string, flag int, perm os.FileMode) (afero.File, error) {
59+
file, err := r.Fs.OpenFile(name, flag, perm)
60+
if err != nil {
61+
return nil, err
62+
}
63+
return &rateLimitedFile{File: file, limiter: r.limiter}, nil
64+
}
65+
66+
func (f *rateLimitedFile) Read(p []byte) (int, error) {
67+
n, err := f.File.Read(p)
68+
if n > 0 {
69+
if waitErr := f.waitN(n); waitErr != nil {
70+
return n, waitErr
71+
}
72+
}
73+
return n, err
74+
}
75+
76+
func (f *rateLimitedFile) Write(p []byte) (int, error) {
77+
n, err := f.File.Write(p)
78+
if n > 0 {
79+
if waitErr := f.waitN(n); waitErr != nil {
80+
return n, waitErr
81+
}
82+
}
83+
return n, err
84+
}
85+
86+
func (f *rateLimitedFile) ReadAt(p []byte, off int64) (int, error) {
87+
n, err := f.File.ReadAt(p, off)
88+
if n > 0 {
89+
if waitErr := f.waitN(n); waitErr != nil {
90+
return n, waitErr
91+
}
92+
}
93+
return n, err
94+
}
95+
96+
func (f *rateLimitedFile) WriteAt(p []byte, off int64) (int, error) {
97+
n, err := f.File.WriteAt(p, off)
98+
if n > 0 {
99+
if waitErr := f.waitN(n); waitErr != nil {
100+
return n, waitErr
101+
}
102+
}
103+
return n, err
104+
}
105+
106+
func (f *rateLimitedFile) waitN(n int) error {
107+
remaining := n
108+
for remaining > 0 {
109+
burst := f.limiter.Burst()
110+
if burst <= 0 {
111+
burst = 1
112+
}
113+
step := remaining
114+
if step > burst {
115+
step = burst
116+
}
117+
if err := f.limiter.WaitN(context.Background(), step); err != nil {
118+
return err
119+
}
120+
remaining -= step
121+
}
122+
return nil
123+
}

param/param.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func stringToSliceHookFunc() mapstructure.DecodeHookFunc {
123123
}
124124
}
125125

126-
// stringToByteRateHookFunc returns a DecodeHookFunc that converts strings to integers
126+
// stringToByteRateHookFunc returns a DecodeHookFunc that converts strings to byte rates
127127
// representing bytes per second. It supports human-readable rate formats like:
128128
// - "10MB/s", "100Mbps", "1.5GiB/m"
129129
//
@@ -132,8 +132,9 @@ func stringToSliceHookFunc() mapstructure.DecodeHookFunc {
132132
// so other hooks or default conversions can handle them.
133133
func stringToByteRateHookFunc() mapstructure.DecodeHookFunc {
134134
return func(f reflect.Type, t reflect.Type, data interface{}) (interface{}, error) {
135-
// Only convert string to int
136-
if f.Kind() != reflect.String || t.Kind() != reflect.Int {
135+
// Only convert string to int or ByteRate
136+
byteRateType := reflect.TypeOf(byte_rate.ByteRate(0))
137+
if f.Kind() != reflect.String || (t.Kind() != reflect.Int && t != byteRateType) {
137138
return data, nil
138139
}
139140

@@ -144,6 +145,9 @@ func stringToByteRateHookFunc() mapstructure.DecodeHookFunc {
144145

145146
// Empty or "0" means no rate limiting
146147
if raw == "" || raw == "0" {
148+
if t == byteRateType {
149+
return byte_rate.ByteRate(0), nil
150+
}
147151
return 0, nil
148152
}
149153

@@ -169,7 +173,10 @@ func stringToByteRateHookFunc() mapstructure.DecodeHookFunc {
169173
return nil, errors.Wrapf(err, "failed to parse byte rate '%s'", raw)
170174
}
171175

172-
// Return as integer bytes per second
176+
if t == byteRateType {
177+
return rate, nil
178+
}
179+
// Return as integer bytes per second for int targets
173180
return int(rate), nil
174181
}
175182
}

param/param_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import (
2929
"github.com/spf13/viper"
3030
"github.com/stretchr/testify/assert"
3131
"github.com/stretchr/testify/require"
32+
33+
"github.com/pelicanplatform/pelican/byte_rate"
3234
)
3335

3436
func TestSetAndGet(t *testing.T) {
@@ -663,7 +665,7 @@ func TestByteRateDecoding(t *testing.T) {
663665
require.NoError(t, err)
664666

665667
// 10MB/s should be 10 * 1048576 (MiB) = 10485760 bytes/second
666-
expected := 10 * 1048576
668+
expected := byte_rate.ByteRate(10 * 1048576)
667669
assert.Equal(t, expected, cfg.Origin.TransferRateLimit, "Should decode 10MB/s correctly")
668670
})
669671

@@ -688,7 +690,7 @@ func TestByteRateDecoding(t *testing.T) {
688690
require.NoError(t, err)
689691

690692
// 100Mbps = 100 * 1048576 / 8 = 13107200 bytes/second
691-
expected := 100 * 1048576 / 8
693+
expected := byte_rate.ByteRate(100 * 1048576 / 8)
692694
assert.Equal(t, expected, cfg.Origin.TransferRateLimit, "Should decode 100Mbps correctly")
693695
})
694696

@@ -712,7 +714,7 @@ func TestByteRateDecoding(t *testing.T) {
712714
cfg, err := DecodeConfig(v)
713715
require.NoError(t, err)
714716

715-
assert.Equal(t, 0, cfg.Origin.TransferRateLimit, "Should handle zero rate")
717+
assert.Equal(t, byte_rate.ByteRate(0), cfg.Origin.TransferRateLimit, "Should handle zero rate")
716718
})
717719

718720
t.Run("decode-invalid-rate-should-error", func(t *testing.T) {
@@ -747,7 +749,7 @@ func TestByteRateDecoding(t *testing.T) {
747749
require.NoError(t, err)
748750

749751
// 5GB/s = 5 * 1073741824 = 5368709120 bytes/second
750-
expected := 5 * 1073741824
752+
expected := byte_rate.ByteRate(5 * 1073741824)
751753
assert.Equal(t, expected, cfg.Origin.TransferRateLimit, "Should decode rate set via viper.Set")
752754
})
753755

@@ -763,8 +765,8 @@ func TestByteRateDecoding(t *testing.T) {
763765

764766
// Accessor should work even without explicit config creation
765767
// because getOrCreateConfig will create it
766-
rateLimit := Origin_TransferRateLimit.GetInt()
767-
expected := 50 * 1048576
768+
rateLimit := Origin_TransferRateLimit.GetByteRate()
769+
expected := byte_rate.ByteRate(50 * 1048576)
768770
assert.Equal(t, expected, rateLimit, "Accessor should return correct byte rate value")
769771

770772
// Verify config was created and stored

0 commit comments

Comments
 (0)