Skip to content

Commit eeaf262

Browse files
authored
Send analytics logs to Kafka (#1157)
1 parent b91de3a commit eeaf262

File tree

11 files changed

+416
-365
lines changed

11 files changed

+416
-365
lines changed

api/http.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ import (
55
"net/http"
66
"time"
77

8+
"github.com/golang/glog"
89
"github.com/julienschmidt/httprouter"
910
"github.com/livepeer/catalyst-api/balancer"
1011
"github.com/livepeer/catalyst-api/cluster"
1112
"github.com/livepeer/catalyst-api/config"
1213
"github.com/livepeer/catalyst-api/handlers"
14+
"github.com/livepeer/catalyst-api/handlers/analytics"
1315
"github.com/livepeer/catalyst-api/handlers/geolocation"
1416
"github.com/livepeer/catalyst-api/log"
1517
mistapiconnector "github.com/livepeer/catalyst-api/mapic"
@@ -66,9 +68,14 @@ func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal b
6668

6769
router.GET("/ok", withLogging(catalystApiHandlers.Ok()))
6870
if cli.EnableAnalytics == "true" || cli.EnableAnalytics == "enabled" {
69-
analyticsApiHandlers := handlers.NewAnalyticsHandlersCollection(mapic, lapi, cli.AnalyticsMetricsURL, cli.NodeName)
70-
router.POST("/analytics/log", withLogging(withCORS(analyticsApiHandlers.Log())))
71-
router.GET("/analytics/log", withLogging(withCORS(geoHandlers.RedirectHandler())))
71+
logProcessor, err := analytics.NewLogProcessor(cli.KafkaBootstrapServers, cli.KafkaUser, cli.KafkaPassword, cli.AnalyticsKafkaTopic)
72+
if err != nil {
73+
glog.Fatalf("failed to configure analytics log processor, err=%v", err)
74+
} else {
75+
analyticsApiHandlers := handlers.NewAnalyticsHandlersCollection(mapic, lapi, logProcessor)
76+
router.POST("/analytics/log", withCORS(analyticsApiHandlers.Log()))
77+
router.GET("/analytics/log", withLogging(withCORS(geoHandlers.RedirectHandler())))
78+
}
7279
}
7380

7481
// Playback endpoint

config/cli.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,10 @@ type Cli struct {
6565
MaxBitrateFactor float64
6666
BlockedJWTs []string
6767
EnableAnalytics string
68-
AnalyticsMetricsURL string
68+
KafkaBootstrapServers string
69+
KafkaUser string
70+
KafkaPassword string
71+
AnalyticsKafkaTopic string
6972

7073
// mapping playbackId to value between 0.0 to 100.0
7174
CdnRedirectPlaybackPct map[string]float64

go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ require (
1717
github.com/hashicorp/serf v0.10.1
1818
github.com/julienschmidt/httprouter v1.3.0
1919
github.com/lib/pq v1.10.9
20-
github.com/livepeer/go-api-client v0.4.15
20+
github.com/livepeer/go-api-client v0.4.18
2121
github.com/livepeer/go-tools v0.3.6
2222
github.com/livepeer/joy4 v0.1.1
2323
github.com/livepeer/livepeer-data v0.8.1
@@ -28,6 +28,7 @@ require (
2828
github.com/peterbourgon/ff/v3 v3.4.0
2929
github.com/pquerna/cachecontrol v0.2.0
3030
github.com/prometheus/client_golang v1.18.0
31+
github.com/segmentio/kafka-go v0.4.47
3132
github.com/shirou/gopsutil/v3 v3.24.2
3233
github.com/stretchr/testify v1.8.4
3334
github.com/u2takey/ffmpeg-go v0.5.0
@@ -122,6 +123,7 @@ require (
122123
github.com/opentracing/opentracing-go v1.2.0 // indirect
123124
github.com/philhofer/fwd v1.1.2-0.20210722190033-5c56ac6d0bb9 // indirect
124125
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
126+
github.com/pierrec/lz4/v4 v4.1.15 // indirect
125127
github.com/pkg/errors v0.9.1 // indirect
126128
github.com/polydawn/refmt v0.89.0 // indirect
127129
github.com/power-devops/perfstat v0.0.0-20220216144756-c35f1ee13d7c // indirect

go.sum

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,7 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW
405405
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
406406
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
407407
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
408+
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
408409
github.com/klauspost/compress v1.16.3 h1:XuJt9zzcnaz6a16/OU53ZjWp/v7/42WcR5t2a0PcNQY=
409410
github.com/klauspost/compress v1.16.3/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
410411
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
@@ -446,8 +447,10 @@ github.com/libp2p/go-netroute v0.2.0 h1:0FpsbsvuSnAhXFnCY0VLFbJOzaK0VnP0r1QT/o4n
446447
github.com/libp2p/go-netroute v0.2.0/go.mod h1:Vio7LTzZ+6hoT4CMZi5/6CpY3Snzh2vgZhWgxMNwlQI=
447448
github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+Ooo=
448449
github.com/libp2p/go-openssl v0.1.0/go.mod h1:OiOxwPpL3n4xlenjx2h7AwSGaFSC/KZvf6gNdOBQMtc=
449-
github.com/livepeer/go-api-client v0.4.15 h1:M9QlqvGOzhibdejugTYZ3pLABYV4byaWafSO2eBArOc=
450-
github.com/livepeer/go-api-client v0.4.15/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw=
450+
github.com/livepeer/go-api-client v0.4.18-0.20240305122931-8f6d8c6543ad h1:eSqYAvAqyt857xBuNEta/FOEb8EtBFbuTW2KnJQzlv0=
451+
github.com/livepeer/go-api-client v0.4.18-0.20240305122931-8f6d8c6543ad/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw=
452+
github.com/livepeer/go-api-client v0.4.18 h1:grDZK6oMBm/6N9ZqsAk4ae2ohGDVzRPd2U12DrTRv2s=
453+
github.com/livepeer/go-api-client v0.4.18/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw=
451454
github.com/livepeer/go-tools v0.3.6 h1:LhRnoVVGFCtfBh6WyKdwJ2bPD/h5gaRvsAszmCqKt1Q=
452455
github.com/livepeer/go-tools v0.3.6/go.mod h1:qs31y68b3PQPmSr8nR8l5WQiIWI623z6pqOccqebjos=
453456
github.com/livepeer/joy4 v0.1.1 h1:Tz7gVcmvpG/nfUKHU+XJn6Qke/k32mTWMiH9qB0bhnM=
@@ -567,6 +570,8 @@ github.com/philhofer/fwd v1.1.2-0.20210722190033-5c56ac6d0bb9 h1:6ob53CVz+ja2i7e
567570
github.com/philhofer/fwd v1.1.2-0.20210722190033-5c56ac6d0bb9/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
568571
github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
569572
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
573+
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
574+
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
570575
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
571576
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
572577
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -635,6 +640,8 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUt
635640
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
636641
github.com/secure-io/sio-go v0.3.1 h1:dNvY9awjabXTYGsTF1PiCySl9Ltofk9GA3VdWlo7rRc=
637642
github.com/secure-io/sio-go v0.3.1/go.mod h1:+xbkjDzPjwh4Axd07pRKSNriS9SCiYksWnZqdnfpQxs=
643+
github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0=
644+
github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
638645
github.com/sethgrid/pester v0.0.0-20190127155807-68a33a018ad0/go.mod h1:Ad7IjTpvzZO8Fl0vh9AzQ+j/jYZfyp2diGwI8m5q+ns=
639646
github.com/shirou/gopsutil/v3 v3.24.2 h1:kcR0erMbLg5/3LcInpw0X/rrPSqq4CDPyI6A6ZRC18Y=
640647
github.com/shirou/gopsutil/v3 v3.24.2/go.mod h1:tSg/594BcA+8UdQU2XcW803GWYgdtauFFPgJCJKZlVk=
@@ -705,6 +712,12 @@ github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvS
705712
github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158/go.mod h1:Xj/M2wWU+QdTdRbu/L/1dIZY8/Wb2K9pAhtroQuxJJI=
706713
github.com/whyrusleeping/cbor-gen v0.0.0-20230418232409-daab9ece03a0 h1:XYEgH2nJgsrcrj32p+SAbx6T3s/6QknOXezXtz7kzbg=
707714
github.com/whyrusleeping/cbor-gen v0.0.0-20230418232409-daab9ece03a0/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ=
715+
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
716+
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
717+
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
718+
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
719+
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
720+
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
708721
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f h1:J9EGpcZtP0E/raorCMxlFGSTBrsSlaDGf3jU/qvAE2c=
709722
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
710723
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0=
@@ -847,6 +860,8 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx
847860
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
848861
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
849862
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
863+
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
864+
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
850865
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
851866
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
852867
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -871,6 +886,7 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ
871886
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
872887
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
873888
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
889+
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
874890
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
875891
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
876892
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -935,14 +951,19 @@ golang.org/x/sys v0.0.0-20220708085239-5a0f0661e09d/go.mod h1:oPkhp1MJrh7nUepCBc
935951
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
936952
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
937953
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
954+
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
938955
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
939956
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
940957
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
958+
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
941959
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
942960
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
943961
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
944962
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
945963
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
964+
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
965+
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
966+
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
946967
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
947968
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
948969
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -951,6 +972,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
951972
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
952973
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
953974
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
975+
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
976+
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
954977
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
955978
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
956979
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=

handlers/analytics.go

Lines changed: 117 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package handlers
22

33
import (
4+
"crypto/sha256"
45
"encoding/json"
56
"fmt"
67
"github.com/golang/glog"
@@ -13,8 +14,10 @@ import (
1314
"github.com/mmcloughlin/geohash"
1415
"github.com/xeipuuv/gojsonschema"
1516
"io"
17+
"net"
1618
"net/http"
1719
"strconv"
20+
"time"
1821
)
1922

2023
const (
@@ -29,39 +32,62 @@ type AnalyticsLog struct {
2932
PageURL string `json:"page_url"`
3033
SourceURL string `json:"source_url"`
3134
Player string `json:"player"`
35+
Version string `json:"version"`
3236
UserAgent string `json:"user_agent"`
3337
UID string `json:"uid"`
3438
Events []AnalyticsLogEvent `json:"events"`
3539
}
3640

3741
type AnalyticsLogEvent struct {
38-
Type string `json:"type"`
39-
Timestamp int64 `json:"timestamp"`
40-
Errors int `json:"errors,omitempty"`
41-
PlaytimeMS int `json:"playtime_ms,omitempty"`
42-
TTFFMS int `json:"ttff_ms,omitempty"`
43-
PreloadTimeMS int `json:"preload_time_ms,omitempty"`
44-
AutoplayStatus string `json:"autoplay_status,omitempty"`
45-
BufferMS int `json:"buffer_ms,omitempty"`
46-
ErrorMessage string `json:"error_message,omitempty"`
42+
// Shared fields by all events
43+
Type string `json:"type"`
44+
Timestamp int64 `json:"timestamp"`
45+
46+
// Heartbeat event
47+
Errors *int `json:"errors"`
48+
AutoplayStatus *string `json:"autoplay_status"`
49+
StalledCount *int `json:"stalled_count"`
50+
WaitingCount *int `json:"waiting_count"`
51+
TimeErroredMS *int `json:"time_errored_ms"`
52+
TimeStalledMS *int `json:"time_stalled_ms"`
53+
TimePlayingMS *int `json:"time_playing_ms"`
54+
TimeWaitingMS *int `json:"time_waiting_ms"`
55+
MountToPlayMS *int `json:"mount_to_play_ms"`
56+
MountToFirstFrameMS *int `json:"mount_to_first_frame_ms"`
57+
PlayToFirstFrameMS *int `json:"play_to_first_frame_ms"`
58+
DurationMS *int `json:"duration_ms"`
59+
OffsetMS *int `json:"offset_ms"`
60+
PlayerHeightPX *int `json:"player_height_px"`
61+
PlayerWidthPX *int `json:"player_width_px"`
62+
VideoHeightPX *int `json:"video_height_px"`
63+
VideoWidthPX *int `json:"video_width_px"`
64+
WindowHeightPX *int `json:"window_height_px"`
65+
WindowWidthPX *int `json:"window_width_px"`
66+
67+
// Error event
68+
ErrorMessage *string `json:"error_message"`
69+
Category *string `json:"category"`
4770
}
4871

4972
type AnalyticsGeo struct {
5073
GeoHash string
74+
Continent string
5175
Country string
76+
CountryCode string
5277
Subdivision string
5378
Timezone string
79+
IP string
5480
}
5581

5682
type AnalyticsHandlersCollection struct {
5783
extFetcher analytics.IExternalDataFetcher
5884
logProcessor analytics.ILogProcessor
5985
}
6086

61-
func NewAnalyticsHandlersCollection(streamCache mistapiconnector.IStreamCache, lapi *api.Client, metricsURL string, host string) AnalyticsHandlersCollection {
87+
func NewAnalyticsHandlersCollection(streamCache mistapiconnector.IStreamCache, lapi *api.Client, lp analytics.ILogProcessor) AnalyticsHandlersCollection {
6288
return AnalyticsHandlersCollection{
6389
extFetcher: analytics.NewExternalDataFetcher(streamCache, lapi),
64-
logProcessor: analytics.NewLogProcessor(metricsURL, host),
90+
logProcessor: lp,
6591
}
6692
}
6793

@@ -119,10 +145,11 @@ func parseAnalyticsLog(r *http.Request, schema *gojsonschema.Schema) (*Analytics
119145
}
120146

121147
func parseAnalyticsGeo(r *http.Request) (AnalyticsGeo, error) {
122-
res := AnalyticsGeo{}
148+
res := AnalyticsGeo{IP: getIP(r)}
123149
var missingHeader []string
124150

125151
res.Country, missingHeader = getOrAddMissing("X-City-Country-Name", r.Header, missingHeader)
152+
res.CountryCode, missingHeader = getOrAddMissing("X-City-Country-Code", r.Header, missingHeader)
126153
res.Subdivision, missingHeader = getOrAddMissing("X-Region-Name", r.Header, missingHeader)
127154
res.Timezone, missingHeader = getOrAddMissing("X-Time-Zone", r.Header, missingHeader)
128155

@@ -146,6 +173,16 @@ func parseAnalyticsGeo(r *http.Request) (AnalyticsGeo, error) {
146173
return res, nil
147174
}
148175

176+
func getIP(r *http.Request) string {
177+
ip := r.RemoteAddr
178+
host, _, err := net.SplitHostPort(ip)
179+
if err != nil {
180+
// If not possible to split, then just use RemoteAddr
181+
return ip
182+
}
183+
return host
184+
}
185+
149186
func getOrAddMissing(key string, headers http.Header, missingHeaders []string) (string, []string) {
150187
if h := headers.Get(key); h != "" {
151188
return h, missingHeaders
@@ -158,23 +195,70 @@ func toAnalyticsData(log *AnalyticsLog, geo AnalyticsGeo, extData analytics.Exte
158195
ua := useragent.Parse(log.UserAgent)
159196
var res []analytics.LogData
160197
for _, e := range log.Events {
161-
if e.Type == "heartbeat" {
162-
res = append(res, analytics.LogData{
163-
SessionID: log.SessionID,
164-
PlaybackID: log.PlaybackID,
165-
Browser: ua.Name,
166-
DeviceType: deviceTypeOf(ua),
167-
Country: geo.Country,
168-
UserID: extData.UserID,
169-
PlaytimeMs: e.PlaytimeMS,
170-
BufferMs: e.BufferMS,
171-
Errors: e.Errors,
172-
})
198+
if !isSupportedEvent(e.Type) {
199+
continue
173200
}
201+
res = append(res, analytics.LogData{
202+
SessionID: log.SessionID,
203+
ServerTimestamp: time.Now().UnixMilli(),
204+
PlaybackID: log.PlaybackID,
205+
ViewerHash: hashViewer(log, geo),
206+
Protocol: log.Protocol,
207+
PageURL: log.PageURL,
208+
SourceURL: log.SourceURL,
209+
Player: log.Player,
210+
Version: log.Version,
211+
UserID: extData.UserID,
212+
DStorageURL: extData.DStorageURL,
213+
Source: extData.SourceType,
214+
CreatorID: extData.CreatorID,
215+
DeviceType: deviceTypeOf(ua),
216+
DeviceModel: ua.Device,
217+
Browser: ua.Name,
218+
OS: ua.OS,
219+
PlaybackGeoHash: geo.GeoHash,
220+
PlaybackContinentName: geo.Continent,
221+
PlaybackCountryCode: geo.CountryCode,
222+
PlaybackCountryName: geo.Country,
223+
PlaybackSubdivision: geo.Subdivision,
224+
EventType: e.Type,
225+
EventTimestamp: e.Timestamp,
226+
EventData: analytics.LogDataEvent{
227+
Errors: e.Errors,
228+
AutoplayStatus: e.AutoplayStatus,
229+
StalledCount: e.StalledCount,
230+
WaitingCount: e.WaitingCount,
231+
TimeErroredMS: e.TimeErroredMS,
232+
TimeStalledMS: e.TimeStalledMS,
233+
TimePlayingMS: e.TimePlayingMS,
234+
TimeWaitingMS: e.TimeWaitingMS,
235+
MountToPlayMS: e.MountToPlayMS,
236+
MountToFirstFrameMS: e.MountToFirstFrameMS,
237+
PlayToFirstFrameMS: e.PlayToFirstFrameMS,
238+
DurationMS: e.DurationMS,
239+
OffsetMS: e.OffsetMS,
240+
PlayerHeightPX: e.PlayerHeightPX,
241+
PlayerWidthPX: e.PlayerWidthPX,
242+
VideoHeightPX: e.VideoHeightPX,
243+
VideoWidthPX: e.VideoWidthPX,
244+
WindowHeightPX: e.WindowHeightPX,
245+
WindowWidthPX: e.WindowWidthPX,
246+
247+
ErrorMessage: e.ErrorMessage,
248+
Category: e.Category,
249+
},
250+
})
174251
}
175252
return res
176253
}
177254

255+
func isSupportedEvent(eventType string) bool {
256+
if eventType == "heartbeat" || eventType == "error" {
257+
return true
258+
}
259+
return false
260+
}
261+
178262
func deviceTypeOf(ua useragent.UserAgent) string {
179263
if ua.Mobile {
180264
return "mobile"
@@ -185,3 +269,12 @@ func deviceTypeOf(ua useragent.UserAgent) string {
185269
}
186270
return "unknown"
187271
}
272+
273+
func hashViewer(log *AnalyticsLog, geo AnalyticsGeo) string {
274+
if log.UID != "" {
275+
// If user defined the unique viewer ID, then we just use it
276+
return log.UID
277+
}
278+
// If user didn't define the unique viewer ID, then we hash IP and user agent data
279+
return fmt.Sprintf("%x", sha256.Sum256([]byte(log.UserAgent+geo.IP)))
280+
}

0 commit comments

Comments
 (0)