|
| 1 | +package main |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "errors" |
| 6 | + "flag" |
| 7 | + "fmt" |
| 8 | + "os" |
| 9 | + "reflect" |
| 10 | + "time" |
| 11 | + |
| 12 | + grpcpb "github.com/codeblooded/grpc-proto/genproto/grpc/testing" |
| 13 | + "github.com/golang/protobuf/jsonpb" |
| 14 | + "github.com/golang/protobuf/proto" |
| 15 | + "github.com/golang/protobuf/ptypes" |
| 16 | + svcpb "github.com/grpc/grpc/testctrl/proto/scheduling/v1" |
| 17 | + lrpb "google.golang.org/genproto/googleapis/longrunning" |
| 18 | + "google.golang.org/grpc" |
| 19 | +) |
| 20 | + |
| 21 | +const ( |
| 22 | + // Success (exit code 0) shows the command finished without an error. |
| 23 | + Success = 0 |
| 24 | + |
| 25 | + // FlagError (exit code 2) shows the command was unable to run or |
| 26 | + // complete due to the combination or lack of flags. |
| 27 | + FlagError = 2 |
| 28 | + |
| 29 | + // ConnectionError (exit code 3) shows the command could not establish a |
| 30 | + // connection to services over the internet. |
| 31 | + ConnectionError = 3 |
| 32 | + |
| 33 | + // SchedulingError (exit code 4) shows that the test session could not |
| 34 | + // be scheduled to run on the cluster. |
| 35 | + SchedulingError = 4 |
| 36 | + |
| 37 | + // OperationError (exit code 5) shows that the test session was scheduled |
| 38 | + // but there was a problem checking the status of the operation. |
| 39 | + OperationError = 5 |
| 40 | +) |
| 41 | + |
| 42 | +// ScheduleFlags is the set of flags necessary to schedule test sessions. |
| 43 | +type ScheduleFlags struct { |
| 44 | + address string |
| 45 | + driver string |
| 46 | + server string |
| 47 | + driverPool string |
| 48 | + serverPool string |
| 49 | + clientPool string |
| 50 | + clients clientList |
| 51 | + scenario scenario |
| 52 | +} |
| 53 | + |
| 54 | +// validate ensures that a scenario and driver are provided for the test. If |
| 55 | +// they are missing, an error is returned. |
| 56 | +func (s *ScheduleFlags) validate() error { |
| 57 | + if s.driver == "" { |
| 58 | + return errors.New("-driver is required to orchestrate the test, but missing") |
| 59 | + } |
| 60 | + |
| 61 | + if s.scenario.String() == "<nil>" { |
| 62 | + return errors.New("-scenario is required to configure the test, but missing") |
| 63 | + } |
| 64 | + |
| 65 | + return nil |
| 66 | +} |
| 67 | + |
| 68 | +// clientList contains a list of client container images. It implements the |
| 69 | +// flag.Value interface, allowing it to be parsed alongside flags with primitive |
| 70 | +// types. |
| 71 | +type clientList struct { |
| 72 | + clients []string |
| 73 | +} |
| 74 | + |
| 75 | +// String returns a string representation of the list of clients. |
| 76 | +func (cl *clientList) String() string { |
| 77 | + return fmt.Sprintf("%v", cl.clients) |
| 78 | +} |
| 79 | + |
| 80 | +// Set parses a client flag and appends it to the list. |
| 81 | +func (cl *clientList) Set(client string) error { |
| 82 | + cl.clients = append(cl.clients, client) |
| 83 | + return nil |
| 84 | +} |
| 85 | + |
| 86 | +// scenario wraps the scenario protobuf, implementing the flag.Value interface. |
| 87 | +// This allows it to be parsed alongside flags with primitive types. |
| 88 | +type scenario struct { |
| 89 | + proto *grpcpb.Scenario |
| 90 | +} |
| 91 | + |
| 92 | +// String returns a string representation of the proto. |
| 93 | +func (sc *scenario) String() string { |
| 94 | + return fmt.Sprintf("%v", sc.proto) |
| 95 | +} |
| 96 | + |
| 97 | +// Set parses the JSON string into a protobuf as the flag is parsed. It returns |
| 98 | +// an error is the flag is malformed or cannot be marshaled into a proto. |
| 99 | +func (sc *scenario) Set(scenarioJSON string) error { |
| 100 | + if scenarioJSON == "" { |
| 101 | + return errors.New("a valid scenario is required, but missing") |
| 102 | + } |
| 103 | + |
| 104 | + sc.proto = &grpcpb.Scenario{} |
| 105 | + err := jsonpb.UnmarshalString(scenarioJSON, sc.proto) |
| 106 | + if err != nil { |
| 107 | + return fmt.Errorf("could not parse scenario json: %v", err) |
| 108 | + } |
| 109 | + |
| 110 | + return nil |
| 111 | +} |
| 112 | + |
| 113 | +// connect establishes a connection to a server at a specified address, |
| 114 | +// returning a client connection object. If there is a problem connecting or |
| 115 | +// the context's deadline is exceeded, an error is returned. |
| 116 | +func connect(ctx context.Context, address string) (*grpc.ClientConn, error) { |
| 117 | + dialCtx, cancel := context.WithTimeout(ctx, 10*time.Second) |
| 118 | + defer cancel() |
| 119 | + |
| 120 | + fmt.Printf("dialing server at %v\n", address) |
| 121 | + return grpc.DialContext(dialCtx, address, grpc.WithInsecure(), |
| 122 | + grpc.WithBlock(), grpc.WithDisableRetry()) |
| 123 | +} |
| 124 | + |
| 125 | +// newScheduleRequest uses a ScheduleFlags struct to construct a |
| 126 | +// StartTestSessionRequest protobuf. |
| 127 | +func newScheduleRequest(flags ScheduleFlags) *svcpb.StartTestSessionRequest { |
| 128 | + var workers []*svcpb.Component |
| 129 | + if flags.server != "" { |
| 130 | + workers = append(workers, &svcpb.Component{ |
| 131 | + ContainerImage: flags.server, |
| 132 | + Kind: svcpb.Component_SERVER, |
| 133 | + Pool: flags.serverPool, |
| 134 | + }) |
| 135 | + } |
| 136 | + for _, client := range flags.clients.clients { |
| 137 | + workers = append(workers, &svcpb.Component{ |
| 138 | + ContainerImage: client, |
| 139 | + Kind: svcpb.Component_CLIENT, |
| 140 | + Pool: flags.clientPool, |
| 141 | + }) |
| 142 | + } |
| 143 | + |
| 144 | + return &svcpb.StartTestSessionRequest{ |
| 145 | + Scenario: flags.scenario.proto, |
| 146 | + Driver: &svcpb.Component{ |
| 147 | + ContainerImage: flags.driver, |
| 148 | + Kind: svcpb.Component_DRIVER, |
| 149 | + Pool: flags.driverPool, |
| 150 | + }, |
| 151 | + Workers: workers, |
| 152 | + } |
| 153 | +} |
| 154 | + |
| 155 | +// startSession attempts to create a test session. It returns a longrunning |
| 156 | +// operation upon success. If the context's deadline is exceeded or a networking |
| 157 | +// problem occurs, an error is returned. |
| 158 | +func startSession(ctx context.Context, client svcpb.SchedulingServiceClient, request *svcpb.StartTestSessionRequest) (*lrpb.Operation, error) { |
| 159 | + scheduleCtx, cancel := context.WithTimeout(ctx, 10*time.Second) |
| 160 | + defer cancel() |
| 161 | + |
| 162 | + fmt.Printf("scheduling session with test %q\n", request.Scenario.Name) |
| 163 | + return client.StartTestSession(scheduleCtx, request) |
| 164 | +} |
| 165 | + |
| 166 | +// awaitSession polls the service for the status of a running operation until it |
| 167 | +// is done. If the context's deadline is exceeded or there is a problem getting |
| 168 | +// the status of the operation, an error is returned. Otherwise, the result of |
| 169 | +// the tests are returned. |
| 170 | +func awaitSession(ctx context.Context, operationName string, client lrpb.OperationsClient) (*svcpb.TestSessionResult, error) { |
| 171 | + awaitCtx, cancel := context.WithCancel(ctx) |
| 172 | + defer cancel() |
| 173 | + |
| 174 | + var lastEvent *svcpb.Event |
| 175 | + |
| 176 | + for { |
| 177 | + operation, err := client.GetOperation( |
| 178 | + awaitCtx, |
| 179 | + &lrpb.GetOperationRequest{Name: operationName}, |
| 180 | + ) |
| 181 | + if err != nil { |
| 182 | + return nil, fmt.Errorf("could not get operation status: %v", err) |
| 183 | + } |
| 184 | + |
| 185 | + var metadata svcpb.TestSessionMetadata |
| 186 | + if err := proto.Unmarshal(operation.Metadata.GetValue(), &metadata); err == nil { |
| 187 | + event := metadata.LatestEvent |
| 188 | + timestamp, err := ptypes.Timestamp(event.Time) |
| 189 | + if err != nil { |
| 190 | + return nil, fmt.Errorf("could not marshal timestamp: %v", err) |
| 191 | + } |
| 192 | + |
| 193 | + if lastEvent == nil || !reflect.DeepEqual(lastEvent, event) { |
| 194 | + fmt.Printf("[%s] [%s] %s\n", timestamp.Format("Jan 2 2006 15:04:05"), |
| 195 | + event.Kind, event.Description) |
| 196 | + } |
| 197 | + |
| 198 | + lastEvent = event |
| 199 | + } |
| 200 | + |
| 201 | + if operation.Done { |
| 202 | + var result svcpb.TestSessionResult |
| 203 | + if err := proto.Unmarshal(operation.GetResponse().GetValue(), &result); err != nil { |
| 204 | + return nil, fmt.Errorf("could not marshal test result: %v", err) |
| 205 | + } |
| 206 | + |
| 207 | + return &result, nil |
| 208 | + } |
| 209 | + |
| 210 | + time.Sleep(5 * time.Second) |
| 211 | + } |
| 212 | +} |
| 213 | + |
| 214 | +// exit logs an error message and terminates the process with the provided |
| 215 | +// status code. |
| 216 | +func exit(code int, messageFmt string, args ...interface{}) { |
| 217 | + fmt.Fprintf(os.Stderr, messageFmt+"\n", args...) |
| 218 | + os.Exit(code) |
| 219 | +} |
| 220 | + |
| 221 | +// Schedule accepts command line arguments and uses them to schedule a test, |
| 222 | +// reporting progress as it runs. |
| 223 | +func Schedule(args []string) { |
| 224 | + flags := ScheduleFlags{} |
| 225 | + scheduleFlags := flag.NewFlagSet("testctl", flag.ExitOnError) |
| 226 | + scheduleFlags.StringVar(&flags.address, "address", "127.0.0.1:50051", "host and port of the scheduling server") |
| 227 | + scheduleFlags.StringVar(&flags.driver, "driver", "", "container image with a driver for testing") |
| 228 | + scheduleFlags.StringVar(&flags.server, "server", "", "container image with a server for testing") |
| 229 | + scheduleFlags.Var(&flags.clients, "client", "container image with a client for testing") |
| 230 | + scheduleFlags.Var(&flags.scenario, "scenario", "protobuf which configures the test (as a JSON string)") |
| 231 | + scheduleFlags.StringVar(&flags.driverPool, "driverPool", "drivers", "pool of machines where the driver should run") |
| 232 | + scheduleFlags.StringVar(&flags.serverPool, "serverPool", "workers-8core", "pool of machines where the server should run") |
| 233 | + scheduleFlags.StringVar(&flags.clientPool, "clientPool", "workers-8core", "pool of machines where the client should run") |
| 234 | + scheduleFlags.Parse(args) |
| 235 | + |
| 236 | + if err := flags.validate(); err != nil { |
| 237 | + exit(FlagError, err.Error()) |
| 238 | + } |
| 239 | + |
| 240 | + ctx, cancel := context.WithCancel(context.Background()) |
| 241 | + defer cancel() |
| 242 | + |
| 243 | + conn, err := connect(ctx, flags.address) |
| 244 | + if err != nil { |
| 245 | + exit(ConnectionError, "could not connect to server: %v", err) |
| 246 | + } |
| 247 | + defer conn.Close() |
| 248 | + |
| 249 | + scheduleClient := svcpb.NewSchedulingServiceClient(conn) |
| 250 | + operationsClient := lrpb.NewOperationsClient(conn) |
| 251 | + |
| 252 | + request := newScheduleRequest(flags) |
| 253 | + operation, err := startSession(ctx, scheduleClient, request) |
| 254 | + if err != nil { |
| 255 | + exit(SchedulingError, "scheduling session failed: %v", err) |
| 256 | + } |
| 257 | + fmt.Printf("%v has been created\n", operation.Name) |
| 258 | + |
| 259 | + result, err := awaitSession(ctx, operation.Name, operationsClient) |
| 260 | + if err != nil { |
| 261 | + exit(OperationError, "service did not report status of operation: %v", err) |
| 262 | + } |
| 263 | + fmt.Printf("%s\n", result.DriverLogs) |
| 264 | +} |
| 265 | + |
| 266 | +func main() { |
| 267 | + Schedule(os.Args[1:]) |
| 268 | +} |
0 commit comments