Skip to content

Commit a6f9b24

Browse files
committed
test infra
1 parent 2751348 commit a6f9b24

File tree

5 files changed

+550
-1
lines changed

5 files changed

+550
-1
lines changed

service/history/transfer_queue_standby_task_executor.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,10 +558,11 @@ func (t *transferQueueStandbyTaskExecutor) pushActivity(
558558
return nil
559559
}
560560

561+
activityTask := task.(*tasks.ActivityTask)
561562
pushActivityInfo := postActionInfo.(*activityTaskPostActionInfo)
562563
return t.transferQueueTaskExecutorBase.pushActivity(
563564
ctx,
564-
task.(*tasks.ActivityTask),
565+
activityTask,
565566
pushActivityInfo.activityTaskScheduleToStartTimeout,
566567
pushActivityInfo.versionDirective,
567568
pushActivityInfo.priority,
Lines changed: 320 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,320 @@
1+
package testcore
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"os"
9+
"strings"
10+
"sync"
11+
"time"
12+
13+
"google.golang.org/grpc"
14+
"google.golang.org/protobuf/encoding/protojson"
15+
"google.golang.org/protobuf/proto"
16+
)
17+
18+
// MatchingStreamRecorder captures matching service messages for testing
19+
type MatchingStreamRecorder struct {
20+
mu sync.RWMutex
21+
capturedMessages []CapturedMatchingMessage
22+
outputFile *os.File
23+
outputFilePath string
24+
}
25+
26+
// CapturedMatchingMessage represents a captured matching service message
27+
type CapturedMatchingMessage struct {
28+
Timestamp string `json:"timestamp"`
29+
Method string `json:"method"`
30+
Direction string `json:"direction"`
31+
ClusterName string `json:"clusterName"`
32+
TargetAddress string `json:"targetAddress"`
33+
MessageType string `json:"messageType"`
34+
IsStreamCall bool `json:"isStreamCall"`
35+
Request proto.Message `json:"-"` // Don't marshal directly
36+
Response proto.Message `json:"-"` // Don't marshal directly
37+
Message json.RawMessage `json:"message,omitempty"`
38+
}
39+
40+
func NewMatchingStreamRecorder() *MatchingStreamRecorder {
41+
return &MatchingStreamRecorder{
42+
capturedMessages: make([]CapturedMatchingMessage, 0),
43+
}
44+
}
45+
46+
// SetOutputFile sets the file path for writing captured messages on-demand
47+
func (r *MatchingStreamRecorder) SetOutputFile(filePath string) {
48+
r.mu.Lock()
49+
defer r.mu.Unlock()
50+
r.outputFilePath = filePath
51+
}
52+
53+
// WriteToLog writes all captured messages to the configured output file
54+
func (r *MatchingStreamRecorder) WriteToLog() error {
55+
r.mu.RLock()
56+
defer r.mu.RUnlock()
57+
58+
if r.outputFilePath == "" {
59+
return errors.New("output file path not set")
60+
}
61+
62+
// Create or truncate the output file
63+
f, err := os.Create(r.outputFilePath)
64+
if err != nil {
65+
return fmt.Errorf("failed to create output file %s: %w", r.outputFilePath, err)
66+
}
67+
defer func() {
68+
_ = f.Close()
69+
}()
70+
71+
// Write all captured messages
72+
for _, captured := range r.capturedMessages {
73+
formattedMsg := r.formatCapturedMessage(captured)
74+
if _, err := f.WriteString(formattedMsg + "\n"); err != nil {
75+
return fmt.Errorf("failed to write message: %w", err)
76+
}
77+
}
78+
79+
return f.Sync()
80+
}
81+
82+
func (r *MatchingStreamRecorder) Clear() {
83+
r.mu.Lock()
84+
defer r.mu.Unlock()
85+
r.capturedMessages = make([]CapturedMatchingMessage, 0)
86+
}
87+
88+
func (r *MatchingStreamRecorder) GetMessages() []CapturedMatchingMessage {
89+
r.mu.RLock()
90+
defer r.mu.RUnlock()
91+
result := make([]CapturedMatchingMessage, len(r.capturedMessages))
92+
copy(result, r.capturedMessages)
93+
return result
94+
}
95+
96+
func (r *MatchingStreamRecorder) recordMessage(method string, msg proto.Message, direction string, clusterName string, targetAddr string, isStreamCall bool) {
97+
r.mu.Lock()
98+
defer r.mu.Unlock()
99+
100+
captured := CapturedMatchingMessage{
101+
Method: method,
102+
Direction: direction,
103+
ClusterName: clusterName,
104+
Timestamp: time.Now().Format(time.RFC3339Nano),
105+
MessageType: string(msg.ProtoReflect().Descriptor().FullName()),
106+
TargetAddress: targetAddr,
107+
IsStreamCall: isStreamCall,
108+
}
109+
110+
// Store the message reference directly without cloning for performance
111+
if direction == DirectionSend || direction == DirectionServerSend {
112+
captured.Request = msg
113+
} else {
114+
captured.Response = msg
115+
}
116+
117+
r.capturedMessages = append(r.capturedMessages, captured)
118+
}
119+
120+
// formatCapturedMessage formats a single captured message as JSON for output
121+
func (r *MatchingStreamRecorder) formatCapturedMessage(captured CapturedMatchingMessage) string {
122+
// Get the appropriate proto message based on direction
123+
var msg proto.Message
124+
if captured.Direction == DirectionSend || captured.Direction == DirectionServerSend {
125+
msg = captured.Request
126+
} else {
127+
msg = captured.Response
128+
}
129+
130+
// Marshal the proto message to JSON and attach to Message field
131+
if msg != nil {
132+
marshaler := protojson.MarshalOptions{
133+
Multiline: false,
134+
Indent: "",
135+
}
136+
jsonBytes, err := marshaler.Marshal(msg)
137+
if err == nil {
138+
captured.Message = jsonBytes
139+
}
140+
}
141+
142+
// Marshal the entire struct to pretty JSON
143+
jsonOutput, err := json.MarshalIndent(captured, "", " ")
144+
if err != nil {
145+
return fmt.Sprintf(`{"error": "failed to marshal output: %v"}`, err)
146+
}
147+
148+
return string(jsonOutput)
149+
}
150+
151+
// UnaryInterceptor returns a gRPC unary client interceptor that captures messages
152+
func (r *MatchingStreamRecorder) UnaryInterceptor(clusterName string) grpc.UnaryClientInterceptor {
153+
return func(
154+
ctx context.Context,
155+
method string,
156+
req, reply interface{},
157+
cc *grpc.ClientConn,
158+
invoker grpc.UnaryInvoker,
159+
opts ...grpc.CallOption,
160+
) error {
161+
target := cc.Target()
162+
163+
// Capture outgoing request if it's a matching-related call
164+
if isMatchingMethod(method) {
165+
if protoReq, ok := req.(proto.Message); ok {
166+
r.recordMessage(method, protoReq, DirectionSend, clusterName, target, false)
167+
}
168+
}
169+
170+
err := invoker(ctx, method, req, reply, cc, opts...)
171+
172+
// Capture incoming response if successful
173+
if err == nil && isMatchingMethod(method) {
174+
if protoReply, ok := reply.(proto.Message); ok {
175+
r.recordMessage(method, protoReply, DirectionRecv, clusterName, target, false)
176+
}
177+
}
178+
179+
return err
180+
}
181+
}
182+
183+
// StreamInterceptor returns a gRPC stream client interceptor that captures stream messages
184+
func (r *MatchingStreamRecorder) StreamInterceptor(clusterName string) grpc.StreamClientInterceptor {
185+
return func(
186+
ctx context.Context,
187+
desc *grpc.StreamDesc,
188+
cc *grpc.ClientConn,
189+
method string,
190+
streamer grpc.Streamer,
191+
opts ...grpc.CallOption,
192+
) (grpc.ClientStream, error) {
193+
stream, err := streamer(ctx, desc, cc, method, opts...)
194+
if err != nil {
195+
return nil, err
196+
}
197+
198+
if isMatchingMethod(method) {
199+
return &recordingMatchingClientStream{
200+
ClientStream: stream,
201+
recorder: r,
202+
method: method,
203+
clusterName: clusterName,
204+
targetAddress: cc.Target(),
205+
}, nil
206+
}
207+
208+
return stream, nil
209+
}
210+
}
211+
212+
// recordingMatchingClientStream wraps a grpc.ClientStream to record messages
213+
type recordingMatchingClientStream struct {
214+
grpc.ClientStream
215+
recorder *MatchingStreamRecorder
216+
method string
217+
clusterName string
218+
targetAddress string
219+
}
220+
221+
func (s *recordingMatchingClientStream) SendMsg(m interface{}) error {
222+
if msg, ok := m.(proto.Message); ok {
223+
// SendMsg means this cluster is SENDING a message (could be request or ack)
224+
s.recorder.recordMessage(s.method, msg, DirectionSend, s.clusterName, s.targetAddress, true)
225+
}
226+
return s.ClientStream.SendMsg(m)
227+
}
228+
229+
func (s *recordingMatchingClientStream) RecvMsg(m interface{}) error {
230+
err := s.ClientStream.RecvMsg(m)
231+
if err == nil {
232+
if msg, ok := m.(proto.Message); ok {
233+
// RecvMsg means this cluster is RECEIVING a message (could be request or data)
234+
s.recorder.recordMessage(s.method, msg, DirectionRecv, s.clusterName, s.targetAddress, true)
235+
}
236+
}
237+
return err
238+
}
239+
240+
// UnaryServerInterceptor returns a gRPC unary server interceptor that captures messages
241+
func (r *MatchingStreamRecorder) UnaryServerInterceptor(clusterName string) grpc.UnaryServerInterceptor {
242+
return func(
243+
ctx context.Context,
244+
req interface{},
245+
info *grpc.UnaryServerInfo,
246+
handler grpc.UnaryHandler,
247+
) (interface{}, error) {
248+
// Capture incoming request if it's a matching-related call
249+
if isMatchingMethod(info.FullMethod) {
250+
if protoReq, ok := req.(proto.Message); ok {
251+
r.recordMessage(info.FullMethod, protoReq, DirectionServerRecv, clusterName, "server", false)
252+
}
253+
}
254+
255+
resp, err := handler(ctx, req)
256+
257+
// Capture outgoing response if successful
258+
if err == nil && isMatchingMethod(info.FullMethod) {
259+
if protoResp, ok := resp.(proto.Message); ok {
260+
r.recordMessage(info.FullMethod, protoResp, DirectionServerSend, clusterName, "server", false)
261+
}
262+
}
263+
264+
return resp, err
265+
}
266+
}
267+
268+
// StreamServerInterceptor returns a gRPC stream server interceptor that captures stream messages
269+
func (r *MatchingStreamRecorder) StreamServerInterceptor(clusterName string) grpc.StreamServerInterceptor {
270+
return func(
271+
srv interface{},
272+
ss grpc.ServerStream,
273+
info *grpc.StreamServerInfo,
274+
handler grpc.StreamHandler,
275+
) error {
276+
if isMatchingMethod(info.FullMethod) {
277+
wrappedStream := &recordingMatchingServerStream{
278+
ServerStream: ss,
279+
recorder: r,
280+
method: info.FullMethod,
281+
clusterName: clusterName,
282+
}
283+
return handler(srv, wrappedStream)
284+
}
285+
286+
return handler(srv, ss)
287+
}
288+
}
289+
290+
// recordingMatchingServerStream wraps a grpc.ServerStream to record messages
291+
type recordingMatchingServerStream struct {
292+
grpc.ServerStream
293+
recorder *MatchingStreamRecorder
294+
method string
295+
clusterName string
296+
}
297+
298+
func (s *recordingMatchingServerStream) SendMsg(m interface{}) error {
299+
if msg, ok := m.(proto.Message); ok {
300+
// Server SendMsg means this server is SENDING a message to the client
301+
s.recorder.recordMessage(s.method, msg, DirectionServerSend, s.clusterName, "server", true)
302+
}
303+
return s.ServerStream.SendMsg(m)
304+
}
305+
306+
func (s *recordingMatchingServerStream) RecvMsg(m interface{}) error {
307+
err := s.ServerStream.RecvMsg(m)
308+
if err == nil {
309+
if msg, ok := m.(proto.Message); ok {
310+
// Server RecvMsg means this server is RECEIVING a message from the client
311+
s.recorder.recordMessage(s.method, msg, DirectionServerRecv, s.clusterName, "server", true)
312+
}
313+
}
314+
return err
315+
}
316+
317+
func isMatchingMethod(method string) bool {
318+
// Capture all matching service calls
319+
return strings.Contains(method, ".matchingservice.v1.MatchingService/")
320+
}

