Skip to content

Commit 29c6b41

Browse files
authored
chore(shard-manager): provide metadata in constructor (#7711)
**What changed?** Refactoring for #7712: Providing shard executor metadata to the constructor instead of Initalising this separately. In addition, unifying panic -> logger.Fatal as we do in other cases. **Why?** +Better readability as code is more aligned with other usages of log.Fatal. Passing metadata to constructor allows to avoid partial structure initialization. **How did you test it?** go test -v ./service/matching/handler **Potential risks** **Release notes** **Documentation Changes** Signed-off-by: Jan Kisel <dkrot@uber.com>
1 parent 0a14583 commit 29c6b41

File tree

2 files changed

+16
-15
lines changed

2 files changed

+16
-15
lines changed

service/matching/handler/engine.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -198,31 +198,31 @@ func (e *matchingEngineImpl) setupExecutor(shardDistributorExecutorClient execut
198198
}
199199
e.taskListsFactory = taskListFactory
200200

201+
// Get the IP address to advertise to external services
202+
// This respects bindOnLocalHost config (127.0.0.1 for local dev, external IP for production)
203+
hostIP, err := rpc.GetListenIP(e.config.RPCConfig)
204+
if err != nil {
205+
e.logger.Fatal("Failed to get listen IP", tag.Error(err))
206+
}
207+
201208
params := executorclient.Params[tasklist.ShardProcessor]{
202209
ExecutorClient: shardDistributorExecutorClient,
203210
MetricsScope: e.metricsScope,
204211
Logger: e.logger,
205212
ShardProcessorFactory: taskListFactory,
206213
Config: cfg,
207214
TimeSource: e.timeSource,
215+
Metadata: map[string]string{
216+
"tchannel": fmt.Sprintf("%d", e.config.RPCConfig.Port),
217+
"grpc": fmt.Sprintf("%d", e.config.RPCConfig.GRPCPort),
218+
"hostIP": hostIP.String(),
219+
},
208220
}
209221
executor, err := executorclient.NewExecutor[tasklist.ShardProcessor](params)
210222
if err != nil {
211-
panic(err)
223+
e.logger.Fatal("Failed to create new executor", tag.Error(err))
212224
}
213225

214-
// Get the IP address to advertise to external services
215-
// This respects bindOnLocalHost config (127.0.0.1 for local dev, external IP for production)
216-
hostIP, err := rpc.GetListenIP(e.config.RPCConfig)
217-
if err != nil {
218-
e.logger.Fatal("Failed to get listen IP", tag.Error(err))
219-
}
220-
221-
executor.SetMetadata(map[string]string{
222-
"tchannel": fmt.Sprintf("%d", e.config.RPCConfig.Port),
223-
"grpc": fmt.Sprintf("%d", e.config.RPCConfig.GRPCPort),
224-
"hostIP": hostIP.String(),
225-
})
226226
e.executor = executor
227227
}
228228

service/sharddistributor/client/executorclient/clientimpl.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,10 @@ func (e *executorImpl[SP]) Stop() {
138138
}
139139

140140
func (e *executorImpl[SP]) GetShardProcess(ctx context.Context, shardID string) (SP, error) {
141-
shardProcess, ok := e.managedProcessors.Load(shardID)
142141
e.processorsToLastUse.Store(shardID, e.timeSource.Now())
143-
if !ok {
144142

143+
shardProcess, ok := e.managedProcessors.Load(shardID)
144+
if !ok {
145145
if e.getMigrationMode() == types.MigrationModeLOCALPASSTHROUGH {
146146
// Fail immediately if we are in LOCAL_PASSTHROUGH mode
147147
var zero SP
@@ -162,6 +162,7 @@ func (e *executorImpl[SP]) GetShardProcess(ctx context.Context, shardID string)
162162
return zero, fmt.Errorf("shard process not found for shard ID: %s", shardID)
163163
}
164164
}
165+
165166
return shardProcess.processor, nil
166167
}
167168

0 commit comments

Comments
 (0)