4949import java .sql .Statement ;
5050import java .sql .Timestamp ;
5151import java .time .Instant ;
52+ import java .time .format .DateTimeFormatter ;
5253import java .util .ArrayList ;
54+ import java .util .Comparator ;
5355import java .util .HashMap ;
5456import java .util .List ;
5557import java .util .Map ;
5961import java .util .concurrent .ConcurrentHashMap ;
6062import java .util .concurrent .TimeUnit ;
6163import java .util .function .Supplier ;
64+ import java .util .stream .Collectors ;
6265
6366/**
6467 * An implementation of topology service for Aurora RDS. It uses
@@ -89,7 +92,6 @@ public class AuroraTopologyService implements ITopologyService {
8992 public static final CacheMap <String , List <HostInfo >> topologyCache = new CacheMap <>();
9093 public static final CacheMap <String , Set <String >> downHostCache = new CacheMap <>();
9194 public static final CacheMap <String , HostInfo > lastUsedReaderCache = new CacheMap <>();
92- public static final CacheMap <String , Boolean > multiWriterClusterCache = new CacheMap <>();
9395
9496 protected String clusterId ;
9597 protected HostInfo clusterInstanceTemplate ;
@@ -190,9 +192,6 @@ public List<HostInfo> getTopology(JdbcConnection conn, boolean forceUpdate)
190192 ClusterTopologyInfo latestTopologyInfo = queryForTopology (conn );
191193
192194 if (latestTopologyInfo != null ) {
193- multiWriterClusterCache .put (
194- this .clusterId , latestTopologyInfo .getMultiWriterCluster (), this .refreshRateNanos );
195-
196195 downHostCache .get (this .clusterId , ConcurrentHashMap .newKeySet (), this .refreshRateNanos ).clear ();
197196
198197 if (!Util .isNullOrEmpty (latestTopologyInfo .getHosts ())) {
@@ -233,7 +232,7 @@ protected ClusterTopologyInfo queryForTopology(JdbcConnection conn) throws SQLEx
233232
234233 return topologyInfo != null
235234 ? topologyInfo
236- : new ClusterTopologyInfo (new ArrayList <>(), false );
235+ : new ClusterTopologyInfo (new ArrayList <>());
237236 }
238237
239238 /**
@@ -257,33 +256,35 @@ private boolean gatherPerfMetrics(PropertySet props) {
257256 */
258257 private ClusterTopologyInfo processQueryResults (ResultSet resultSet )
259258 throws SQLException {
260- int writerCount = 0 ;
261259
262260 List <HostInfo > hosts = new ArrayList <>();
261+ List <HostInfo > writers = new ArrayList <>();
263262 while (resultSet .next ()) {
263+ HostInfo currentHost = createHost (resultSet );
264+
264265 if (!WRITER_SESSION_ID .equalsIgnoreCase (resultSet .getString (FIELD_SESSION_ID ))) {
265- hosts .add (createHost ( resultSet ) );
266+ hosts .add (currentHost );
266267 continue ;
267268 }
268269
269- if (writerCount == 0 ) {
270- // store the first writer to its expected position [0]
271- hosts .add (
272- FailoverConnectionPlugin .WRITER_CONNECTION_INDEX ,
273- createHost (resultSet ));
274- } else {
275- // append other writers, if any, to the end of the host list
276- hosts .add (createHost (resultSet ));
277- }
278- writerCount ++;
270+ writers .add (currentHost );
279271 }
280272
281- if (writerCount == 0 ) {
273+ int writersCount = writers .size ();
274+
275+ if (writersCount == 0 ) {
282276 this .log .logError (Messages .getString ("AuroraTopologyService.3" ));
283277 hosts .clear ();
278+ } else if (writersCount == 1 ) {
279+ hosts .add (FailoverConnectionPlugin .WRITER_CONNECTION_INDEX , writers .get (0 ));
280+ } else {
281+ // Store the first writer to its expected position [0]. If there are other writers or stale records, ignore them.
282+ List <HostInfo > sortedWriters = writers .stream ()
283+ .sorted (Comparator .comparing (HostInfo ::getLastUpdatedTime ).reversed ()).collect (Collectors .toList ());
284+ hosts .add (FailoverConnectionPlugin .WRITER_CONNECTION_INDEX , sortedWriters .get (0 ));
284285 }
285286
286- return new ClusterTopologyInfo (hosts , writerCount > 1 );
287+ return new ClusterTopologyInfo (hosts );
287288 }
288289
289290 private HostInfo createHost (ResultSet resultSet ) throws SQLException {
@@ -351,7 +352,8 @@ private Map<String, String> getPropertiesFromTopology(ResultSet resultSet)
351352 }
352353
353354 private String convertTimestampToString (Timestamp timestamp ) {
354- return timestamp == null ? null : timestamp .toString ();
355+ DateTimeFormatter formatter = DateTimeFormatter .ISO_LOCAL_DATE_TIME ;
356+ return timestamp == null ? null : formatter .format (timestamp .toLocalDateTime ());
355357 }
356358
357359 /**
@@ -464,16 +466,6 @@ public void removeFromDownHostList(HostInfo host) {
464466 .remove (host .getHostPortPair ());
465467 }
466468
467- /**
468- * Check if cached topology belongs to multi-writer cluster.
469- *
470- * @return True, if it's multi-writer cluster.
471- */
472- @ Override
473- public boolean isMultiWriterCluster () {
474- return multiWriterClusterCache .get (this .clusterId , false , this .refreshRateNanos );
475- }
476-
477469 /**
478470 * Set new topology refresh rate. Different service instances may have different topology refresh
479471 * rate while sharing the same topology cache.
@@ -490,7 +482,6 @@ public void setRefreshRate(int refreshRateMillis) {
490482 public void clearAll () {
491483 topologyCache .clear ();
492484 downHostCache .clear ();
493- multiWriterClusterCache .clear ();
494485 lastUsedReaderCache .clear ();
495486 }
496487
@@ -499,20 +490,16 @@ public void clearAll() {
499490 public void clear () {
500491 topologyCache .remove (this .clusterId );
501492 downHostCache .remove (this .clusterId );
502- multiWriterClusterCache .remove (this .clusterId );
503493 lastUsedReaderCache .remove (this .clusterId );
504494 }
505495
506496 private static class ClusterTopologyInfo {
507497 private List <HostInfo > hosts ;
508- private boolean isMultiWriterCluster ;
509498
510- ClusterTopologyInfo (List <HostInfo > hosts , boolean isMultiWriterCluster ) {
499+ ClusterTopologyInfo (List <HostInfo > hosts ) {
511500 this .hosts = hosts ;
512- this .isMultiWriterCluster = isMultiWriterCluster ;
513501 }
514502
515503 List <HostInfo > getHosts () { return this .hosts ; }
516- boolean getMultiWriterCluster () { return this .isMultiWriterCluster ; }
517504 }
518505}
0 commit comments