Skip to content

Commit b473b7a

Browse files
authored
Merge pull request moby#5323 from jsternberg/generic-pipes
solver: pipe implementation utilizes generics for better typing
2 parents 2a7accc + ad64996 commit b473b7a

File tree

4 files changed

+115
-109
lines changed

4 files changed

+115
-109
lines changed

solver/edge.go

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func newEdge(ed Edge, op activeOp, index *edgeIndex) *edge {
2929
e := &edge{
3030
edge: ed,
3131
op: op,
32-
depRequests: map[pipe.Receiver]*dep{},
32+
depRequests: map[pipeReceiver]*dep{},
3333
keyMap: map[string]struct{}{},
3434
cacheRecords: map[string]*CacheRecord{},
3535
cacheRecordsLoaded: map[string]struct{}{},
@@ -65,14 +65,14 @@ type edge struct {
6565
op activeOp
6666

6767
edgeState
68-
depRequests map[pipe.Receiver]*dep
68+
depRequests map[pipeReceiver]*dep
6969
deps []*dep
7070

71-
cacheMapReq pipe.Receiver
71+
cacheMapReq pipeReceiver
7272
cacheMapDone bool
7373
cacheMapIndex int
7474
cacheMapDigests []digest.Digest
75-
execReq pipe.Receiver
75+
execReq pipeReceiver
7676
execCacheLoad bool
7777
err error
7878
cacheRecords map[string]*CacheRecord
@@ -99,11 +99,11 @@ type edge struct {
9999

100100
// dep holds state for a dependant edge
101101
type dep struct {
102-
req pipe.Receiver
102+
req pipeReceiver
103103
edgeState
104104
index Index
105105
keyMap map[string]*CacheKey
106-
slowCacheReq pipe.Receiver
106+
slowCacheReq pipeReceiver
107107
slowCacheComplete bool
108108
slowCacheFoundKey bool
109109
slowCacheKey *ExportableCacheKey
@@ -122,7 +122,7 @@ func newDep(i Index) *dep {
122122

123123
// edgePipe is a pipe for requests between two edges
124124
type edgePipe struct {
125-
*pipe.Pipe
125+
*pipe.Pipe[*edgeRequest, any]
126126
From, Target *edge
127127
mu sync.Mutex
128128
}
@@ -198,21 +198,21 @@ func (e *edge) isComplete() bool {
198198
}
199199

200200
// finishIncoming finalizes the incoming pipe request
201-
func (e *edge) finishIncoming(req pipe.Sender) {
201+
func (e *edge) finishIncoming(req pipeSender) {
202202
err := e.err
203203
if req.Request().Canceled && err == nil {
204204
err = context.Canceled
205205
}
206206
if e.debug {
207-
bklog.G(context.TODO()).Debugf("finishIncoming %s %v %#v desired=%s", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Payload.(*edgeRequest).desiredState)
207+
bklog.G(context.TODO()).Debugf("finishIncoming %s %v %#v desired=%s", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Payload.desiredState)
208208
}
209209
req.Finalize(&e.edgeState, err)
210210
}
211211

212212
// updateIncoming updates the current value of incoming pipe request
213-
func (e *edge) updateIncoming(req pipe.Sender) {
213+
func (e *edge) updateIncoming(req pipeSender) {
214214
if e.debug {
215-
bklog.G(context.TODO()).Debugf("updateIncoming %s %#v desired=%s", e.edge.Vertex.Name(), e.edgeState, req.Request().Payload.(*edgeRequest).desiredState)
215+
bklog.G(context.TODO()).Debugf("updateIncoming %s %#v desired=%s", e.edge.Vertex.Name(), e.edgeState, req.Request().Payload.desiredState)
216216
}
217217
req.Update(&e.edgeState)
218218
}
@@ -353,7 +353,7 @@ func (e *edge) skipPhase2FastCache(dep *dep) bool {
353353
// requests were not completed
354354
// 2. this function may not return outgoing requests if it has completed all
355355
// incoming requests
356-
func (e *edge) unpark(incoming []pipe.Sender, updates, allPipes []pipe.Receiver, f *pipeFactory) {
356+
func (e *edge) unpark(incoming []pipeSender, updates, allPipes []pipeReceiver, f *pipeFactory) {
357357
// process all incoming changes
358358
depChanged := false
359359
for _, upt := range updates {
@@ -414,7 +414,7 @@ func (e *edge) markFailed(f *pipeFactory, err error) {
414414
}
415415

416416
// processUpdate is called by unpark for every updated pipe request
417-
func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
417+
func (e *edge) processUpdate(upt pipeReceiver) (depChanged bool) {
418418
// response for cachemap request
419419
if upt == e.cacheMapReq && upt.Status().Completed {
420420
if err := upt.Status().Err; err != nil {
@@ -719,7 +719,7 @@ func (e *edge) recalcCurrentState() {
719719

720720
// respondToIncoming responds to all incoming requests. completing or
721721
// updating them when possible
722-
func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receiver) (edgeStatusType, bool) {
722+
func (e *edge) respondToIncoming(incoming []pipeSender, allPipes []pipeReceiver) (edgeStatusType, bool) {
723723
// detect the result state for the requests
724724
allIncomingCanComplete := true
725725
desiredState := e.state
@@ -731,7 +731,7 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive
731731
for _, req := range incoming {
732732
if !req.Request().Canceled {
733733
allCanceled = false
734-
if r := req.Request().Payload.(*edgeRequest); desiredState < r.desiredState {
734+
if r := req.Request().Payload; desiredState < r.desiredState {
735735
desiredState = r.desiredState
736736
if e.hasActiveOutgoing || r.desiredState == edgeStatusComplete || r.currentKeys == len(e.keys) {
737737
allIncomingCanComplete = false
@@ -757,7 +757,7 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive
757757
}
758758

759759
// can close all but one requests
760-
var leaveOpen pipe.Sender
760+
var leaveOpen pipeSender
761761
for _, req := range incoming {
762762
if !req.Request().Canceled {
763763
leaveOpen = req
@@ -784,7 +784,7 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive
784784

785785
// update incoming based on current state
786786
for _, req := range incoming {
787-
r := req.Request().Payload.(*edgeRequest)
787+
r := req.Request().Payload
788788
if req.Request().Canceled {
789789
e.finishIncoming(req)
790790
} else if !e.hasActiveOutgoing && e.state >= r.desiredState {
@@ -803,7 +803,7 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory,
803803

804804
// initialize deps state
805805
if e.deps == nil {
806-
e.depRequests = make(map[pipe.Receiver]*dep)
806+
e.depRequests = make(map[pipeReceiver]*dep)
807807
e.deps = make([]*dep, 0, len(e.edge.Vertex.Inputs()))
808808
for i := range e.edge.Vertex.Inputs() {
809809
e.deps = append(e.deps, newDep(Index(i)))
@@ -842,13 +842,13 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory,
842842
if dep.state < desiredStateDep {
843843
addNew := true
844844
if dep.req != nil && !dep.req.Status().Completed {
845-
if dep.req.Request().(*edgeRequest).desiredState != desiredStateDep {
845+
if dep.req.Request().desiredState != desiredStateDep {
846846
if e.debug {
847847
bklog.G(context.TODO()).
848848
WithField("edge_vertex_name", e.edge.Vertex.Name()).
849849
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
850850
WithField("dep_index", dep.index).
851-
WithField("dep_req_desired_state", dep.req.Request().(*edgeRequest).desiredState).
851+
WithField("dep_req_desired_state", dep.req.Request().desiredState).
852852
WithField("dep_desired_state", desiredStateDep).
853853
WithField("dep_state", dep.state).
854854
Debug("cancel input request")
@@ -860,7 +860,7 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory,
860860
WithField("edge_vertex_name", e.edge.Vertex.Name()).
861861
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
862862
WithField("dep_index", dep.index).
863-
WithField("dep_req_desired_state", dep.req.Request().(*edgeRequest).desiredState).
863+
WithField("dep_req_desired_state", dep.req.Request().desiredState).
864864
WithField("dep_desired_state", desiredStateDep).
865865
WithField("dep_state", dep.state).
866866
Debug("skip input request based on existing request")
@@ -1062,3 +1062,13 @@ func withSelector(keys []ExportableCacheKey, selector digest.Digest) []CacheKeyW
10621062
}
10631063
return out
10641064
}
1065+
1066+
type (
1067+
pipeRequest = pipe.Request[*edgeRequest]
1068+
pipeSender = pipe.Sender[*edgeRequest, any]
1069+
pipeReceiver = pipe.Receiver[*edgeRequest, any]
1070+
)
1071+
1072+
func newPipe(req pipeRequest) *pipe.Pipe[*edgeRequest, any] {
1073+
return pipe.New[*edgeRequest, any](req)
1074+
}

0 commit comments

Comments
 (0)