Skip to content

Commit c6d19ec

Browse files
committed
Add Xgress impl. Fixes #722
1 parent 71cea50 commit c6d19ec

14 files changed

+3510
-0
lines changed

xgress/circuit_inspections.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
Copyright NetFoundry Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package xgress
18+
19+
type CircuitInspectDetail struct {
20+
CircuitId string `json:"circuitId"`
21+
Forwards map[string]string `json:"forwards"`
22+
XgressDetails map[string]*InspectDetail `json:"xgressDetails"`
23+
LinkDetails map[string]*LinkInspectDetail `json:"linkDetails"`
24+
includeGoroutines bool
25+
}
26+
27+
func (self *CircuitInspectDetail) SetIncludeGoroutines(includeGoroutines bool) {
28+
self.includeGoroutines = includeGoroutines
29+
}
30+
31+
func (self *CircuitInspectDetail) IncludeGoroutines() bool {
32+
return self.includeGoroutines
33+
}
34+
35+
func (self *CircuitInspectDetail) AddXgressDetail(xgressDetail *InspectDetail) {
36+
self.XgressDetails[xgressDetail.Address] = xgressDetail
37+
}
38+
39+
func (self *CircuitInspectDetail) AddLinkDetail(linkDetail *LinkInspectDetail) {
40+
self.LinkDetails[linkDetail.Id] = linkDetail
41+
}
42+
43+
type InspectDetail struct {
44+
Address string `json:"address"`
45+
Originator string `json:"originator"`
46+
TimeSinceLastLinkRx string `json:"timeSinceLastLinkRx"`
47+
SendBufferDetail *SendBufferDetail `json:"sendBufferDetail"`
48+
RecvBufferDetail *RecvBufferDetail `json:"recvBufferDetail"`
49+
XgressPointer string `json:"xgressPointer"`
50+
LinkSendBufferPointer string `json:"linkSendBufferPointer"`
51+
Goroutines []string `json:"goroutines"`
52+
Sequence uint64 `json:"sequence"`
53+
Flags string `json:"flags"`
54+
}
55+
56+
type SendBufferDetail struct {
57+
WindowSize uint32 `json:"windowSize"`
58+
LinkSendBufferSize uint32 `json:"linkSendBufferSize"`
59+
LinkRecvBufferSize uint32 `json:"linkRecvBufferSize"`
60+
Accumulator uint32 `json:"accumulator"`
61+
SuccessfulAcks uint32 `json:"successfulAcks"`
62+
DuplicateAcks uint32 `json:"duplicateAcks"`
63+
Retransmits uint32 `json:"retransmits"`
64+
Closed bool `json:"closed"`
65+
BlockedByLocalWindow bool `json:"blockedByLocalWindow"`
66+
BlockedByRemoteWindow bool `json:"blockedByRemoteWindow"`
67+
RetxScale float64 `json:"retxScale"`
68+
RetxThreshold uint32 `json:"retxThreshold"`
69+
TimeSinceLastRetx string `json:"timeSinceLastRetx"`
70+
CloseWhenEmpty bool `json:"closeWhenEmpty"`
71+
AcquiredSafely bool `json:"acquiredSafely"`
72+
}
73+
74+
type RecvBufferDetail struct {
75+
Size uint32 `json:"size"`
76+
PayloadCount uint32 `json:"payloadCount"`
77+
LastSizeSent uint32 `json:"lastSizeSent"`
78+
Sequence int32 `json:"sequence"`
79+
MaxSequence int32 `json:"maxSequence"`
80+
NextPayload string `json:"nextPayload"`
81+
AcquiredSafely bool `json:"acquiredSafely"`
82+
}
83+
84+
type LinkInspectDetail struct {
85+
Id string `json:"id"`
86+
Iteration uint32 `json:"iteration"`
87+
Key string `json:"key"`
88+
Split bool `json:"split"`
89+
Protocol string `json:"protocol"`
90+
DialAddress string `json:"dialAddress"`
91+
Dest string `json:"dest"`
92+
DestVersion string `json:"destVersion"`
93+
Dialed bool `json:"dialed"`
94+
}

