991010 * @license https://github.com/opencodeco/hyperf-metric/blob/main/LICENSE
1111 */
12+
1213namespace Jaeger ;
1314
14- use Hyperf \Context \ApplicationContext ;
15- use Hyperf \Contract \StdoutLoggerInterface ;
1615use Hyperf \Coordinator \Constants ;
1716use Hyperf \Coordinator \CoordinatorManager ;
1817use Hyperf \Coroutine \Coroutine ;
1918use Hyperf \Engine \Channel ;
2019use Psr \Log \LoggerInterface ;
21- use Psr \Log \NullLogger ;
2220use Socket ;
2321use Thrift \Exception \TTransportException ;
2422use Thrift \Transport \TTransport ;
2523use Throwable ;
2624
25+ use function Hyperf \Support \env ;
26+
2727class ThriftUdpTransport extends TTransport
2828{
2929 private null |Socket $ socket = null ;
@@ -33,10 +33,8 @@ class ThriftUdpTransport extends TTransport
3333 public function __construct (
3434 private string $ host ,
3535 private int $ port ,
36- private ?LoggerInterface $ logger = null
37- ) {
38- $ this ->logger = $ logger ?? new NullLogger ();
39- }
36+ private LoggerInterface $ logger
37+ ) {}
4038
4139 /**
4240 * Whether this transport is open.
@@ -120,9 +118,17 @@ public function write($buf)
120118 $ this ->loop ();
121119 }
122120
123- $ this ->chan ->push (function () use ($ buf ) {
121+ $ pushed = $ this ->chan ->push (function () use ($ buf ) {
124122 $ this ->doWrite ($ buf );
125- });
123+ }, (float ) env ('TRACE_THRIFT_UDP_TIMEOUT ' , 0.1 ));
124+
125+ if (! $ pushed ) {
126+ $ this ->logger ->error ('ThriftUdpTransport error: ' . match (true ) {
127+ $ this ->chan ->isTimeout () => 'Channel Timeout ' ,
128+ $ this ->chan ->isClosing () => 'Channel Close ' ,
129+ default => 'Channel Error '
130+ });
131+ }
126132 }
127133
128134 private function doOpen (): void
@@ -151,7 +157,14 @@ private function loop(): void
151157 $ this ->chan = new Channel (1 );
152158 Coroutine::create (function () {
153159 while (true ) {
154- $ this ->doOpen ();
160+ try {
161+ $ this ->doOpen ();
162+ } catch (Throwable $ e ) {
163+ $ this ->chan ->close ();
164+ $ this ->chan = null ;
165+ throw $ e ;
166+ }
167+
155168 while (true ) {
156169 try {
157170 $ closure = $ this ->chan ->pop ();
@@ -160,13 +173,7 @@ private function loop(): void
160173 }
161174 $ closure ->call ($ this );
162175 } catch (Throwable $ e ) {
163- if (ApplicationContext::hasContainer ()) {
164- if (ApplicationContext::getContainer ()->has (StdoutLoggerInterface::class)) {
165- ApplicationContext::getContainer ()
166- ->get (StdoutLoggerInterface::class)
167- ->error ('ThriftUdpTransport error: ' . $ e ->getMessage ());
168- }
169- }
176+ $ this ->logger ->error ('ThriftUdpTransport error: ' . $ e ->getMessage ());
170177 @socket_close ($ this ->socket );
171178 $ this ->socket = null ;
172179 break ;
0 commit comments