Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ type NDTResult struct {
// ndt7
Upload *model.ArchivalData `json:",omitempty"`
Download *model.ArchivalData `json:",omitempty"`
Ping *model.ArchivalData `json:",omitempty"`
}
22 changes: 22 additions & 0 deletions html/ndt7-ping.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/* jshint esversion: 6, asi: true, worker: true */
// WebWorker that runs the ndt7 ping test
onmessage = function (ev) {
'use strict'
let url = new URL(ev.data.href)
url.protocol = (url.protocol === 'https:') ? 'wss:' : 'ws:'
url.pathname = '/ndt/v7/ping'
const sock = new WebSocket(url.toString(), 'net.measurementlab.ndt.v7')
sock.onclose = function () {
postMessage(null)
}
sock.onopen = function () {
sock.onmessage = function (ev) {
if (!(ev.data instanceof Blob)) {
let m = JSON.parse(ev.data)
m.Origin = 'server'
m.Test = 'ping'
postMessage(m)
}
}
}
}
25 changes: 24 additions & 1 deletion html/ndt7.html
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
</head>
<body>
<div>
<div id='ping' class='result row'>[Ping]</div>
<div id='download' class='result row'>[Download]</div>
<div id='upload' class='result row'>[Upload]</div>
<div id='done' class='result row'></div>
</div>
<script type='text/javascript'>
/* jshint esversion: 6, asi: true */
Expand All @@ -49,18 +51,35 @@
}

function runSomething(testName, callback) {
let ws = Number.NaN;
let tcp = Number.NaN;
ndt7core.run(location.href, testName, function(ev, val) {
console.log(ev, val)
if (ev === 'complete') {
if (callback !== undefined) {
callback()
} else {
withElementDo('done', function (elem) {
elem.innerHTML = 'Done.'
})
}
return
}
if (ev === 'measurement' && val.AppInfo !== undefined &&
val.Origin === 'client') {
updateView(testName, val.AppInfo)
}
if (ev === 'measurement' && val.Origin === 'server' && testName === 'ping') {
if (val.WSInfo !== undefined) {
ws = val.WSInfo.MinRTT / 1e3
}
if (val.TCPInfo !== undefined) {
tcp = val.TCPInfo.MinRTT / 1e3
}
withElementDo('ping', function (elem) {
elem.innerHTML = '⓻ ' + ws.toFixed(1) + ' / ⓸ ' + tcp.toFixed(1) + ' ms'
})
}
})
}

Expand All @@ -72,7 +91,11 @@
runSomething('upload', callback)
}

runDownload(function() { runUpload(); })
function runPing(callback) {
runSomething('ping', callback)
}

