1+ package org .jetbrains .ktor .benchmarks ;
2+
3+ import io .netty .channel .Channel ;
4+ import io .netty .channel .EventLoopGroup ;
5+ import io .netty .channel .nio .NioEventLoopGroup ;
6+ import io .netty .channel .socket .DatagramChannel ;
7+ import io .netty .channel .socket .ServerSocketChannel ;
8+ import io .netty .channel .socket .SocketChannel ;
9+ import io .netty .channel .socket .nio .NioDatagramChannel ;
10+ import io .netty .channel .socket .nio .NioServerSocketChannel ;
11+ import io .netty .channel .socket .nio .NioSocketChannel ;
12+ import io .netty .util .concurrent .DefaultThreadFactory ;
13+ import io .netty .util .concurrent .Future ;
14+ import io .netty .util .concurrent .ThreadPerTaskExecutor ;
15+ import reactor .core .publisher .Mono ;
16+ import reactor .netty .FutureMono ;
17+ import reactor .netty .resources .LoopResources ;
18+
19+ import java .time .Duration ;
20+ import java .util .concurrent .TimeUnit ;
21+ import java .util .concurrent .atomic .AtomicBoolean ;
22+ import java .util .concurrent .atomic .AtomicReference ;
23+
24+ /**
25+ * Copied from GitHub issue comment: https://github.com/r2dbc/r2dbc-pool/issues/190#issuecomment-1566845190
26+ */
27+ public class NioClientEventLoopResources implements LoopResources {
28+ public static final String THREAD_PREFIX = "prefix-" ;
29+ final int threads ;
30+ final AtomicReference <EventLoopGroup > loops = new AtomicReference <>();
31+ final AtomicBoolean running ;
32+
33+ NioClientEventLoopResources (int threads ) {
34+ this .running = new AtomicBoolean (true );
35+ this .threads = threads ;
36+ }
37+
38+
39+ @ Override
40+ @ SuppressWarnings ("unchecked" )
41+ public Mono <Void > disposeLater (Duration quietPeriod , Duration timeout ) {
42+ return Mono .defer (() -> {
43+ long quietPeriodMillis = quietPeriod .toMillis ();
44+ long timeoutMillis = timeout .toMillis ();
45+ EventLoopGroup serverLoopsGroup = loops .get ();
46+ Mono <?> slMono = Mono .empty ();
47+ if (running .compareAndSet (true , false )) {
48+ if (serverLoopsGroup != null ) {
49+ slMono = FutureMono .from ((Future ) serverLoopsGroup .shutdownGracefully (
50+ quietPeriodMillis , timeoutMillis , TimeUnit .MILLISECONDS ));
51+ }
52+ }
53+ return Mono .when (slMono );
54+ });
55+ }
56+
57+ @ Override
58+ public boolean isDisposed () {
59+ return !running .get ();
60+ }
61+
62+ @ Override
63+ public EventLoopGroup onClient (boolean useNative ) {
64+ return cacheLoops ();
65+ }
66+
67+ @ Override
68+ public EventLoopGroup onServer (boolean useNative ) {
69+ throw new UnsupportedOperationException ("This event loop is designed only for client DB calls." );
70+ }
71+
72+ @ Override
73+ public EventLoopGroup onServerSelect (boolean useNative ) {
74+ throw new UnsupportedOperationException ("This event loop is designed only for client DB calls." );
75+ }
76+
77+ @ Override
78+ public <CHANNEL extends Channel > CHANNEL onChannel (Class <CHANNEL > channelType , EventLoopGroup group ) {
79+ if (channelType .equals (SocketChannel .class )) {
80+ return (CHANNEL ) new NioSocketChannel ();
81+ }
82+ if (channelType .equals (ServerSocketChannel .class )) {
83+ return (CHANNEL ) new NioServerSocketChannel ();
84+ }
85+ if (channelType .equals (DatagramChannel .class )) {
86+ return (CHANNEL ) new NioDatagramChannel ();
87+ }
88+ throw new IllegalArgumentException ("Unsupported channel type: " + channelType .getSimpleName ());
89+ }
90+
91+ @ Override
92+ public <CHANNEL extends Channel > Class <? extends CHANNEL > onChannelClass (Class <CHANNEL > channelType ,
93+ EventLoopGroup group ) {
94+ if (channelType .equals (SocketChannel .class )) {
95+ return (Class <? extends CHANNEL >) NioSocketChannel .class ;
96+ }
97+ if (channelType .equals (ServerSocketChannel .class )) {
98+ return (Class <? extends CHANNEL >) NioServerSocketChannel .class ;
99+ }
100+ if (channelType .equals (DatagramChannel .class )) {
101+ return (Class <? extends CHANNEL >) NioDatagramChannel .class ;
102+ }
103+ throw new IllegalArgumentException ("Unsupported channel type: " + channelType .getSimpleName ());
104+ }
105+
106+ @ SuppressWarnings ("FutureReturnValueIgnored" )
107+ EventLoopGroup cacheLoops () {
108+ EventLoopGroup eventLoopGroup = loops .get ();
109+ if (null == eventLoopGroup ) {
110+ EventLoopGroup newEventLoopGroup = createNewEventLoopGroup ();
111+ if (!loops .compareAndSet (null , newEventLoopGroup )) {
112+ //"FutureReturnValueIgnored" this is deliberate
113+ newEventLoopGroup .shutdownGracefully (0 , 0 , TimeUnit .MILLISECONDS );
114+ }
115+ eventLoopGroup = cacheLoops ();
116+ }
117+ return eventLoopGroup ;
118+ }
119+
120+ private NioEventLoopGroup createNewEventLoopGroup () {
121+ return new NioEventLoopGroup (threads , new ThreadPerTaskExecutor (new DefaultThreadFactory (THREAD_PREFIX )));
122+ }
123+ }
0 commit comments