@@ -91,17 +91,21 @@ public class MetricResolver {
9191 final protected BalancerProperties balancerProperties ;
9292 final protected CacheProperties cacheProperties ;
9393 final protected HealthChecker healthChecker ;
94+ final protected Path assignmentFile ;
95+ final protected FileSystem fs ;
9496
9597 private enum BalanceType {
9698 HIGH_LOW , HIGH_AVG , AVG_LOW
9799 }
98100
99101 public MetricResolver (CuratorFramework curatorFramework , BalancerProperties balancerProperties , CacheProperties cacheProperties ,
100- HealthChecker healthChecker ) {
102+ HealthChecker healthChecker ) throws Exception {
101103 this .curatorFramework = curatorFramework ;
102104 this .balancerProperties = balancerProperties ;
103105 this .cacheProperties = cacheProperties ;
104106 this .healthChecker = healthChecker ;
107+ this .assignmentFile = new Path (balancerProperties .getAssignmentFile ());
108+ this .fs = getFileSystem (this .assignmentFile );
105109 }
106110
107111 public void start () {
@@ -916,20 +920,16 @@ protected FileSystem getFileSystem(Path path) throws IOException {
916920
917921 protected void createAssignmentFile () {
918922 try {
919- log .info ("Creating assignment file: " + balancerProperties .getAssignmentFile ());
920- Path assignmentFile = new Path (balancerProperties .getAssignmentFile ());
921- FileSystem fs = getFileSystem (assignmentFile );
922- fs .create (assignmentFile );
923+ log .info ("Creating assignment file: " + this .assignmentFile );
924+ this .fs .create (this .assignmentFile );
923925 } catch (Exception e ) {
924926 log .error (e .getMessage (), e );
925927 }
926928 }
927929
928930 protected long getLastWrittenToHdfsTimestamp () {
929931 try {
930- Path assignmentFile = new Path (balancerProperties .getAssignmentFile ());
931- FileSystem fs = getFileSystem (assignmentFile );
932- FileStatus fileStatus = fs .getFileStatus (assignmentFile );
932+ FileStatus fileStatus = this .fs .getFileStatus (this .assignmentFile );
933933 return fileStatus .getModificationTime ();
934934 } catch (FileNotFoundException e ) {
935935 createAssignmentFile ();
@@ -960,16 +960,15 @@ private void readAssignmentsFromHdfs(boolean checkIfNecessary) {
960960 // proceed with reading from HDFS
961961
962962 Map <String ,TimelyBalancedHost > assignedMetricToHostMap = new TreeMap <>();
963+ CsvReader reader = null ;
963964 try {
964965 boolean acquired = false ;
965966 while (!acquired ) {
966967 acquired = assignmentsIPRWLock .readLock ().acquire (60 , TimeUnit .SECONDS );
967968 }
968969 balancerLock .writeLock ().lock ();
969- Path assignmentFile = new Path (balancerProperties .getAssignmentFile ());
970- FileSystem fs = getFileSystem (assignmentFile );
971- FSDataInputStream iStream = fs .open (assignmentFile );
972- CsvReader reader = new CsvReader (iStream , ',' , Charset .forName ("UTF-8" ));
970+ FSDataInputStream iStream = this .fs .open (this .assignmentFile );
971+ reader = new CsvReader (iStream , ',' , Charset .forName ("UTF-8" ));
973972 reader .setUseTextQualifier (false );
974973
975974 // skip the headers
@@ -1031,6 +1030,9 @@ private void readAssignmentsFromHdfs(boolean checkIfNecessary) {
10311030 } catch (Exception e ) {
10321031 log .error (e .getMessage (), e );
10331032 } finally {
1033+ if (reader != null ) {
1034+ reader .close ();
1035+ }
10341036 balancerLock .writeLock ().unlock ();
10351037 try {
10361038 assignmentsIPRWLock .readLock ().release ();
@@ -1052,12 +1054,10 @@ private void writeAssignmentsToHdfs() {
10521054 nonCachedMetricsLocalLock .readLock ().lock ();
10531055 try {
10541056 if (!metricToHostMap .isEmpty ()) {
1055- Path assignmentFile = new Path (balancerProperties .getAssignmentFile ());
1056- FileSystem fs = getFileSystem (assignmentFile );
1057- if (!fs .exists (assignmentFile .getParent ())) {
1058- fs .mkdirs (assignmentFile .getParent ());
1057+ if (!this .fs .exists (this .assignmentFile .getParent ())) {
1058+ this .fs .mkdirs (this .assignmentFile .getParent ());
10591059 }
1060- FSDataOutputStream oStream = fs .create (assignmentFile , true );
1060+ FSDataOutputStream oStream = this . fs .create (this . assignmentFile , true );
10611061 writer = new CsvWriter (oStream , ',' , Charset .forName ("UTF-8" ));
10621062 writer .setUseTextQualifier (false );
10631063 writer .write ("metric" );
@@ -1080,7 +1080,7 @@ private void writeAssignmentsToHdfs() {
10801080 if (!assignmentsLastUpdatedInHdfs .get ().succeeded ()) {
10811081 assignmentsLastUpdatedInHdfs .forceSet (now );
10821082 }
1083- log .debug ("Wrote {} assignments to hdfs lastHdfsUpdate = lastLocalUpdate ({})" , metricToHostMap .size (),
1083+ log .info ("Wrote {} assignments to hdfs lastHdfsUpdate = lastLocalUpdate ({})" , metricToHostMap .size (),
10841084 new Date (assignmentsLastUpdatedLocal .get ()));
10851085 // remove metric from metricMap (ArrivalRate) if not being cached
10861086 metricMap .entrySet ().removeIf (e -> !metricToHostMap .containsKey (e .getKey ()));
0 commit comments