Skip to content

Commit db29e55

Browse files
mahdy-nasrtsahee
andauthored
Make compression level adaptive and configurable #NIT-4153 (#4145)
* make compression level adaptive and configurable * fix some issues * fix tests * Add changelog fragment for NIT-4153 * add the new configs in the changelog --------- Co-authored-by: Tsahi Zidenberg <[email protected]>
1 parent 670d1bf commit db29e55

File tree

6 files changed

+411
-24
lines changed

6 files changed

+411
-24
lines changed

arbnode/batch_poster.go

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,12 @@ type BatchPosterConfig struct {
173173
// Batch post polling interval.
174174
PollInterval time.Duration `koanf:"poll-interval" reload:"hot"`
175175
// Batch posting error delay.
176-
ErrorDelay time.Duration `koanf:"error-delay" reload:"hot"`
177-
CompressionLevel int `koanf:"compression-level" reload:"hot"`
176+
ErrorDelay time.Duration `koanf:"error-delay" reload:"hot"`
177+
// Deprecated: use CompressionLevels instead. This sets a single compression level for all backlog levels.
178+
CompressionLevel int `koanf:"compression-level" reload:"hot"`
179+
// CompressionLevels defines adaptive compression based on backlog. Each entry specifies the
180+
// compression level and recompression level to use when backlog >= the entry's backlog threshold.
181+
CompressionLevels CompressionLevelStepList `koanf:"compression-levels" reload:"hot"`
178182
AnyTrustRetentionPeriod time.Duration `koanf:"anytrust-retention-period" reload:"hot"`
179183
GasRefunderAddress string `koanf:"gas-refunder-address" reload:"hot"`
180184
DataPoster dataposter.DataPosterConfig `koanf:"data-poster" reload:"hot"`
@@ -231,6 +235,12 @@ func (c *BatchPosterConfig) Validate() error {
231235
} else {
232236
return fmt.Errorf("invalid L1 block bound tag \"%v\" (see --help for options)", c.L1BlockBound)
233237
}
238+
// Resolve compression levels from deprecated and new config fields
239+
resolved, err := ResolveCompressionLevels(c.CompressionLevel, c.CompressionLevels)
240+
if err != nil {
241+
return err
242+
}
243+
c.CompressionLevels = resolved
234244
return nil
235245
}
236246