tests/testcore/onebox.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ type (
116116
chasmRegistry *chasm.Registry
117117
grpcClientInterceptor *grpcinject.Interceptor
118118
replicationStreamRecorder *ReplicationStreamRecorder
119+
matchingStreamRecorder *MatchingStreamRecorder
119120
spanExporters map[telemetry.SpanExporterType]sdktrace.SpanExporter
120121
}
121122

@@ -217,6 +218,7 @@ func newTemporal(t *testing.T, params *TemporalParams) *TemporalImpl {
217218
hostsByProtocolByService: params.HostsByProtocolByService,
218219
grpcClientInterceptor: grpcinject.NewInterceptor(),
219220
replicationStreamRecorder: NewReplicationStreamRecorder(),
221+
matchingStreamRecorder: NewMatchingStreamRecorder(),
220222
spanExporters: params.SpanExporters,
221223
}
222224

@@ -225,6 +227,9 @@ func newTemporal(t *testing.T, params *TemporalParams) *TemporalImpl {
225227
outputFile := fmt.Sprintf("/tmp/replication_stream_messages_%s.txt", clusterName)
226228
impl.replicationStreamRecorder.SetOutputFile(outputFile)
227229

230+
matchingOutputFile := fmt.Sprintf("/tmp/matching_stream_messages_%s.txt", clusterName)
231+
impl.matchingStreamRecorder.SetOutputFile(matchingOutputFile)
232+
228233
for k, v := range dynamicConfigOverrides {
229234
impl.overrideDynamicConfig(t, k, v)
230235
}
@@ -764,6 +769,13 @@ func (c *TemporalImpl) newRPCFactory(
764769
grpc.WithChainStreamInterceptor(c.replicationStreamRecorder.StreamInterceptor(c.clusterMetadataConfig.CurrentClusterName)),
765770
)
766771
}
772+
// Add matching stream recorder interceptor
773+
if c.matchingStreamRecorder != nil {
774+
options = append(options,
775+
grpc.WithChainUnaryInterceptor(c.matchingStreamRecorder.UnaryInterceptor(c.clusterMetadataConfig.CurrentClusterName)),
776+
grpc.WithChainStreamInterceptor(c.matchingStreamRecorder.StreamInterceptor(c.clusterMetadataConfig.CurrentClusterName)),
777+
)
778+
}
767779
rpcConfig := config.RPC{BindOnIP: host, GRPCPort: port, HTTPPort: int(httpPort)}
768780
cfg := &config.Config{
769781
Services: map[string]config.Service{

tests/testcore/test_cluster.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,10 @@ func (tc *TestCluster) GetReplicationStreamRecorder() *ReplicationStreamRecorder
609609
return tc.host.replicationStreamRecorder
610610
}
611611

612+
func (tc *TestCluster) GetMatchingStreamRecorder() *MatchingStreamRecorder {
613+
return tc.host.matchingStreamRecorder
614+
}
615+
612616
func (tc *TestCluster) OverrideDynamicConfig(t *testing.T, key dynamicconfig.GenericSetting, value any) (cleanup func()) {
613617
return tc.host.overrideDynamicConfig(t, key.Key(), value)
614618
}

0 commit comments

Comments
 (0)