Skip to content

Commit 38a70c0

Browse files
Fix generate identity hang and add more metrics (#666)
In a rare case when there are no logs in a batch of 1000, the batch start point never moves and the entire indexer hangs. Fix it. Also add metrics that were used to debug this. Fixes #659 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced connection monitoring with a new interceptor for managing open connections in the API server. - Refined logging across key operations, including block handling and identity updates, with precise timestamp details. - Integrated Prometheus-based metrics to track active API connections for improved performance insights. - **Bug Fixes** - Corrected the logic for updating the `fromBlock` variable to ensure it reflects the latest available block. - **Documentation** - Added detailed logging statements to improve observability of operations. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent ad551a4 commit 38a70c0

File tree

6 files changed

+93
-7
lines changed

6 files changed

+93
-7
lines changed

pkg/api/server.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ func NewAPIServer(
6363
return nil, err
6464
}
6565

66+
openConnectionsInterceptor, err := server.NewOpenConnectionsInterceptor()
67+
if err != nil {
68+
return nil, err
69+
}
70+
6671
srvMetrics := grpcprom.NewServerMetrics(
6772
grpcprom.WithServerHandlingTimeHistogram(
6873
grpcprom.WithHistogramBuckets(
@@ -73,9 +78,13 @@ func NewAPIServer(
7378

7479
unary := []grpc.UnaryServerInterceptor{
7580
srvMetrics.UnaryServerInterceptor(),
81+
openConnectionsInterceptor.Unary(),
82+
loggingInterceptor.Unary(),
7683
}
7784
stream := []grpc.StreamServerInterceptor{
7885
srvMetrics.StreamServerInterceptor(),
86+
openConnectionsInterceptor.Stream(),
87+
loggingInterceptor.Stream(),
7988
}
8089

8190
if jwtVerifier != nil {
@@ -95,9 +104,6 @@ func NewAPIServer(
95104
PermitWithoutStream: true,
96105
MinTime: 15 * time.Second,
97106
}),
98-
grpc.ChainUnaryInterceptor(loggingInterceptor.Unary()),
99-
grpc.ChainStreamInterceptor(loggingInterceptor.Stream()),
100-
101107
// grpc.MaxRecvMsgSize(s.Config.Options.MaxMsgSize),
102108
}
103109

pkg/blockchain/rpcLogStreamer.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,10 @@ func (r *RpcLogStreamer) watchContract(watcher ContractConfig) {
168168
// reset self-termination timer
169169
timer.Reset(watcher.maxDisconnectTime)
170170

171+
if nextBlock != nil {
172+
fromBlock = *nextBlock
173+
}
174+
171175
if len(logs) == 0 {
172176
time.Sleep(NO_LOGS_SLEEP_TIME)
173177
continue
@@ -177,14 +181,12 @@ func (r *RpcLogStreamer) watchContract(watcher ContractConfig) {
177181
"Got logs",
178182
zap.Int("numLogs", len(logs)),
179183
zap.Uint64("fromBlock", fromBlock),
184+
zap.Time("time", time.Now()),
180185
)
181186

182187
for _, log := range logs {
183188
watcher.EventChannel <- log
184189
}
185-
if nextBlock != nil {
186-
fromBlock = *nextBlock
187-
}
188190
}
189191
}
190192
}

pkg/indexer/storer/identityUpdate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ func (s *IdentityUpdateStorer) StoreLog(
5858
if err != nil {
5959
return NewUnrecoverableLogStorageError(err)
6060
}
61-
6261
err = db.RunInTx(
6362
ctx,
6463
s.db,
@@ -83,6 +82,7 @@ func (s *IdentityUpdateStorer) StoreLog(
8382
s.logger.Info(
8483
"Inserting identity update from contract",
8584
zap.String("topic", messageTopic.String()),
85+
zap.Time("time", time.Now()),
8686
)
8787

8888
clientEnvelope, err := envelopes.NewClientEnvelopeFromBytes(msgSent.Update)
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package server
2+
3+
import (
4+
"context"
5+
"github.com/xmtp/xmtpd/pkg/metrics"
6+
"google.golang.org/grpc"
7+
)
8+
9+
// OpenConnectionsInterceptor reports open connections for unary and stream RPCs.
10+
type OpenConnectionsInterceptor struct {
11+
}
12+
13+
// NewOpenConnectionsInterceptor creates a new instance of OpenConnectionsInterceptor.
14+
func NewOpenConnectionsInterceptor() (*OpenConnectionsInterceptor, error) {
15+
return &OpenConnectionsInterceptor{}, nil
16+
}
17+
18+
// Unary intercepts unary RPC calls to log errors.
19+
func (i *OpenConnectionsInterceptor) Unary() grpc.UnaryServerInterceptor {
20+
return func(
21+
ctx context.Context,
22+
req interface{},
23+
info *grpc.UnaryServerInfo,
24+
handler grpc.UnaryHandler,
25+
) (interface{}, error) {
26+
oc := metrics.NewApiOpenConnection("unary", info.FullMethod)
27+
defer oc.Close()
28+
return handler(ctx, req) // Call the actual RPC handler
29+
}
30+
}
31+
32+
// Stream intercepts stream RPC calls to log errors.
33+
func (i *OpenConnectionsInterceptor) Stream() grpc.StreamServerInterceptor {
34+
return func(
35+
srv interface{},
36+
ss grpc.ServerStream,
37+
info *grpc.StreamServerInfo,
38+
handler grpc.StreamHandler,
39+
) error {
40+
oc := metrics.NewApiOpenConnection("stream", info.FullMethod)
41+
defer oc.Close()
42+
return handler(srv, ss) // Call the actual stream handler
43+
}
44+
}

pkg/metrics/api.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package metrics
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
)
6+
7+
var apiOpenConnections = prometheus.NewGaugeVec(
8+
prometheus.GaugeOpts{
9+
Name: "xmtp_api_open_connections_gauge",
10+
Help: "Number of open API connections",
11+
},
12+
[]string{"style", "method"},
13+
)
14+
15+
type ApiOpenConnection struct {
16+
style string
17+
method string
18+
}
19+
20+
func NewApiOpenConnection(style string, method string) *ApiOpenConnection {
21+
oc := ApiOpenConnection{
22+
style: style,
23+
method: method,
24+
}
25+
26+
apiOpenConnections.With(prometheus.Labels{"style": oc.style, "method": oc.method}).Inc()
27+
28+
return &oc
29+
}
30+
31+
func (oc *ApiOpenConnection) Close() {
32+
apiOpenConnections.With(prometheus.Labels{"style": oc.style, "method": oc.method}).Dec()
33+
}

pkg/metrics/metrics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ func registerCollectors(reg prometheus.Registerer) {
8383
syncOutgoingSyncConnections,
8484
syncFailedOutgoingSyncConnections,
8585
syncFailedOutgoingSyncConnectionCounter,
86+
apiOpenConnections,
8687
}
8788

8889
for _, col := range cols {

0 commit comments

Comments
 (0)