26
26
import java .io .IOException ;
27
27
import java .net .URI ;
28
28
import java .util .Arrays ;
29
+ import java .util .Enumeration ;
30
+ import java .util .Hashtable ;
29
31
30
32
import org .apache .hadoop .conf .Configuration ;
31
33
import org .apache .hadoop .fs .BlockLocation ;
@@ -48,33 +50,74 @@ public class GlusterVolume extends RawLocalFileSystem{
48
50
*/
49
51
public static final int OVERRIDE_WRITE_BUFFER_SIZE = 1024 * 4 ;
50
52
public static final int OPTIMAL_WRITE_BUFFER_SIZE = 1024 * 128 ;
53
+ public static final int DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024 ;
51
54
52
- public static final URI NAME = URI . create ( "glusterfs:///" ) ;
53
-
54
- protected String root = null ;
55
-
56
-
55
+ protected URI NAME = null ;
56
+
57
+ protected Hashtable < String , String > volumes = new Hashtable < String , String >() ;
58
+ protected String default_volume = null ;
59
+ private Path workingDir ;
57
60
58
61
protected static GlusterFSXattr attr = null ;
59
62
60
- public GlusterVolume (){
61
- }
63
+ public GlusterVolume (){}
62
64
63
65
public GlusterVolume (Configuration conf ){
64
66
this ();
65
67
this .setConf (conf );
66
68
}
69
+
67
70
public URI getUri () { return NAME ; }
68
71
72
+ public void initialize (URI uri , Configuration conf ) throws IOException {
73
+ /* we only really care about the URI up to the path, so strip other things off */
74
+ String auth = uri .getAuthority ();
75
+ if (auth ==null )
76
+ auth = "" ;
77
+ this .NAME = URI .create (uri .getScheme () + "://" + auth + "/" ) ;
78
+ super .initialize (this .NAME , conf );
79
+ }
80
+
81
+ /*
82
+ * This expands URIs to include 'default' values.
83
+ * For instance, glusterfs:///path would expand to glusterfs://defaultvolume/path
84
+ */
85
+ protected URI canonicalizeUri (URI uri ) {
86
+ String auth = uri .getAuthority ();
87
+ if (auth ==null )
88
+ auth = default_volume ;
89
+ return URI .create (uri .getScheme () + "://" + auth + "/" + uri .getPath ()) ;
90
+ }
91
+
92
+ /* check if a path is on the same volume as this instance */
93
+ public boolean sameVolume (Path p ){
94
+ URI thisUri = canonicalizeUri (this .NAME );
95
+ URI thatUri = canonicalizeUri (p .toUri ());
96
+ if (!thatUri .getScheme ().equalsIgnoreCase (thisUri .getScheme ())) return false ;
97
+ if ((thatUri .getAuthority ()==null && thisUri .getAuthority ()==null )) return true ;
98
+ return (thatUri .getAuthority ()!=null && thatUri .getAuthority ().equalsIgnoreCase (thisUri .getAuthority ()));
99
+
100
+ }
101
+
69
102
public void setConf (Configuration conf ){
70
103
log .info ("Initializing gluster volume.." );
71
104
super .setConf (conf );
72
105
String getfattrcmd = null ;
73
106
if (conf !=null ){
74
107
75
108
try {
76
- root =conf .get ("fs.glusterfs.mount" , null );
77
- log .info ("Root of Gluster file system is " + root );
109
+ String [] v =conf .get ("fs.glusterfs.volumes" , "" ).split ("," );
110
+ default_volume = v [0 ];
111
+ for (int i =0 ;i <v .length ;i ++){
112
+ String vol = conf .get ("fs.glusterfs.volume.fuse." + v [i ] , null );
113
+
114
+ if (vol ==null ){
115
+ log .error ("Could not find property: fs.glusterfs.fuse." + v [i ]);
116
+ throw new RuntimeException ("Could not find mount point for volume: " + v [i ]);
117
+ }
118
+ volumes .put (v [i ],vol );
119
+ log .info ("Gluster volume: " + v [i ] + " at : " + volumes .get (v [i ]));
120
+ }
78
121
getfattrcmd = conf .get ("fs.glusterfs.getfattrcmd" , null );
79
122
if (getfattrcmd !=null ){
80
123
attr = new GlusterFSXattr (getfattrcmd );
@@ -90,12 +133,15 @@ public void setConf(Configuration conf){
90
133
mapredSysDirectory = new Path (conf .get ("mapred.system.dir" , "glusterfs:///mapred/system" ));
91
134
}
92
135
93
- if (!exists (mapredSysDirectory )){
136
+ if (sameVolume ( mapredSysDirectory ) && !exists (mapredSysDirectory ) ){
94
137
mkdirs (mapredSysDirectory );
95
138
}
96
139
//Working directory setup
140
+
97
141
Path workingDirectory = getInitialWorkingDirectory ();
98
- mkdirs (workingDirectory );
142
+ if (sameVolume (workingDirectory ) && !exists (workingDirectory ) ){
143
+ mkdirs (workingDirectory );
144
+ }
99
145
setWorkingDirectory (workingDirectory );
100
146
log .info ("Working directory is : " + getWorkingDirectory ());
101
147
@@ -107,6 +153,18 @@ public void setConf(Configuration conf){
107
153
conf .setInt ("io.file.buffer.size" , OPTIMAL_WRITE_BUFFER_SIZE );
108
154
}
109
155
log .info ("Write buffer size : " +conf .getInt ("io.file.buffer.size" ,-1 )) ;
156
+
157
+ /**
158
+ * Default block size
159
+ */
160
+
161
+ Long defaultBlockSize =conf .getLong ("fs.local.block.size" , -1 );
162
+
163
+ if (defaultBlockSize == -1 ) {
164
+ conf .setInt ("fs.local.block.size" , DEFAULT_BLOCK_SIZE );
165
+ }
166
+ log .info ("Default block size : " +conf .getInt ("fs.local.block.size" ,-1 )) ;
167
+
110
168
}
111
169
catch (Exception e ){
112
170
throw new RuntimeException (e );
@@ -117,25 +175,67 @@ public void setConf(Configuration conf){
117
175
118
176
public File pathToFile (Path path ) {
119
177
checkPath (path );
178
+
120
179
if (!path .isAbsolute ()) {
121
180
path = new Path (getWorkingDirectory (), path );
122
181
}
123
- return new File (root + path .toUri ().getPath ());
182
+
183
+ String volume = path .toUri ().getAuthority ();
184
+ String scheme = path .toUri ().getScheme ();
185
+
186
+ if (scheme ==null || "" .equals (scheme )){
187
+ return pathToFile (path .makeQualified (this ));
188
+
189
+ }else if (volume ==null ){
190
+ volume = default_volume ;
191
+ }
192
+
193
+ return new File (this .volumes .get (volume ) + "/" + path .toUri ().getPath ());
124
194
}
125
-
126
- /**
127
- * Note this method doesn't override anything in hadoop 1.2.0 and
128
- * below.
129
- */
195
+
130
196
protected Path getInitialWorkingDirectory () {
131
- /* apache's unit tests use a default working direcotry like this: */
132
- return new Path (this .NAME + "user/" + System .getProperty ("user.name" ));
133
- /* The super impl returns the users home directory in unix */
134
- //return super.getInitialWorkingDirectory();
197
+ /* initial home directory is always on the default volume */
198
+ return new Path ("glusterfs:///user/" + System .getProperty ("user.name" ));
135
199
}
200
+
201
+ private Path makeAbsolute (Path f ) {
202
+ if (f .isAbsolute ()) {
203
+ return f ;
204
+ } else {
205
+ return new Path (workingDir , f );
206
+ }
207
+ }
208
+
209
+ /**
210
+ * Set the working directory to the given directory.
211
+ */
136
212
213
+ public void setWorkingDirectory (Path newDir ) {
214
+ workingDir = makeAbsolute (newDir );
215
+ // avoid checking the path as the working directory may not exist on this volume.
216
+ //checkPath(workingDir);
217
+
218
+ }
219
+
137
220
public Path fileToPath (File path ) {
138
- return new Path (NAME .toString () + path .toURI ().getRawPath ().substring (root .length ()));
221
+ Enumeration <String > all = volumes .keys ();
222
+ String rawPath = path .toURI ().getRawPath ();
223
+ String volume = null ;
224
+ String root = null ;
225
+
226
+ while (volume ==null && all .hasMoreElements ()){
227
+ String nextVolume = all .nextElement ();
228
+ String nextPath = volumes .get (nextVolume );
229
+ if (rawPath .startsWith (nextPath )){
230
+ volume = nextVolume ;
231
+ root = nextPath ;
232
+ }
233
+ }
234
+
235
+ if (default_volume .equalsIgnoreCase (volume ))
236
+ volume = "" ;
237
+
238
+ return new Path ("glusterfs://" + volume + "/" + rawPath .substring (root .length ()));
139
239
}
140
240
141
241
public boolean rename (Path src , Path dst ) throws IOException {
@@ -205,7 +305,15 @@ public FileStatus[] listStatus(Path f) throws IOException {
205
305
}
206
306
207
307
public FileStatus getFileStatus (Path f ) throws IOException {
208
- File path = pathToFile (f );
308
+
309
+ File path = null ;
310
+
311
+ try {
312
+ path = pathToFile (f );
313
+ }catch (IllegalArgumentException ex ){
314
+ throw new FileNotFoundException ( "File " + f + " does not exist on this volume." + ex );
315
+ }
316
+
209
317
if (path .exists ()) {
210
318
return new GlusterFileStatus (pathToFile (f ), getDefaultBlockSize (), this );
211
319
} else {
@@ -249,7 +357,7 @@ public BlockLocation[] getFileBlockLocations(FileStatus file,long start,long len
249
357
}
250
358
251
359
public String toString (){
252
- return "Gluster Volume mounted at : " + root ;
360
+ return "Gluster volume : " + this . NAME ;
253
361
}
254
362
255
363
}
0 commit comments