|
32 | 32 | */
|
33 | 33 | class RpcServer extends Server
|
34 | 34 | {
|
35 |
| - protected $call; |
36 |
| - // [ <String method_full_path> => [ |
37 |
| - // 'service' => <Object service>, |
38 |
| - // 'method' => <String method_name>, |
39 |
| - // 'request' => <Object request>, |
40 |
| - // ] ] |
41 |
| - protected $paths_map; |
| 35 | + // [ <String method_full_path> => MethodDescriptor ] |
| 36 | + private $paths_map = []; |
42 | 37 |
|
43 |
| - private function waitForNextEvent() { |
| 38 | + private function waitForNextEvent() |
| 39 | + { |
44 | 40 | return $this->requestCall();
|
45 | 41 | }
|
46 | 42 |
|
47 |
| - private function loadRequest($request) { |
48 |
| - if (!$this->call) { |
49 |
| - throw new Exception("serverCall is not ready"); |
50 |
| - } |
51 |
| - $event = $this->call->startBatch([ |
52 |
| - OP_RECV_MESSAGE => true, |
53 |
| - ]); |
54 |
| - if (!$event->message) { |
55 |
| - throw new Exception("Did not receive a proper message"); |
56 |
| - } |
57 |
| - $request->mergeFromString($event->message); |
58 |
| - return $request; |
59 |
| - } |
60 |
| - |
61 |
| - protected function sendOkResponse($response) { |
62 |
| - if (!$this->call) { |
63 |
| - throw new Exception("serverCall is not ready"); |
64 |
| - } |
65 |
| - $this->call->startBatch([ |
66 |
| - OP_SEND_INITIAL_METADATA => [], |
67 |
| - OP_SEND_MESSAGE => ['message' => |
68 |
| - $response->serializeToString()], |
69 |
| - OP_SEND_STATUS_FROM_SERVER => [ |
70 |
| - 'metadata' => [], |
71 |
| - 'code' => STATUS_OK, |
72 |
| - 'details' => 'OK', |
73 |
| - ], |
74 |
| - OP_RECV_CLOSE_ON_SERVER => true, |
75 |
| - ]); |
76 |
| - } |
77 |
| - |
78 | 43 | /**
|
79 | 44 | * Add a service to this server
|
80 | 45 | *
|
81 | 46 | * @param Object $service The service to be added
|
82 | 47 | */
|
83 |
| - public function handle($service) { |
84 |
| - $rf = new \ReflectionClass($service); |
85 |
| - |
86 |
| - // If input does not have a parent class, which should be the |
87 |
| - // generated stub, don't proceeed. This might change in the |
88 |
| - // future. |
89 |
| - if (!$rf->getParentClass()) return; |
90 |
| - |
91 |
| - // The input class name needs to match the service name |
92 |
| - $service_name = $rf->getName(); |
93 |
| - $namespace = $rf->getParentClass()->getNamespaceName(); |
94 |
| - $prefix = ""; |
95 |
| - if ($namespace) { |
96 |
| - $parts = explode("\\", $namespace); |
97 |
| - foreach ($parts as $part) { |
98 |
| - $prefix .= lcfirst($part) . "."; |
99 |
| - } |
| 48 | + public function handle($service) |
| 49 | + { |
| 50 | + $methodDescriptors = $service->getMethodDescriptors(); |
| 51 | + $exist_methods = array_intersect_key($this->paths_map, $methodDescriptors); |
| 52 | + if (!empty($exist_methods)) { |
| 53 | + fwrite(STDERR, "WARNING: " . 'override already registered methods: ' . |
| 54 | + implode(', ', array_keys($exist_methods)) . PHP_EOL); |
100 | 55 | }
|
101 |
| - $base_path = "/" . $prefix . $service_name; |
102 |
| - |
103 |
| - // Right now, assume all the methods in the class are RPC method |
104 |
| - // implementations. Might change in the future. |
105 |
| - $methods = $rf->getMethods(); |
106 |
| - foreach ($methods as $method) { |
107 |
| - $method_name = $method->getName(); |
108 |
| - $full_path = $base_path . "/" . ucfirst($method_name); |
109 | 56 |
|
110 |
| - $method_params = $method->getParameters(); |
111 |
| - // RPC should have exactly 1 request param |
112 |
| - if (count($method_params) != 1) continue; |
113 |
| - $request_param = $method_params[0]; |
114 |
| - // Method implementation must have type hint for request param |
115 |
| - if (!$request_param->getType()) continue; |
116 |
| - $request_type = $request_param->getType()->getName(); |
117 |
| - |
118 |
| - // $full_path needs to match the incoming event->method |
119 |
| - // from requestCall() for us to know how to handle the request |
120 |
| - $this->paths_map[$full_path] = [ |
121 |
| - 'service' => $service, |
122 |
| - 'method' => $method_name, |
123 |
| - 'request' => new $request_type(), |
124 |
| - ]; |
125 |
| - } |
| 57 | + $this->paths_map = array_merge($this->paths_map, $methodDescriptors); |
| 58 | + return $this->paths_map; |
126 | 59 | }
|
127 | 60 |
|
128 |
| - public function run() { |
| 61 | + public function run() |
| 62 | + { |
129 | 63 | $this->start();
|
130 |
| - while (true) { |
| 64 | + while (true) try { |
131 | 65 | // This blocks until the server receives a request
|
132 | 66 | $event = $this->waitForNextEvent();
|
133 |
| - if (!$event) { |
134 |
| - throw new Exception( |
135 |
| - "Unexpected error: server->waitForNextEvent delivers" |
136 |
| - . " an empty event"); |
137 |
| - } |
138 |
| - if (!$event->call) { |
139 |
| - throw new Exception( |
140 |
| - "Unexpected error: server->waitForNextEvent delivers" |
141 |
| - . " an event without a call"); |
142 |
| - } |
143 |
| - $this->call = $event->call; |
144 |
| - $full_path = $event->method; |
145 |
| - |
146 |
| - // TODO: Can send a proper UNIMPLEMENTED response in the future |
147 |
| - if (!array_key_exists($full_path, $this->paths_map)) continue; |
148 | 67 |
|
149 |
| - $service = $this->paths_map[$full_path]['service']; |
150 |
| - $method = $this->paths_map[$full_path]['method']; |
151 |
| - $request = $this->paths_map[$full_path]['request']; |
152 |
| - |
153 |
| - $request = $this->loadRequest($request); |
154 |
| - if (!$request) { |
155 |
| - throw new Exception("Unexpected error: fail to parse request"); |
156 |
| - } |
157 |
| - if (!method_exists($service, $method)) { |
158 |
| - // TODO: Can send a proper UNIMPLEMENTED response in the future |
159 |
| - throw new Exception("Method not implemented"); |
| 68 | + $full_path = $event->method; |
| 69 | + $context = new ServerContext($event); |
| 70 | + $server_writer = new ServerCallWriter($event->call, $context); |
| 71 | + |
| 72 | + if (!array_key_exists($full_path, $this->paths_map)) { |
| 73 | + $context->setStatus(Status::unimplemented()); |
| 74 | + $server_writer->finish(); |
| 75 | + continue; |
| 76 | + }; |
| 77 | + |
| 78 | + $method_desc = $this->paths_map[$full_path]; |
| 79 | + $server_reader = new ServerCallReader( |
| 80 | + $event->call, |
| 81 | + $method_desc->request_type |
| 82 | + ); |
| 83 | + |
| 84 | + try { |
| 85 | + $this->processCall( |
| 86 | + $method_desc, |
| 87 | + $server_reader, |
| 88 | + $server_writer, |
| 89 | + $context |
| 90 | + ); |
| 91 | + } catch (\Exception $e) { |
| 92 | + $context->setStatus(Status::status( |
| 93 | + STATUS_INTERNAL, |
| 94 | + $e->getMessage() |
| 95 | + )); |
| 96 | + $server_writer->finish(); |
160 | 97 | }
|
| 98 | + } catch (\Exception $e) { |
| 99 | + fwrite(STDERR, "ERROR: " . $e->getMessage() . PHP_EOL); |
| 100 | + exit(1); |
| 101 | + } |
| 102 | + } |
161 | 103 |
|
162 |
| - // Dispatch to actual server logic |
163 |
| - $response = $service->$method($request); |
164 |
| - $this->sendOkResponse($response); |
165 |
| - $this->call = null; |
| 104 | + private function processCall( |
| 105 | + MethodDescriptor $method_desc, |
| 106 | + ServerCallReader $server_reader, |
| 107 | + ServerCallWriter $server_writer, |
| 108 | + ServerContext $context |
| 109 | + ) { |
| 110 | + // Dispatch to actual server logic |
| 111 | + switch ($method_desc->call_type) { |
| 112 | + case MethodDescriptor::UNARY_CALL: |
| 113 | + $request = $server_reader->read(); |
| 114 | + $response = |
| 115 | + call_user_func( |
| 116 | + array($method_desc->service, $method_desc->method_name), |
| 117 | + $request ?? new $method_desc->request_type, |
| 118 | + $context |
| 119 | + ); |
| 120 | + $server_writer->finish($response); |
| 121 | + break; |
| 122 | + case MethodDescriptor::SERVER_STREAMING_CALL: |
| 123 | + $request = $server_reader->read(); |
| 124 | + call_user_func( |
| 125 | + array($method_desc->service, $method_desc->method_name), |
| 126 | + $request ?? new $method_desc->request_type, |
| 127 | + $server_writer, |
| 128 | + $context |
| 129 | + ); |
| 130 | + break; |
| 131 | + case MethodDescriptor::CLIENT_STREAMING_CALL: |
| 132 | + $response = call_user_func( |
| 133 | + array($method_desc->service, $method_desc->method_name), |
| 134 | + $server_reader, |
| 135 | + $context |
| 136 | + ); |
| 137 | + $server_writer->finish($response); |
| 138 | + break; |
| 139 | + case MethodDescriptor::BIDI_STREAMING_CALL: |
| 140 | + call_user_func( |
| 141 | + array($method_desc->service, $method_desc->method_name), |
| 142 | + $server_reader, |
| 143 | + $server_writer, |
| 144 | + $context |
| 145 | + ); |
| 146 | + break; |
| 147 | + default: |
| 148 | + throw new \Exception(); |
166 | 149 | }
|
167 | 150 | }
|
168 | 151 | }
|
0 commit comments