Skip to content

Commit 9159d33

Browse files
committed
Added ClientSessions to GridFS
JAVA-2623 JAVA-2624
1 parent 0bc1601 commit 9159d33

17 files changed

+2785
-589
lines changed

driver-async/src/main/com/mongodb/async/client/gridfs/GridFSBucket.java

Lines changed: 346 additions & 16 deletions
Large diffs are not rendered by default.

driver-async/src/main/com/mongodb/async/client/gridfs/GridFSBucketImpl.java

Lines changed: 299 additions & 80 deletions
Large diffs are not rendered by default.

driver-async/src/main/com/mongodb/async/client/gridfs/GridFSDownloadStreamImpl.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
import com.mongodb.MongoGridFSException;
2020
import com.mongodb.async.AsyncBatchCursor;
2121
import com.mongodb.async.SingleResultCallback;
22+
import com.mongodb.async.client.FindIterable;
2223
import com.mongodb.async.client.MongoCollection;
2324
import com.mongodb.client.gridfs.model.GridFSFile;
2425
import com.mongodb.diagnostics.logging.Logger;
2526
import com.mongodb.diagnostics.logging.Loggers;
27+
import com.mongodb.session.ClientSession;
2628
import org.bson.Document;
2729
import org.bson.types.Binary;
2830

@@ -37,6 +39,7 @@
3739

