Skip to content

Commit 698cd85

Browse files
committed
Merge pull request #36 from chzyer/feature/v6
fix
2 parents 931f7a3 + 72c55cf commit 698cd85

File tree

5 files changed

+63
-6
lines changed

5 files changed

+63
-6
lines changed

src/com/qiniu/auth/Client.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ public Client(HttpClient client) {
3232
mClient = client;
3333
}
3434

35+
public void close() {
36+
mClient.getConnectionManager().closeExpiredConnections();
37+
mClient.getConnectionManager().shutdown();
38+
}
39+
3540
public static ClientExecutor get(String url, CallRet ret) {
3641
Client client = Client.defaultClient();
3742
return client.get(client.makeClientExecutor(), url, ret);
@@ -78,6 +83,7 @@ protected HttpResponse roundtrip(HttpRequestBase httpRequest) throws IOException
7883
public class ClientExecutor extends AsyncTask<Object, Object, Object> implements ICancel {
7984
HttpRequestBase mHttpRequest;
8085
CallRet mRet;
86+
boolean failed;
8187
public void setup(HttpRequestBase httpRequest, CallRet ret) {
8288
mHttpRequest = httpRequest;
8389
mRet = ret;
@@ -109,18 +115,29 @@ protected Object doInBackground(Object... objects) {
109115

110116
@Override
111117
protected void onProgressUpdate(Object... values) {
118+
if (failed) return;
119+
if (values.length == 1 && values[0] instanceof Exception) {
120+
mRet.onFailure((Exception) values[0]);
121+
failed = true;
122+
return;
123+
}
112124
mRet.onProcess((Long) values[0], (Long) values[1]);
113125
}
114126

115127
@Override
116128
protected void onPostExecute(Object o) {
117-
mClient.getConnectionManager().closeIdleConnections(30, TimeUnit.SECONDS);
129+
if (failed) return;
118130
if (o instanceof Exception) {
119131
mRet.onFailure((Exception) o);
120132
return;
121133
}
122134
mRet.onSuccess((byte[]) o);
123135
}
136+
137+
public void onFailure(Exception ex) {
138+
publishProgress(ex);
139+
cancel(true);
140+
}
124141
};
125142

126143
public static Client defaultClient() {

src/com/qiniu/io/IO.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,21 @@ public class IO {
2222
public static String UNDEFINED_KEY = null;
2323
private static Client mClient;
2424
private static String mUptoken;
25+
private static long mClientUseTime;
2526
public IO(Client client, String uptoken) {
2627
mClient = client;
2728
mUptoken = uptoken;
2829
}
2930

3031
private static Client defaultClient() {
32+
if (mClient != null && System.currentTimeMillis() - mClientUseTime > 3 * 60 * 1000) { // 1 minute
33+
mClient.close();
34+
mClient = null;
35+
}
3136
if (mClient == null) {
3237
mClient = Client.defaultClient();
3338
}
39+
mClientUseTime = System.currentTimeMillis();
3440
return mClient;
3541
}
3642

@@ -68,6 +74,11 @@ public void put(String key, InputStreamAt isa, PutExtra extra, JSONObjectRet ret
6874
public void onProcess(long current, long total) {
6975
executor.upload(current, total);
7076
}
77+
78+
@Override
79+
public void onFailure(Exception ex) {
80+
executor.onFailure(ex);
81+
}
7182
});
7283
client.call(executor, Conf.UP_HOST, m, ret);
7384
}

src/com/qiniu/utils/IOnProcess.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@
22

33
public interface IOnProcess {
44
public void onProcess(long current, long total);
5+
public void onFailure(Exception ex);
56
}

src/com/qiniu/utils/InputStreamAt.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import org.apache.http.entity.AbstractHttpEntity;
88

99
import java.io.*;
10-
import java.util.Arrays;
1110
import java.util.zip.CRC32;
1211

1312
public class InputStreamAt implements Closeable {
@@ -151,11 +150,19 @@ protected byte[] fileStreamRead(long offset, int length) throws IOException {
151150
}
152151

153152
if (totalRead != data.length) {
154-
data = Arrays.copyOfRange(data, 0, totalRead);
153+
data = copyOfRange(data, 0, totalRead);
155154
}
156155
return data;
157156
}
158157

158+
public static byte[] copyOfRange(byte[] original, int from, int to) {
159+
int newLength = to - from;
160+
if (newLength < 0) throw new IllegalArgumentException(from + " > " + to);
161+
byte[] copy = new byte[newLength];
162+
System.arraycopy(original, from, copy, 0, Math.min(original.length - from, newLength));
163+
return copy;
164+
}
165+
159166
@Override
160167
public synchronized void close(){
161168
if (mClosed) return;

src/com/qiniu/utils/MultipartEntity.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.io.OutputStream;
99
import java.util.ArrayList;
1010
import java.util.Random;
11+
import java.util.concurrent.*;
1112

1213
public class MultipartEntity extends AbstractHttpEntity {
1314
private String mBoundary;
@@ -33,7 +34,7 @@ public void addFile(String field, String contentType, String fileName, InputStre
3334

3435
@Override
3536
public boolean isRepeatable() {
36-
return false;
37+
return true;
3738
}
3839

3940
@Override
@@ -55,6 +56,7 @@ public InputStream getContent() throws IOException, IllegalStateException {
5556

5657
@Override
5758
public void writeTo(OutputStream outputStream) throws IOException {
59+
writed = 0;
5860
outputStream.write(mData.toString().getBytes());
5961
outputStream.flush();
6062
writed += mData.toString().getBytes().length;
@@ -80,6 +82,7 @@ public boolean isStreaming() {
8082
public void setProcessNotify(IOnProcess ret) {
8183
mNotify = ret;
8284
}
85+
ExecutorService executor = Executors.newFixedThreadPool(1);
8386

8487
class FileInfo {
8588

@@ -112,12 +115,18 @@ public void writeTo(OutputStream outputStream) throws IOException {
112115

113116
int blockSize = (int) (getContentLength() / 100);
114117
if (blockSize > 256 * 1024) blockSize = 256 * 1024;
115-
if (blockSize < 16 * 1024) blockSize = 16 * 1024;
118+
if (blockSize < 32 * 1024) blockSize = 32 * 1024;
116119
long index = 0;
117120
long length = mIsa.length();
118121
while (index < length) {
119122
int readLength = (int) StrictMath.min((long) blockSize, mIsa.length() - index);
120-
outputStream.write(mIsa.read(index, readLength));
123+
int timeout = readLength * 2;
124+
try {
125+
write(timeout, outputStream, mIsa.read(index, readLength));
126+
} catch (Exception e) {
127+
mNotify.onFailure(e);
128+
return;
129+
}
121130
index += blockSize;
122131
outputStream.flush();
123132
writed += readLength;
@@ -130,6 +139,18 @@ public void writeTo(OutputStream outputStream) throws IOException {
130139
}
131140
}
132141

142+
private void write(int timeout, final OutputStream outputStream, final byte[] data) throws InterruptedException, ExecutionException, TimeoutException {
143+
Callable<Object> readTask = new Callable<Object>() {
144+
@Override
145+
public Object call() throws Exception {
146+
outputStream.write(data);
147+
return null;
148+
}
149+
};
150+
Future<Object> future = executor.submit(readTask);
151+
future.get(timeout, TimeUnit.MILLISECONDS);
152+
}
153+
133154
private static String getRandomString(int length) {
134155
String base = "abcdefghijklmnopqrstuvwxyz0123456789";
135156
Random random = new Random();

0 commit comments

Comments
 (0)