Skip to content

Commit 0f8b6d0

Browse files
authored
feat: implement SSE for real-time UI and ofrep evaluation updates (#5617)
* feat: implement SSE for real-time evaluation updates - Add event-stream content type marshaler for SSE support - Add streaming API endpoint in gateway - Add frontend hooks for subscribing to changes stream - Update flags and segments API to support streaming Signed-off-by: Roman Dmytrenko <rdmytrenko@gmail.com> * support event streams in ofrep (ADR-0008) and ui preferences Signed-off-by: Roman Dmytrenko <rdmytrenko@gmail.com> * addreess PR feedback Signed-off-by: Roman Dmytrenko <rdmytrenko@gmail.com> --------- Signed-off-by: Roman Dmytrenko <rdmytrenko@gmail.com>
1 parent 047693e commit 0f8b6d0

File tree

30 files changed

+864
-233
lines changed

30 files changed

+864
-233
lines changed

go.mod

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ require (
9898
golang.org/x/sync v0.20.0
9999
golang.org/x/term v0.41.0
100100
google.golang.org/api v0.272.0
101-
google.golang.org/genproto/googleapis/api v0.0.0-20260226221140-a57be14db171
101+
google.golang.org/genproto/googleapis/api v0.0.0-20260319201613-d00831a3d3e7
102+
google.golang.org/genproto/googleapis/rpc v0.0.0-20260319201613-d00831a3d3e7
102103
google.golang.org/grpc v1.79.3
103104
google.golang.org/protobuf v1.36.11
104105
gopkg.in/segmentio/analytics-go.v3 v3.1.0
@@ -282,7 +283,7 @@ require (
282283
github.com/prometheus/otlptranslator v1.0.0 // indirect
283284
github.com/prometheus/procfs v0.20.1 // indirect
284285
github.com/protocolbuffers/txtpbfmt v0.0.0-20260217160748-a481f6a22f94 // indirect
285-
github.com/rakyll/gotest v0.0.6 // indirect
286+
github.com/rakyll/gotest v0.0.7 // indirect
286287
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect
287288
github.com/redis/go-redis/extra/rediscmd/v9 v9.18.0 // indirect
288289
github.com/rivo/uniseg v0.4.7 // indirect
@@ -335,16 +336,15 @@ require (
335336
go.uber.org/multierr v1.11.0 // indirect
336337
go.yaml.in/yaml/v2 v2.4.3 // indirect
337338
go.yaml.in/yaml/v3 v3.0.4 // indirect
338-
golang.org/x/mod v0.33.0 // indirect
339+
golang.org/x/mod v0.34.0 // indirect
339340
golang.org/x/sys v0.42.0 // indirect
340-
golang.org/x/telemetry v0.0.0-20260209163413-e7419c687ee4 // indirect
341+
golang.org/x/telemetry v0.0.0-20260316223853-b6b0c46d1ccd // indirect
341342
golang.org/x/text v0.35.0 // indirect
342343
golang.org/x/time v0.15.0 // indirect
343-
golang.org/x/tools v0.42.0 // indirect
344+
golang.org/x/tools v0.43.0 // indirect
344345
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
345346
google.golang.org/genproto v0.0.0-20260217215200-42d3e9bedb6d // indirect
346-
google.golang.org/genproto/googleapis/rpc v0.0.0-20260311181403-84a4fc48630c // indirect
347-
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1 // indirect
347+
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.6.1 // indirect
348348
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 // indirect
349349
sigs.k8s.io/yaml v1.6.0 // indirect
350350
)

go.sum

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,6 @@ github.com/envoyproxy/protoc-gen-validate v1.3.0 h1:TvGH1wof4H33rezVKWSpqKz5NXWg
282282
github.com/envoyproxy/protoc-gen-validate v1.3.0/go.mod h1:HvYl7zwPa5mffgyeTUHA9zHIH36nmrm7oCbo4YKoSWA=
283283
github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f h1:Y/CXytFA4m6baUTXGLOoWe4PQhGxaX0KpnayAqC48p4=
284284
github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f/go.mod h1:vw97MGsxSvLiUE2X8qFplwetxpGLQrlU1Q9AUEIzCaM=
285-
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
286285
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
287286
github.com/fatih/color v1.19.0 h1:Zp3PiM21/9Ld6FzSKyL5c/BULoe/ONr9KlbYVOfG8+w=
288287
github.com/fatih/color v1.19.0/go.mod h1:zNk67I0ZUT1bEGsSGyCZYZNrHuTkJJB+r6Q9VuMi0LE=
@@ -522,13 +521,11 @@ github.com/lufia/plan9stats v0.0.0-20240513124658-fba389f38bae/go.mod h1:ilwx/Dt
522521
github.com/magiconair/properties v1.8.10 h1:s31yESBquKXCV9a/ScB3ESkOjUYYv+X0rg8SYxI99mE=
523522
github.com/magiconair/properties v1.8.10/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
524523
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
525-
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
526524
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
527525
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
528526
github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE=
529527
github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8=
530528
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
531-
github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
532529
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
533530
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
534531
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
@@ -644,8 +641,8 @@ github.com/prometheus/sigv4 v0.4.1 h1:EIc3j+8NBea9u1iV6O5ZAN8uvPq2xOIUPcqCTivHuX
644641
github.com/prometheus/sigv4 v0.4.1/go.mod h1:eu+ZbRvsc5TPiHwqh77OWuCnWK73IdkETYY46P4dXOU=
645642
github.com/protocolbuffers/txtpbfmt v0.0.0-20260217160748-a481f6a22f94 h1:2PC6Ql3jipz1KvBlqUHjjk6v4aMwE86mfDu1XMH0LR8=
646643
github.com/protocolbuffers/txtpbfmt v0.0.0-20260217160748-a481f6a22f94/go.mod h1:JSbkp0BviKovYYt9XunS95M3mLPibE9bGg+Y95DsEEY=
647-
github.com/rakyll/gotest v0.0.6 h1:hBTqkO3jiuwYW/M9gL4bu0oTYcm8J6knQAAPUsJsz1I=
648-
github.com/rakyll/gotest v0.0.6/go.mod h1:SkoesdNCWmiD4R2dljIUcfSnNdVZ12y8qK4ojDkc2Sc=
644+
github.com/rakyll/gotest v0.0.7 h1:CL4D+fVEL0cUS5ys1cjrd+pN7sb8s1uf2LUy3UqyhAo=
645+
github.com/rakyll/gotest v0.0.7/go.mod h1:F/7ufCiqpm6I79Epl+SQ7tc03zSdgcf7yZsGyBH60+Q=
649646
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg=
650647
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
651648
github.com/redis/go-redis/extra/rediscmd/v9 v9.18.0 h1:QY4nmPHLFAJjtT5O4OMUEOxP8WVaRNOFpcbmxT2NLZU=
@@ -859,8 +856,8 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
859856
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
860857
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
861858
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
862-
golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8=
863-
golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w=
859+
golang.org/x/mod v0.34.0 h1:xIHgNUUnW6sYkcM5Jleh05DvLOtwc6RitGHbDk4akRI=
860+
golang.org/x/mod v0.34.0/go.mod h1:ykgH52iCZe79kzLLMhyCUzhMci+nQj+0XkbXpNYtVjY=
864861
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
865862
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
866863
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -902,7 +899,6 @@ golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5h
902899
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
903900
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
904901
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
905-
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
906902
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
907903
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
908904
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -928,8 +924,8 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
928924
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
929925
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
930926
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
931-
golang.org/x/telemetry v0.0.0-20260209163413-e7419c687ee4 h1:bTLqdHv7xrGlFbvf5/TXNxy/iUwwdkjhqQTJDjW7aj0=
932-
golang.org/x/telemetry v0.0.0-20260209163413-e7419c687ee4/go.mod h1:g5NllXBEermZrmR51cJDQxmJUHUOfRAaNyWBM+R+548=
927+
golang.org/x/telemetry v0.0.0-20260316223853-b6b0c46d1ccd h1:QbR6Giw8AyR6v6Vff72jiZRUdZnetfgYRndQuKa806k=
928+
golang.org/x/telemetry v0.0.0-20260316223853-b6b0c46d1ccd/go.mod h1:TpUTTEp9frx7rTdLpC9gFG9kdI7zVLFTFFlqaH2Cncw=
933929
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
934930
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
935931
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
@@ -961,8 +957,8 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY
961957
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
962958
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
963959
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
964-
golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k=
965-
golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0=
960+
golang.org/x/tools v0.43.0 h1:12BdW9CeB3Z+J/I/wj34VMl8X+fEXBxVR90JeMX5E7s=
961+
golang.org/x/tools v0.43.0/go.mod h1:uHkMso649BX2cZK6+RpuIPXS3ho2hZo4FVwfoy1vIk0=
966962
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
967963
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
968964
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@@ -981,10 +977,10 @@ google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfG
981977
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
982978
google.golang.org/genproto v0.0.0-20260217215200-42d3e9bedb6d h1:vsOm753cOAMkt76efriTCDKjpCbK18XGHMJHo0JUKhc=
983979
google.golang.org/genproto v0.0.0-20260217215200-42d3e9bedb6d/go.mod h1:0oz9d7g9QLSdv9/lgbIjowW1JoxMbxmBVNe8i6tORJI=
984-
google.golang.org/genproto/googleapis/api v0.0.0-20260226221140-a57be14db171 h1:tu/dtnW1o3wfaxCOjSLn5IRX4YDcJrtlpzYkhHhGaC4=
985-
google.golang.org/genproto/googleapis/api v0.0.0-20260226221140-a57be14db171/go.mod h1:M5krXqk4GhBKvB596udGL3UyjL4I1+cTbK0orROM9ng=
986-
google.golang.org/genproto/googleapis/rpc v0.0.0-20260311181403-84a4fc48630c h1:xgCzyF2LFIO/0X2UAoVRiXKU5Xg6VjToG4i2/ecSswk=
987-
google.golang.org/genproto/googleapis/rpc v0.0.0-20260311181403-84a4fc48630c/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
980+
google.golang.org/genproto/googleapis/api v0.0.0-20260319201613-d00831a3d3e7 h1:41r6JMbpzBMen0R/4TZeeAmGXSJC7DftGINUodzTkPI=
981+
google.golang.org/genproto/googleapis/api v0.0.0-20260319201613-d00831a3d3e7/go.mod h1:EIQZ5bFCfRQDV4MhRle7+OgjNtZ6P1PiZBgAKuxXu/Y=
982+
google.golang.org/genproto/googleapis/rpc v0.0.0-20260319201613-d00831a3d3e7 h1:ndE4FoJqsIceKP2oYSnUZqhTdYufCYYkqwtFzfrhI7w=
983+
google.golang.org/genproto/googleapis/rpc v0.0.0-20260319201613-d00831a3d3e7/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
988984
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
989985
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
990986
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
@@ -993,8 +989,8 @@ google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3Iji
993989
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
994990
google.golang.org/grpc v1.79.3 h1:sybAEdRIEtvcD68Gx7dmnwjZKlyfuc61Dyo9pGXXkKE=
995991
google.golang.org/grpc v1.79.3/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ=
996-
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1 h1:F29+wU6Ee6qgu9TddPgooOdaqsxTMunOoj8KA5yuS5A=
997-
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1/go.mod h1:5KF+wpkbTSbGcR9zteSqZV6fqFOWBl4Yde8En8MryZA=
992+
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.6.1 h1:/WILD1UcXj/ujCxgoL/DvRgt2CP3txG8+FwkUbb9110=
993+
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.6.1/go.mod h1:YNKnb2OAApgYn2oYY47Rn7alMr1zWjb2U8Q0aoGWiNc=
998994
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
999995
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
1000996
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=

internal/gateway/gateway.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,21 @@
11
package gateway
22

33
import (
4+
"context"
5+
"errors"
6+
"fmt"
47
"net/http"
58
"slices"
69
"sync"
710

811
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
12+
"go.flipt.io/flipt/rpc/v2/evaluation"
913
"go.uber.org/zap"
14+
"google.golang.org/genproto/googleapis/rpc/status"
15+
"google.golang.org/grpc/codes"
16+
grpcstatus "google.golang.org/grpc/status"
1017
"google.golang.org/protobuf/encoding/protojson"
18+
"google.golang.org/protobuf/proto"
1119
)
1220

1321
// commonMuxOptions are options for gateway mux which are used for multiple instances.
@@ -20,11 +28,20 @@ var (
2028
once sync.Once
2129
)
2230

31+
const eventStreamContentType = "text/event-stream"
32+
2333
// NewGatewayServeMux builds a new gateway serve mux with common options.
2434
func NewGatewayServeMux(logger *zap.Logger, opts ...runtime.ServeMuxOption) *runtime.ServeMux {
2535
once.Do(func() {
2636
commonMuxOptions = []runtime.ServeMuxOption{
2737
runtime.WithMarshalerOption(runtime.MIMEWildcard, NewV1toV2MarshallerAdapter(logger)),
38+
runtime.WithMarshalerOption(eventStreamContentType, &eventSourceMarshaler{JSONPb: runtime.JSONPb{}}),
39+
runtime.WithStreamErrorHandler(func(_ context.Context, err error) *grpcstatus.Status {
40+
if errors.Is(err, context.Canceled) || grpcstatus.Code(err) == codes.Canceled {
41+
return grpcstatus.New(codes.Canceled, "client disconnected")
42+
}
43+
return grpcstatus.Convert(err)
44+
}),
2845
runtime.WithMarshalerOption("application/json+pretty", &runtime.JSONPb{
2946
MarshalOptions: protojson.MarshalOptions{
3047
Indent: " ",
@@ -49,3 +66,69 @@ func NewGatewayServeMux(logger *zap.Logger, opts ...runtime.ServeMuxOption) *run
4966

5067
return runtime.NewServeMux(append(opts, commonMuxOptions...)...)
5168
}
69+
70+
type eventSourceMarshaler struct {
71+
runtime.JSONPb
72+
}
73+
74+
func (m *eventSourceMarshaler) ContentType(_ any) string {
75+
return eventStreamContentType
76+
}
77+
78+
func (m *eventSourceMarshaler) StreamContentType(_ any) string {
79+
return eventStreamContentType
80+
}
81+
82+
func (m *eventSourceMarshaler) Marshal(v any) ([]byte, error) {
83+
switch val := v.(type) {
84+
case map[string]any:
85+
return m.marshalEventSourceMap(val)
86+
case map[string]proto.Message:
87+
converted := make(map[string]any, len(val))
88+
for k, v := range val {
89+
converted[k] = v
90+
}
91+
return m.marshalEventSourceMap(converted)
92+
default:
93+
return nil, fmt.Errorf("unsupported event-stream payload %T", v)
94+
}
95+
}
96+
97+
func (m *eventSourceMarshaler) Unmarshal(_ []byte, _ any) error {
98+
return errors.New("text/event-stream unmarshaler is unsupported")
99+
}
100+
101+
func (m *eventSourceMarshaler) Delimiter() []byte {
102+
return []byte{}
103+
}
104+
105+
func (m *eventSourceMarshaler) marshalEventSourceMap(val map[string]any) ([]byte, error) {
106+
// expected happy path
107+
if vv, ok := val["result"].(*evaluation.EvaluationNamespaceSnapshot); ok {
108+
b, err := m.JSONPb.Marshal(map[string]string{
109+
"type": "refetchEvaluation",
110+
"etag": vv.Digest,
111+
})
112+
if err != nil {
113+
return nil, err
114+
}
115+
return fmt.Appendf(nil, "id: %s\ndata: %s\n\n", vv.Digest[:min(len(vv.Digest), 9)], b), nil
116+
}
117+
118+
if errStatus, ok := val["error"].(*status.Status); ok {
119+
if codes.Code(errStatus.Code) == codes.Canceled {
120+
return nil, nil
121+
}
122+
b, err := m.JSONPb.Marshal(map[string]any{
123+
"type": "error",
124+
"code": errStatus.Code,
125+
"message": errStatus.Message,
126+
})
127+
if err != nil {
128+
return nil, err
129+
}
130+
return fmt.Appendf(nil, "data: %s\n\n", b), nil
131+
}
132+
133+
return nil, fmt.Errorf("unsupported event-stream payload map: %T", val)
134+
}

internal/gateway/gateway_test.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package gateway
2+
3+
import (
4+
"encoding/json"
5+
"testing"
6+
7+
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
8+
"github.com/stretchr/testify/require"
9+
"go.flipt.io/flipt/rpc/v2/evaluation"
10+
"google.golang.org/grpc/codes"
11+
grpcstatus "google.golang.org/grpc/status"
12+
"google.golang.org/protobuf/proto"
13+
)
14+
15+
func TestEventSourceMarshalerTypes(t *testing.T) {
16+
m := &eventSourceMarshaler{JSONPb: runtime.JSONPb{}}
17+
18+
require.Equal(t, eventStreamContentType, m.ContentType(nil))
19+
require.Equal(t, eventStreamContentType, m.StreamContentType(nil))
20+
require.Empty(t, m.Delimiter())
21+
}
22+
23+
func TestEventSourceMarshalerSnapshot(t *testing.T) {
24+
m := &eventSourceMarshaler{JSONPb: runtime.JSONPb{}}
25+
snapshot := &evaluation.EvaluationNamespaceSnapshot{Digest: "etag-123"}
26+
27+
buf, err := m.Marshal(map[string]any{"result": snapshot})
28+
require.NoError(t, err)
29+
30+
payload, ok := extractEventPayload(t, buf)
31+
require.True(t, ok)
32+
33+
var got map[string]string
34+
require.NoError(t, json.Unmarshal(payload, &got))
35+
require.Equal(t, map[string]string{
36+
"type": "refetchEvaluation",
37+
"etag": "etag-123",
38+
}, got)
39+
}
40+
41+
func TestEventSourceMarshalerSnapshotProtoMap(t *testing.T) {
42+
m := &eventSourceMarshaler{JSONPb: runtime.JSONPb{}}
43+
snapshot := &evaluation.EvaluationNamespaceSnapshot{Digest: "etag-456"}
44+
45+
buf, err := m.Marshal(map[string]proto.Message{"result": snapshot})
46+
require.NoError(t, err)
47+
48+
payload, ok := extractEventPayload(t, buf)
49+
require.True(t, ok)
50+
51+
var got map[string]string
52+
require.NoError(t, json.Unmarshal(payload, &got))
53+
require.Equal(t, map[string]string{
54+
"type": "refetchEvaluation",
55+
"etag": "etag-456",
56+
}, got)
57+
}
58+
59+
func TestEventSourceMarshalerError(t *testing.T) {
60+
m := &eventSourceMarshaler{JSONPb: runtime.JSONPb{}}
61+
errStatus := grpcstatus.New(codes.Internal, "boom").Proto()
62+
63+
buf, err := m.Marshal(map[string]any{"error": errStatus})
64+
require.NoError(t, err)
65+
66+
payload, ok := extractEventPayload(t, buf)
67+
require.True(t, ok)
68+
69+
var got map[string]any
70+
require.NoError(t, json.Unmarshal(payload, &got))
71+
72+
require.Equal(t, "error", got["type"])
73+
require.InDelta(t, float64(codes.Internal), got["code"], 0)
74+
require.Equal(t, "boom", got["message"])
75+
}
76+
77+
func TestEventSourceMarshalerCanceledErrorIsIgnored(t *testing.T) {
78+
m := &eventSourceMarshaler{JSONPb: runtime.JSONPb{}}
79+
errStatus := grpcstatus.New(codes.Canceled, "client disconnected").Proto()
80+
81+
buf, err := m.Marshal(map[string]any{"error": errStatus})
82+
require.NoError(t, err)
83+
require.Nil(t, buf)
84+
}
85+
86+
func TestEventSourceMarshalerUnsupported(t *testing.T) {
87+
m := &eventSourceMarshaler{JSONPb: runtime.JSONPb{}}
88+
89+
buf, err := m.Marshal(map[string]any{"result": "nope"})
90+
require.Error(t, err)
91+
require.Nil(t, buf)
92+
}
93+
94+
func extractEventPayload(t *testing.T, buf []byte) ([]byte, bool) {
95+
t.Helper()
96+
97+
// Handle both with and without id field
98+
if len(buf) > 0 && string(buf[:3]) == "id:" {
99+
// Find "data: " after the id line
100+
const dataPrefix = "data: "
101+
idx := findSubstring(buf, dataPrefix)
102+
if idx == -1 {
103+
return nil, false
104+
}
105+
// Find "\n\n" after data
106+
endIdx := findSubstring(buf[idx:], "\n\n")
107+
if endIdx == -1 {
108+
return nil, false
109+
}
110+
return buf[idx+len(dataPrefix) : idx+endIdx], true
111+
}
112+
113+
const prefix = "data: "
114+
const suffix = "\n\n"
115+
116+
if len(buf) < len(prefix)+len(suffix) {
117+
return nil, false
118+
}
119+
120+
if string(buf[:len(prefix)]) != prefix {
121+
return nil, false
122+
}
123+
if string(buf[len(buf)-len(suffix):]) != suffix {
124+
return nil, false
125+
}
126+
127+
return buf[len(prefix) : len(buf)-len(suffix)], true
128+
}
129+
130+
func findSubstring(buf []byte, substr string) int {
131+
for i := 0; i <= len(buf)-len(substr); i++ {
132+
if string(buf[i:i+len(substr)]) == substr {
133+
return i
134+
}
135+
}
136+
return -1
137+
}

0 commit comments

Comments
 (0)