Skip to content

Commit 763688a

Browse files
committed
Merge pull request #26 from gluster/buffering_writes
Configurable buffered writes in memory.
2 parents 855cdc0 + 17ebcaf commit 763688a

File tree

4 files changed

+65
-15
lines changed

4 files changed

+65
-15
lines changed

conf/core-site.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,8 @@
3535
<value>Off</value>
3636
</property>
3737

38+
<property>
39+
<name>fs.glusterfs.write.buffer.size</name>
40+
<value>1024</value>
41+
</property>
3842
</configuration>

src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFUSEOutputStream.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,29 @@
2121

2222
import java.io.*;
2323

24-
import org.apache.hadoop.fs.FSOutputSummer;
25-
import org.apache.hadoop.fs.FileSystem;
26-
2724
public class GlusterFUSEOutputStream extends OutputStream{
2825
File f;
2926
long pos;
3027
boolean closed;
3128
OutputStream fuseOutputStream;
32-
29+
org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(GlusterFUSEOutputStream.class);
30+
3331
public GlusterFUSEOutputStream(String file, boolean append) throws IOException{
32+
this(file,append,0);
33+
}
34+
35+
/**
36+
* @param bufferSize : Size of buffer in bytes (if 0, then no buffer will be used).
37+
*/
38+
public GlusterFUSEOutputStream(String file, boolean append, int bufferSize) throws IOException{
3439
this.f=new File(file); /* not needed ? */
3540
this.pos=0;
36-
this.fuseOutputStream=new FileOutputStream(file, append);
41+
fuseOutputStream=new FileOutputStream(file, append) ;
42+
if(bufferSize > 0)
43+
fuseOutputStream = new BufferedOutputStream(fuseOutputStream, bufferSize);
3744
this.closed=false;
3845
}
39-
46+
4047
public long getPos() throws IOException{
4148
return pos;
4249
}

src/main/java/org/apache/hadoop/fs/glusterfs/GlusterFileSystem.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,26 +23,26 @@
2323
*/
2424
package org.apache.hadoop.fs.glusterfs;
2525

26-
import java.io.*;
27-
import java.net.*;
28-
26+
import java.io.File;
27+
import java.io.FileNotFoundException;
28+
import java.io.IOException;
29+
import java.net.InetAddress;
30+
import java.net.URI;
2931
import java.util.StringTokenizer;
3032
import java.util.TreeMap;
31-
import java.util.regex.*;
3233

3334
import org.apache.hadoop.conf.Configuration;
35+
import org.apache.hadoop.fs.BlockLocation;
3436
import org.apache.hadoop.fs.FSDataInputStream;
3537
import org.apache.hadoop.fs.FSDataOutputStream;
36-
import org.apache.hadoop.fs.FileSystem;
3738
import org.apache.hadoop.fs.FileStatus;
39+
import org.apache.hadoop.fs.FileSystem;
3840
import org.apache.hadoop.fs.FileUtil;
3941
import org.apache.hadoop.fs.Path;
40-
import org.apache.hadoop.fs.BlockLocation;
4142
import org.apache.hadoop.fs.permission.FsPermission;
4243
import org.apache.hadoop.util.Progressable;
4344
import org.apache.hadoop.util.Shell;
4445
import org.apache.hadoop.util.StringUtils;
45-
4646
import org.slf4j.Logger;
4747
import org.slf4j.LoggerFactory;
4848

@@ -65,6 +65,7 @@ public class GlusterFileSystem extends FileSystem{
6565
private Path workingDir=null;
6666
private String glusterMount=null;
6767
private boolean mounted=false;
68+
private int writeBufferSize = 0;
6869

6970

7071
/* for quick IO */
@@ -111,6 +112,7 @@ public void initialize(URI uri,Configuration conf) throws IOException{
111112
String remoteGFSServer=null;
112113
String needQuickRead=null;
113114
boolean autoMount=true;
115+
114116

115117
if(this.mounted)
116118
return;
@@ -123,7 +125,8 @@ public void initialize(URI uri,Configuration conf) throws IOException{
123125
remoteGFSServer=conf.get("fs.glusterfs.server", null);
124126
needQuickRead=conf.get("quick.slave.io", null);
125127
autoMount=conf.getBoolean("fs.glusterfs.automount", true);
126-
128+
writeBufferSize = conf.getInt("fs.glusterfs.write.buffer.size", 0);
129+
LOG.info("Gluster Output Buffering size configured to " + writeBufferSize + " bytes.");
127130
/*
128131
* bail out if we do not have enough information to do a FUSE mount
129132
*/
@@ -421,7 +424,7 @@ public FSDataOutputStream create(Path path,FsPermission permission,boolean overw
421424
parent=path.getParent();
422425
mkdirs(parent);
423426

424-
glusterFileStream=new FSDataOutputStream(new GlusterFUSEOutputStream(f.getPath(), false));
427+
glusterFileStream=new FSDataOutputStream(new GlusterFUSEOutputStream(f.getPath(), false, writeBufferSize));
425428

426429
return glusterFileStream;
427430
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package org.gluster.test;
2+
3+
import java.io.File;
4+
import java.io.IOException;
5+
import java.util.TreeMap;
6+
7+
import junit.framework.Assert;
8+
9+
import org.apache.hadoop.fs.glusterfs.GlusterFSBrickClass;
10+
import org.apache.hadoop.fs.glusterfs.GlusterFUSEInputStream;
11+
import org.apache.hadoop.fs.glusterfs.GlusterFUSEOutputStream;
12+
import org.junit.Test;
13+
14+
public class TestGlusterFuseOutputStream {
15+
16+
@Test
17+
public void testOutputStream() throws IOException{
18+
File out = new File("/tmp/testGlusterFuseOutputStream"+System.currentTimeMillis());
19+
out.createNewFile();
20+
//Create a 3 byte FuseOutputStream
21+
final GlusterFUSEOutputStream stream = new GlusterFUSEOutputStream(out.getAbsolutePath(), true, 3);
22+
23+
stream.write("ab".getBytes());
24+
System.out.println(out.length());
25+
26+
Assert.assertEquals(0, out.length());
27+
28+
stream.flush();
29+
stream.close();
30+
31+
System.out.println(out.length());
32+
//Confirm that the buffer held 100 bytes.
33+
Assert.assertEquals(2, out.length());
34+
out.delete();
35+
}
36+
}

0 commit comments

Comments
 (0)