88import com .datastax .oss .driver .api .core .cql .Row ;
99import lombok .AllArgsConstructor ;
1010import lombok .Data ;
11- import lombok .NoArgsConstructor ;
1211import lombok .NonNull ;
1312import lombok .extern .slf4j .Slf4j ;
1413
@@ -26,11 +25,11 @@ public class ClusteredCassandraTable extends AbstractCassandraTable<ClusteredCas
2625 /**
2726 * Prepared statements
2827 */
29- private final PreparedStatement findPartitionStatement ;
30- private final PreparedStatement deletePartitionStatement ;
31- private final PreparedStatement deleteRowStatement ;
32- private final PreparedStatement insertRowStatement ;
33- private final PreparedStatement findRowStatement ;
28+ private PreparedStatement findPartitionStatement ;
29+ private PreparedStatement deletePartitionStatement ;
30+ private PreparedStatement deleteRowStatement ;
31+ private PreparedStatement insertRowStatement ;
32+ private PreparedStatement findRowStatement ;
3433
3534 /**
3635 * Constructor with the mandatory parameters.
@@ -44,41 +43,50 @@ public class ClusteredCassandraTable extends AbstractCassandraTable<ClusteredCas
4443 */
4544 public ClusteredCassandraTable (@ NonNull CqlSession session , @ NonNull String keyspaceName , @ NonNull String tableName ) {
4645 super (session , keyspaceName , tableName );
47- createSchema ();
48- findPartitionStatement = session .prepare (
49- "select * from " + keyspaceName + "." + tableName
50- + " where " + PARTITION_ID + " = ? " );
51- deletePartitionStatement = session .prepare (
52- "delete from " + keyspaceName + "." + tableName
53- + " where " + PARTITION_ID + " = ? " );
54- findRowStatement = session .prepare (
55- "select * from " + keyspaceName + "." + tableName
56- + " where " + PARTITION_ID + " = ? "
57- + " and " + ROW_ID + " = ? " );
58- deleteRowStatement = session .prepare (
59- "delete from " + keyspaceName + "." + tableName
60- + " where " + PARTITION_ID + " = ? "
61- + " and " + ROW_ID + " = ? " );
62- insertRowStatement = session .prepare (
63- "insert into " + keyspaceName + "." + tableName
64- + " (" + PARTITION_ID + ", " + ROW_ID + ", " + BODY_BLOB + ") "
65- + " values (?, ?, ?)" );
6646 }
6747
48+ /**
49+ * Prepare statements on first request.
50+ */
51+ private synchronized void prepareStatements () {
52+ if (findPartitionStatement == null ) {
53+ findPartitionStatement = cqlSession .prepare (
54+ "select * from " + keyspaceName + "." + tableName
55+ + " where " + PARTITION_ID + " = ? " );
56+ deletePartitionStatement = cqlSession .prepare (
57+ "delete from " + keyspaceName + "." + tableName
58+ + " where " + PARTITION_ID + " = ? " );
59+ findRowStatement = cqlSession .prepare (
60+ "select * from " + keyspaceName + "." + tableName
61+ + " where " + PARTITION_ID + " = ? "
62+ + " and " + ROW_ID + " = ? " );
63+ deleteRowStatement = cqlSession .prepare (
64+ "delete from " + keyspaceName + "." + tableName
65+ + " where " + PARTITION_ID + " = ? "
66+ + " and " + ROW_ID + " = ? " );
67+ insertRowStatement = cqlSession .prepare (
68+ "insert into " + keyspaceName + "." + tableName
69+ + " (" + PARTITION_ID + ", " + ROW_ID + ", " + BODY_BLOB + ") "
70+ + " values (?, ?, ?)" );
71+ }
72+ }
73+
74+ /** {@inheritDoc} */
6875 @ Override
69- public void createSchema () {
76+ public void create () {
7077 cqlSession .execute ("CREATE TABLE IF NOT EXISTS " + keyspaceName + "." + tableName + " ("
7178 + PARTITION_ID + " text, "
7279 + ROW_ID + " timeuuid, "
7380 + BODY_BLOB + " text, "
7481 + "PRIMARY KEY ((" + PARTITION_ID + "), " + ROW_ID + ")) "
75- + "WITH CLUSTERING ORDER BY (" + ROW_ID + " DESC" );
82+ + "WITH CLUSTERING ORDER BY (" + ROW_ID + " DESC) " );
7683 log .info ("+ Table '{}' has been created (if needed)." , tableName );
7784 }
7885
7986 /** {@inheritDoc} */
8087 @ Override
8188 public void put (@ NonNull ClusteredCassandraTable .Record row ) {
89+ prepareStatements ();
8290 cqlSession .execute (insertRowStatement .bind (row .getPartitionId (), row .getRowId (), row .getBody ()));
8391 }
8492
@@ -100,6 +108,7 @@ public Record mapRow(@NonNull Row row) {
100108 * list of rows
101109 */
102110 public List <Record > findPartition (@ NonNull String partitionDd ) {
111+ prepareStatements ();
103112 return cqlSession .execute (findPartitionStatement .bind (partitionDd ))
104113 .all ().stream ()
105114 .map (this ::mapRow )
@@ -113,6 +122,7 @@ public List<Record> findPartition(@NonNull String partitionDd) {
113122 * current rows.
114123 */
115124 public void upsertPartition (List <Record > rows ) {
125+ prepareStatements ();
116126 if (rows != null && !rows .isEmpty ()) {
117127 BatchStatementBuilder batch = BatchStatement .builder (BatchType .LOGGED );
118128 String currentPartitionId = null ;
@@ -137,6 +147,7 @@ public void upsertPartition(List<Record> rows) {
137147 * record if exists
138148 */
139149 public Optional <Record > findById (String partition , UUID rowId ) {
150+ prepareStatements ();
140151 return Optional .ofNullable (cqlSession
141152 .execute (findRowStatement .bind (partition , rowId ))
142153 .one ()).map (this ::mapRow );
@@ -149,6 +160,7 @@ public Optional<Record> findById(String partition, UUID rowId) {
149160 * delete the whole partition
150161 */
151162 public void deletePartition (@ NonNull String partitionId ) {
163+ prepareStatements ();
152164 cqlSession .execute (deletePartitionStatement .bind (partitionId ));
153165 }
154166
@@ -161,6 +173,7 @@ public void deletePartition(@NonNull String partitionId) {
161173 * message id
162174 */
163175 public void delete (@ NonNull String partitionId , @ NonNull UUID rowId ) {
176+ prepareStatements ();
164177 cqlSession .execute (deleteRowStatement .bind (partitionId , rowId ));
165178 }
166179
@@ -175,6 +188,7 @@ public void delete(@NonNull String partitionId, @NonNull UUID rowId) {
175188 * body
176189 */
177190 public void insert (@ NonNull String partitionId , @ NonNull UUID rowId , @ NonNull String bodyBlob ) {
191+ prepareStatements ();
178192 cqlSession .execute (insertRowStatement .bind (partitionId ,rowId , bodyBlob ));
179193 }
180194
0 commit comments