@@ -66,7 +66,7 @@ public void start() {
66
66
*/
67
67
@ Override
68
68
public void requestStart (){
69
- _myPort .get (). requestStart ();
69
+ _myPort .requestStart ();
70
70
}
71
71
72
72
/**
@@ -78,7 +78,7 @@ public void requestStart(){
78
78
*/
79
79
@ Override
80
80
public void requestDone (){
81
- _myPort .get (). requestDone ();
81
+ _myPort .requestDone ();
82
82
}
83
83
84
84
/**
@@ -87,7 +87,7 @@ public void requestDone(){
87
87
@ Override
88
88
public void requestEnsureConnection (){
89
89
checkMaster ( false , true );
90
- _myPort .get (). requestEnsureConnection ();
90
+ _myPort .requestEnsureConnection ();
91
91
}
92
92
93
93
void _checkClosed (){
@@ -133,8 +133,7 @@ public WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddr
133
133
_checkClosed ();
134
134
checkMaster ( false , true );
135
135
136
- MyPort mp = _myPort .get ();
137
- DBPort port = mp .get ( true , ReadPreference .primary (), hostNeeded );
136
+ DBPort port = _myPort .get (true , ReadPreference .primary (), hostNeeded );
138
137
139
138
try {
140
139
port .checkAuth ( db .getMongo () );
@@ -147,7 +146,7 @@ public WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddr
147
146
}
148
147
}
149
148
catch ( IOException ioe ){
150
- mp .error ( port , ioe );
149
+ _myPort .error (port , ioe );
151
150
_error ( ioe , false );
152
151
153
152
if ( concern .raiseNetworkErrors () )
@@ -162,11 +161,11 @@ public WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddr
162
161
throw me ;
163
162
}
164
163
catch ( RuntimeException re ){
165
- mp .error ( port , re );
164
+ _myPort .error (port , re );
166
165
throw re ;
167
166
}
168
167
finally {
169
- mp .done ( port );
168
+ _myPort .done (port );
170
169
m .doneWithMessage ();
171
170
}
172
171
}
@@ -236,8 +235,7 @@ private Response innerCall(final DB db, final DBCollection coll, final OutMessag
236
235
if (!secondaryOk || getReplicaSetStatus () == null )
237
236
checkMaster ( false , !secondaryOk );
238
237
239
- final MyPort mp = _myPort .get ();
240
- final DBPort port = mp .get ( false , readPref , hostNeeded );
238
+ final DBPort port = _myPort .get (false , readPref , hostNeeded );
241
239
242
240
Response res = null ;
243
241
boolean retry = false ;
@@ -248,7 +246,7 @@ private Response innerCall(final DB db, final DBCollection coll, final OutMessag
248
246
throw new MongoException ( "ids don't match" );
249
247
}
250
248
catch ( IOException ioe ){
251
- mp .error ( port , ioe );
249
+ _myPort .error (port , ioe );
252
250
retry = retries > 0 && !coll ._name .equals ( "$cmd" )
253
251
&& !(ioe instanceof SocketTimeoutException ) && _error ( ioe , secondaryOk );
254
252
if ( !retry ){
@@ -257,10 +255,10 @@ private Response innerCall(final DB db, final DBCollection coll, final OutMessag
257
255
}
258
256
}
259
257
catch ( RuntimeException re ){
260
- mp .error ( port , re );
258
+ _myPort .error (port , re );
261
259
throw re ;
262
260
} finally {
263
- mp .done ( port );
261
+ _myPort .done (port );
264
262
}
265
263
266
264
if (retry )
@@ -372,27 +370,29 @@ class MyPort {
372
370
373
371
DBPort get ( boolean keep , ReadPreference readPref , ServerAddress hostNeeded ){
374
372
375
- if ( hostNeeded != null ){
376
- if (_requestPort != null && _requestPort .serverAddress ().equals (hostNeeded )) {
377
- return _requestPort ;
373
+ DBPort requestPort = getPinnedRequestPort ();
374
+
375
+ if ( hostNeeded != null ) {
376
+ if (requestPort != null && requestPort .serverAddress ().equals (hostNeeded )) {
377
+ return requestPort ;
378
378
}
379
379
380
380
// asked for a specific host
381
381
return _portHolder .get ( hostNeeded ).get ();
382
382
}
383
383
384
- if ( _requestPort != null ){
384
+ if ( requestPort != null ){
385
385
// we are within a request, and have a port, should stick to it
386
- if ( _requestPort .getPool () == _masterPortPool || !keep ) {
386
+ if ( requestPort .getPool () == _masterPortPool || !keep ) {
387
387
// if keep is false, it's a read, so we use port even if master changed
388
- return _requestPort ;
388
+ return requestPort ;
389
389
}
390
390
391
391
// it's write and master has changed
392
392
// we fall back on new master and try to go on with request
393
393
// this may not be best behavior if spec of request is to stick with same server
394
- _requestPort .getPool ().done (_requestPort );
395
- _requestPort = null ;
394
+ requestPort .getPool ().done (requestPort );
395
+ pinnedRequestStatusThreadLocal . get (). requestPort = null ;
396
396
}
397
397
398
398
DBPort p ;
@@ -414,17 +414,19 @@ DBPort get( boolean keep , ReadPreference readPref, ServerAddress hostNeeded ){
414
414
p = _portHolder .get (node .getServerAddress ()).get ();
415
415
}
416
416
417
- if ( _inRequest ) {
418
- // if within request, remember port to stick to same server
419
- _requestPort = p ;
417
+ // if within request, remember port to stick to same server
418
+ if ( pinnedRequestStatusThreadLocal . get () != null ) {
419
+ pinnedRequestStatusThreadLocal . get (). requestPort = p ;
420
420
}
421
421
422
422
return p ;
423
423
}
424
424
425
- void done ( DBPort p ){
425
+ void done ( DBPort p ) {
426
+ DBPort requestPort = getPinnedRequestPort ();
427
+
426
428
// keep request port
427
- if ( p != _requestPort ) {
429
+ if (p != requestPort ) {
428
430
p .getPool ().done (p );
429
431
}
430
432
}
@@ -436,7 +438,7 @@ void done( DBPort p ){
436
438
*/
437
439
void error ( DBPort p , Exception e ){
438
440
p .close ();
439
- _requestPort = null ;
441
+ pinnedRequestStatusThreadLocal . remove () ;
440
442
441
443
// depending on type of error, may need to close other connections in pool
442
444
boolean recoverable = p .getPool ().gotError (e );
@@ -449,28 +451,39 @@ void error( DBPort p , Exception e ){
449
451
}
450
452
451
453
void requestEnsureConnection (){
452
- if ( ! _inRequest )
454
+ if ( pinnedRequestStatusThreadLocal . get () == null )
453
455
return ;
454
456
455
- if ( _requestPort != null )
457
+ if ( getPinnedRequestPort () != null )
456
458
return ;
457
459
458
- _requestPort = _masterPortPool .get ();
460
+ pinnedRequestStatusThreadLocal . get (). requestPort = _masterPortPool .get ();
459
461
}
460
462
461
463
void requestStart (){
462
- _inRequest = true ;
464
+ pinnedRequestStatusThreadLocal . set ( new PinnedRequestStatus ()) ;
463
465
}
464
466
465
467
void requestDone (){
466
- if ( _requestPort != null )
467
- _requestPort .getPool ().done ( _requestPort );
468
- _requestPort = null ;
469
- _inRequest = false ;
468
+ DBPort requestPort = getPinnedRequestPort ();
469
+ if ( requestPort != null )
470
+ requestPort .getPool ().done ( requestPort );
471
+ pinnedRequestStatusThreadLocal .remove ();
472
+ }
473
+
474
+ PinnedRequestStatus getPinnedRequestStatus () {
475
+ return pinnedRequestStatusThreadLocal .get ();
476
+ }
477
+
478
+ DBPort getPinnedRequestPort () {
479
+ return pinnedRequestStatusThreadLocal .get () != null ? pinnedRequestStatusThreadLocal .get ().requestPort : null ;
470
480
}
471
481
472
- DBPort _requestPort ;
473
- boolean _inRequest ;
482
+ private final ThreadLocal <PinnedRequestStatus > pinnedRequestStatusThreadLocal = new ThreadLocal <PinnedRequestStatus >();
483
+ }
484
+
485
+ static class PinnedRequestStatus {
486
+ DBPort requestPort ;
474
487
}
475
488
476
489
void checkMaster ( boolean force , boolean failIfNoMaster ){
@@ -566,10 +579,6 @@ public void close(){
566
579
_connectionStatus = null ;
567
580
} catch (final Throwable t ) { /* nada */ }
568
581
}
569
-
570
- // below this will remove the myport for this thread only
571
- // client using thread pool in web framework may need to call close() from all threads
572
- _myPort .remove ();
573
582
}
574
583
575
584
/**
@@ -598,15 +607,14 @@ public boolean isOpen(){
598
607
599
608
@ Override
600
609
public CommandResult authenticate (MongoCredential credentials ) {
601
- final MyPort mp = _myPort .get ();
602
- final DBPort port = mp .get (false , ReadPreference .primaryPreferred (), null );
610
+ final DBPort port = _myPort .get (false , ReadPreference .primaryPreferred (), null );
603
611
604
612
try {
605
613
CommandResult result = port .authenticate (_mongo , credentials );
606
614
_mongo .getAuthority ().getCredentialsStore ().add (credentials );
607
615
return result ;
608
616
} finally {
609
- mp .done (port );
617
+ _myPort .done (port );
610
618
}
611
619
}
612
620
@@ -621,7 +629,7 @@ public int getMaxBsonObjectSize() {
621
629
622
630
// expose for unit testing
623
631
MyPort getMyPort () {
624
- return _myPort . get () ;
632
+ return _myPort ;
625
633
}
626
634
627
635
private volatile DBPortPool _masterPortPool ;
@@ -634,10 +642,5 @@ MyPort getMyPort() {
634
642
private volatile int _maxBsonObjectSize ;
635
643
private volatile Boolean _isMongosDirectConnection ;
636
644
637
- private ThreadLocal <MyPort > _myPort = new ThreadLocal <MyPort >(){
638
- protected MyPort initialValue (){
639
- return new MyPort ();
640
- }
641
- };
642
-
645
+ MyPort _myPort = new MyPort ();
643
646
}
0 commit comments