@@ -252,7 +262,11 @@ func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
252262
f.Bool(prefix+".wait-for-max-delay", DefaultBatchPosterConfig.WaitForMaxDelay, "wait for the max batch delay, even if the batch is full")
253263
f.Duration(prefix+".poll-interval", DefaultBatchPosterConfig.PollInterval, "how long to wait after no batches are ready to be posted before checking again")
254264
f.Duration(prefix+".error-delay", DefaultBatchPosterConfig.ErrorDelay, "how long to delay after error posting batch")
255-
f.Int(prefix+".compression-level", DefaultBatchPosterConfig.CompressionLevel, "batch compression level")
265+
f.Int(prefix+".compression-level", DefaultBatchPosterConfig.CompressionLevel, "DEPRECATED: use compression-levels instead. batch compression level")
266+
f.Var(&parsedCompressionLevelsConf, prefix+".compression-levels",
267+
`JSON array of compression level steps. Format: [{"backlog":<int>,"level":<int>,"recompression-level":<int>},...]. `+
268+
`First entry must have backlog:0. Both Level and recomp-level must be 0-11, weakly descending. `+
269+
`Example: [{"backlog":0,"level":11,"recompression-level":11},{"backlog":21,"level":6,"recompression-level":11}]`)
256270
f.Duration(prefix+".anytrust-retention-period", DefaultBatchPosterConfig.AnyTrustRetentionPeriod, "In AnyTrust mode, the period which AnyTrust nodes are requested to retain the stored batches.")
257271
f.String(prefix+".gas-refunder-address", DefaultBatchPosterConfig.GasRefunderAddress, "The gas refunder contract address (optional)")
258272
f.Uint64(prefix+".extra-batch-gas", DefaultBatchPosterConfig.ExtraBatchGas, "use this much more gas than estimation says is necessary to post batches")
@@ -290,7 +304,6 @@ var DefaultBatchPosterConfig = BatchPosterConfig{
290304
ErrorDelay: time.Second * 10,
291305
MaxDelay: time.Hour,
292306
WaitForMaxDelay: false,
293-
CompressionLevel: brotli.BestCompression,
294307
AnyTrustRetentionPeriod: daprovider.DefaultAnyTrustRetentionPeriod,
295308
GasRefunderAddress: "",
296309
ExtraBatchGas: 50_000,
@@ -329,7 +342,8 @@ var TestBatchPosterConfig = BatchPosterConfig{
329342
ErrorDelay: time.Millisecond * 10,
330343
MaxDelay: 0,
331344
WaitForMaxDelay: false,
332-
CompressionLevel: 2,
345+
CompressionLevel: 0,
346+
CompressionLevels: CompressionLevelStepList{{Backlog: 0, Level: 2, RecompressionLevel: 2}},
333347
AnyTrustRetentionPeriod: daprovider.DefaultAnyTrustRetentionPeriod,
334348
GasRefunderAddress: "",
335349
ExtraBatchGas: 10_000,
@@ -967,25 +981,17 @@ func (b *BatchPoster) newBatchSegments(ctx context.Context, firstDelayed uint64,
967981
maxSize -= SequencerMessageHeaderSize
968982
}
969983
compressedBuffer := bytes.NewBuffer(make([]byte, 0, maxSize*2))
970-
compressionLevel := b.config().CompressionLevel
971-
recompressionLevel := b.config().CompressionLevel
972-
if b.GetBacklogEstimate() > 20 {
973-
compressionLevel = arbmath.MinInt(compressionLevel, brotli.DefaultCompression)
974-
}
975-
if b.GetBacklogEstimate() > 40 {
976-
recompressionLevel = arbmath.MinInt(recompressionLevel, brotli.DefaultCompression)
977-
}
978-
if b.GetBacklogEstimate() > 60 {
979-
compressionLevel = arbmath.MinInt(compressionLevel, 4)
980-
}
981-
if recompressionLevel < compressionLevel {
982-
// This should never be possible
983-
log.Warn(
984-
"somehow the recompression level was lower than the compression level",
985-
"recompressionLevel", recompressionLevel,
986-
"compressionLevel", compressionLevel,
987-
)
988-
recompressionLevel = compressionLevel
984+
// Determine compression levels based on backlog using configured steps
985+
compressionLevel := config.CompressionLevels[0].Level
986+
recompressionLevel := config.CompressionLevels[0].RecompressionLevel
987+
backlog := b.GetBacklogEstimate()
988+
for _, step := range config.CompressionLevels {
989+
if backlog >= step.Backlog {
990+
compressionLevel = step.Level
991+
recompressionLevel = step.RecompressionLevel
992+
} else {
993+
break
994+
}
989995
}
990996
return &batchSegments{
991997
compressedBuffer: compressedBuffer,

arbnode/compression_level.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
// Copyright 2021-2022, Offchain Labs, Inc.
2+
// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md
3+
4+
package arbnode
5+
6+
import (
7+
"encoding/json"
8+
"errors"
9+
"fmt"
10+
11+
"github.com/andybalholm/brotli"
12+
"github.com/knadh/koanf"
13+
"github.com/knadh/koanf/providers/confmap"
14+
15+
"github.com/offchainlabs/nitro/util/arbmath"
16+
)
17+
18+
// CompressionLevelStep defines compression levels to use at a given backlog threshold.
19+
type CompressionLevelStep struct {
20+
Backlog uint64 `koanf:"backlog" json:"backlog"`
21+
Level int `koanf:"level" json:"level"`
22+
RecompressionLevel int `koanf:"recompression-level" json:"recompression-level"`
23+
}
24+
25+
// CompressionLevelStepList is a list of compression level steps for configuring
26+
// adaptive compression based on batch backlog.
27+
type CompressionLevelStepList []CompressionLevelStep
28+
29+
func (l *CompressionLevelStepList) Set(jsonStr string) error {
30+
return l.UnmarshalJSON([]byte(jsonStr))
31+
}
32+
33+
func (l *CompressionLevelStepList) String() string {
34+
b, _ := json.Marshal(l)
35+
return string(b)
36+
}
37+
38+
func (l *CompressionLevelStepList) UnmarshalJSON(data []byte) error {
39+
var tmp []CompressionLevelStep
40+
if err := json.Unmarshal(data, &tmp); err != nil {
41+
return err
42+
}
43+
*l = tmp
44+
return nil
45+
}
46+
47+
func (*CompressionLevelStepList) Type() string {
48+
return "CompressionLevelStepList"
49+
}
50+
51+
// Validate checks that the compression level steps are valid:
52+
// - Must have at least one entry
53+
// - First entry must have backlog: 0
54+
// - Backlog thresholds must be strictly ascending
55+
// - Level and RecompressionLevel must be weakly descending (non-increasing)
56+
// - RecompressionLevel must be >= Level within each entry
57+
// - All levels must be in valid range: 0-11
58+
func (l CompressionLevelStepList) Validate() error {
59+
if len(l) == 0 {
60+
return errors.New("compression-levels must have at least one entry")
61+
}
62+
if l[0].Backlog != 0 {
63+
return errors.New("first compression-levels entry must have backlog: 0")
64+
}
65+
for i, step := range l {
66+
if step.Level < 0 || step.Level > 11 {
67+
return fmt.Errorf("compression-levels[%d].level must be 0-11, got %d", i, step.Level)
68+
}
69+
if step.RecompressionLevel < 0 || step.RecompressionLevel > 11 {
70+
return fmt.Errorf("compression-levels[%d].recompression-level must be 0-11, got %d", i, step.RecompressionLevel)
71+
}
72+
if step.RecompressionLevel < step.Level {
73+
return fmt.Errorf("compression-levels[%d].recompression-level (%d) must be >= level (%d)", i, step.RecompressionLevel, step.Level)
74+
}
75+
if i > 0 {
76+
if step.Backlog <= l[i-1].Backlog {
77+
return fmt.Errorf("compression-levels[%d].backlog must be > compression-levels[%d].backlog", i, i-1)
78+
}
79+
if step.Level > l[i-1].Level {
80+
return fmt.Errorf("compression-levels[%d].level must be <= compression-levels[%d].level (weakly descending)", i, i-1)
81+
}
82+
if step.RecompressionLevel > l[i-1].RecompressionLevel {
83+
return fmt.Errorf("compression-levels[%d].recompression-level must be <= compression-levels[%d].recompression-level (weakly descending)", i, i-1)
84+
}
85+
}
86+
}
87+
return nil
88+
}
89+
90+
var parsedCompressionLevelsConf CompressionLevelStepList
91+
92+
// FixCompressionLevelsCLIParsing decode compression-levels json CLI ARG
93+
func FixCompressionLevelsCLIParsing(path string, k *koanf.Koanf) error {
94+
raw := k.Get(path)
95+
if jsonStr, ok := raw.(string); ok {
96+
if err := parsedCompressionLevelsConf.Set(jsonStr); err != nil {
97+
98+
return err
99+
}
100+
tempMap := map[string]interface{}{path: parsedCompressionLevelsConf}
101+
if err := k.Load(confmap.Provider(tempMap, "."), nil); err != nil {
102+
return err
103+
}
104+
} else {
105+
return fmt.Errorf("CompressionLevels config not found in %s", path)
106+
}
107+
return nil
108+
}
109+
110+
// DefaultCompressionLevels replicates the previous hardcoded adaptive compression behavior:
111+
var DefaultCompressionLevels = CompressionLevelStepList{
112+
{Backlog: 0, Level: brotli.BestCompression, RecompressionLevel: brotli.BestCompression},
113+
{Backlog: 21, Level: brotli.DefaultCompression, RecompressionLevel: brotli.BestCompression},
114+
{Backlog: 41, Level: brotli.DefaultCompression, RecompressionLevel: brotli.DefaultCompression},
115+
{Backlog: 61, Level: 4, RecompressionLevel: brotli.DefaultCompression},
116+
}
117+
118+
// ResolveCompressionLevels resolves the compression configuration from deprecated and new fields.
119+
// Returns error if both are set. Converts deprecated format to new format if needed.
120+
func ResolveCompressionLevels(compressionLevel int, compressionLevels CompressionLevelStepList) (CompressionLevelStepList, error) {
121+
// Check for conflict: both deprecated and new config set
122+
if compressionLevel > 0 && len(compressionLevels) > 0 {
123+
return nil, errors.New("cannot specify both compression-level (deprecated) and compression-levels; use only compression-levels")
124+
}
125+
126+
// Return DefaultCompressionLevels if both (compressionLevel and compressionLevels ) are not set
127+
if len(compressionLevels) == 0 && compressionLevel == 0 {
128+
return DefaultCompressionLevels, nil
129+
}
130+
131+
// If new config is set, validate and return it
132+
if len(compressionLevels) > 0 {
133+
if err := compressionLevels.Validate(); err != nil {
134+
return nil, fmt.Errorf("invalid compression-levels: %w", err)
135+
}
136+
return compressionLevels, nil
137+
}
138+
139+
// Convert deprecated `compressionLevel` config to new format of compressionLevels
140+
resolved := CompressionLevelStepList{
141+
{Backlog: 0, Level: compressionLevel, RecompressionLevel: compressionLevel},
142+
{Backlog: 21, Level: arbmath.MinInt(compressionLevel, brotli.DefaultCompression), RecompressionLevel: compressionLevel},
143+
{Backlog: 41, Level: arbmath.MinInt(compressionLevel, brotli.DefaultCompression), RecompressionLevel: arbmath.MinInt(compressionLevel, brotli.DefaultCompression)},
144+
{Backlog: 61, Level: arbmath.MinInt(compressionLevel, 4), RecompressionLevel: arbmath.MinInt(compressionLevel, brotli.DefaultCompression)},
145+
}
146+
147+
if err := resolved.Validate(); err != nil {
148+
return nil, fmt.Errorf("invalid compression-levels derived from compression-level: %w", err)
149+
}
150+
return resolved, nil
151+
}

0 commit comments

Comments
 (0)