Skip to content

Commit 86384e6

Browse files
committed
feat(runway): wire merge-conflict-check controller to Merger extension
The merge-conflict-check controller was a parse-and-log stub. Wire it to the Merger extension so it performs the dry-run check and publishes the MergeResult to the merge-conflict-check-signal topic. A merge conflict is an expected outcome (ack + publish FAILED), not an infrastructure error.
1 parent 631f778 commit 86384e6

5 files changed

Lines changed: 310 additions & 31 deletions

File tree

example/runway/server/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ go_library(
1717
"//runway/controller",
1818
"//runway/controller/merge",
1919
"//runway/controller/mergeconflictcheck",
20+
"//runway/extension/merger",
21+
"//runway/extension/merger/noop",
2022
"@com_github_go_sql_driver_mysql//:mysql",
2123
"@com_github_uber_go_tally//:tally",
2224
"@org_golang_google_grpc//:grpc",

example/runway/server/main.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ import (
3939
"github.com/uber/submitqueue/runway/controller"
4040
"github.com/uber/submitqueue/runway/controller/merge"
4141
"github.com/uber/submitqueue/runway/controller/mergeconflictcheck"
42+
"github.com/uber/submitqueue/runway/extension/merger"
43+
"github.com/uber/submitqueue/runway/extension/merger/noop"
4244
"go.uber.org/zap"
4345
"google.golang.org/grpc"
4446
"google.golang.org/grpc/reflection"
@@ -152,9 +154,13 @@ func run() error {
152154
),
153155
)
154156

157+
mergerFactory := newMergerFactory()
158+
155159
mergeConflictCheckController := mergeconflictcheck.NewController(mergeconflictcheck.Params{
156160
Logger: logger.Sugar(),
157161
Scope: scope,
162+
MergerFactory: mergerFactory,
163+
Registry: registry,
158164
TopicKey: runwaymq.TopicKeyMergeConflictCheck,
159165
ConsumerGroup: "runway-mergeconflictcheck",
160166
})
@@ -235,10 +241,21 @@ func run() error {
235241
return err
236242
}
237243

238-
// newTopicRegistry builds the TopicRegistry for Runway's consumed merge queues.
239-
// Runway is the consumer of the merge-conflict-check and merge queues; each is
240-
// registered with a consuming subscription. The corresponding signal queues
241-
// (where results are published) are not wired yet.
244+
// newMergerFactory returns a merger.Factory for the example server. The noop
245+
// implementation always succeeds; a real deployment wires a VCS-backed factory.
246+
func newMergerFactory() merger.Factory {
247+
return &noopMergerFactory{}
248+
}
249+
250+
type noopMergerFactory struct{}
251+
252+
func (f *noopMergerFactory) For(_ merger.Config) (merger.Merger, error) {
253+
return noop.New(), nil
254+
}
255+
256+
// newTopicRegistry builds the TopicRegistry for Runway's merge queues. Inbound
257+
// topics (merge-conflict-check, merge) have subscriptions; outbound signal topics
258+
// are publish-only.
242259
func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRegistry, error) {
243260
return consumer.NewTopicRegistry([]consumer.TopicConfig{
244261
{
@@ -249,6 +266,11 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
249266
subscriberName, "runway-mergeconflictcheck",
250267
),
251268
},
269+
{
270+
Key: runwaymq.TopicKeyMergeConflictCheckSignal,
271+
Name: "merge-conflict-check-signal",
272+
Queue: q,
273+
},
252274
{
253275
Key: runwaymq.TopicKeyMerge,
254276
Name: "merge",

runway/controller/mergeconflictcheck/BUILD.bazel

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@ go_library(
77
visibility = ["//visibility:public"],
88
deps = [
99
"//api/runway/messagequeue",
10+
"//api/runway/messagequeue/protopb",
11+
"//platform/base/messagequeue",
1012
"//platform/consumer",
1113
"//platform/metrics",
14+
"//runway/extension/merger",
1215
"@com_github_uber_go_tally//:tally",
1316
"@org_uber_go_zap//:zap",
1417
],
@@ -20,8 +23,12 @@ go_test(
2023
embed = [":mergeconflictcheck"],
2124
deps = [
2225
"//api/runway/messagequeue",
26+
"//api/runway/messagequeue/protopb",
2327
"//platform/base/messagequeue",
28+
"//platform/consumer",
2429
"//platform/extension/messagequeue/mock",
30+
"//runway/extension/merger",
31+
"//runway/extension/merger/mock",
2532
"@com_github_stretchr_testify//assert",
2633
"@com_github_stretchr_testify//require",
2734
"@com_github_uber_go_tally//:tally",

runway/controller/mergeconflictcheck/mergeconflictcheck.go

Lines changed: 72 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,26 @@
1414

1515
// Package mergeconflictcheck consumes dry-run merge-conflict check requests from
1616
// Runway's merge-conflict-check queue. A request asks whether an ordered sequence
17-
// of merge steps applies cleanly onto the target branch without committing.
17+
// of merge steps can be applied cleanly onto the merge target.
1818
//
19-
// Currently a parse-and-log stub: it deserializes the MergeRequest off the queue
20-
// and logs it. The real check (attempt the merge without committing and publish a
21-
// MergeResult to the merge-conflict-check-signal queue) is not wired yet.
19+
// The controller obtains a Merger for the request's merge target, calls
20+
// CheckMergeability, and publishes the MergeResult to the
21+
// merge-conflict-check-signal queue. A merge conflict is an expected outcome
22+
// (ack + publish FAILED result), not an infrastructure error.
2223
package mergeconflictcheck
2324

2425
import (
2526
"context"
27+
"errors"
2628
"fmt"
2729

2830
"github.com/uber-go/tally"
2931
runwaymq "github.com/uber/submitqueue/api/runway/messagequeue"
32+
runwaypb "github.com/uber/submitqueue/api/runway/messagequeue/protopb"
33+
entityqueue "github.com/uber/submitqueue/platform/base/messagequeue"
3034
"github.com/uber/submitqueue/platform/consumer"
3135
"github.com/uber/submitqueue/platform/metrics"
36+
"github.com/uber/submitqueue/runway/extension/merger"
3237
"go.uber.org/zap"
3338
)
3439

@@ -39,6 +44,8 @@ var _ consumer.Controller = (*Controller)(nil)
3944
type Controller struct {
4045
logger *zap.SugaredLogger
4146
metricsScope tally.Scope
47+
mergerFactory merger.Factory
48+
registry consumer.TopicRegistry
4249
topicKey consumer.TopicKey
4350
consumerGroup string
4451
}
@@ -48,6 +55,9 @@ type Params struct {
4855
TopicKey consumer.TopicKey
4956
ConsumerGroup string
5057

58+
MergerFactory merger.Factory
59+
Registry consumer.TopicRegistry
60+
5161
Scope tally.Scope
5262
Logger *zap.SugaredLogger
5363
}
@@ -57,13 +67,15 @@ func NewController(p Params) *Controller {
5767
return &Controller{
5868
logger: p.Logger.Named("mergeconflictcheck_controller"),
5969
metricsScope: p.Scope.SubScope("mergeconflictcheck_controller"),
70+
mergerFactory: p.MergerFactory,
71+
registry: p.Registry,
6072
topicKey: p.TopicKey,
6173
consumerGroup: p.ConsumerGroup,
6274
}
6375
}
6476

65-
// Process deserializes the merge request and logs it. Returns nil to ack, or an
66-
// error to nack.
77+
// Process deserializes the merge request, performs a dry-run merge check, and
78+
// publishes the result. Returns nil to ack, or an error to nack.
6779
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) {
6880
const opName = "process"
6981

@@ -75,13 +87,9 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
7587
request := &runwaymq.MergeRequest{}
7688
if err := runwaymq.Unmarshal(msg.Payload, request); err != nil {
7789
metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1)
78-
// Non-retryable: a malformed payload will never deserialize on retry.
7990
return fmt.Errorf("failed to deserialize merge request: %w", err)
8091
}
8192

82-
// TODO: attempt the ordered merge steps without committing and publish a
83-
// MergeResult to the merge-conflict-check-signal queue. For now the request
84-
// is only logged after parsing.
8593
c.logger.Infow("received merge-conflict-check request",
8694
"id", request.Id,
8795
"queue_name", request.QueueName,
@@ -90,6 +98,60 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
9098
"partition_key", msg.PartitionKey,
9199
)
92100

101+
m, err := c.mergerFactory.For(merger.Config{QueueName: request.GetQueueName()})
102+
if err != nil {
103+
metrics.NamedCounter(c.metricsScope, opName, "factory_errors", 1)
104+
return fmt.Errorf("failed to create merger for queue %s: %w", request.GetQueueName(), err)
105+
}
106+
107+
result, err := m.CheckMergeability(ctx, request)
108+
if err != nil && !errors.Is(err, merger.ErrConflict) {
109+
metrics.NamedCounter(c.metricsScope, opName, "check_errors", 1)
110+
return fmt.Errorf("failed to check mergeability for %s: %w", request.GetId(), err)
111+
} else if err != nil {
112+
metrics.NamedCounter(c.metricsScope, opName, "merge_conflicts", 1)
113+
c.logger.Infow("merge conflict detected",
114+
"id", request.GetId(),
115+
"queue_name", request.GetQueueName(),
116+
)
117+
result = &runwaymq.MergeResult{
118+
Id: request.GetId(),
119+
Outcome: runwaypb.Outcome_FAILED,
120+
Reason: err.Error(),
121+
}
122+
}
123+
124+
if err := c.publish(ctx, runwaymq.TopicKeyMergeConflictCheckSignal, result, msg.PartitionKey); err != nil {
125+
metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1)
126+
return fmt.Errorf("failed to publish merge-conflict-check result for %s: %w", request.GetId(), err)
127+
}
128+
129+
return nil
130+
}
131+
132+
// publish serializes a MergeResult and publishes it to the given signal topic.
133+
func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, result *runwaymq.MergeResult, partitionKey string) error {
134+
payload, err := runwaymq.Marshal(result)
135+
if err != nil {
136+
return fmt.Errorf("failed to serialize merge result: %w", err)
137+
}
138+
139+
msg := entityqueue.NewMessage(result.GetId(), payload, partitionKey, nil)
140+
141+
q, ok := c.registry.Queue(key)
142+
if !ok {
143+
return fmt.Errorf("no queue registered for topic key %s", key)
144+
}
145+
146+
topicName, ok := c.registry.TopicName(key)
147+
if !ok {
148+
return fmt.Errorf("no topic name registered for topic key %s", key)
149+
}
150+
151+
if err := q.Publisher().Publish(ctx, topicName, msg); err != nil {
152+
return fmt.Errorf("failed to publish message: %w", err)
153+
}
154+
93155
return nil
94156
}
95157

0 commit comments

Comments
 (0)