Skip to content

Commit bd6169c

Browse files
committed
send all batch to same endpoint once
Signed-off-by: Yi Jin <yi.jin@databricks.com>
1 parent a5f8b31 commit bd6169c

File tree

1 file changed

+8
-1
lines changed

1 file changed

+8
-1
lines changed

pkg/receive/handler.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
163163
},
164164
),
165165
workers,
166+
logger,
166167
o.DialOpts...),
167168
receiverMode: o.ReceiverMode,
168169
Limiter: o.Limiter,
@@ -776,6 +777,8 @@ func (h *Handler) distributeTimeseriesToReplicas(
776777
var writeDestination = remoteWrites
777778
if endpoint == h.options.Endpoint {
778779
writeDestination = localWrites
780+
} else {
781+
endpointReplica.replica = 0
779782
}
780783
writeableSeries, ok := writeDestination[endpointReplica]
781784
if !ok {
@@ -1209,7 +1212,7 @@ type peerWorker struct {
12091212
forwardDelay prometheus.Histogram
12101213
}
12111214

1212-
func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, asyncForwardWorkersCount uint, dialOpts ...grpc.DialOption) peersContainer {
1215+
func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, asyncForwardWorkersCount uint, log log.Logger, dialOpts ...grpc.DialOption) peersContainer {
12131216
return &peerGroup{
12141217
dialOpts: dialOpts,
12151218
connections: map[string]*peerWorker{},
@@ -1219,6 +1222,7 @@ func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, as
12191222
expBackoff: backoff,
12201223
forwardDelay: forwardDelay,
12211224
asyncForwardWorkersCount: asyncForwardWorkersCount,
1225+
log: log,
12221226
}
12231227
}
12241228

@@ -1262,6 +1266,7 @@ type peerGroup struct {
12621266
expBackoff backoff.Backoff
12631267
forwardDelay prometheus.Histogram
12641268
asyncForwardWorkersCount uint
1269+
log log.Logger
12651270

12661271
m sync.RWMutex
12671272

@@ -1317,6 +1322,8 @@ func (p *peerGroup) getConnection(ctx context.Context, addr string) (WriteableSt
13171322
}
13181323

13191324
p.connections[addr] = newPeerWorker(conn, p.forwardDelay, p.asyncForwardWorkersCount)
1325+
log.With(p.log).Log("msg", "established connection to peer", "peer", addr,
1326+
"asyncForwardWorkersCount", p.asyncForwardWorkersCount, "poolSize", p.connections[addr].wp.Size())
13201327
return p.connections[addr], nil
13211328
}
13221329

0 commit comments

Comments
 (0)