Skip to content

Commit 3a4113e

Browse files
ZPascalchombium
authored andcommitted
Make ingress buffer parameters configurable
This change exposes the ingress buffer configuration parameters to the outside world. Sometimes, depending on the scaling of Log Cache, the number of sources (CF applications and platform components) and the log load, ingress drops may occur when Log Cache nodes send items between each other. The Log Cache `ingress_dropped` metric should be monitored, to make sure that there are no drops. For such cases, the following three parameters can be adjusted until the log loss is gone. - Ingress Buffer Size - The ingress buffer (diode) size in number of items used when LogCache nodes send items between each other. The default size is 10000. Can be increased when ingress drops occur. - Ingress Buffer Read Batch Size - The ingress buffer read batch size in number of items. The size of the ingress buffer read batch used when LogCache nodes send items between each other. The default size is 100. Can be increased when ingress drops occur. - Ingress Buffer Read Batch Interval - The ingress buffer read interval in milliseconds. The default value is 250. Can be increased when ingress drops occur.
1 parent 192b676 commit 3a4113e

File tree

9 files changed

+111
-29
lines changed

9 files changed

+111
-29
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ Memory limit - Increasing memory limit allows for more storage, but may cause ou
4343

4444
Larger CPUs - Increasing the CPU budget per instance should allow higher throughput
4545

46+
Sometimes, depending on the scaling of Log Cache, the number of sources (CF applications and platform components) and the log load, ingress drops may occur when Log Cache nodes send items between each other. The Log Cache `ingress_dropped` metric should be monitored, to make sure that there are no drops. For such cases, the following three parameters can be adjusted until the log loss is gone.
47+
48+
- Ingress Buffer Size - The ingress buffer (diode) size in number of items used when LogCache nodes send items between each other. The default size is 10000. Can be increased when ingress drops occur.
49+
- Ingress Buffer Read Batch Size - The ingress buffer read batch size in number of items. The size of the ingress buffer read batch used when LogCache nodes send items between each other. The default size is 100. Can be increased when ingress drops occur.
50+
- Ingress Buffer Read Batch Interval - The ingress buffer read interval in milliseconds. The default value is 250. Can be increased when ingress drops occur.
51+
52+
4653
Log Cache is known to exceed memory limits under high throughput/stress. If you see your log-cache reaching higher memory
4754
then you have set, you might want to scale your log-cache up. Either solely in terms of CPU per instance, or more instances.
4855

jobs/log-cache/spec

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,18 @@ properties:
4747
description: "The maximum number of items stored in LogCache per source."
4848
default: 100000
4949

50+
ingress_buffer_size:
51+
description: "The ingress buffer (diode) size in number of items. The size of the ingress buffer used when LogCache nodes send items between each other. In some cases, the buffer size must be risen in order to avoid ingress drops. Must be an integer."
52+
default: 10000
53+
54+
ingress_buffer_read_batch_size:
55+
description: "The ingress buffer read batch size in number of items. The size of the ingress buffer read batch used when LogCache nodes send envelopes between each other. In some cases, the batch size must be risen in order to make place in the buffer faster and avoid ingress drops. Must be an integer."
56+
default: 100
57+
58+
ingress_buffer_read_batch_interval:
59+
description: "The ingress buffer read batch interval in milliseconds. The interval in which the data will be read out from the ingress buffer used when LogCache nodes send envelopes between each other. In some cases, the batch read interval must be adjusted in order to make place in the buffer faster and avoid ingress drops. Must be an integer."
60+
default: 250
61+
5062
truncation_interval:
5163
description: "The amount of time between log-cache checking if it needs to prune"
5264
default: "1s"

