Skip to content

Commit 34f87b5

Browse files
committed
Javadoc, Dependencyies, test
1 parent 8c1d8a1 commit 34f87b5

17 files changed

+262
-36
lines changed

pom.xml

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@
1313
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
1414
<maven.plugin.compiler.source>11</maven.plugin.compiler.source>
1515
<maven.plugin.compiler.target>11</maven.plugin.compiler.target>
16+
<maven.skip.test>false</maven.skip.test>
1617
<!-- Beam -->
1718
<beam.version>2.48.0</beam.version>
1819
<!-- Astra -->
19-
<astra-sdk.version>0.6</astra-sdk.version>
20+
<astra-sdk.version>0.6.2</astra-sdk.version>
2021
<cassandra-driver4x.version>4.16.0</cassandra-driver4x.version>
2122
<!-- Third Parties -->
2223
<slf4j.version>2.0.7</slf4j.version>
23-
<logback.version>1.4.7</logback.version>
24+
<logback.version>1.4.8</logback.version>
2425
<auto-value.version>1.10.1</auto-value.version>
2526
<junit.version>4.13.2</junit.version>
2627
<hamcrest.version>2.2</hamcrest.version>
@@ -31,9 +32,9 @@
3132
<version.maven.plugin.exec>1.6.0</version.maven.plugin.exec>
3233
<version.maven.plugin.gpg>3.0.1</version.maven.plugin.gpg>
3334
<version.maven.plugin.jar>3.3.0</version.maven.plugin.jar>
34-
<version.maven.plugin.javadoc>3.4.1</version.maven.plugin.javadoc>
35+
<version.maven.plugin.javadoc>3.5.0</version.maven.plugin.javadoc>
3536
<version.maven.plugin.license>2.0.0</version.maven.plugin.license>
36-
<version.maven.plugin.nexus>1.6.8</version.maven.plugin.nexus>
37+
<version.maven.plugin.nexus>1.6.13</version.maven.plugin.nexus>
3738
<version.maven.plugin.release>2.5.2</version.maven.plugin.release>
3839
<version.maven.plugin.resources>3.3.1</version.maven.plugin.resources>
3940
<version.maven.plugin.shade>3.4.1</version.maven.plugin.shade>
@@ -185,7 +186,6 @@
185186
</configuration>
186187
</plugin>
187188

188-
<!--
189189
<plugin>
190190
<groupId>org.apache.maven.plugins</groupId>
191191
<artifactId>maven-javadoc-plugin</artifactId>
@@ -199,18 +199,16 @@
199199
</execution>
200200
</executions>
201201
</plugin>
202-
-->
203202

204203
<plugin>
205204
<groupId>org.apache.maven.plugins</groupId>
206205
<artifactId>maven-surefire-plugin</artifactId>
207206
<version>${version.maven.plugin.surefire}</version>
208207
<configuration>
209-
<skipTests>true</skipTests>
208+
<skipTests>${maven.skip.test}</skipTests>
210209
</configuration>
211210
</plugin>
212211

213-
214212
<plugin>
215213
<groupId>org.apache.maven.plugins</groupId>
216214
<artifactId>maven-dependency-plugin</artifactId>

src/main/java/org/apache/beam/sdk/io/astra/db/AstraDbIO.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,12 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>
141141

142142
abstract @Nullable Coder<T> coder();
143143

144+
/**
145+
* Expose split count.
146+
*
147+
* @return
148+
* minimal split count
149+
*/
144150
public abstract @Nullable ValueProvider<Integer> minNumberOfSplits();
145151

146152
abstract @Nullable SerializableFunction<CqlSession, AstraDbMapper<T>> mapperFactoryFn();
@@ -853,6 +859,9 @@ public PCollection<T> expand(PCollection<Read<T>> input) {
853859
}
854860
}
855861

862+
/**
863+
* Closing Open Connections.
864+
*/
856865
public static void close() {
857866
CqlSessionHolder.cleanup();
858867
}

src/main/java/org/apache/beam/sdk/io/astra/db/CqlSessionHolder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ public static synchronized CqlSession getCqlSession(AstraDbIO.Write<?> write) {
9696
* token (or clientSecret)
9797
* @param secureConnectBundle
9898
* read scb as stream
99+
* @param keyspace
100+
* list of keyspaces
99101
* @return
100102
* cassandra cluster
101103
*/
@@ -172,6 +174,9 @@ private static synchronized void init() {
172174
}
173175
}
174176

