Skip to content

Commit 4c09ad2

Browse files
authored
Bugfix/decrease receive send sizes (#82)
* Add stricter limits for receiving and responding * Add stricter limits for receiving and responding * Make max send and receive message size configurable * Move exploratory integration test into separate directory * Add code to avoid merge conflict
1 parent 98319e0 commit 4c09ad2

File tree

6 files changed

+58
-35
lines changed

6 files changed

+58
-35
lines changed

legacy/app/archive-query-service/main.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ func run() error {
4343
StatusServiceGrpcHost string `conf:"default:127.0.0.1:9901"`
4444
StatusDataCacheTtl time.Duration `conf:"default:1s"` // nolint:revive
4545
EmptyTicksTtl time.Duration `conf:"default:24h"` // nolint:revive
46+
MaxRecvSizeInMb int `conf:"default:1"`
47+
MaxSendSizeInMb int `conf:"default:10"`
4648
}
4749
ElasticSearch struct {
4850
Address []string `conf:"default:https://localhost:9200"`
@@ -128,12 +130,20 @@ func run() error {
128130
go cache.Start()
129131
defer cache.Stop()
130132

133+
startCfg := rpc.StartConfig{
134+
ListenAddrGRPC: cfg.Server.GrpcHost,
135+
ListenAddrHTTP: cfg.Server.HttpHost,
136+
MaxRecvMsgSize: cfg.Server.MaxRecvSizeInMb * 1024 * 1024,
137+
MaxSendMsgSize: cfg.Server.MaxSendSizeInMb * 1024 * 1024,
138+
}
139+
131140
queryService := rpc.NewQueryService(cfg.ElasticSearch.TransactionsIndex, cfg.ElasticSearch.TickDataIndex, cfg.ElasticSearch.ComputorListIndex, elasticClient, cache)
132-
rpcServer := rpc.NewServer(cfg.Server.GrpcHost, cfg.Server.HttpHost, queryService, statusServiceClient)
141+
rpcServer := rpc.NewServer(queryService, statusServiceClient)
133142
tickInBoundsInterceptor := rpc.NewTickWithinBoundsInterceptor(statusServiceClient, cache)
134143
var identitiesValidatorInterceptor rpc.IdentitiesValidatorInterceptor
135144
var logTechnicalErrorInterceptor rpc.LogTechnicalErrorInterceptor
136-
err = rpcServer.Start(srvMetrics.UnaryServerInterceptor(),
145+
err = rpcServer.Start(startCfg,
146+
srvMetrics.UnaryServerInterceptor(),
137147
logTechnicalErrorInterceptor.GetInterceptor,
138148
tickInBoundsInterceptor.GetInterceptor,
139149
identitiesValidatorInterceptor.GetInterceptor)

legacy/elastic/query_tick_data_integration_test.go renamed to legacy/elastic/useless/query_tick_data_integration_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//go:build !ci
22
// +build !ci
33

4-
package elastic
4+
package useless
55

66
import (
77
"context"
@@ -16,12 +16,13 @@ import (
1616
"github.com/ardanlabs/conf"
1717
"github.com/elastic/go-elasticsearch/v8"
1818
"github.com/joho/godotenv"
19+
"github.com/qubic/archive-query-service/legacy/elastic"
1920
"github.com/stretchr/testify/assert"
2021
"github.com/stretchr/testify/require"
2122
)
2223

2324
var (
24-
elasticClient *Client
25+
elasticClient *elastic.Client
2526
)
2627

2728
func TestElasticClient_QueryEmptyTicks_ReturnEmptyTicksAtBounds(t *testing.T) {
@@ -52,7 +53,7 @@ func TestMain(m *testing.M) {
5253

5354
func setup() {
5455
const envPrefix = "QUBIC_LTS_QUERY_SERVICE"
55-
err := godotenv.Load("../.env.local")
56+
err := godotenv.Load("../../.env.local")
5657
if err != nil {
5758
log.Printf("[WARN] no env file found")
5859
}
@@ -83,5 +84,5 @@ func setup() {
8384
if err != nil {
8485
log.Fatalf("error creating elastic client: %v", err)
8586
}
86-
elasticClient = NewElasticClient(cfg.Elastic.TransactionsIndex, cfg.Elastic.TickDataIndex, cfg.Elastic.ComputorListIndex, esClient)
87+
elasticClient = elastic.NewElasticClient(cfg.Elastic.TransactionsIndex, cfg.Elastic.TickDataIndex, cfg.Elastic.ComputorListIndex, esClient)
8788
}

legacy/rpc/rpc_server.go

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,23 @@ var ErrNotFound = errors.New("store resource not found")
2929

3030
var _ protobuf.TransactionsServiceServer = &Server{}
3131

32+
type StartConfig struct {
33+
ListenAddrGRPC string
34+
ListenAddrHTTP string
35+
MaxRecvMsgSize int
36+
MaxSendMsgSize int
37+
}
38+
3239
type Server struct {
3340
protobuf.UnimplementedTransactionsServiceServer
34-
listenAddrGRPC string
35-
listenAddrHTTP string
36-
qb *QueryService
37-
statusService statusPb.StatusServiceClient
41+
qb *QueryService
42+
statusService statusPb.StatusServiceClient
3843
}
3944

40-
func NewServer(listenAddrGRPC, listenAddrHTTP string, qb *QueryService, statusClient statusPb.StatusServiceClient) *Server {
45+
func NewServer(qb *QueryService, statusClient statusPb.StatusServiceClient) *Server {
4146
return &Server{
42-
listenAddrGRPC: listenAddrGRPC,
43-
listenAddrHTTP: listenAddrHTTP,
44-
qb: qb,
45-
statusService: statusClient,
47+
qb: qb,
48+
statusService: statusClient,
4649
}
4750
}
4851

@@ -688,17 +691,17 @@ func recomputeSendManyMoneyFlew(tx *protobuf.Transaction) (bool, error) {
688691
return true, nil
689692
}
690693

691-
func (s *Server) Start(interceptors ...grpc.UnaryServerInterceptor) error {
694+
func (s *Server) Start(cfg StartConfig, interceptors ...grpc.UnaryServerInterceptor) error {
692695

693696
srv := grpc.NewServer(
694-
grpc.MaxRecvMsgSize(600*1024*1024),
695-
grpc.MaxSendMsgSize(600*1024*1024),
697+
grpc.MaxRecvMsgSize(cfg.MaxRecvMsgSize),
698+
grpc.MaxSendMsgSize(cfg.MaxSendMsgSize),
696699
grpc.ChainUnaryInterceptor(interceptors...),
697700
)
698701
protobuf.RegisterTransactionsServiceServer(srv, s)
699702
reflection.Register(srv)
700703

701-
lis, err := net.Listen("tcp", s.listenAddrGRPC)
704+
lis, err := net.Listen("tcp", cfg.ListenAddrGRPC)
702705
if err != nil {
703706
return fmt.Errorf("listening on grpc port: %w", err)
704707
}
@@ -709,29 +712,28 @@ func (s *Server) Start(interceptors ...grpc.UnaryServerInterceptor) error {
709712
}
710713
}()
711714

712-
if s.listenAddrHTTP != "" {
715+
if cfg.ListenAddrHTTP != "" {
713716
go func() {
714717
mux := runtime.NewServeMux(runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{
715718
MarshalOptions: protojson.MarshalOptions{EmitDefaultValues: true, EmitUnpopulated: true},
716719
}))
717720
opts := []grpc.DialOption{
718721
grpc.WithTransportCredentials(insecure.NewCredentials()),
719722
grpc.WithDefaultCallOptions(
720-
grpc.MaxCallRecvMsgSize(600*1024*1024),
721-
grpc.MaxCallSendMsgSize(600*1024*1024),
723+
grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsgSize),
724+
grpc.MaxCallSendMsgSize(cfg.MaxSendMsgSize),
722725
),
723726
}
724-
725727
if err := protobuf.RegisterTransactionsServiceHandlerFromEndpoint(
726728
context.Background(),
727729
mux,
728-
s.listenAddrGRPC,
730+
cfg.ListenAddrGRPC,
729731
opts,
730732
); err != nil {
731733
panic(err)
732734
}
733735

734-
if err := http.ListenAndServe(s.listenAddrHTTP, mux); err != nil {
736+
if err := http.ListenAndServe(cfg.ListenAddrHTTP, mux); err != nil {
735737
panic(err)
736738
}
737739
}()

v2/cmd/archive-query-service/main.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ func run() error {
4747
StatusDataCacheTTL time.Duration `conf:"default:1s"`
4848
CacheEnabled bool `conf:"default:false"`
4949
CacheTTLFile string `conf:"default:cache_ttl.json"`
50+
MaxRecvSizeInMb int `conf:"default:1"`
51+
MaxSendSizeInMb int `conf:"default:10"`
5052
}
5153
Pagination struct {
5254
MaxPageSize uint32 `conf:"default:1000"`
@@ -157,10 +159,6 @@ func run() error {
157159
tickInBoundsInterceptor := rpc.NewTickWithinBoundsInterceptor(statusService)
158160
var identitiesValidatorInterceptor rpc.IdentitiesValidatorInterceptor
159161
var logTechnicalErrorInterceptor rpc.LogTechnicalErrorInterceptor
160-
startCfg := rpc.StartConfig{
161-
ListenAddrGRPC: cfg.Server.GrpcHost,
162-
ListenAddrHTTP: cfg.Server.HttpHost,
163-
}
164162

165163
var interceptors = []grpc.UnaryServerInterceptor{
166164
srvMetrics.UnaryServerInterceptor(),
@@ -194,6 +192,13 @@ func run() error {
194192
interceptors = append([]grpc.UnaryServerInterceptor{cacheInterceptor.GetInterceptor}, interceptors...)
195193
}
196194

195+
startCfg := rpc.StartConfig{
196+
ListenAddrGRPC: cfg.Server.GrpcHost,
197+
ListenAddrHTTP: cfg.Server.HttpHost,
198+
MaxRecvMsgSize: cfg.Server.MaxRecvSizeInMb * 1024 * 1024,
199+
MaxSendMsgSize: cfg.Server.MaxSendSizeInMb * 1024 * 1024,
200+
}
201+
197202
srvErrorsChan := make(chan error, 1)
198203
err = rpcServer.Start(startCfg, srvErrorsChan, interceptors...)
199204
if err != nil {

v2/grpc/server.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,28 @@ package grpc
33
import (
44
"context"
55
"fmt"
6+
"net"
7+
"net/http"
8+
69
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
710
api "github.com/qubic/archive-query-service/v2/api/archive-query-service/v2"
811
"google.golang.org/grpc"
912
"google.golang.org/grpc/credentials/insecure"
1013
"google.golang.org/grpc/reflection"
1114
"google.golang.org/protobuf/encoding/protojson"
12-
"net"
13-
"net/http"
1415
)
1516

1617
type StartConfig struct {
1718
ListenAddrGRPC string
1819
ListenAddrHTTP string
20+
MaxRecvMsgSize int // limit receive size (request)
21+
MaxSendMsgSize int // limit send size (response)
1922
}
2023

2124
func (s *ArchiveQueryService) Start(cfg StartConfig, errCh chan error, interceptors ...grpc.UnaryServerInterceptor) error {
2225
srv := grpc.NewServer(
23-
grpc.MaxRecvMsgSize(600*1024*1024),
24-
grpc.MaxSendMsgSize(600*1024*1024),
26+
grpc.MaxRecvMsgSize(cfg.MaxRecvMsgSize),
27+
grpc.MaxSendMsgSize(cfg.MaxSendMsgSize),
2528
grpc.ChainUnaryInterceptor(interceptors...),
2629
)
2730
api.RegisterArchiveQueryServiceServer(srv, s)
@@ -46,8 +49,8 @@ func (s *ArchiveQueryService) Start(cfg StartConfig, errCh chan error, intercept
4649
opts := []grpc.DialOption{
4750
grpc.WithTransportCredentials(insecure.NewCredentials()),
4851
grpc.WithDefaultCallOptions(
49-
grpc.MaxCallRecvMsgSize(600*1024*1024),
50-
grpc.MaxCallSendMsgSize(600*1024*1024),
52+
grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsgSize),
53+
grpc.MaxCallSendMsgSize(cfg.MaxSendMsgSize),
5154
),
5255
}
5356

v2/test/server_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ func (s *ServerTestSuite) SetupSuite() {
4646
var logTechnicalErrorInterceptor rpc.LogTechnicalErrorInterceptor
4747
startCfg := rpc.StartConfig{
4848
ListenAddrGRPC: "127.0.0.1:0", // Use a random port for testing
49+
MaxRecvMsgSize: 1 * 1024 * 1024,
50+
MaxSendMsgSize: 1 * 1024 * 1024,
4951
}
5052

5153
err := rpcServer.Start(startCfg, srvErrorsChan,

0 commit comments

Comments
 (0)