jobs/log-cache/templates/bpm.yml.erb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ processes:
2626
ADDR: "<%= ":#{p('port')}" %>"
2727
MEMORY_LIMIT_PERCENT: "<%= p('memory_limit_percent') %>"
2828
MAX_PER_SOURCE: "<%= p('max_per_source') %>"
29+
INGRESS_BUFFER_SIZE: "<%= p('ingress_buffer_size') %>"
30+
INGRESS_BUFFER_READ_BATCH_SIZE: "<%= p('ingress_buffer_read_batch_size') %>"
31+
INGRESS_BUFFER_READ_BATCH_INTERVAL: "<%= p('ingress_buffer_read_batch_interval') %>"
2932
QUERY_TIMEOUT: "<%= p('promql.query_timeout') %>"
3033
TRUNCATION_INTERVAL: "<%= p('truncation_interval') %>"
3134
PRUNES_PER_GC: "<%= p('prunes_per_gc') %>"

src/cmd/log-cache/config.go

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,24 @@ type Config struct {
5757
// assumed that the current node is the only one.
5858
NodeAddrs []string `env:"NODE_ADDRS, report"`
5959

60+
// IngressBufferSize sets the size of the ingress buffer(diode) in number of items,
61+
// used when LogCache nodes send items between each other. Depending on the Log
62+
// Envelope Load in some cases it might be useful to raise the size in order to avoid
63+
// ingress drops. Default is 10000.
64+
IngressBufferSize int `env:"INGRESS_BUFFER_SIZE, report"`
65+
66+
// IngressBufferReadBatchSize sets the size of the ingress buffer(diode) read batch in number of items,
67+
// used when used when LogCache nodes send items between each other. Depending on the Log
68+
// Envelope Load in some cases it might be useful to raise the size in order to avoid
69+
// ingress drops. Default is 100.
70+
IngressBufferReadBatchSize int `env:"INGRESS_BUFFER_READ_BATCH_SIZE, report"`
71+
72+
// IngressBufferReadBatchInterval sets the interval in which the items will be read out from the ingress buffer(diode)
73+
// in milliseconds, used when used when LogCache nodes send items between each other. Depending on the Log
74+
// Envelope Load in some cases it might be useful to adjust the interval in order to avoid
75+
// ingress drops. Default is 250.
76+
IngressBufferReadBatchInterval int `env:"INGRESS_BUFFER_READ_BATCH_INTERVAL, report"`
77+
6078
TLS tls.TLS
6179
MetricsServer config.MetricsServer
6280
UseRFC339 bool `env:"USE_RFC339"`
@@ -65,12 +83,15 @@ type Config struct {
6583
// LoadConfig creates Config object from environment variables
6684
func LoadConfig() (*Config, error) {
6785
c := Config{
68-
Addr: ":8080",
69-
QueryTimeout: 10 * time.Second,
70-
MemoryLimitPercent: 50,
71-
MaxPerSource: 100000,
72-
TruncationInterval: 1 * time.Second,
73-
PrunesPerGC: int64(3),
86+
Addr: ":8080",
87+
QueryTimeout: 10 * time.Second,
88+
MemoryLimitPercent: 50,
89+
MaxPerSource: 100000,
90+
TruncationInterval: 1 * time.Second,
91+
PrunesPerGC: int64(3),
92+
IngressBufferSize: 10000,
93+
IngressBufferReadBatchSize: 100,
94+
IngressBufferReadBatchInterval: 250,
7495
MetricsServer: config.MetricsServer{
7596
Port: 6060,
7697
},

src/cmd/log-cache/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ func main() {
9797
WithQueryTimeout(cfg.QueryTimeout),
9898
WithTruncationInterval(cfg.TruncationInterval),
9999
WithPrunesPerGC(cfg.PrunesPerGC),
100+
WithIngressBufferSize(cfg.IngressBufferSize),
101+
WithIngressBufferReadBatchSize(cfg.IngressBufferReadBatchSize),
102+
WithIngressBufferReadBatchInterval(cfg.IngressBufferReadBatchInterval),
100103
}
101104
var transport grpc.DialOption
102105
if cfg.TLS.HasAnyCredential() {

src/internal/cache/log_cache.go

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,15 @@ type LogCache struct {
3737
metrics Metrics
3838
closing int64
3939

40-
maxPerSource int
41-
memoryLimitPercent float64
42-
memoryLimit uint64
43-
queryTimeout time.Duration
44-
truncationInterval time.Duration
45-
prunesPerGC int64
40+
maxPerSource int
41+
memoryLimitPercent float64
42+
memoryLimit uint64
43+
queryTimeout time.Duration
44+
truncationInterval time.Duration
45+
prunesPerGC int64
46+
ingressBufferSize int
47+
ingressBufferReadBatchSize int
48+
ingressBufferReadBatchInterval int
4649

4750
// Cluster Properties
4851
addr string
@@ -60,13 +63,16 @@ type LogCache struct {
6063
// NewLogCache creates a new LogCache.
6164
func New(m Metrics, logger *log.Logger, opts ...LogCacheOption) *LogCache {
6265
cache := &LogCache{
63-
log: logger,
64-
metrics: m,
65-
maxPerSource: 100000,
66-
memoryLimitPercent: 50,
67-
queryTimeout: 10 * time.Second,
68-
truncationInterval: 1 * time.Second,
69-
prunesPerGC: int64(3),
66+
log: logger,
67+
metrics: m,
68+
maxPerSource: 100000,
69+
ingressBufferSize: 10000,
70+
ingressBufferReadBatchSize: 100,
71+
ingressBufferReadBatchInterval: 250,
72+
memoryLimitPercent: 50,
73+
queryTimeout: 10 * time.Second,
74+
truncationInterval: 1 * time.Second,
75+
prunesPerGC: int64(3),
7076

7177
addr: ":8080",
7278
dialOpts: []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())},
@@ -95,6 +101,33 @@ func WithMaxPerSource(size int) LogCacheOption {
95101
}
96102
}
97103

104+
// WithIngressBufferSize returns a LogCacheOption that configures the size
105+
// of the batched input buffer size as number of envelopes. Defaults to
106+
// 10000 envelopes
107+
func WithIngressBufferSize(size int) LogCacheOption {
108+
return func(c *LogCache) {
109+
c.ingressBufferSize = size
110+
}
111+
}
112+
113+
// WithIngressBufferReadBatchSize returns a LogCacheOption that configures size
114+
// of the ingress buffer(diode) read batch in number of envelopes.
115+
// Defaults to 100 envelopes
116+
func WithIngressBufferReadBatchSize(size int) LogCacheOption {
117+
return func(c *LogCache) {
118+
c.ingressBufferReadBatchSize = size
119+
}
120+
}
121+
122+
// WithIngressBufferReadBatchInterval returns a LogCacheOption that configures read interval
123+
// of the ingress buffer(diode) in milliseconds.
124+
// Defaults to 100 envelopes
125+
func WithIngressBufferReadBatchInterval(interval int) LogCacheOption {
126+
return func(c *LogCache) {
127+
c.ingressBufferReadBatchInterval = interval
128+
}
129+
}
130+
98131
// WithTruncationInterval returns a LogCacheOption that configures the
99132
// interval in ms on the store's truncation loop. Defaults to 1s.
100133
func WithTruncationInterval(interval time.Duration) LogCacheOption {
@@ -225,8 +258,9 @@ func (c *LogCache) setupRouting(s *store.Store) {
225258
}
226259

227260
bw := routing.NewBatchedIngressClient(
228-
100,
229-
250*time.Millisecond,
261+
c.ingressBufferReadBatchSize,
262+
c.ingressBufferSize,
263+
time.Duration(c.ingressBufferReadBatchInterval)*time.Millisecond,
230264
logcache_v1.NewIngressClient(conn),
231265
c.metrics.NewCounter(
232266
"ingress_dropped",

src/internal/routing/batched_ingress_client.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type BatchedIngressClient struct {
2020
c rpc.IngressClient
2121

2222
buffer *diodes.OneToOne
23-
size int
23+
batchSize int
2424
interval time.Duration
2525
log *log.Logger
2626
sendFailureMetric metrics.Counter
@@ -36,7 +36,8 @@ func WithLocalOnlyDisabled(b *BatchedIngressClient) {
3636

3737
// NewBatchedIngressClient returns a new BatchedIngressClient.
3838
func NewBatchedIngressClient(
39-
size int,
39+
batchSize int,
40+
bufferSize int,
4041
interval time.Duration,
4142
c rpc.IngressClient,
4243
droppedMetric metrics.Counter,
@@ -46,13 +47,13 @@ func NewBatchedIngressClient(
4647
) *BatchedIngressClient {
4748
b := &BatchedIngressClient{
4849
c: c,
49-
size: size,
50+
batchSize: batchSize,
5051
interval: interval,
5152
log: log,
5253
sendFailureMetric: sendFailureMetric,
5354
localOnly: true,
5455

55-
buffer: diodes.NewOneToOne(10000, diodes.AlertFunc(func(dropped int) {
56+
buffer: diodes.NewOneToOne(bufferSize, diodes.AlertFunc(func(dropped int) {
5657
log.Printf("dropped %d envelopes", dropped)
5758
droppedMetric.Add(float64(dropped))
5859
})),
@@ -77,7 +78,7 @@ func (b *BatchedIngressClient) Send(ctx context.Context, in *rpc.SendRequest, op
7778
}
7879

7980
func (b *BatchedIngressClient) start() {
80-
batcher := batching.NewBatcher(b.size, b.interval, batching.WriterFunc(b.write))
81+
batcher := batching.NewBatcher(b.batchSize, b.interval, batching.WriterFunc(b.write))
8182
for {
8283
e, ok := b.buffer.TryNext()
8384
if !ok {

src/internal/routing/batched_ingress_client_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ var _ = Describe("BatchedIngressClient", func() {
2929
m = testhelpers.NewMetricsRegistry()
3030
spyDropped = m.NewCounter("nodeX_dropped", "some help text")
3131
ingressClient = newSpyIngressClient()
32-
c = routing.NewBatchedIngressClient(5, time.Hour, ingressClient, spyDropped, m.NewCounter("send_failure", "some help text"), log.New(io.Discard, "", 0))
32+
c = routing.NewBatchedIngressClient(5, 10, time.Hour, ingressClient, spyDropped, m.NewCounter("send_failure", "some help text"), log.New(io.Discard, "", 0))
3333
})
3434

3535
It("sends envelopes by batches because of size", func() {
@@ -49,7 +49,7 @@ var _ = Describe("BatchedIngressClient", func() {
4949
})
5050

5151
It("sends envelopes by batches because of interval", func() {
52-
c = routing.NewBatchedIngressClient(5, time.Microsecond, ingressClient, spyDropped, m.NewCounter("send_failure", "some help text"), log.New(io.Discard, "", 0))
52+
c = routing.NewBatchedIngressClient(5, 10, time.Microsecond, ingressClient, spyDropped, m.NewCounter("send_failure", "some help text"), log.New(io.Discard, "", 0))
5353
_, err := c.Send(context.Background(), &rpc.SendRequest{
5454
Envelopes: &loggregator_v2.EnvelopeBatch{
5555
Batch: []*loggregator_v2.Envelope{
@@ -117,6 +117,7 @@ var _ = Describe("BatchedIngressClient", func() {
117117
It("sends envelopes with LocalOnly false with option", func() {
118118
c = routing.NewBatchedIngressClient(
119119
5,
120+
10,
120121
time.Hour,
121122
ingressClient,
122123
spyDropped,

src/vendor/code.cloudfoundry.org/go-batching/batcher.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)