Skip to content

Commit ad64996

Browse files
committed
solver: pipe implementation utilizes generics for better typing
This updates the pipe library to use generics for the request payload and the status value. This allows the solver to put in explicit types rather than rely on type casting from interfaces which helps with type safety and understandability. The status value used by the solver uses the `any` type instead of an explicit type because the `unpark` method takes a generic list of pipes and the different pipes have different result types. We can likely change this in the future or create a discriminated union for the types that can be used in this package. That is left for future work because at least the request payload is typed now. Signed-off-by: Jonathan A. Sternberg <[email protected]>
1 parent 3a70550 commit ad64996

File tree

4 files changed

+115
-109
lines changed

4 files changed

+115
-109
lines changed

solver/edge.go

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func newEdge(ed Edge, op activeOp, index *edgeIndex) *edge {
2929
e := &edge{
3030
edge: ed,
3131
op: op,
32-
depRequests: map[pipe.Receiver]*dep{},
32+
depRequests: map[pipeReceiver]*dep{},
3333
keyMap: map[string]struct{}{},
3434
cacheRecords: map[string]*CacheRecord{},
3535
cacheRecordsLoaded: map[string]struct{}{},
@@ -65,14 +65,14 @@ type edge struct {
6565
op activeOp
6666

6767
edgeState
68-
depRequests map[pipe.Receiver]*dep
68+
depRequests map[pipeReceiver]*dep
6969
deps []*dep
7070

71-
cacheMapReq pipe.Receiver
71+
cacheMapReq pipeReceiver
7272
cacheMapDone bool
7373
cacheMapIndex int
7474
cacheMapDigests []digest.Digest
75-
execReq pipe.Receiver
75+
execReq pipeReceiver
7676
execCacheLoad bool
7777
err error
7878
cacheRecords map[string]*CacheRecord
@@ -99,11 +99,11 @@ type edge struct {
9999

100100
// dep holds state for a dependant edge
101101
type dep struct {
102-
req pipe.Receiver
102+
req pipeReceiver
103103
edgeState
104104
index Index
105105
keyMap map[string]*CacheKey
106-
slowCacheReq pipe.Receiver
106+
slowCacheReq pipeReceiver
107107
slowCacheComplete bool
108108
slowCacheFoundKey bool
109109
slowCacheKey *ExportableCacheKey
@@ -122,7 +122,7 @@ func newDep(i Index) *dep {
122122

123123
// edgePipe is a pipe for requests between two edges
124124
type edgePipe struct {
125-
*pipe.Pipe
125+
*pipe.Pipe[*edgeRequest, any]
126126
From, Target *edge
127127
mu sync.Mutex
128128
}
@@ -198,21 +198,21 @@ func (e *edge) isComplete() bool {
198198
}
199199

200200
// finishIncoming finalizes the incoming pipe request
201-
func (e *edge) finishIncoming(req pipe.Sender) {
201+
func (e *edge) finishIncoming(req pipeSender) {
202202
err := e.err
203203
if req.Request().Canceled && err == nil {
204204
err = context.Canceled
205205
}
206206
if e.debug {
207-
bklog.G(context.TODO()).Debugf("finishIncoming %s %v %#v desired=%s", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Payload.(*edgeRequest).desiredState)
207+
bklog.G(context.TODO()).Debugf("finishIncoming %s %v %#v desired=%s", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Payload.desiredState)
208208
}
209209
req.Finalize(&e.edgeState, err)
210210
}
211211

212212
// updateIncoming updates the current value of incoming pipe request
213-
func (e *edge) updateIncoming(req pipe.Sender) {
213+
func (e *edge) updateIncoming(req pipeSender) {
214214
if e.debug {
215-
bklog.G(context.TODO()).Debugf("updateIncoming %s %#v desired=%s", e.edge.Vertex.Name(), e.edgeState, req.Request().Payload.(*edgeRequest).desiredState)
215+
bklog.G(context.TODO()).Debugf("updateIncoming %s %#v desired=%s", e.edge.Vertex.Name(), e.edgeState, req.Request().Payload.desiredState)
216216
}
217217
req.Update(&e.edgeState)
218218
}
@@ -353,7 +353,7 @@ func (e *edge) skipPhase2FastCache(dep *dep) bool {
353353
// requests were not completed
354354
// 2. this function may not return outgoing requests if it has completed all
355355
// incoming requests
356-
func (e *edge) unpark(incoming []pipe.Sender, updates, allPipes []pipe.Receiver, f *pipeFactory) {
356+
func (e *edge) unpark(incoming []pipeSender, updates, allPipes []pipeReceiver, f *pipeFactory) {
357357
// process all incoming changes
358358
depChanged := false
359359
for _, upt := range updates {
@@ -414,7 +414,7 @@ func (e *edge) markFailed(f *pipeFactory, err error) {
414414
}
415415

416416
// processUpdate is called by unpark for every updated pipe request
417-
func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
417+
func (e *edge) processUpdate(upt pipeReceiver) (depChanged bool) {
418418
// response for cachemap request
419419
if upt == e.cacheMapReq && upt.Status().Completed {
420420
if err := upt.Status().Err; err != nil {
@@ -719,7 +719,7 @@ func (e *edge) recalcCurrentState() {
719719

720720
// respondToIncoming responds to all incoming requests. completing or
721721
// updating them when possible
722-
func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receiver) (edgeStatusType, bool) {
722+
func (e *edge) respondToIncoming(incoming []pipeSender, allPipes []pipeReceiver) (edgeStatusType, bool) {
723723
// detect the result state for the requests
724724
allIncomingCanComplete := true
725725
desiredState := e.state
@@ -731,7 +731,7 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive
731731
for _, req := range incoming {
732732
if !req.Request().Canceled {
733733
allCanceled = false
734-
if r := req.Request().Payload.(*edgeRequest); desiredState < r.desiredState {
734+
if r := req.Request().Payload; desiredState < r.desiredState {
735735
desiredState = r.desiredState
736736
if e.hasActiveOutgoing || r.desiredState == edgeStatusComplete || r.currentKeys == len(e.keys) {
737737
allIncomingCanComplete = false
@@ -757,7 +757,7 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive
757757
}
758758

759759
// can close all but one requests
760-
var leaveOpen pipe.Sender
760+
var leaveOpen pipeSender
761761
for _, req := range incoming {
762762
if !req.Request().Canceled {
763763
leaveOpen = req
@@ -784,7 +784,7 @@ func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receive
784784

785785
// update incoming based on current state
786786
for _, req := range incoming {
787-
r := req.Request().Payload.(*edgeRequest)
787+
r := req.Request().Payload
788788
if req.Request().Canceled {
789789
e.finishIncoming(req)
790790
} else if !e.hasActiveOutgoing && e.state >= r.desiredState {
@@ -803,7 +803,7 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory,
803803

804804
// initialize deps state
805805
if e.deps == nil {
806-
e.depRequests = make(map[pipe.Receiver]*dep)
806+
e.depRequests = make(map[pipeReceiver]*dep)
807807
e.deps = make([]*dep, 0, len(e.edge.Vertex.Inputs()))
808808
for i := range e.edge.Vertex.Inputs() {
809809
e.deps = append(e.deps, newDep(Index(i)))
@@ -842,13 +842,13 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory,
842842
if dep.state < desiredStateDep {
843843
addNew := true
844844
if dep.req != nil && !dep.req.Status().Completed {
845-
if dep.req.Request().(*edgeRequest).desiredState != desiredStateDep {
845+
if dep.req.Request().desiredState != desiredStateDep {
846846
if e.debug {
847847
bklog.G(context.TODO()).
848848
WithField("edge_vertex_name", e.edge.Vertex.Name()).
849849
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
850850
WithField("dep_index", dep.index).
851-
WithField("dep_req_desired_state", dep.req.Request().(*edgeRequest).desiredState).
851+
WithField("dep_req_desired_state", dep.req.Request().desiredState).
852852
WithField("dep_desired_state", desiredStateDep).
853853
WithField("dep_state", dep.state).
854854
Debug("cancel input request")
@@ -860,7 +860,7 @@ func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory,
860860
WithField("edge_vertex_name", e.edge.Vertex.Name()).
861861
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
862862
WithField("dep_index", dep.index).
863-
WithField("dep_req_desired_state", dep.req.Request().(*edgeRequest).desiredState).
863+
WithField("dep_req_desired_state", dep.req.Request().desiredState).
864864
WithField("dep_desired_state", desiredStateDep).
865865
WithField("dep_state", dep.state).
866866
Debug("skip input request based on existing request")
@@ -1062,3 +1062,13 @@ func withSelector(keys []ExportableCacheKey, selector digest.Digest) []CacheKeyW
10621062
}
10631063
return out
10641064
}
1065+
1066+
type (
1067+
pipeRequest = pipe.Request[*edgeRequest]
1068+
pipeSender = pipe.Sender[*edgeRequest, any]
1069+
pipeReceiver = pipe.Receiver[*edgeRequest, any]
1070+
)
1071+
1072+
func newPipe(req pipeRequest) *pipe.Pipe[*edgeRequest, any] {
1073+
return pipe.New[*edgeRequest, any](req)
1074+
}

0 commit comments

Comments
 (0)