Skip to content

Commit 680490e

Browse files
committed
in the process of simplifying pipeline use
1 parent 70160fb commit 680490e

File tree

13 files changed

+250
-1277
lines changed

13 files changed

+250
-1277
lines changed

engine/dispatcher/dispatcher.go

Lines changed: 172 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"errors"
99
"fmt"
1010
"log/slog"
11+
"math/rand"
1112
"sync"
1213
"time"
1314

@@ -16,28 +17,38 @@ import (
1617
oam "github.com/owasp-amass/open-asset-model"
1718
)
1819

20+
type limits struct {
21+
MaxQueued int
22+
HighWater int
23+
LowWater int
24+
PerSessBurst int
25+
}
26+
1927
type dynamicDispatcher struct {
2028
sync.RWMutex
21-
log *slog.Logger
22-
reg et.Registry
23-
mgr et.SessionManager
24-
done chan struct{}
25-
cqueue queue.Queue
26-
cchan chan *et.EventDataElement
27-
pools map[oam.AssetType]*pipelinePool
28-
meta *metaMap
29+
log *slog.Logger
30+
reg et.Registry
31+
mgr et.SessionManager
32+
done chan struct{}
33+
cqueue queue.Queue
34+
cchan chan *et.EventDataElement
35+
meta *metaMap
36+
shuffler *rand.Rand
2937
}
3038