runPing(function() { runDownload(function() { runUpload(); }); })
</script>
</body>
</html>
1 change: 1 addition & 0 deletions ndt-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func main() {
}
ndt7Mux.Handle(spec.DownloadURLPath, http.HandlerFunc(ndt7Handler.Download))
ndt7Mux.Handle(spec.UploadURLPath, http.HandlerFunc(ndt7Handler.Upload))
ndt7Mux.Handle(spec.PingURLPath, http.HandlerFunc(ndt7Handler.Ping))
ndt7Server := &http.Server{
Addr: *ndt7Addr,
Handler: logging.MakeAccessLogHandler(ndt7Mux),
Expand Down
11 changes: 7 additions & 4 deletions ndt7/download/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package download

import (
"context"
"time"

"github.com/gorilla/websocket"
"github.com/m-lab/ndt-server/ndt7/download/sender"
Expand All @@ -15,13 +16,15 @@ import (
// Do implements the download subtest. The ctx argument is the parent
// context for the subtest. The conn argument is the open WebSocket
// connection. The resultfp argument is the file where to save results. Both
// arguments are owned by the caller of this function.
func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File) {
// arguments are owned by the caller of this function. The start argument is
// the test start time used to calculate ElapsedTime and deadlines.
func Do(ctx context.Context, conn *websocket.Conn, resultfp *results.File, start time.Time) {
// Implementation note: use child context so that, if we cannot save the
// results in the loop below, we terminate the goroutines early
wholectx, cancel := context.WithCancel(ctx)
defer cancel()
senderch := sender.Start(conn, measurer.Start(wholectx, conn, resultfp.Data.UUID))
receiverch := receiver.StartDownloadReceiver(wholectx, conn)
measurerch := measurer.Start(wholectx, conn, resultfp.Data.UUID, start)
receiverch, pongch := receiver.StartDownloadReceiver(wholectx, conn, start, measurerch)
senderch := sender.Start(conn, measurerch, start, pongch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused by the fact that a new channel has been introduced. Isn't it possible to use the measurerch for passing around information? I understood that the PING is another nullable pointer within a Measurement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

download.sender.loop goroutine may be naturally blocked on conn.WritePreparedMessage, so there is a risk that send(measurerch) will block receiver.loop goroutine. Blocked measurer.loop is good, it'll just skip a few ticks. Blocked receiver.loop leads to skewed RTT measurements and I'd rather avoid that.

There are several possible options to overcome that.

Current implementation creates buffered channel for RTT measurements with a buffer size equal of maximum possible amount of pings issued during the test. It was good enough for PoC, but I consider it a wasteful behavior optimized for the worst case.

I'm refactoring the code to send less WSPingInfo messages to the client. Download sub-test will send only the first message back to the client via a buffered channel with small buffer and tail-drop other messages if the channel is not writable due to sender being blocked. The WSPingInfo messages would be still logged, and I assume that writing to the logging channel does not naturally block for an unpredictable amount of time (unlike conn.WritePreparedMessage in download.sender.loop). I think, logging WSPingInfo in ClientMeasurements section is okay. Anyway, the elapsed value in ping/pong frame is not signed, so client can fake it, so putting that to ClientMeasurements makes sense to me :-)

BTW, should ping be signed or formatted as {ndt7: $value} as a safety net against a client sending unsolicited pongs with some numerical value? Clients can send unsolicited pongs for heartbeat purposes per RFC.

It may make sense to apply the same logic to the measurements coming from measurer.loop. Currently the messages are logged if and only if dst is available for writing (sender is not blocked). Maybe it makes sense to split that channel into two: one channel coming to the log (that blocks on "disk") and another coming to the client (that may tail-drop messages if the client is not fast enough to fetch WritePreparedMessage blob). What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, slow client blocking TCPInfo logging seems to be a real-world case, not a hypothetical situation. I've checked my few ndt7 data samples, some of them have a gap as large as 1.2s between TCPInfo.ElapsedTime samples (that's twice as large as spec.MaxPoissonSamplingInterval being 625ms). Those 11 files with 321 TCPInfo samples have 15 TCPInfo samples being more than 650ms apart.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you talking about client side traces or server side traces?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That you don't get measurements on the client side because of head of line blocking is a know issue that I think we cannot address. However, the channel has buffer, so the measurements were actually saved on disk when I checked. The improvement that I foresee here is actually separating saving measurements from sending measurements to client. In the second case, I think it is reasonable to consider sending the client the most recent sample not the most ancient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thought that I have is that I'd try to not change the channel structure in this PR but rather consider changing the channel structure later. The code is using a pipeline pattern and channels are closed to signal EOF. However, there are better patterns where there are multiple workers per stage and where contexts are used rather than closing channels. I have a sense that the right solution here is to measure and reorganise channels to have more parallelism when needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, the channel has buffer

As far as I see, the created channels do not have buffers, all make(chan...) calls under ndt7/ do not specify any non-zero capacity. Do you mean golang channels here?

I was worried about the following backpressure situation:

  • the client is slow and his TCP-proxy has large TCP buffer (the easiest way for me to reproduce the case is tor)
  • download.sender.loop goroutine blocks on conn.WritePreparedMessage for some time (tor case gives me delays like 8500ms)
  • download.sender.loop does not fetch messages from m, ok := <-src as the goroutine is blocked
  • measurer.loop goroutine blocks on sending to dst as the channel is not drained and has no buffer
  • measurer.loop samples TCP_INFO and BRR_INFO possibly less often that MaxPoissonSamplingInterval as it's blocked on sending to dst and skips some of ticker.C ticks
  • the TCPInfo samples are not written to the log as they're not actually sampled :-)

Note, I assume that ndt7 has a (weak) guarantee to take TCPInfo samples at least as often as MaxPoissonSamplingInterval. That's just my assumption and it may be wrong.

This backpressure case worries me as it means that receiver.loop being blocked on writing RTT estimate to the channel drained by sender may provide wrong RTT estimate for the next sample in a queue. receiver should be mostly blocked on conn.ReadMessage to provide reasonably accurate data.

The orignal goal to introduce pongch was to mitigate that backpressure with quite a deep buffer. Bug I agree that this backpressure issue should be out of the scope of this pull request.

Back to answering your original question. My goal is go have some of WSPingInfo samples being sent back to the ndt7 client during the ping test runtime. The master branch has the channel passing both the messages and the EOF signal from measurer.loop to ${testname}.sender.loop. Having two concurrent writers (measurer.loop and receiver.loop) sending to the channel that is eventually closed(!) will lead to goroutine panic (on an attempt to send to closed channel).

So, I'm still going to use pongch as a channel from receiver.loop to ${testname}.sender.loop to keep the "pipeline + EOF" pattern.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TL;DR I acknowledge the issue you report (did see it myself). I will not accept a patch with pongch. I explain how I’d change the architecture to avoid losing measurements. I am not asking you to implement that, however. I am just asking you to kindly focus here on a simple patch that implements the /ping you need and does not intersect with existing subtests.

The channel has buffer [...]

I was wrong. I used a buffer in a development branch then. I must have concluded that was not the right fix, which is to change the architecture.

[...] the pipeline+EOF pattern

That pattern is wrong. A pattern where an expired ctx tells you a goroutine to stop is right. We’ll fix that.

[...] I am still using to use pongch

Your attitude is wrong. The right attitude is to listen. We are on the same page on more than 99% of the topics here.

Let me summarize my position: thank your for exposing a bunch of architectural issues. I am not going to accept a patch that complicates significantly the code base. Please focus on writing a /ping endpoint that does its job and does not intersect with the implementation of other experiments.

That patch, I will expedite its review. I am not willing to spend further time to argue on the merits of adding complexity to the code base when this can be avoided. That is 無駄無駄無駄無駄.

Worried about the following back pressure situation [...]

Your analysis is right. Your remediation is wrong. Thank your for spelling out this issue so clearly. As I mentioned to you (I guess I’m private?) I did see the same for a 2G connection.

[...] weak guarantee to take TCP samples

This is right or wrong depending on the meaning of take. It’s right if take also implies save. It’s wrong if take also imply sending the data to the client. We send data to the client on a best effort basis but we should be doing our best to collect and save samples.

(On this note: M-Lab has a sidecar service that samples TCPInfo at high frequency. The data is collected separately and can be joined after submission. We are still collecting BBR and TCPInfo here, why? The original design was to stop early when BBR bandwidth estimation did become stable, but we have not implemented this yet. Anyway, this should help you understand why dealing with this issue has priority medium and not high.)

The right fix that significantly mitigates all that have been said so far is to change the code architecture as follows:

  1. the measurer is a class and stores measurements in itself in a lock free way

  2. the channels become all non blocking when writing

  3. LIFO is better than FIFO when passing measurements downstream

  4. we use a WaitGroup to wait for all the goroutines to complete

  5. then we’re again single threaded and we just save on disk accessing directly what was measured

This is the right architectural fix for the current situation.

goal is to send back samples to the client

We already agreed that your use case is best served by a separate subtest called ping. We already agreed that we also need a single sample at the beginning of the /download. Everything else seems irrelevant to the objective, hence wrong.

If adding support for WebSocket level ping to download and upload is such a burden with the current architecture, the right thing to do is to open an issue, just take the first sample of the connection, and get rid of the remaining code. A future code architecture may accommodate this feature with less effort. This is something that would tell us that the architecture is right. Now it is clearly wrong because it does not allow us to do that easily.

Instead, please focus on writing /ping to fulfill your goals. Please do so without complicating the implementation of existing endpoints in exchange for seriously marginal gains.

✌️

Copy link
Contributor Author

@darkk darkk Jan 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(writing that down, so the essence is not lost in a log of a private chat)

I've refactored the code under following rules:

  • keep measurer and saver intact
  • avoid two ping frames in-flight (drop ping sample if the previous pong has not arrived yet)
  • have LIFO logic for samples sent to client
  • make the channels "almost non-blocking" for sender. I mean structuring the code in a way that blocking on chan <- value MAY happen in some case, but this case is an "erratic" code path, not a "usual" one.
  • use EOF as a completion signal for most of the goroutines to terminate them in a predictable order (as the final sink, namely saver, depends on EOF)
  • use context.cancel() to stop memoryless timer early in case of a network error from a client

complicating ... in exchange for seriously marginal gains

Yeah, I'm sorry, that's an (unlucky) trait I have indeed! Thanks (no kidding) for nudging me towards awareness, better focus and simplicity.

M-Lab has a sidecar service that samples TCPInfo at high frequency

I've also completely forgotten about that. That's why I overestimated the utility of TCPInfo samples being logged.

saver.SaveAll(resultfp, senderch, receiverch)
}
32 changes: 26 additions & 6 deletions ndt7/download/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,20 @@ func makePreparedMessage(size int) (*websocket.PreparedMessage, error) {
return websocket.NewPreparedMessage(websocket.BinaryMessage, data)
}

func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.Measurement) {
func loop(
conn *websocket.Conn, src <-chan model.Measurement,
dst chan<- model.Measurement, start time.Time, pongch <-chan model.WSInfo,
) {
logging.Logger.Debug("sender: start")
defer logging.Logger.Debug("sender: stop")
defer close(dst)
defer func() {
for range src {
// make sure we drain the channel
}
for range pongch {
// it should be buffered channel, but let's drain it anyway
}
}()
logging.Logger.Debug("sender: generating random buffer")
bulkMessageSize := 1 << 13
Expand All @@ -38,12 +44,17 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M
logging.Logger.WithError(err).Warn("sender: makePreparedMessage failed")
return
}
deadline := time.Now().Add(spec.MaxRuntime)
deadline := start.Add(spec.MaxRuntime)
err = conn.SetWriteDeadline(deadline) // Liveness!
if err != nil {
logging.Logger.WithError(err).Warn("sender: conn.SetWriteDeadline failed")
return
}
// only the first RTT sample taken before flooding the conn is not affected by HOL
if err := ping.SendTicks(conn, start, deadline); err != nil {
logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed")
return
}
var totalSent int64
for {
select {
Expand All @@ -57,10 +68,19 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M
return
}
dst <- m // Liveness: this is blocking
if err := ping.SendTicks(conn, deadline); err != nil {
if err := ping.SendTicks(conn, start, deadline); err != nil {
logging.Logger.WithError(err).Warn("sender: ping.SendTicks failed")
return
}
case wsinfo := <-pongch:
m := model.Measurement{
WSInfo: &wsinfo,
}
if err := conn.WriteJSON(m); err != nil {
logging.Logger.WithError(err).Warn("sender: conn.WriteJSON failed")
return
}
dst <- m // Liveness: this is blocking write to log
default:
if err := conn.WritePreparedMessage(preparedMessage); err != nil {
logging.Logger.WithError(err).Warn(
Expand Down Expand Up @@ -99,9 +119,9 @@ func loop(conn *websocket.Conn, src <-chan model.Measurement, dst chan<- model.M
// Liveness guarantee: the sender will not be stuck sending for more then
// the MaxRuntime of the subtest, provided that the consumer will
// continue reading from the returned channel. This is enforced by
// setting the write deadline to Time.Now() + MaxRuntime.
func Start(conn *websocket.Conn, src <-chan model.Measurement) <-chan model.Measurement {
// setting the write deadline to |start| + MaxRuntime.
func Start(conn *websocket.Conn, src <-chan model.Measurement, start time.Time, pongch <-chan model.WSInfo) <-chan model.Measurement {
dst := make(chan model.Measurement)
go loop(conn, src, dst)
go loop(conn, src, dst, start, pongch)
return dst
}
20 changes: 14 additions & 6 deletions ndt7/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ func warnAndClose(writer http.ResponseWriter, message string) {
// testerFunc is the function implementing a subtest. The first argument
// is the subtest context. The second argument is the connected websocket. The
// third argument is the open file where to write results. This function does
// not own the second or the third argument.
type testerFunc = func(context.Context, *websocket.Conn, *results.File)
// not own the second or the third argument. The fourth argument is the base
// start time of the test.
type testerFunc = func(context.Context, *websocket.Conn, *results.File, time.Time)

// downloadOrUpload implements both download and upload. The writer argument
// is the HTTP response writer. The request argument is the HTTP request
// that we received. The kind argument must be spec.SubtestDownload or
// spec.SubtestUpload. The tester is a function actually implementing the
// requested ndt7 subtest.
// that we received. The kind argument must be spec.SubtestDownload,
// spec.SubtestUpload, or SubtestPing. The tester is a function actually
// implementing the requested ndt7 subtest.
func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Request, kind spec.SubtestKind, tester testerFunc) {
logging.Logger.Debug("downloadOrUpload: upgrading to WebSockets")
if request.Header.Get("Sec-WebSocket-Protocol") != spec.SecWebSocketProtocol {
Expand Down Expand Up @@ -106,6 +107,8 @@ func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Requ
result.Download = resultfp.Data
} else if kind == spec.SubtestUpload {
result.Upload = resultfp.Data
} else if kind == spec.SubtestPing {
result.Ping = resultfp.Data
} else {
logging.Logger.Warn(string(kind) + ": data not saved")
}
Expand All @@ -114,7 +117,7 @@ func (h Handler) downloadOrUpload(writer http.ResponseWriter, request *http.Requ
}
warnonerror.Close(resultfp, string(kind)+": ignoring resultfp.Close error")
}()
tester(request.Context(), conn, resultfp)
tester(request.Context(), conn, resultfp, result.StartTime)
}

// Download handles the download subtest.
Expand All @@ -126,3 +129,8 @@ func (h Handler) Download(writer http.ResponseWriter, request *http.Request) {
func (h Handler) Upload(writer http.ResponseWriter, request *http.Request) {
h.downloadOrUpload(writer, request, spec.SubtestUpload, upload.Do)
}

// Ping handles the ping subtest.
func (h Handler) Ping(writer http.ResponseWriter, request *http.Request) {
h.downloadOrUpload(writer, request, spec.SubtestPing, upload.Do)
}
7 changes: 3 additions & 4 deletions ndt7/measurer/measurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func measure(measurement *model.Measurement, sockfp *os.File, elapsed time.Durat
}
}

func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- model.Measurement) {
func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- model.Measurement, start time.Time) {
logging.Logger.Debug("measurer: start")
defer logging.Logger.Debug("measurer: stop")
defer close(dst)
Expand All @@ -66,7 +66,6 @@ func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- mod
return
}
defer sockfp.Close()
start := time.Now()
connectionInfo := &model.ConnectionInfo{
Client: conn.RemoteAddr().String(),
Server: conn.LocalAddr().String(),
Expand Down Expand Up @@ -104,9 +103,9 @@ func loop(ctx context.Context, conn *websocket.Conn, UUID string, dst chan<- mod
// a timeout of DefaultRuntime seconds, provided that the consumer
// continues reading from the returned channel.
func Start(
ctx context.Context, conn *websocket.Conn, UUID string,
ctx context.Context, conn *websocket.Conn, UUID string, start time.Time,
) <-chan model.Measurement {
dst := make(chan model.Measurement)
go loop(ctx, conn, UUID, dst)
go loop(ctx, conn, UUID, dst, start)
return dst
}
1 change: 1 addition & 0 deletions ndt7/model/measurement.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ type Measurement struct {
ConnectionInfo *ConnectionInfo `json:",omitempty" bigquery:"-"`
BBRInfo *BBRInfo `json:",omitempty"`
TCPInfo *TCPInfo `json:",omitempty"`
WSInfo *WSInfo `json:",omitempty"`
}
10 changes: 10 additions & 0 deletions ndt7/model/wsinfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package model

// WSInfo contains an application level (websocket) ping measurement data.
// It may be melded into AppInfo.
// FIXME: describe this structure is in the ndt7 specification.
type WSInfo struct {
ElapsedTime int64
LastRTT int64 // TCPInfo.RTT is smoothed RTT, LastRTT is just a sample.
MinRTT int64
}
21 changes: 12 additions & 9 deletions ndt7/ping/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,33 @@ package ping

import (
"encoding/json"
"errors"
"time"

"github.com/gorilla/websocket"
)

// SendTicks sends the current ticks as a ping message.
func SendTicks(conn *websocket.Conn, deadline time.Time) error {
// TODO(bassosimone): when we'll have a unique base time.Time reference for
// the whole test, we should use that, since UnixNano() is not monotonic.
ticks := int64(time.Now().UnixNano())
func SendTicks(conn *websocket.Conn, start time.Time, deadline time.Time) error {
var ticks int64 = time.Since(start).Nanoseconds()
data, err := json.Marshal(ticks)
if err == nil {
err = conn.WriteControl(websocket.PingMessage, data, deadline)
}
return err
}

func ParseTicks(s string) (d int64, err error) {
// TODO(bassosimone): when we'll have a unique base time.Time reference for
// the whole test, we should use that, since UnixNano() is not monotonic.
func ParseTicks(s string, start time.Time) (elapsed time.Duration, d time.Duration, err error) {
elapsed = time.Since(start)
var prev int64
err = json.Unmarshal([]byte(s), &prev)
if err == nil {
d = (int64(time.Now().UnixNano()) - prev)
if err != nil {
return
}
if 0 <= prev && prev <= elapsed.Nanoseconds() {
d = time.Duration(elapsed.Nanoseconds() - prev)
} else {
err = errors.New("RTT is negative")
}
return
}
Loading