Skip to content

Commit 1b84fd9

Browse files
committed
MEDIUM: maps: parallelize writing of maps to haproxy and fs
1 parent 2ddac90 commit 1b84fd9

File tree

6 files changed

+279
-28
lines changed

6 files changed

+279
-28
lines changed

.aspell.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,4 @@ allowed:
3131
- passthrough
3232
- ssl
3333
- unix
34+
- parallelize

pkg/controller/controller.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/haproxytech/client-native/v5/models"
2525
"github.com/haproxytech/kubernetes-ingress/pkg/annotations"
26+
"github.com/haproxytech/kubernetes-ingress/pkg/fs"
2627
gateway "github.com/haproxytech/kubernetes-ingress/pkg/gateways"
2728
"github.com/haproxytech/kubernetes-ingress/pkg/haproxy"
2829
"github.com/haproxytech/kubernetes-ingress/pkg/haproxy/instance"
@@ -196,7 +197,10 @@ func (c *HAProxyController) updateHAProxy() {
196197
c.setToReady()
197198
}
198199

200+
fs.Writer.WaitUntilWritesDone()
201+
199202
if instance.NeedReload() {
203+
fs.RunDelayedFuncs()
200204
if err = c.haproxy.Service("reload"); err != nil {
201205
logger.Error(err)
202206
} else {

pkg/fs/fs-delayed.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright 2019 HAProxy Technologies LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package fs
16+
17+
import (
18+
"sync"
19+
)
20+
21+
var (
22+
delayedFunc map[string]func()
23+
muDelayed sync.Mutex
24+
)
25+
var delayedWriter = New()
26+
27+
// AddDelayedFunc adds a function to be called prior to restarting of HAProxy
28+
func AddDelayedFunc(name string, f func()) {
29+
muDelayed.Lock()
30+
defer muDelayed.Unlock()
31+
if delayedFunc == nil {
32+
delayedFunc = make(map[string]func())
33+
}
34+
delayedFunc[name] = f
35+
}
36+
37+
func RunDelayedFuncs() {
38+
muDelayed.Lock()
39+
defer muDelayed.Unlock()
40+
if delayedFunc == nil {
41+
return
42+
}
43+
for _, f := range delayedFunc {
44+
delayedWriter.Write(f)
45+
}
46+
clear(delayedFunc)
47+
delayedWriter.WaitUntilWritesDone()
48+
}

pkg/fs/fs.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright 2019 HAProxy Technologies LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package fs
16+
17+
import (
18+
"sync"
19+
)
20+
21+
const FS_WRITE_LIMIT = 20 //nolint:stylecheck
22+
23+
var Writer = New()
24+
25+
type writer struct {
26+
writeLimiter chan struct{}
27+
wg *sync.WaitGroup
28+
mu *sync.Mutex
29+
}
30+
31+
// New creates new writer that will parallelize fs writes
32+
func New() writer {
33+
w := writer{
34+
writeLimiter: make(chan struct{}, FS_WRITE_LIMIT),
35+
wg: &sync.WaitGroup{},
36+
mu: &sync.Mutex{},
37+
}
38+
return w
39+
}
40+
41+
// Write ensures function to be executed in a separate goroutine
42+
// this also ensures that we do not put to much pressure on the FS
43+
// while still allowing some parallelization
44+
//
45+
// NOTE: this will block calling of WaitUntilWritesDone
46+
func (w *writer) Write(writeFunc func()) {
47+
w.mu.Lock()
48+
defer w.mu.Unlock()
49+
w.wg.Add(1)
50+
go func() {
51+
defer func() {
52+
<-w.writeLimiter
53+
w.wg.Done()
54+
}()
55+
w.writeLimiter <- struct{}{}
56+
writeFunc()
57+
}()
58+
}
59+
60+
// WaitUntilWritesDone waits for all fs writes to complete.
61+
//
62+
// NOTE: this will block calling of Write
63+
func (w *writer) WaitUntilWritesDone() {
64+
w.mu.Lock()
65+
defer w.mu.Unlock()
66+
w.wg.Wait()
67+
}

pkg/fs/fs_test.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
// Copyright 2019 HAProxy Technologies LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package fs
16+
17+
import (
18+
"sync/atomic"
19+
"testing"
20+
"time"
21+
22+
"github.com/stretchr/testify/assert"
23+
)
24+
25+
func TestWrite(t *testing.T) {
26+
var counter atomic.Int32
27+
w := New()
28+
w.Write(func() {
29+
counter.Add(1)
30+
})
31+
w.Write(func() {
32+
time.Sleep(time.Second)
33+
counter.Add(1)
34+
})
35+
w.WaitUntilWritesDone()
36+
37+
assert.Equal(t, int32(2), counter.Load())
38+
}
39+
40+
func TestWriteSecondTime(t *testing.T) {
41+
var counter atomic.Int32
42+
w := New()
43+
w.Write(func() {
44+
counter.Add(1)
45+
})
46+
w.Write(func() {
47+
counter.Add(1)
48+
})
49+
w.WaitUntilWritesDone()
50+
51+
assert.Equal(t, int32(2), counter.Load())
52+
53+
counter.Store(0)
54+
w.Write(func() {
55+
time.Sleep(time.Second)
56+
counter.Add(1)
57+
})
58+
w.Write(func() {
59+
counter.Add(1)
60+
})
61+
w.WaitUntilWritesDone()
62+
63+
assert.Equal(t, int32(2), counter.Load())
64+
}
65+
66+
func TestWriteSecondQueue(t *testing.T) {
67+
var counter atomic.Int32
68+
w := New()
69+
numWrites := int32(FS_WRITE_LIMIT*2 + 2)
70+
71+
for range numWrites {
72+
w.Write(func() {
73+
counter.Add(1)
74+
})
75+
}
76+
w.WaitUntilWritesDone()
77+
78+
assert.Equal(t, numWrites, counter.Load())
79+
80+
counter.Store(0)
81+
for range numWrites {
82+
w.Write(func() {
83+
counter.Add(1)
84+
})
85+
}
86+
w.WaitUntilWritesDone()
87+
88+
assert.Equal(t, numWrites, counter.Load())
89+
}
90+
91+
func TestWriteSecondQueueTime(t *testing.T) {
92+
var counter atomic.Int32
93+
w := New()
94+
numWrites := FS_WRITE_LIMIT*2 + 2
95+
96+
start := time.Now()
97+
for range numWrites {
98+
w.Write(func() {
99+
time.Sleep(time.Second)
100+
counter.Add(1)
101+
})
102+
}
103+
w.WaitUntilWritesDone()
104+
diffTime := time.Since(start)
105+
106+
if counter.Load() != int32(numWrites) {
107+
t.Errorf("expected %d writes, got %d", numWrites, counter.Load())
108+
}
109+
110+
numSeconds := numWrites/FS_WRITE_LIMIT + 1
111+
112+
assert.Less(t, time.Second*time.Duration(numSeconds), diffTime)
113+
assert.Less(t, diffTime, time.Second*time.Duration(numSeconds+1))
114+
}

pkg/haproxy/maps/main.go

Lines changed: 45 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"strings"
2424

2525
"github.com/google/renameio"
26+
"github.com/haproxytech/kubernetes-ingress/pkg/fs"
2627
"github.com/haproxytech/kubernetes-ingress/pkg/haproxy/api"
2728
"github.com/haproxytech/kubernetes-ingress/pkg/haproxy/instance"
2829
"github.com/haproxytech/kubernetes-ingress/pkg/utils"
@@ -117,36 +118,52 @@ func (m mapFiles) RefreshMaps(client api.HAProxyClient) {
117118
if mapFile.hash == hash {
118119
continue
119120
}
120-
var f *os.File
121-
var err error
122-
filename := GetPath(name)
123-
if len(content) == 0 && !mapFile.persistent {
124-
logger.Error(os.Remove(string(filename)))
125-
delete(m, name)
126-
continue
127-
} else if f, err = os.Create(string(filename)); err != nil {
128-
logger.Error(err)
129-
continue
130-
}
131-
f.Close()
132-
var buff strings.Builder
133-
buff.Grow(api.BufferSize * len(content))
134-
for _, d := range content {
135-
buff.WriteString(d)
136-
}
137-
err = renameio.WriteFile(string(filename), []byte(buff.String()), 0o666)
138-
if err != nil {
139-
logger.Error(err)
140-
continue
141-
}
142-
mapFile.hash = hash
143-
if err = client.SetMapContent(string(name), content); err != nil {
144-
if errors.Is(err, api.ErrMapNotFound) {
145-
instance.Reload("Map file %s created", string(name))
121+
// parallelize writing of files
122+
fs.Writer.Write(func() {
123+
var err error
124+
filename := GetPath(name)
125+
if len(content) == 0 && !mapFile.persistent {
126+
fs.AddDelayedFunc(string(filename), func() {
127+
logger.Error(os.Remove(string(filename)))
128+
})
129+
delete(m, name)
130+
return
146131
} else {
147-
instance.Reload("Runtime update of map file '%s' failed : %s", string(name), err.Error())
132+
if _, err = os.Stat(string(filename)); err != nil {
133+
if os.IsNotExist(err) {
134+
err = renameio.WriteFile(string(filename), []byte{}, 0o666)
135+
if err != nil {
136+
logger.Error(err)
137+
return
138+
}
139+
} else {
140+
logger.Error(err)
141+
return
142+
}
143+
}
148144
}
149-
}
145+
var buff strings.Builder
146+
buff.Grow(api.BufferSize * len(content))
147+
for _, d := range content {
148+
buff.WriteString(d)
149+
}
150+
fs.AddDelayedFunc(string(filename), func() {
151+
err = renameio.WriteFile(string(filename), []byte(buff.String()), 0o666)
152+
if err != nil {
153+
logger.Error(err)
154+
return
155+
}
156+
})
157+
158+
mapFile.hash = hash
159+
if err = client.SetMapContent(string(name), content); err != nil {
160+
if errors.Is(err, api.ErrMapNotFound) {
161+
instance.Reload("Map file %s created", string(name))
162+
} else {
163+
instance.Reload("Runtime update of map file '%s' failed : %s", string(name), err.Error())
164+
}
165+
}
166+
})
150167
}
151168
}
152169

0 commit comments

Comments
 (0)