Skip to content

Commit eb35801

Browse files
committed
propagate stamp
1 parent a6f9b24 commit eb35801

File tree

4 files changed

+37
-35
lines changed

4 files changed

+37
-35
lines changed

service/history/workflow/task_generator.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,7 @@ func (r *TaskGeneratorImpl) GenerateActivityTasks(
551551
TaskQueue: activityInfo.TaskQueue,
552552
ScheduledEventID: activityInfo.ScheduledEventId,
553553
Version: activityInfo.Version,
554+
Stamp: activityInfo.Stamp,
554555
})
555556

556557
return nil

tests/testcore/matching_stream_recorder.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func (r *MatchingStreamRecorder) recordMessage(method string, msg proto.Message,
108108
}
109109

110110
// Store the message reference directly without cloning for performance
111-
if direction == DirectionSend || direction == DirectionServerSend {
111+
if direction == DirectionRequest || direction == DirectionServerRequest {
112112
captured.Request = msg
113113
} else {
114114
captured.Response = msg
@@ -121,7 +121,7 @@ func (r *MatchingStreamRecorder) recordMessage(method string, msg proto.Message,
121121
func (r *MatchingStreamRecorder) formatCapturedMessage(captured CapturedMatchingMessage) string {
122122
// Get the appropriate proto message based on direction
123123
var msg proto.Message
124-
if captured.Direction == DirectionSend || captured.Direction == DirectionServerSend {
124+
if captured.Direction == DirectionRequest || captured.Direction == DirectionServerRequest {
125125
msg = captured.Request
126126
} else {
127127
msg = captured.Response
@@ -163,7 +163,7 @@ func (r *MatchingStreamRecorder) UnaryInterceptor(clusterName string) grpc.Unary
163163
// Capture outgoing request if it's a matching-related call
164164
if isMatchingMethod(method) {
165165
if protoReq, ok := req.(proto.Message); ok {
166-
r.recordMessage(method, protoReq, DirectionSend, clusterName, target, false)
166+
r.recordMessage(method, protoReq, DirectionRequest, clusterName, target, false)
167167
}
168168
}
169169

@@ -172,7 +172,7 @@ func (r *MatchingStreamRecorder) UnaryInterceptor(clusterName string) grpc.Unary
172172
// Capture incoming response if successful
173173
if err == nil && isMatchingMethod(method) {
174174
if protoReply, ok := reply.(proto.Message); ok {
175-
r.recordMessage(method, protoReply, DirectionRecv, clusterName, target, false)
175+
r.recordMessage(method, protoReply, DirectionResponse, clusterName, target, false)
176176
}
177177
}
178178

@@ -221,7 +221,7 @@ type recordingMatchingClientStream struct {
221221
func (s *recordingMatchingClientStream) SendMsg(m interface{}) error {
222222
if msg, ok := m.(proto.Message); ok {
223223
// SendMsg means this cluster is SENDING a message (could be request or ack)
224-
s.recorder.recordMessage(s.method, msg, DirectionSend, s.clusterName, s.targetAddress, true)
224+
s.recorder.recordMessage(s.method, msg, DirectionRequest, s.clusterName, s.targetAddress, true)
225225
}
226226
return s.ClientStream.SendMsg(m)
227227
}
@@ -231,7 +231,7 @@ func (s *recordingMatchingClientStream) RecvMsg(m interface{}) error {
231231
if err == nil {
232232
if msg, ok := m.(proto.Message); ok {
233233
// RecvMsg means this cluster is RECEIVING a message (could be request or data)
234-
s.recorder.recordMessage(s.method, msg, DirectionRecv, s.clusterName, s.targetAddress, true)
234+
s.recorder.recordMessage(s.method, msg, DirectionResponse, s.clusterName, s.targetAddress, true)
235235
}
236236
}
237237
return err
@@ -248,7 +248,7 @@ func (r *MatchingStreamRecorder) UnaryServerInterceptor(clusterName string) grpc
248248
// Capture incoming request if it's a matching-related call
249249
if isMatchingMethod(info.FullMethod) {
250250
if protoReq, ok := req.(proto.Message); ok {
251-
r.recordMessage(info.FullMethod, protoReq, DirectionServerRecv, clusterName, "server", false)
251+
r.recordMessage(info.FullMethod, protoReq, DirectionServerRequest, clusterName, "server", false)
252252
}
253253
}
254254

@@ -257,7 +257,7 @@ func (r *MatchingStreamRecorder) UnaryServerInterceptor(clusterName string) grpc
257257
// Capture outgoing response if successful
258258
if err == nil && isMatchingMethod(info.FullMethod) {
259259
if protoResp, ok := resp.(proto.Message); ok {
260-
r.recordMessage(info.FullMethod, protoResp, DirectionServerSend, clusterName, "server", false)
260+
r.recordMessage(info.FullMethod, protoResp, DirectionServerResponse, clusterName, "server", false)
261261
}
262262
}
263263

@@ -298,7 +298,7 @@ type recordingMatchingServerStream struct {
298298
func (s *recordingMatchingServerStream) SendMsg(m interface{}) error {
299299
if msg, ok := m.(proto.Message); ok {
300300
// Server SendMsg means this server is SENDING a message to the client
301-
s.recorder.recordMessage(s.method, msg, DirectionServerSend, s.clusterName, "server", true)
301+
s.recorder.recordMessage(s.method, msg, DirectionServerResponse, s.clusterName, "server", true)
302302
}
303303
return s.ServerStream.SendMsg(m)
304304
}
@@ -308,7 +308,7 @@ func (s *recordingMatchingServerStream) RecvMsg(m interface{}) error {
308308
if err == nil {
309309
if msg, ok := m.(proto.Message); ok {
310310
// Server RecvMsg means this server is RECEIVING a message from the client
311-
s.recorder.recordMessage(s.method, msg, DirectionServerRecv, s.clusterName, "server", true)
311+
s.recorder.recordMessage(s.method, msg, DirectionServerRequest, s.clusterName, "server", true)
312312
}
313313
}
314314
return err

tests/testcore/replication_stream_recorder.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ import (
1616

1717
// Message direction constants
1818
const (
19-
DirectionSend = "send"
20-
DirectionRecv = "recv"
21-
DirectionServerSend = "server_send"
22-
DirectionServerRecv = "server_recv"
19+
DirectionRequest = "request"
20+
DirectionResponse = "response"
21+
DirectionServerRequest = "server_request" // For server-initiated streams
22+
DirectionServerResponse = "server_response"
2323
)
2424

2525
// ReplicationStreamRecorder captures replication stream messages for testing
@@ -115,7 +115,7 @@ func (r *ReplicationStreamRecorder) recordMessage(method string, msg proto.Messa
115115
}
116116

117117
// Store the message reference directly without cloning for performance
118-
if direction == DirectionSend || direction == DirectionServerSend {
118+
if direction == DirectionRequest || direction == DirectionServerRequest {
119119
captured.Request = msg
120120
} else {
121121
captured.Response = msg
@@ -128,7 +128,7 @@ func (r *ReplicationStreamRecorder) recordMessage(method string, msg proto.Messa
128128
func (r *ReplicationStreamRecorder) formatCapturedMessage(captured CapturedReplicationMessage) string {
129129
// Get the appropriate proto message based on direction
130130
var msg proto.Message
131-
if captured.Direction == DirectionSend || captured.Direction == DirectionServerSend {
131+
if captured.Direction == DirectionRequest || captured.Direction == DirectionServerRequest {
132132
msg = captured.Request
133133
} else {
134134
msg = captured.Response
@@ -170,7 +170,7 @@ func (r *ReplicationStreamRecorder) UnaryInterceptor(clusterName string) grpc.Un
170170
// Capture outgoing request if it's a replication-related call
171171
if isReplicationMethod(method) {
172172
if protoReq, ok := req.(proto.Message); ok {
173-
r.recordMessage(method, protoReq, DirectionSend, clusterName, target, false)
173+
r.recordMessage(method, protoReq, DirectionRequest, clusterName, target, false)
174174
}
175175
}
176176

@@ -179,7 +179,7 @@ func (r *ReplicationStreamRecorder) UnaryInterceptor(clusterName string) grpc.Un
179179
// Capture incoming response if successful
180180
if err == nil && isReplicationMethod(method) {
181181
if protoReply, ok := reply.(proto.Message); ok {
182-
r.recordMessage(method, protoReply, DirectionRecv, clusterName, target, false)
182+
r.recordMessage(method, protoReply, DirectionResponse, clusterName, target, false)
183183
}
184184
}
185185

@@ -228,7 +228,7 @@ type recordingClientStream struct {
228228
func (s *recordingClientStream) SendMsg(m interface{}) error {
229229
if msg, ok := m.(proto.Message); ok {
230230
// SendMsg means this cluster is SENDING a message (could be request or ack)
231-
s.recorder.recordMessage(s.method, msg, DirectionSend, s.clusterName, s.targetAddress, true)
231+
s.recorder.recordMessage(s.method, msg, DirectionRequest, s.clusterName, s.targetAddress, true)
232232
}
233233
return s.ClientStream.SendMsg(m)
234234
}
@@ -238,7 +238,7 @@ func (s *recordingClientStream) RecvMsg(m interface{}) error {
238238
if err == nil {
239239
if msg, ok := m.(proto.Message); ok {
240240
// RecvMsg means this cluster is RECEIVING a message (could be request or data)
241-
s.recorder.recordMessage(s.method, msg, DirectionRecv, s.clusterName, s.targetAddress, true)
241+
s.recorder.recordMessage(s.method, msg, DirectionResponse, s.clusterName, s.targetAddress, true)
242242
}
243243
}
244244
return err
@@ -255,7 +255,7 @@ func (r *ReplicationStreamRecorder) UnaryServerInterceptor(clusterName string) g
255255
// Capture incoming request if it's a replication-related call
256256
if isReplicationMethod(info.FullMethod) {
257257
if protoReq, ok := req.(proto.Message); ok {
258-
r.recordMessage(info.FullMethod, protoReq, DirectionServerRecv, clusterName, "server", false)
258+
r.recordMessage(info.FullMethod, protoReq, DirectionServerRequest, clusterName, "server", false)
259259
}
260260
}
261261

@@ -264,7 +264,7 @@ func (r *ReplicationStreamRecorder) UnaryServerInterceptor(clusterName string) g
264264
// Capture outgoing response if successful
265265
if err == nil && isReplicationMethod(info.FullMethod) {
266266
if protoResp, ok := resp.(proto.Message); ok {
267-
r.recordMessage(info.FullMethod, protoResp, DirectionServerSend, clusterName, "server", false)
267+
r.recordMessage(info.FullMethod, protoResp, DirectionServerResponse, clusterName, "server", false)
268268
}
269269
}
270270

@@ -305,7 +305,7 @@ type recordingServerStream struct {
305305
func (s *recordingServerStream) SendMsg(m interface{}) error {
306306
if msg, ok := m.(proto.Message); ok {
307307
// Server SendMsg means this server is SENDING a message to the client
308-
s.recorder.recordMessage(s.method, msg, DirectionServerSend, s.clusterName, "server", true)
308+
s.recorder.recordMessage(s.method, msg, DirectionServerResponse, s.clusterName, "server", true)
309309
}
310310
return s.ServerStream.SendMsg(m)
311311
}
@@ -315,7 +315,7 @@ func (s *recordingServerStream) RecvMsg(m interface{}) error {
315315
if err == nil {
316316
if msg, ok := m.(proto.Message); ok {
317317
// Server RecvMsg means this server is RECEIVING a message from the client
318-
s.recorder.recordMessage(s.method, msg, DirectionServerRecv, s.clusterName, "server", true)
318+
s.recorder.recordMessage(s.method, msg, DirectionServerRequest, s.clusterName, "server", true)
319319
}
320320
}
321321
return err

tests/xdc/stream_based_replication_test.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -928,7 +928,7 @@ func (s *streamBasedReplicationTestSuite) TestCloseTransferTaskAckedReplication(
928928
s.T().Log("Checking replication stream for close transfer task acknowledgment in versioned transition artifact...")
929929
s.Eventually(func() bool {
930930
for _, msg := range recorder.GetMessages() {
931-
if msg.Direction != testcore.DirectionServerSend {
931+
if msg.Direction != testcore.DirectionServerResponse {
932932
continue
933933
}
934934

@@ -1093,17 +1093,18 @@ func (s *streamBasedReplicationTestSuite) TestStreamRecorders() {
10931093
}
10941094

10951095
// Print summary
1096-
sendCount := 0
1097-
recvCount := 0
1096+
requestCount := 0
1097+
responseCount := 0
10981098
for _, msg := range messages {
1099-
if msg.Direction == testcore.DirectionSend {
1100-
sendCount++
1101-
} else if msg.Direction == testcore.DirectionRecv {
1102-
recvCount++
1099+
switch msg.Direction {
1100+
case testcore.DirectionRequest:
1101+
requestCount++
1102+
case testcore.DirectionResponse:
1103+
responseCount++
11031104
}
11041105
}
1105-
s.T().Logf(" - Sent: %d messages", sendCount)
1106-
s.T().Logf(" - Received: %d messages", recvCount)
1106+
s.T().Logf(" - Requests: %d messages", requestCount)
1107+
s.T().Logf(" - Responses: %d messages", responseCount)
11071108
}
11081109

11091110
// Dump matching stream messages for all clusters
@@ -1171,8 +1172,8 @@ func longRunningRecorderTestActivity(ctx context.Context, input string) (string,
11711172

11721173
activity.GetLogger(ctx).Info("Activity started", "input", input, "attempt", attempt)
11731174

1174-
// Fail the first 2 attempts to trigger retries that standby will need to push to matching
1175-
if attempt < 3 {
1175+
// Fail the first 3 attempts to trigger retries that standby will need to push to matching
1176+
if attempt <= 3 {
11761177
activity.GetLogger(ctx).Info("Activity failing intentionally to trigger retry", "attempt", attempt)
11771178
time.Sleep(2 * time.Second) // Small delay before failing
11781179
return "", fmt.Errorf("intentional failure on attempt %d", attempt)

0 commit comments

Comments
 (0)