Skip to content

Commit 8b4b499

Browse files
committed
IGP getConfig3 support
1 parent 07cb650 commit 8b4b499

File tree

1 file changed

+67
-3
lines changed

1 file changed

+67
-3
lines changed

cmd/statshouse-igp/ingress_proxy.go

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@ import (
1818
"os"
1919
"path/filepath"
2020
"runtime"
21+
"slices"
2122
"strings"
2223
"sync"
2324
"time"
2425

2526
"github.com/vkcom/statshouse/internal/agent"
27+
"github.com/vkcom/statshouse/internal/data_model"
2628
"github.com/vkcom/statshouse/internal/data_model/gen2/constants"
2729
"github.com/vkcom/statshouse/internal/data_model/gen2/tlstatshouse"
2830
"github.com/vkcom/statshouse/internal/format"
@@ -137,6 +139,7 @@ func RunIngressProxy(ctx context.Context, config ConfigIngressProxy, aesPwd stri
137139
startTime: uint32(time.Now().Unix()),
138140
firstClientConn: make(map[string]bool),
139141
}
142+
restart := make(chan int)
140143
if config.UpstreamAddr != "" {
141144
addresses := strings.Split(config.UpstreamAddr, ",")
142145
p.agent = &agent.Agent{GetConfigResult: tlstatshouse.GetConfigResult3{
@@ -183,6 +186,10 @@ func RunIngressProxy(ctx context.Context, config ConfigIngressProxy, aesPwd stri
183186
if err != nil {
184187
log.Fatalf("error creating agent: %v", err)
185188
}
189+
go func() {
190+
p.agent.GoGetConfig() // if terminates, proxy must restart
191+
close(restart)
192+
}()
186193
p.agent.Run(0, 0, 0)
187194
}
188195
p.uniqueStartTime.Store(p.startTime)
@@ -223,7 +230,10 @@ func RunIngressProxy(ctx context.Context, config ConfigIngressProxy, aesPwd stri
223230
log.Printf("Running ingress proxy v2, PID %d\n", os.Getpid())
224231
tcp4.run()
225232
tcp6.run()
226-
<-ctx.Done()
233+
select {
234+
case <-ctx.Done():
235+
case <-restart:
236+
}
227237
shutdown()
228238
p.group.Wait()
229239
return nil
@@ -233,12 +243,10 @@ func (p *ingressProxy) newProxyServer(network string) proxyServer {
233243
return proxyServer{
234244
ingressProxy: p,
235245
config2: tlstatshouse.GetConfigResult{
236-
Addresses: make([]string, 0, len(p.agent.GetConfigResult.Addresses)),
237246
MaxAddressesCount: int32(len(p.agent.GetConfigResult.Addresses)),
238247
PreviousAddresses: int32(len(p.agent.GetConfigResult.Addresses)),
239248
},
240249
config3: tlstatshouse.GetConfigResult3{
241-
Addresses: make([]string, 0, len(p.agent.GetConfigResult.Addresses)),
242250
ShardByMetricCount: p.agent.GetConfigResult.ShardByMetricCount,
243251
},
244252
network: network,
@@ -420,6 +428,13 @@ func (p *proxyConn) run() {
420428
return // server shutdown
421429
}
422430
if firstReq.tip == rpcInvokeReqHeaderTLTag {
431+
if firstReq.RequestTag() == constants.StatshouseGetConfig3 {
432+
// GetConfig3 does not send shardReplica
433+
if res := firstReq.process(p); res.Error() != nil {
434+
return // failed serve GetConfig3 request
435+
}
436+
continue
437+
}
423438
break
424439
}
425440
log.Printf("Client skip #%d looking for invoke request, addr %v\n", firstReq.tip, p.clientConn.RemoteAddr())
@@ -532,9 +547,12 @@ func (p *proxyConn) readRequest() (req proxyRequest, err error) {
532547
}
533548
switch req.RequestTag() {
534549
case constants.StatshouseGetConfig2,
550+
constants.StatshouseGetConfig3,
535551
constants.StatshouseGetTagMapping2,
536552
constants.StatshouseSendKeepAlive2,
553+
constants.StatshouseSendKeepAlive3,
537554
constants.StatshouseSendSourceBucket2,
555+
constants.StatshouseSendSourceBucket3,
538556
constants.StatshouseTestConnection2,
539557
constants.StatshouseGetTargets2,
540558
constants.StatshouseGetTagMappingBootstrap,
@@ -609,19 +627,50 @@ func (req *proxyRequest) process(p *proxyConn) (res rpc.ForwardPacketsResult) {
609627
case rpcInvokeReqHeaderTLTag:
610628
switch req.RequestTag() {
611629
case constants.StatshouseGetConfig2:
630+
var autoConfigStatus int32
612631
var args tlstatshouse.GetConfig2
613632
if _, err = args.ReadBoxed(req.Request); err == nil {
614633
if args.Cluster != p.cluster {
615634
err = fmt.Errorf("statshouse misconfiguration! cluster requested %q does not match actual cluster connected %q", args.Cluster, p.cluster)
616635
p.logClientError("GetConfig2", err, rpc.PacketHeaderCircularBuffer{})
636+
autoConfigStatus = format.TagValueIDAutoConfigWrongCluster
617637
} else {
618638
req.Response, _ = args.WriteResult(req.Response[:0], p.config2)
639+
autoConfigStatus = format.TagValueIDAutoConfigOK
619640
}
641+
p.sendAutoConfigStatus(&args.Header, autoConfigStatus)
620642
}
621643
if err = req.WriteReponseAndFlush(p.clientConn, err); err != nil {
622644
p.logClientError("write", err, rpc.PacketHeaderCircularBuffer{})
623645
// not an error ("requestLoop" exits on request read-write errors only)
624646
}
647+
case constants.StatshouseGetConfig3:
648+
var autoConfigStatus int32
649+
var args tlstatshouse.GetConfig3
650+
if _, err = args.ReadBoxed(req.Request); err == nil {
651+
if args.Cluster != p.cluster {
652+
err = fmt.Errorf("statshouse misconfiguration! cluster requested %q does not match actual cluster connected %q", args.Cluster, p.cluster)
653+
p.logClientError("GetConfig3", err, rpc.PacketHeaderCircularBuffer{})
654+
autoConfigStatus = format.TagValueIDAutoConfigWrongCluster
655+
} else {
656+
equalConfig := args.IsSetPreviousConfig() &&
657+
slices.Equal(p.config3.Addresses, args.PreviousConfig.Addresses) &&
658+
p.config3.ShardByMetricCount == args.PreviousConfig.ShardByMetricCount
659+
if equalConfig {
660+
autoConfigStatus = format.TagValueIDAutoConfigErrorKeepAlive
661+
} else {
662+
req.Response, _ = args.WriteResult(req.Response[:0], p.config3)
663+
autoConfigStatus = format.TagValueIDAutoConfigOK
664+
}
665+
}
666+
p.sendAutoConfigStatus(&args.Header, autoConfigStatus)
667+
}
668+
if autoConfigStatus == format.TagValueIDAutoConfigOK || err != nil {
669+
if err = req.WriteReponseAndFlush(p.clientConn, err); err != nil {
670+
p.logClientError("write", err, rpc.PacketHeaderCircularBuffer{})
671+
// not an error ("requestLoop" exits on request read-write errors only)
672+
}
673+
}
625674
default:
626675
req.setIngressProxy(p)
627676
if err = req.forwardAndFlush(p); err != nil {
@@ -652,6 +701,21 @@ func (req *proxyRequest) forwardAndFlush(p *proxyConn) error {
652701
return nil
653702
}
654703

704+
func (p *ingressProxy) sendAutoConfigStatus(h *tlstatshouse.CommonProxyHeader, status int32) {
705+
p.agent.AddCounterHostAERA(
706+
uint32(time.Now().Unix()),
707+
format.BuiltinMetricMetaAutoConfig,
708+
[]int32{0, 0, 0, 0, status},
709+
1, // count
710+
data_model.TagUnionBytes{I: p.hostnameID.Load()},
711+
format.AgentEnvRouteArch{
712+
AgentEnv: format.TagValueIDProduction,
713+
Route: format.TagValueIDRouteIngressProxy,
714+
BuildArch: format.FilterBuildArch(h.BuildArch),
715+
})
716+
717+
}
718+
655719
func (req *proxyRequest) setIngressProxy(p *proxyConn) {
656720
if len(req.Request) < 32 {
657721
return // test environment

0 commit comments

Comments
 (0)