3139
func NewDispatcher(l *slog.Logger, r et.Registry, mgr et.SessionManager) et.Dispatcher {
40+
// Create a new random source
41+
shuffler := rand.New(rand.NewSource(time.Now().UnixNano()))
42+
3243
d := &dynamicDispatcher{
33-
log: l,
34-
reg: r,
35-
mgr: mgr,
36-
done: make(chan struct{}),
37-
cchan: make(chan *et.EventDataElement, 1000),
38-
cqueue: queue.NewQueue(),
39-
pools: make(map[oam.AssetType]*pipelinePool),
40-
meta: newMetaMap(),
44+
log: l,
45+
reg: r,
46+
mgr: mgr,
47+
done: make(chan struct{}),
48+
cchan: make(chan *et.EventDataElement, 1000),
49+
cqueue: queue.NewQueue(),
50+
meta: newMetaMap(),
51+
shuffler: shuffler,
4152
}
4253

4354
go d.runEvents()
@@ -56,9 +67,6 @@ func (d *dynamicDispatcher) Shutdown() {
5667
}
5768

5869
func (d *dynamicDispatcher) runEvents() {
59-
scale := time.NewTicker(5 * time.Second)
60-
defer scale.Stop()
61-
6270
for {
6371
select {
6472
case <-d.done:
@@ -67,13 +75,6 @@ func (d *dynamicDispatcher) runEvents() {
6775
}
6876

6977
select {
70-
case <-scale.C:
71-
for _, pool := range d.pools {
72-
if stats, err := d.snapshotSessBacklogStats(pool.eventTy); err == nil {
73-
_ = pool.maybeScale(stats)
74-
pool.maybeAdjustFanout(stats)
75-
}
76-
}
7778
case e := <-d.cchan:
7879
d.cqueue.Append(e)
7980
case <-d.cqueue.Signal():
@@ -92,8 +93,10 @@ func (d *dynamicDispatcher) completedCallback(ede *et.EventDataElement) {
9293
_ = d.meta.DeleteSessionEntry(ede.Event.Session.ID().String(), ede.Event.Entity.ID)
9394
}
9495

95-
if inst, ok := ede.Ref.(*pipelineInstance); ok {
96-
inst.onDequeue()
96+
if ap, ok := ede.Ref.(*et.AssetPipeline); ok {
97+
if ap.Queue.Len() <= int(limitsByAssetType(ede.Event.Entity.Asset.AssetType()).LowWater) {
98+
pump()
99+
}
97100
}
98101

99102
if err := ede.Error; err != nil {
@@ -158,38 +161,6 @@ func (d *dynamicDispatcher) ResubmitEvent(e *et.Event) error {
158161
return pool.Dispatch(e)
159162
}
160163

161-
func (d *dynamicDispatcher) getOrCreatePool(atype oam.AssetType) *pipelinePool {
162-
d.RLock()
163-
pool := d.pools[atype]
164-
d.RUnlock()
165-
if pool != nil {
166-
return pool
167-
}
168-
169-
d.Lock()
170-
defer d.Unlock()
171-
// check if the pool was created while waiting
172-
if pool = d.pools[atype]; pool != nil {
173-
return pool
174-
}
175-
176-
min, max := assetTypeToPoolMinMax(atype)
177-
pool = newPipelinePool(d, atype, min, max)
178-
d.pools[atype] = pool
179-
return pool
180-
}
181-
182-
func assetTypeToPoolMinMax(atype oam.AssetType) (int, int) {
183-
switch atype {
184-
case oam.FQDN:
185-
return 4, 32
186-
case oam.IPAddress:
187-
return 4, 32
188-
default:
189-
return 1, 4
190-
}
191-
}
192-
193164
type sessStatsMap map[string]sessBacklogStats
194165

195166
type sessBacklogStats struct {
@@ -245,3 +216,144 @@ func (d *dynamicDispatcher) removeKilledSessions() {
245216

246217
d.meta.RemoveInactiveSessions(sids)
247218
}
219+
220+
func limitsByAssetType(atype oam.AssetType) *limits {
221+
switch atype {
222+
case oam.FQDN:
223+
fallthrough
224+
case oam.IPAddress:
225+
return &limits{
226+
MaxQueued: 200,
227+
HighWater: 175,
228+
LowWater: 100,
229+
PerSessBurst: 10,
230+
}
231+
case oam.Service:
232+
fallthrough
233+
case oam.TLSCertificate:
234+
fallthrough
235+
case oam.URL:
236+
return &limits{
237+
MaxQueued: 100,
238+
HighWater: 75,
239+
LowWater: 25,
240+
PerSessBurst: 5,
241+
}
242+
}
243+
244+
return &limits{
245+
MaxQueued: 20,
246+
HighWater: 15,
247+
LowWater: 5,
248+
PerSessBurst: 1,
249+
}
250+
}
251+
252+
// Dispatch wakes up the pump for admission.
253+
func (p *pipelinePool) Dispatch(e *et.Event) error {
254+
// decide whether to fill work queues
255+
inst := p.pickInstance(p.workShardKey(e))
256+
if inst != nil && inst.queueLen() <= p.limits.LowWater {
257+
p.notifyCapacity()
258+
}
259+
return nil
260+
}
261+
262+
func (p *pipelinePool) runPump() {
263+
tick := time.NewTicker(time.Second)
264+
defer tick.Stop()
265+
266+
for {
267+
select {
268+
case <-p.dis.done:
269+
return
270+
case <-p.wake:
271+
p.pumpOnce()
272+
case <-tick.C:
273+
p.pumpOnce()
274+
}
275+
}
276+
}
277+
278+
func (d *dynamicDispatcher) pumpOnce(atype oam.AssetType) {
279+
limits := limitsByAssetType(atype)
280+
if limits == nil {
281+
return
282+
}
283+
284+
stats, err := d.snapshotSessBacklogStats(atype)
285+
if err != nil {
286+
return
287+
}
288+
289+
// build a list of the session IDs
290+
sessions := make([]string, 0, len(stats))
291+
for sid := range stats {
292+
sessions = append(sessions, sid)
293+
}
294+
295+
// shuffle the sessions for random selection
296+
d.shuffler.Shuffle(len(sessions), func(i, j int) {
297+
sessions[i], sessions[j] = sessions[j], sessions[i]
298+
})
299+
300+
for _, sess := range sessions {
301+
s := stats[sess]
302+
if s.Queued == 0 {
303+
// there are zero entities to claim from the backlog
304+
continue
305+
}
306+
307+
pipe := s.Session.Pipelines()[atype]
308+
numOfClaims := int(limits.MaxQueued) - pipe.Queue.Len()
309+
if numOfClaims <= 0 {
310+
continue
311+
}
312+
numOfClaims = min(numOfClaims, limits.PerSessBurst)
313+
314+
entities, err := s.Session.Backlog().ClaimNext(atype, numOfClaims)
315+
if err != nil {
316+
continue
317+
}
318+
319+
for _, ent := range entities {
320+
a := ent.Asset
321+
name := string(atype) + ": " + a.Key()
322+
meta, _ := d.meta.GetEntry(sess, ent.ID)
323+
324+
event := &et.Event{
325+
Name: name,
326+
Entity: ent,
327+
Meta: meta,
328+
Dispatcher: d,
329+
Session: s.Session,
330+
}
331+
332+
data := et.NewEventDataElement(event)
333+
data.Exit = d.cchan
334+
data.Ref = pipe // keep a ref to the pipeline
335+
if err := pipe.Queue.Append(data); err != nil {
336+
_ = s.Session.Backlog().Release(ent, atype, false)
337+
}
338+
}
339+
}
340+
}
341+
342+
func (p *pipelinePool) hasCapacity(num int64) bool {
343+
p.RLock()
344+
defer p.RUnlock()
345+
346+
for _, inst := range p.instances {
347+
if inst.queueLen()+num <= p.limits.MaxQueued {
348+
return true
349+
}
350+
}
351+
return false
352+
}
353+
354+
func (p *pipelinePool) notifyCapacity() {
355+
select {
356+
case p.wake <- struct{}{}:
357+
default:
358+
}
359+
}

engine/dispatcher/hashring.go

Lines changed: 0 additions & 76 deletions
This file was deleted.

0 commit comments

Comments
 (0)