3840
final class GridFSDownloadStreamImpl implements GridFSDownloadStream {
3941
private static final Logger LOGGER = Loggers.getLogger("client.gridfs");
42+
private final ClientSession clientSession;
4043
private final GridFSFindIterable fileInfoIterable;
4144
private final MongoCollection<Document> chunksCollection;
4245
private final ConcurrentLinkedQueue<Document> resultsQueue = new ConcurrentLinkedQueue<Document>();
@@ -59,7 +62,9 @@ final class GridFSDownloadStreamImpl implements GridFSDownloadStream {
5962
private byte[] buffer = null;
6063

6164

62-
GridFSDownloadStreamImpl(final GridFSFindIterable fileInfoIterable, final MongoCollection<Document> chunksCollection) {
65+
GridFSDownloadStreamImpl(final ClientSession clientSession, final GridFSFindIterable fileInfoIterable,
66+
final MongoCollection<Document> chunksCollection) {
67+
this.clientSession = clientSession;
6368
this.fileInfoIterable = notNull("file information", fileInfoIterable);
6469
this.chunksCollection = notNull("chunks collection", chunksCollection);
6570
}
@@ -144,10 +149,17 @@ private void checkAndFetchResults(final int amountRead, final ByteBuffer dst, fi
144149
} else if (hasResultsToProcess()) {
145150
processResults(amountRead, dst, callback);
146151
} else if (cursor == null) {
147-
chunksCollection.find(new Document("files_id", fileInfo.getId())
148-
.append("n", new Document("$gte", chunkIndex)))
149-
.batchSize(batchSize).sort(new Document("n", 1))
150-
.batchCursor(new SingleResultCallback<AsyncBatchCursor<Document>>() {
152+
Document filter = new Document("files_id", fileInfo.getId())
153+
.append("n", new Document("$gte", chunkIndex));
154+
155+
FindIterable<Document> findIterable;
156+
if (clientSession != null) {
157+
findIterable = chunksCollection.find(clientSession, filter);
158+
} else {
159+
findIterable = chunksCollection.find(filter);
160+
}
161+
findIterable.batchSize(batchSize).sort(new Document("n", 1)).batchCursor(
162+
new SingleResultCallback<AsyncBatchCursor<Document>>() {
151163
@Override
152164
public void onResult(final AsyncBatchCursor<Document> result, final Throwable t) {
153165
if (t != null) {

driver-async/src/main/com/mongodb/async/client/gridfs/GridFSIndexCheckImpl.java

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717
package com.mongodb.async.client.gridfs;
1818

1919
import com.mongodb.async.SingleResultCallback;
20+
import com.mongodb.async.client.FindIterable;
21+
import com.mongodb.async.client.ListIndexesIterable;
2022
import com.mongodb.async.client.MongoCollection;
2123
import com.mongodb.client.gridfs.model.GridFSFile;
2224
import com.mongodb.client.model.IndexOptions;
25+
import com.mongodb.session.ClientSession;
2326
import org.bson.Document;
2427

2528
import java.util.ArrayList;
@@ -29,17 +32,28 @@
2932

3033
final class GridFSIndexCheckImpl implements GridFSIndexCheck {
3134
private static final Document PROJECTION = new Document("_id", 1);
35+
private final ClientSession clientSession;
3236
private final MongoCollection<GridFSFile> filesCollection;
3337
private final MongoCollection<Document> chunksCollection;
3438

35-
GridFSIndexCheckImpl(final MongoCollection<GridFSFile> filesCollection, final MongoCollection<Document> chunksCollection) {
39+
GridFSIndexCheckImpl(final ClientSession clientSession, final MongoCollection<GridFSFile> filesCollection,
40+
final MongoCollection<Document> chunksCollection) {
41+
this.clientSession = clientSession;
3642
this.filesCollection = notNull("files collection", filesCollection);
3743
this.chunksCollection = notNull("chunks collection", chunksCollection);
3844
}
3945

4046
@Override
4147
public void checkAndCreateIndex(final SingleResultCallback<Void> callback) {
42-
filesCollection.withDocumentClass(Document.class).withReadPreference(primary()).find().projection(PROJECTION).first(
48+
MongoCollection<Document> collection = filesCollection.withDocumentClass(Document.class).withReadPreference(primary());
49+
FindIterable<Document> findIterable;
50+
if (clientSession != null) {
51+
findIterable = collection.find(clientSession);
52+
} else {
53+
findIterable = collection.find();
54+
}
55+
56+
findIterable.projection(PROJECTION).first(
4357
new SingleResultCallback<Document>() {
4458
@Override
4559
public void onResult(final Document result, final Throwable t) {
@@ -55,7 +69,14 @@ public void onResult(final Document result, final Throwable t) {
5569
}
5670

5771
private <T> void hasIndex(final MongoCollection<T> collection, final Document index, final SingleResultCallback<Boolean> callback) {
58-
collection.listIndexes().into(new ArrayList<Document>(), new SingleResultCallback<ArrayList<Document>>() {
72+
ListIndexesIterable<Document> listIndexesIterable;
73+
if (clientSession != null) {
74+
listIndexesIterable = collection.listIndexes(clientSession);
75+
} else {
76+
listIndexesIterable = collection.listIndexes();
77+
}
78+
79+
listIndexesIterable.into(new ArrayList<Document>(), new SingleResultCallback<ArrayList<Document>>() {
5980
@Override
6081
public void onResult(final ArrayList<Document> indexes, final Throwable t) {
6182
if (t != null) {
@@ -83,7 +104,7 @@ public void onResult(final Boolean result, final Throwable t) {
83104
if (t != null) {
84105
callback.onResult(null, t);
85106
} else if (!result) {
86-
filesCollection.createIndex(filesIndex, new SingleResultCallback<String>() {
107+
SingleResultCallback<String> createIndexCallback = new SingleResultCallback<String>() {
87108
@Override
88109
public void onResult(final String result, final Throwable t) {
89110
if (t != null) {
@@ -92,7 +113,12 @@ public void onResult(final String result, final Throwable t) {
92113
checkChunksIndex(callback);
93114
}
94115
}
95-
});
116+
};
117+
if (clientSession != null) {
118+
filesCollection.createIndex(clientSession, filesIndex, createIndexCallback);
119+
} else {
120+
filesCollection.createIndex(filesIndex, createIndexCallback);
121+
}
96122
} else {
97123
checkChunksIndex(callback);
98124
}
@@ -108,7 +134,7 @@ public void onResult(final Boolean result, final Throwable t) {
108134
if (t != null) {
109135
callback.onResult(null, t);
110136
} else if (!result) {
111-
chunksCollection.createIndex(chunksIndex, new IndexOptions().unique(true), new SingleResultCallback<String>() {
137+
SingleResultCallback<String> createIndexCallback = new SingleResultCallback<String>() {
112138
@Override
113139
public void onResult(final String result, final Throwable t) {
114140
if (t != null) {
@@ -117,7 +143,12 @@ public void onResult(final String result, final Throwable t) {
117143
callback.onResult(null, null);
118144
}
119145
}
120-
});
146+
};
147+
if (clientSession != null) {
148+
chunksCollection.createIndex(clientSession, chunksIndex, new IndexOptions().unique(true), createIndexCallback);
149+
} else {
150+
chunksCollection.createIndex(chunksIndex, new IndexOptions().unique(true), createIndexCallback);
151+
}
121152
} else {
122153
callback.onResult(null, null);
123154
}

driver-async/src/main/com/mongodb/async/client/gridfs/GridFSUploadStreamImpl.java

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.mongodb.client.result.DeleteResult;
2424
import com.mongodb.diagnostics.logging.Logger;
2525
import com.mongodb.diagnostics.logging.Loggers;
26+
import com.mongodb.session.ClientSession;
2627
import org.bson.BsonValue;
2728
import org.bson.Document;
2829
import org.bson.types.Binary;
@@ -39,6 +40,7 @@
3940

4041
final class GridFSUploadStreamImpl implements GridFSUploadStream {
4142
private static final Logger LOGGER = Loggers.getLogger("client.gridfs");
43+
private final ClientSession clientSession;
4244
private final MongoCollection<GridFSFile> filesCollection;
4345
private final MongoCollection<Document> chunksCollection;
4446
private final BsonValue fileId;
@@ -63,9 +65,10 @@ final class GridFSUploadStreamImpl implements GridFSUploadStream {
6365
private int chunkIndex;
6466
/* accessed only when writing */
6567

66-
GridFSUploadStreamImpl(final MongoCollection<GridFSFile> filesCollection, final MongoCollection<Document> chunksCollection,
67-
final BsonValue fileId, final String filename, final int chunkSizeBytes, final Document metadata,
68-
final GridFSIndexCheck indexCheck) {
68+
GridFSUploadStreamImpl(final ClientSession clientSession, final MongoCollection<GridFSFile> filesCollection,
69+
final MongoCollection<Document> chunksCollection, final BsonValue fileId, final String filename,
70+
final int chunkSizeBytes, final Document metadata, final GridFSIndexCheck indexCheck) {
71+
this.clientSession = clientSession;
6972
this.filesCollection = notNull("files collection", filesCollection);
7073
this.chunksCollection = notNull("chunks collection", chunksCollection);
7174
this.fileId = notNull("File Id", fileId);
@@ -99,13 +102,20 @@ public void abort(final SingleResultCallback<Void> callback) {
99102
if (!takeWritingLock(errHandlingCallback)) {
100103
return;
101104
}
102-
chunksCollection.deleteMany(new Document("files_id", fileId), new SingleResultCallback<DeleteResult>() {
105+
106+
SingleResultCallback<DeleteResult> deleteCallback = new SingleResultCallback<DeleteResult>() {
103107
@Override
104108
public void onResult(final DeleteResult result, final Throwable t) {
105109
releaseWritingLock();
106110
errHandlingCallback.onResult(null, t);
107111
}
108-
});
112+
};
113+
114+
if (clientSession != null) {
115+
chunksCollection.deleteMany(clientSession, new Document("files_id", fileId), deleteCallback);
116+
} else {
117+
chunksCollection.deleteMany(new Document("files_id", fileId), deleteCallback);
118+
}
109119
}
110120

111121
@Override
@@ -167,21 +177,27 @@ public void onResult(final Void result, final Throwable t) {
167177
GridFSFile gridFSFile = new GridFSFile(fileId, filename, lengthInBytes, chunkSizeBytes, new Date(),
168178
toHex(md5.digest()), metadata);
169179

170-
filesCollection.insertOne(gridFSFile, new SingleResultCallback<Void>() {
180+
SingleResultCallback<Void> insertCallback = new SingleResultCallback<Void>() {
171181
@Override
172182
public void onResult(final Void result, final Throwable t) {
173183
buffer = null;
174184
releaseWritingLock();
175185
errHandlingCallback.onResult(result, t);
176186
}
177-
});
187+
};
188+
189+
if (clientSession != null) {
190+
filesCollection.insertOne(clientSession, gridFSFile, insertCallback);
191+
} else {
192+
filesCollection.insertOne(gridFSFile, insertCallback);
193+
}
178194
}
179195
}
180196
});
181197
}
182198

183199
private void write(final int amount, final ByteBuffer src, final SingleResultCallback<Integer> callback) {
184-
if (!takeWritingLock(callback)){
200+
if (!takeWritingLock(callback)) {
185201
return;
186202
}
187203

@@ -234,21 +250,25 @@ private void writeChunk(final SingleResultCallback<Void> callback) {
234250
if (md5 == null) {
235251
callback.onResult(null, new MongoGridFSException("No MD5 message digest available, cannot upload file"));
236252
} else if (bufferOffset > 0) {
237-
chunksCollection.insertOne(new Document("files_id", fileId).append("n", chunkIndex).append("data", getData()),
238-
new SingleResultCallback<Void>() {
239-
@Override
240-
public void onResult(final Void result, final Throwable t) {
241-
if (t != null) {
242-
callback.onResult(null, t);
243-
} else {
244-
md5.update(buffer);
245-
chunkIndex++;
246-
bufferOffset = 0;
247-
callback.onResult(null, null);
248-
}
249-
}
253+
Document insertDocument = new Document("files_id", fileId).append("n", chunkIndex).append("data", getData());
254+
SingleResultCallback<Void> insertCallback = new SingleResultCallback<Void>() {
255+
@Override
256+
public void onResult(final Void result, final Throwable t) {
257+
if (t != null) {
258+
callback.onResult(null, t);
259+
} else {
260+
md5.update(buffer);
261+
chunkIndex++;
262+
bufferOffset = 0;
263+
callback.onResult(null, null);
250264
}
251-
);
265+
}
266+
};
267+
if (clientSession != null) {
268+
chunksCollection.insertOne(clientSession, insertDocument, insertCallback);
269+
} else {
270+
chunksCollection.insertOne(insertDocument, insertCallback);
271+
}
252272
} else {
253273
callback.onResult(null, null);
254274
}

0 commit comments

Comments
 (0)