177+
/**
178+
* Close all sessions.
179+
*/
175180
public static void cleanup() {
176181
cacheSessions.values().stream()
177182
.filter(s->!s.isClosed())

src/main/java/org/apache/beam/sdk/io/astra/db/ExecuteCqlFn.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,27 +29,60 @@
2929
*/
3030
public abstract class ExecuteCqlFn<T> extends DoFn<String, Row> {
3131

32+
/**
33+
* The Cassandra session.
34+
*/
3235
CqlSession cqlSession;
3336

37+
/**
38+
* Constructor with Session.
39+
*
40+
* @param session the Cassandra session.
41+
*/
3442
public ExecuteCqlFn(CqlSession session) {
3543
this.cqlSession = session;
3644
}
3745

46+
/**
47+
* Constructor with read.
48+
*
49+
* @param read reader.
50+
*/
3851
public ExecuteCqlFn(AstraDbIO.Read<?> read) {
3952
this.cqlSession = CqlSessionHolder.getCqlSession(
4053
read.token(), read.secureConnectBundle(), read.keyspace());
4154
}
4255

56+
/**
57+
* Constructor with write.
58+
*
59+
* @param write writer
60+
*/
4361
public ExecuteCqlFn(AstraDbIO.Write<?> write) {
4462
this.cqlSession = CqlSessionHolder.getCqlSession(
4563
write.token(), write.secureConnectBundle(), write.keyspace());
4664
}
4765

66+
/**
67+
* Executing query as implementation of the DoFn.
68+
* @param query
69+
* current query
70+
* @param receiver
71+
* query responses
72+
*/
4873
@ProcessElement
4974
public void processElement(@Element String query, OutputReceiver<T> receiver) {
5075
cqlSession.execute(query).forEach(row -> receiver.output(mapRow(row)));
5176
}
5277

78+
/**
79+
* Show be implemented to map row.
80+
*
81+
* @param row
82+
* target row
83+
* @return
84+
* row mapper
85+
*/
5386
public abstract T mapRow(Row row);
5487

5588

src/main/java/org/apache/beam/sdk/io/astra/db/ExecuteCqlSimpleFn.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,41 @@
2828
*/
2929
public class ExecuteCqlSimpleFn extends ExecuteCqlFn<Row> {
3030

31+
/**
32+
* Constructor with Session.
33+
*
34+
* @param session the Cassandra session.
35+
*/
3136
public ExecuteCqlSimpleFn(CqlSession session) {
3237
super(session);
3338
}
3439

40+
/**
41+
* Constructor with read.
42+
*
43+
* @param read the Cassandra session.
44+
*/
3545
public ExecuteCqlSimpleFn(AstraDbIO.Read<?> read) {
3646
super(read);
3747
}
3848

49+
/**
50+
* Constructor with Write.
51+
*
52+
* @param write the Cassandra session.
53+
*/
3954
public ExecuteCqlSimpleFn(AstraDbIO.Write<?> write) {
4055
super(write);
4156
}
4257

58+
/**
59+
* No Mapping.
60+
*
61+
* @param row
62+
* cassandra row
63+
* @return
64+
* row
65+
*/
4366
@Override
4467
public Row mapRow(Row row) {
4568
return row;

src/main/java/org/apache/beam/sdk/io/astra/db/mapping/AstraDbMapper.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,13 @@
5151
*/
5252
public interface AstraDbMapper<T> {
5353

54+
/**
55+
* Produce object out of the row
56+
* @param row
57+
* cassandra row
58+
* @return
59+
* entity
60+
*/
5461
@GetEntity
5562
T mapRow(Row row);
5663

src/main/java/org/apache/beam/sdk/io/astra/db/mapping/BeamRowDbMapper.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,15 @@ public class BeamRowDbMapper implements AstraDbMapper<Row>, Serializable {
8686
*/
8787
private final Set<String> primaryKeysColumnNames;
8888

89-
List<ColumnDefinition> columnDefinitions;
89+
/**
90+
* List of columns of the current table.
91+
*/
92+
private List<ColumnDefinition> columnDefinitions;
9093

91-
Schema beamSchema;
94+
/**
95+
* Associated bean schema
96+
*/
97+
private Schema beamSchema;
9298

9399
/**
94100
* Constructor used by AstraDbRowMapperFactory.

src/main/java/org/apache/beam/sdk/io/astra/db/transforms/split/AstraSplitFn.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,13 @@ public AstraSplitFn() {
5353
LOG.info("Split into Token Ranges.");
5454
}
5555

56+
/**
57+
* Implement splits in token ranges.
58+
* @param read
59+
* current read
60+
* @param outputReceiver
61+
* splits
62+
*/
5663
@ProcessElement
5764
public void process(@Element AstraDbIO.Read<T> read, OutputReceiver<AstraDbIO.Read<T>> outputReceiver) {
5865
getRingRanges(read)

src/main/java/org/apache/beam/sdk/io/astra/db/transforms/split/AstraTokenFactory.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,26 +44,60 @@
4444
import java.math.BigInteger;
4545
import java.util.Set;
4646

47+
/**
48+
* Produce Token Ranges on Mumur3 (used in AStra)
49+
*/
4750
public class AstraTokenFactory extends Murmur3TokenFactory {
4851

52+
/** Total number of tokens in the ring. */
4953
public static final BigInteger TOTAL_TOKEN_COUNT =
5054
BigInteger.valueOf(Long.MAX_VALUE).subtract(BigInteger.valueOf(Long.MIN_VALUE));
5155

56+
/**
57+
* Get total number of token for the ring.
58+
*
59+
* @return
60+
* token number
61+
*/
5262
@NonNull
5363
public BigInteger totalTokenCount() {
5464
return TOTAL_TOKEN_COUNT;
5565
}
5666

67+
/**
68+
* Compute a range of token.
69+
*
70+
* @param start
71+
* start token
72+
* @param end
73+
* end token
74+
* @param replicas
75+
* replicas endpoints
76+
* @return
77+
* a token range
78+
*/
5779
@NonNull
5880
public AstraTokenRange range(@NonNull Token start, @NonNull Token end, @NonNull Set<AstraTokenRangeEndpoint> replicas) {
5981
return new AstraTokenRange(((Murmur3Token) start), (Murmur3Token) end, replicas);
6082
}
6183

84+
/**
85+
* Create Splitter.
86+
*
87+
* @return
88+
* token range splitter
89+
*/
6290
@NonNull
6391
public TokenRangeSplitter splitter() {
6492
return new AstraTokenRangeSplitter();
6593
}
6694

95+
/**
96+
* Create token clusterer.
97+
*
98+
* @return
99+
* clusterer.
100+
*/
67101
@NonNull
68102
public TokenRangeClusterer clusterer() {
69103
return new TokenRangeClusterer(this);

src/main/java/org/apache/beam/sdk/io/astra/db/transforms/split/AstraTokenRange.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,33 +50,74 @@
5050
*/
5151
public class AstraTokenRange extends Murmur3TokenRange implements Serializable {
5252

53+
/** Set of replicas. */
5354
private final Set<AstraTokenRangeEndpoint> replicas;
5455

56+
/**
57+
* Constructor without replicas.
58+
* @param start
59+
* start token
60+
* @param end
61+
* end token
62+
*/
5563
public AstraTokenRange(@NonNull Murmur3Token start, @NonNull Murmur3Token end) {
5664
super(start, end);
5765
this.replicas = ImmutableSet.of();
5866
}
5967

68+
/**
69+
* Full constructor.
70+
* @param start
71+
* start token
72+
* @param end
73+
* end token
74+
* @param replicas
75+
* replicas
76+
*/
6077
public AstraTokenRange(@NonNull Murmur3Token start, @NonNull Murmur3Token end, @NonNull Set<AstraTokenRangeEndpoint> replicas) {
6178
super(start, end);
6279
this.replicas = ImmutableSet.copyOf(replicas);
6380
}
6481

82+
/**
83+
* Start token
84+
*
85+
* @return
86+
* getter for start token
87+
*/
6588
@NonNull
6689
public Murmur3Token getStart() {
6790
return (Murmur3Token) super.getStart();
6891
}
6992

93+
/**
94+
* End token.
95+
*
96+
* @return
97+
* getter for end token
98+
*/
7099
@NonNull
71100
public Murmur3Token getEnd() {
72101
return (Murmur3Token) super.getEnd();
73102
}
74103

104+
/**
105+
* List of replicas.
106+
*
107+
* @return
108+
* replicas list
109+
*/
75110
@NonNull
76111
public Set<AstraTokenRangeEndpoint> replicas() {
77112
return replicas;
78113
}
79114

115+
/**
116+
* Size of the range.
117+
*
118+
* @return
119+
* size
120+
*/
80121
@NonNull
81122
public BigInteger size() {
82123
BigInteger left = BigInteger.valueOf(getStart().getValue());
@@ -88,6 +129,12 @@ public BigInteger size() {
88129
}
89130
}
90131

132+
/**
133+
* Fraction of ring range.
134+
*
135+
* @return
136+
* fracction of the range
137+
*/
91138
public double fraction() {
92139
return size().doubleValue() / AstraTokenFactory.TOTAL_TOKEN_COUNT.doubleValue();
93140
}

0 commit comments

Comments
 (0)