Skip to content

Commit e23dea7

Browse files
authored
Merge pull request #958: [proxima-direct-io-cassandra] unsynchronize CassandraWriter
2 parents 05bcd6f + 0acb4cb commit e23dea7

File tree

3 files changed

+10
-4
lines changed

3 files changed

+10
-4
lines changed

direct/core/src/main/java/cz/o2/proxima/direct/core/DirectDataOperator.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package cz.o2.proxima.direct.core;
1717

18+
import cz.o2.proxima.core.annotations.DeclaredThreadSafe;
1819
import cz.o2.proxima.core.annotations.Internal;
1920
import cz.o2.proxima.core.functional.Factory;
2021
import cz.o2.proxima.core.functional.UnaryFunction;
@@ -360,7 +361,11 @@ private void cacheOrRetrieveWriterFor(DirectAttributeFamilyDescriptor af) {
360361
// store writer of this family to all attributes
361362
for (AttributeDescriptor<?> a : af.getAttributes()) {
362363
if (a.getTransactionMode() == TransactionMode.NONE) {
363-
writers.put(a, OnlineAttributeWriters.synchronizedWriter(familyWriter));
364+
if (familyWriter.getClass().getAnnotation(DeclaredThreadSafe.class) == null) {
365+
writers.put(a, OnlineAttributeWriters.synchronizedWriter(familyWriter));
366+
} else {
367+
writers.put(a, familyWriter);
368+
}
364369
} else {
365370
writers.put(a, maybeTransactionalWriter);
366371
}

direct/core/src/main/java/cz/o2/proxima/direct/core/OnlineAttributeWriters.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public synchronized Factory<? extends OnlineAttributeWriter> asFactory() {
4141
}
4242

4343
@Override
44-
public synchronized URI getUri() {
44+
public URI getUri() {
4545
return delegate.getUri();
4646
}
4747

direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CassandraWriter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.datastax.driver.core.BoundStatement;
1919
import com.datastax.driver.core.Session;
20+
import cz.o2.proxima.core.annotations.DeclaredThreadSafe;
2021
import cz.o2.proxima.core.storage.StreamElement;
2122
import cz.o2.proxima.direct.core.AbstractOnlineAttributeWriter;
2223
import cz.o2.proxima.direct.core.CommitCallback;
@@ -27,6 +28,7 @@
2728

2829
/** A {@link OnlineAttributeWriter} implementation for Cassandra. */
2930
@Slf4j
31+
@DeclaredThreadSafe
3032
class CassandraWriter extends AbstractOnlineAttributeWriter implements OnlineAttributeWriter {
3133

3234
private final CassandraDBAccessor accessor;
@@ -39,8 +41,7 @@ class CassandraWriter extends AbstractOnlineAttributeWriter implements OnlineAtt
3941
}
4042

4143
@Override
42-
public synchronized void write(StreamElement data, CommitCallback statusCallback) {
43-
44+
public void write(StreamElement data, CommitCallback statusCallback) {
4445
try {
4546
Session session = accessor.ensureSession();
4647
Optional<BoundStatement> cql = accessor.getCqlFactory().getWriteStatement(data, session);

0 commit comments

Comments
 (0)