-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtimesync.go
More file actions
86 lines (70 loc) · 2.12 KB
/
timesync.go
File metadata and controls
86 lines (70 loc) · 2.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package libphonelabgo
import (
phonelab "github.com/shaseley/phonelab-go"
)
// There can be clock skew between the jiffy-based tracetime monotonic clock and
// systemTime() which uses the POSIX monotonic clock. This processor emits
// offests to enable tighter time sync.
type TimeSyncPreprocessor struct {
Source phonelab.Processor
}
const (
msPerSec = int64(1 * 1000)
usPerSec = int64(1 * 1000 * 1000)
nsPerSec = int64(1 * 1000 * 1000 * 1000)
nsPerMs = int64(1 * 1000 * 1000)
msPerSecF = float64(msPerSec)
usPerSecF = float64(usPerSec)
nsPerSecF = float64(nsPerSec)
nsPerMsF = float64(nsPerMs)
)
type TimeSyncMsg struct {
OffsetNs int64
TraceTimeNs int64
SysTimeNs int64
}
func adjustTimestamp(ts, offset, unitsPerSec int64) float64 {
ts += offset
secs := ts / unitsPerSec
rem := ts - (secs * unitsPerSec)
return float64(secs) + (float64(rem) / float64(unitsPerSec))
}
func adjustTimestampMsToS(ts, offset int64) float64 {
return adjustTimestamp(ts, offset, msPerSec)
}
func (p *TimeSyncPreprocessor) Process() <-chan interface{} {
outChan := make(chan interface{})
go func() {
inChan := p.Source.Process()
// Clock skew between different monotonic clocks.
// Add this to diff timestamps to get trace timestamp.
curOffset := int64(0)
for iLog := range inChan {
if ll, ok := iLog.(*phonelab.Logline); ok && ll != nil {
if fpsLog, ok := ll.Payload.(*SFFpsLog); ok {
// Update current time offset
traceTsNanos := int64(ll.TraceTime * nsPerSecF)
if newOffset := traceTsNanos - fpsLog.SysTimestamp; newOffset != curOffset {
traceTsNanos := int64(ll.TraceTime * nsPerSecF)
curOffset = newOffset
outChan <- &TimeSyncMsg{
OffsetNs: curOffset,
TraceTimeNs: traceTsNanos,
SysTimeNs: fpsLog.SysTimestamp,
}
}
}
outChan <- iLog
}
}
close(outChan)
}()
return outChan
}
type TimeSyncPreprocessorGenerator struct{}
func (g *TimeSyncPreprocessorGenerator) GenerateProcessor(source *phonelab.PipelineSourceInstance,
kwargs map[string]interface{}) phonelab.Processor {
return &TimeSyncPreprocessor{
Source: source.Processor,
}
}