Skip to content

Commit e5b06d2

Browse files
korniltseveh-am
andauthored
feat: flush queue before next event polling (#18)
Co-authored-by: eduardo aleixo <[email protected]> Co-authored-by: eduardo aleixo <[email protected]>
1 parent 9214f26 commit e5b06d2

File tree

6 files changed

+212
-21
lines changed

6 files changed

+212
-21
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,5 @@ examples/go
1919
!examples/go/main.go
2020

2121
node_modules
22+
23+
.idea/

README.md

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,15 @@ If needed, the `PYROSCOPE_AUTH_TOKEN` can be supplied.
2323
For a complete list of variables check the section below.
2424

2525
## Configuration
26-
| env var | default | description |
27-
| -------------------------- | -------------------------------- | ---------------------------------------------- |
28-
| `PYROSCOPE_REMOTE_ADDRESS` | `https://ingest.pyroscope.cloud` | the pyroscope instance data will be relayed to |
29-
| `PYROSCOPE_AUTH_TOKEN` | `""` | authorization key (token authentication) |
30-
| `PYROSCOPE_SELF_PROFILING` | `false` | whether to profile the extension itself or not |
31-
| `PYROSCOPE_LOG_LEVEL` | `info` | `error` or `info` or `debug` or `trace` |
32-
| `PYROSCOPE_TIMEOUT` | `10s` | http client timeout ([go duration format](https://pkg.go.dev/time#Duration)) |
33-
| `PYROSCOPE_NUM_WORKERS` | `5` | num of relay workers, pick based on the number of profile types |
26+
| env var | default | description |
27+
|------------------------------|----------------------------------|---------------------------------------------------------------------------------|
28+
| `PYROSCOPE_REMOTE_ADDRESS` | `https://ingest.pyroscope.cloud` | the pyroscope instance data will be relayed to |
29+
| `PYROSCOPE_AUTH_TOKEN` | `""` | authorization key (token authentication) |
30+
| `PYROSCOPE_SELF_PROFILING` | `false` | whether to profile the extension itself or not |
31+
| `PYROSCOPE_LOG_LEVEL` | `info` | `error` or `info` or `debug` or `trace` |
32+
| `PYROSCOPE_TIMEOUT` | `10s` | http client timeout ([go duration format](https://pkg.go.dev/time#Duration)) |
33+
| `PYROSCOPE_NUM_WORKERS` | `5` | num of relay workers, pick based on the number of profile types |
34+
| `PYROSCOPE_FLUSH_ON_INVOKE` | `false` | wait for all relay requests to be finished/flushed before next `Invocation` event is allowed |
3435

3536
# How it works
3637
The profiler will run as normal, and periodically will send data to the relay server (the server running at `http://localhost:4040`).

main.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ var (
3737

3838
// profile the extension?
3939
selfProfiling = getEnvBool("PYROSCOPE_SELF_PROFILING")
40+
41+
flushOnInvoke = getEnvBool("PYROSCOPE_FLUSH_ON_INVOKE")
4042
)
4143

4244
func main() {
@@ -82,7 +84,7 @@ func main() {
8284
runDevMode(ctx, logger, orch)
8385
} else {
8486
// Register extension and start listening for events
85-
runProdMode(ctx, logger, orch)
87+
runProdMode(ctx, logger, orch, queue)
8688
}
8789
}
8890

@@ -110,17 +112,18 @@ func runDevMode(ctx context.Context, logger *logrus.Entry, orch *relay.Orchestra
110112
}
111113
}
112114

113-
func runProdMode(ctx context.Context, logger *logrus.Entry, orch *relay.Orchestrator) {
115+
func runProdMode(ctx context.Context, logger *logrus.Entry, orch *relay.Orchestrator, queue *relay.RemoteQueue) {
114116
res, err := extensionClient.Register(ctx, extensionName)
115117
if err != nil {
116118
panic(err)
117119
}
118120
logger.Trace("Register response", res)
119121

120122
// Will block until shutdown event is received or cancelled via the context.
121-
processEvents(ctx, logger, orch)
123+
processEvents(ctx, logger, orch, queue)
122124
}
123-
func processEvents(ctx context.Context, log *logrus.Entry, orch *relay.Orchestrator) {
125+
126+
func processEvents(ctx context.Context, log *logrus.Entry, orch *relay.Orchestrator, queue *relay.RemoteQueue) {
124127
log.Debug("Starting processing events")
125128

126129
shutdown := func() {
@@ -153,6 +156,9 @@ func processEvents(ctx context.Context, log *logrus.Entry, orch *relay.Orchestra
153156
shutdown()
154157
return
155158
}
159+
if res.EventType == extension.Invoke && flushOnInvoke {
160+
queue.Flush()
161+
}
156162
}
157163
}
158164
}

relay/remotequeue.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@ type RemoteQueueCfg struct {
1414
}
1515

1616
type RemoteQueue struct {
17-
config *RemoteQueueCfg
18-
jobs chan *http.Request
19-
done chan struct{}
20-
wg sync.WaitGroup
21-
log *logrus.Entry
22-
relayer Relayer
17+
config *RemoteQueueCfg
18+
jobs chan *http.Request
19+
done chan struct{}
20+
wg sync.WaitGroup
21+
flushWG sync.WaitGroup
22+
flushGuard sync.Mutex
23+
log *logrus.Entry
24+
relayer Relayer
2325
}
2426

2527
type Relayer interface {
@@ -66,15 +68,26 @@ func (r *RemoteQueue) Stop(_ context.Context) error {
6668

6769
// Send adds a request to the queue to be processed later
6870
func (r *RemoteQueue) Send(req *http.Request) error {
71+
r.flushGuard.Lock() // block if we are currently trying to Flush
72+
defer r.flushGuard.Unlock()
73+
r.flushWG.Add(1)
6974
select {
7075
case r.jobs <- req:
7176
default:
77+
r.flushWG.Done()
7278
r.log.Error("Request queue is full, dropping a profile job.")
7379
return fmt.Errorf("request queue is full")
7480
}
7581

7682
return nil
7783
}
84+
func (r *RemoteQueue) Flush() {
85+
r.log.Debugf("Flush: Waiting for enqueued jobs to finish")
86+
r.flushGuard.Lock()
87+
defer r.flushGuard.Unlock()
88+
r.flushWG.Wait()
89+
r.log.Debugf("Flush: Done")
90+
}
7891

7992
func (r *RemoteQueue) handleJobs(workerID int) {
8093
for {
@@ -89,6 +102,7 @@ func (r *RemoteQueue) handleJobs(workerID int) {
89102
r.wg.Add(1)
90103
err := r.relayer.Send(job)
91104
r.wg.Done()
105+
r.flushWG.Done()
92106

93107
if err != nil {
94108
log.Error("Failed to relay request: ", err)

relay/remotequeue_flush_test.go

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
package relay_test
2+
3+
import (
4+
"github.com/pyroscope-io/pyroscope-lambda-extension/relay"
5+
"github.com/sirupsen/logrus"
6+
"net/http"
7+
"sync"
8+
"testing"
9+
"time"
10+
)
11+
12+
type asyncJob struct {
13+
name string
14+
m sync.Mutex
15+
t *testing.T
16+
}
17+
18+
func newAsyncJob(t *testing.T, name string, f func()) *asyncJob {
19+
res := &asyncJob{t: t, name: name}
20+
res.m.Lock()
21+
go func() {
22+
f()
23+
res.m.Unlock()
24+
}()
25+
return res
26+
}
27+
28+
func (j *asyncJob) assertNotFinished() {
29+
locked := j.m.TryLock()
30+
if locked {
31+
j.t.Fatalf("should be still working... " + j.name)
32+
}
33+
}
34+
35+
func (j *asyncJob) assertFinished() {
36+
j.m.Lock()
37+
}
38+
39+
type flushTestHelper struct {
40+
t *testing.T
41+
log *logrus.Entry
42+
responses chan struct{}
43+
requests chan struct{}
44+
req *http.Request
45+
queue *relay.RemoteQueue
46+
}
47+
48+
func newFlushMockRelay(t *testing.T) *flushTestHelper {
49+
req, _ := http.NewRequest(http.MethodPost, "/", nil)
50+
log := logrus.WithFields(logrus.Fields{"svc": "flush-test"})
51+
res := &flushTestHelper{
52+
t: t,
53+
log: log,
54+
responses: make(chan struct{}, 128),
55+
requests: make(chan struct{}, 128),
56+
req: req,
57+
}
58+
res.queue = relay.NewRemoteQueue(log, &relay.RemoteQueueCfg{
59+
NumWorkers: 2,
60+
}, res)
61+
logrus.SetLevel(logrus.DebugLevel)
62+
63+
return res
64+
}
65+
66+
func (h *flushTestHelper) Send(_ *http.Request) error {
67+
//h.log.Debug("flushTestHelper.send 1")
68+
h.requests <- struct{}{}
69+
//h.log.Debug("flushTestHelper.send 2")
70+
<-h.responses
71+
//h.log.Debug("flushTestHelper.send 3")
72+
return nil
73+
}
74+
75+
func (h *flushTestHelper) respond() {
76+
h.responses <- struct{}{}
77+
}
78+
79+
func (h *flushTestHelper) flushAsync() *asyncJob {
80+
return newAsyncJob(h.t, "flush", func() {
81+
h.queue.Flush()
82+
})
83+
}
84+
85+
func (h *flushTestHelper) sendAsync() *asyncJob {
86+
return newAsyncJob(h.t, "send", func() {
87+
_ = h.queue.Send(h.req)
88+
})
89+
}
90+
func (h *flushTestHelper) send() {
91+
_ = h.queue.Send(h.req)
92+
}
93+
94+
func (h *flushTestHelper) step() {
95+
time.Sleep(100 * time.Millisecond)
96+
}
97+
98+
func (h *flushTestHelper) assertRequestsProcessed(n int) {
99+
if n != len(h.requests) {
100+
h.t.Fatalf("expected %d got %d", n, len(h.responses))
101+
}
102+
}
103+
104+
func TestFlushWaitsForAllEnqueuedRequests(t *testing.T) {
105+
n := 3
106+
h := newFlushMockRelay(t)
107+
_ = h.queue.Start()
108+
for i := 0; i < n; i++ {
109+
h.send()
110+
}
111+
f := h.flushAsync()
112+
for i := 0; i < n; i++ {
113+
h.step()
114+
f.assertNotFinished()
115+
h.respond()
116+
}
117+
f.assertFinished()
118+
h.assertRequestsProcessed(n)
119+
}
120+
121+
func TestFlushWaitsForAllEnqueuedRequestsWhenQueueIsFullAndSomeAreDropped(t *testing.T) {
122+
n := 30
123+
h := newFlushMockRelay(t)
124+
//queueSize := cap(h.queue.jobs)
125+
queueSize := 20
126+
for i := 0; i < n; i++ { //send 30, 10 are dropped
127+
h.send()
128+
}
129+
_ = h.queue.Start()
130+
f := h.flushAsync()
131+
for i := 0; i < queueSize; i++ { //20 are processed
132+
h.step()
133+
f.assertNotFinished()
134+
h.respond()
135+
}
136+
f.assertFinished()
137+
h.assertRequestsProcessed(queueSize)
138+
}
139+
140+
func TestFlushWithQueueEmpty(t *testing.T) {
141+
h := newFlushMockRelay(t)
142+
_ = h.queue.Start()
143+
f := h.flushAsync()
144+
f.assertFinished()
145+
h.assertRequestsProcessed(0)
146+
}
147+
148+
func TestFlushSendEventDuringFlushBlocks(t *testing.T) {
149+
n := 3
150+
h := newFlushMockRelay(t)
151+
_ = h.queue.Start()
152+
for i := 0; i < n; i++ {
153+
h.send()
154+
}
155+
f := h.flushAsync()
156+
h.step()
157+
s := h.sendAsync()
158+
for i := 0; i < n; i++ {
159+
h.step()
160+
f.assertNotFinished()
161+
s.assertNotFinished()
162+
}
163+
for i := 0; i < n; i++ {
164+
h.respond()
165+
}
166+
f.assertFinished()
167+
s.assertFinished()
168+
169+
}

relay/remotequeue_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@ package relay_test
22

33
import (
44
"context"
5+
"github.com/pyroscope-io/pyroscope-lambda-extension/relay"
6+
"github.com/stretchr/testify/assert"
57
"net/http"
68
"sync"
79
"testing"
8-
9-
"github.com/pyroscope-io/pyroscope-lambda-extension/relay"
10-
"github.com/stretchr/testify/assert"
1110
)
1211

1312
type mockRelayer struct {

0 commit comments

Comments
 (0)