xgress/decoder.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
Copyright NetFoundry Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package xgress
18+
19+
import (
20+
"fmt"
21+
"github.com/michaelquigley/pfxlog"
22+
"github.com/openziti/channel/v4"
23+
)
24+
25+
type Decoder struct{}
26+
27+
const DECODER = "data"
28+
29+
func (d Decoder) Decode(msg *channel.Message) ([]byte, bool) {
30+
switch msg.ContentType {
31+
case int32(ContentTypePayloadType):
32+
if payload, err := UnmarshallPayload(msg); err == nil {
33+
return DecodePayload(payload)
34+
} else {
35+
pfxlog.Logger().WithError(err).Error("unexpected error unmarshalling payload msg")
36+
}
37+
38+
case int32(ContentTypeAcknowledgementType):
39+
if ack, err := UnmarshallAcknowledgement(msg); err == nil {
40+
meta := channel.NewTraceMessageDecode(DECODER, "Acknowledgement")
41+
meta["circuitId"] = ack.CircuitId
42+
meta["sequence"] = fmt.Sprintf("len(%d)", len(ack.Sequence))
43+
switch ack.GetOriginator() {
44+
case Initiator:
45+
meta["originator"] = "i"
46+
case Terminator:
47+
meta["originator"] = "e"
48+
}
49+
50+
data, err := meta.MarshalTraceMessageDecode()
51+
if err != nil {
52+
return nil, true
53+
}
54+
55+
return data, true
56+
57+
} else {
58+
pfxlog.Logger().WithError(err).Error("unexpected error unmarshalling ack msg")
59+
}
60+
case int32(ContentTypeControlType):
61+
if control, err := UnmarshallControl(msg); err == nil {
62+
meta := channel.NewTraceMessageDecode(DECODER, "Control")
63+
meta["circuitId"] = control.CircuitId
64+
meta["type"] = control.Type.String()
65+
if control.Type == ControlTypeTraceRoute || control.Type == ControlTypeTraceRouteResponse {
66+
if ts, found := msg.GetUint64Header(ControlTimestamp); found {
67+
meta["ts"] = ts
68+
}
69+
if hop, found := msg.GetUint32Header(ControlHopCount); found {
70+
meta["hopCount"] = hop
71+
}
72+
if hopType, found := msg.GetStringHeader(ControlHopType); found {
73+
meta["hopType"] = hopType
74+
}
75+
if hopId, found := msg.GetStringHeader(ControlHopId); found {
76+
meta["hopId"] = hopId
77+
}
78+
if userVal, found := msg.GetUint32Header(ControlUserVal); found {
79+
meta["uv"] = userVal
80+
}
81+
if hopErr, found := msg.GetUint32Header(ControlError); found {
82+
meta["err"] = hopErr
83+
}
84+
}
85+
data, err := meta.MarshalTraceMessageDecode()
86+
if err != nil {
87+
return nil, true
88+
}
89+
90+
return data, true
91+
92+
} else {
93+
pfxlog.Logger().WithError(err).Error("unexpected error unmarshalling control msg")
94+
}
95+
}
96+
97+
return nil, false
98+
}
99+
100+
func DecodePayload(payload *Payload) ([]byte, bool) {
101+
meta := channel.NewTraceMessageDecode(DECODER, "Payload")
102+
meta["circuitId"] = payload.CircuitId
103+
meta["sequence"] = payload.Sequence
104+
switch payload.GetOriginator() {
105+
case Initiator:
106+
meta["originator"] = "i"
107+
case Terminator:
108+
meta["originator"] = "e"
109+
}
110+
if payload.Flags != 0 {
111+
meta["flags"] = payload.Flags
112+
}
113+
meta["length"] = len(payload.Data)
114+
115+
data, err := meta.MarshalTraceMessageDecode()
116+
if err != nil {
117+
return nil, true
118+
}
119+
120+
return data, true
121+
}

xgress/heartbeat_transformer.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
Copyright NetFoundry Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package xgress
18+
19+
import (
20+
"encoding/binary"
21+
"github.com/openziti/channel/v4"
22+
"time"
23+
)
24+
25+
type PayloadTransformer struct {
26+
}
27+
28+
func (self PayloadTransformer) Rx(*channel.Message, channel.Channel) {}
29+
30+
func (self PayloadTransformer) Tx(m *channel.Message, ch channel.Channel) {
31+
if m.ContentType == channel.ContentTypeRaw && len(m.Body) > 1 {
32+
if m.Body[0]&HeartbeatFlagMask != 0 && len(m.Body) > 12 {
33+
now := time.Now().UnixNano()
34+
m.PutUint64Header(channel.HeartbeatHeader, uint64(now))
35+
binary.BigEndian.PutUint64(m.Body[len(m.Body)-8:], uint64(now))
36+
}
37+
}
38+
}

0 commit comments

Comments
 (0)