1717
1818import com .datastax .oss .driver .api .core .ConsistencyLevel ;
1919import com .datastax .oss .driver .api .core .CqlSession ;
20+ import com .datastax .oss .driver .api .core .cql .BoundStatement ;
21+ import com .datastax .oss .driver .api .core .cql .PreparedStatement ;
2022import com .datastax .oss .driver .api .core .cql .ResultSet ;
2123import com .datastax .oss .driver .api .core .cql .SimpleStatement ;
24+ import com .datastax .oss .driver .api .core .type .codec .TypeCodec ;
25+ import com .datastax .oss .driver .api .core .type .codec .registry .CodecRegistry ;
2226import com .datastax .oss .driver .api .querybuilder .QueryBuilder ;
23- import com .datastax .oss .driver .api .querybuilder .insert .RegularInsert ;
27+ import com .datastax .oss .driver .api .querybuilder .insert .Insert ;
2428import jakarta .nosql .column .ColumnDeleteQuery ;
2529import jakarta .nosql .column .ColumnEntity ;
2630import jakarta .nosql .column .ColumnQuery ;
2731
2832import java .time .Duration ;
2933import java .util .Map ;
34+ import java .util .Objects ;
3035import java .util .concurrent .Executor ;
3136import java .util .stream .Collectors ;
3237import java .util .stream .Stream ;
@@ -52,7 +57,7 @@ class DefaultCassandraColumnFamilyManager implements CassandraColumnFamilyManage
5257 @ Override
5358 public ColumnEntity insert (ColumnEntity entity ) {
5459 requireNonNull (entity , "entity is required" );
55- final RegularInsert insert = QueryUtils .insert (entity , keyspace , session , null );
60+ final Insert insert = QueryUtils .insert (entity , keyspace , session , null );
5661 session .execute (insert .build ());
5762 return entity ;
5863 }
@@ -61,7 +66,7 @@ public ColumnEntity insert(ColumnEntity entity) {
6166 public ColumnEntity insert (ColumnEntity entity , Duration duration ) {
6267 requireNonNull (entity , "entity is required" );
6368 requireNonNull (duration , "duration is required" );
64- final RegularInsert insert = QueryUtils .insert (entity , keyspace , session , duration );
69+ final Insert insert = QueryUtils .insert (entity , keyspace , session , duration );
6570 session .execute (insert .build ());
6671 return entity ;
6772 }
@@ -95,8 +100,41 @@ public Iterable<ColumnEntity> insert(Iterable<ColumnEntity> entities, Duration d
95100 }
96101
97102 @ Override
98- public void delete (ColumnDeleteQuery columnDeleteQuery ) {
103+ public ColumnEntity save (ColumnEntity entity , ConsistencyLevel level ) {
104+ requireNonNull (entity , "entities is required" );
105+ requireNonNull (level , "level is required" );
106+
107+ final Insert insert = QueryUtils .insert (entity , keyspace , session , null );
108+ session .execute (insert .build ().setConsistencyLevel (level ));
109+ return entity ;
110+ }
111+
112+ @ Override
113+ public ColumnEntity save (ColumnEntity entity , Duration ttl , ConsistencyLevel level ) {
114+ requireNonNull (entity , "entity is required" );
115+ requireNonNull (level , "level is required" );
116+ requireNonNull (ttl , "ttl is required" );
99117
118+ final Insert insert = QueryUtils .insert (entity , keyspace , session , ttl );
119+ session .execute (insert .build ().setConsistencyLevel (level ));
120+ return entity ;
121+ }
122+
123+ @ Override
124+ public Iterable <ColumnEntity > save (Iterable <ColumnEntity > entities , ConsistencyLevel level ) {
125+ requireNonNull (entities , "entities is required" );
126+ requireNonNull (level , "level is required" );
127+ return StreamSupport .stream (entities .spliterator (), false ).map (c -> this .save (c , level ))
128+ .collect (Collectors .toList ());
129+ }
130+
131+ @ Override
132+ public Iterable <ColumnEntity > save (Iterable <ColumnEntity > entities , Duration ttl , ConsistencyLevel level ) {
133+ requireNonNull (entities , "entities is required" );
134+ requireNonNull (level , "level is required" );
135+ requireNonNull (ttl , "ttl is required" );
136+ return StreamSupport .stream (entities .spliterator (), false ).map (c -> this .save (c , ttl , level ))
137+ .collect (Collectors .toList ());
100138 }
101139
102140 @ Override
@@ -106,6 +144,13 @@ public Stream<ColumnEntity> select(ColumnQuery query) {
106144 return executor .execute (keyspace , query , this );
107145 }
108146
147+ @ Override
148+ public Stream <ColumnEntity > select (ColumnQuery query , ConsistencyLevel level ) throws NullPointerException {
149+ requireNonNull (query , "query is required" );
150+ QueryExecutor executor = QueryExecutor .of (query );
151+ return executor .execute (keyspace , query , level , this );
152+ }
153+
109154 @ Override
110155 public long count (String columnFamily ) {
111156 requireNonNull (columnFamily , "columnFamily is required" );
@@ -119,66 +164,64 @@ public void close() {
119164 session .close ();
120165 }
121166
122- CqlSession getSession () {
123- return session ;
124- }
125-
126167 @ Override
127- public String toString () {
128- final StringBuilder sb = new StringBuilder ("CassandraColumnFamilyManager{" );
129- sb .append ("session=" ).append (session );
130- sb .append ('}' );
131- return sb .toString ();
132- }
168+ public void delete (ColumnDeleteQuery query , ConsistencyLevel level ) throws NullPointerException {
133169
134- @ Override
135- public ColumnEntity save (ColumnEntity entity , ConsistencyLevel level ) throws NullPointerException {
136- return null ;
137170 }
138171
139172 @ Override
140- public ColumnEntity save (ColumnEntity entity , Duration ttl , ConsistencyLevel level ) throws NullPointerException {
141- return null ;
142- }
173+ public void delete (ColumnDeleteQuery columnDeleteQuery ) {
143174
144- @ Override
145- public Iterable <ColumnEntity > save (Iterable <ColumnEntity > entities , ConsistencyLevel level ) throws NullPointerException {
146- return null ;
147175 }
148176
149- @ Override
150- public Iterable <ColumnEntity > save (Iterable <ColumnEntity > entities , Duration ttl , ConsistencyLevel level ) throws NullPointerException {
151- return null ;
152- }
153177
154178 @ Override
155- public void delete (ColumnDeleteQuery query , ConsistencyLevel level ) throws NullPointerException {
156-
179+ public Stream <ColumnEntity > cql (String query ) {
180+ Objects .requireNonNull (query , "query is required" );
181+ final ResultSet resultSet = session .execute (query );
182+ return resultSet .all ().stream ().map (CassandraConverter ::toDocumentEntity );
157183 }
158184
159185 @ Override
160- public Stream <ColumnEntity > select (ColumnQuery query , ConsistencyLevel level ) throws NullPointerException {
161- return null ;
186+ public Stream <ColumnEntity > cql (String query , Map <String , Object > values ) {
187+ Objects .requireNonNull (query , "query is required" );
188+ Objects .requireNonNull (values , "values is required" );
189+ final PreparedStatement prepare = session .prepare (query );
190+ BoundStatement statement = prepare .bind ();
191+ for (Map .Entry <String , Object > entry : values .entrySet ()) {
192+ final TypeCodec <Object > codec = CodecRegistry .DEFAULT .codecFor ((Class <Object >) entry .getValue ().getClass ());
193+ statement = statement .set (entry .getKey (), entry .getValue (), codec );
194+ }
195+ final ResultSet resultSet = session .execute (statement );
196+ return resultSet .all ().stream ().map (CassandraConverter ::toDocumentEntity );
162197 }
163198
164199 @ Override
165- public Stream <ColumnEntity > cql (String query ) throws NullPointerException {
166- return null ;
200+ public Stream <ColumnEntity > execute (SimpleStatement statement ) {
201+ Objects .requireNonNull (statement , "statement is required" );
202+ final ResultSet resultSet = session .execute (statement );
203+ return resultSet .all ().stream ().map (CassandraConverter ::toDocumentEntity );
167204 }
168205
169206 @ Override
170- public Stream <ColumnEntity > cql (String query , Map <String , Object > values ) throws NullPointerException {
171- return null ;
207+ public CassandraPreparedStatement nativeQueryPrepare (String query ) {
208+ Objects .requireNonNull (query , "query is required" );
209+ final PreparedStatement prepare = session .prepare (query );
210+ return new CassandraPreparedStatement (prepare , session );
172211 }
173212
174- @ Override
175- public Stream < ColumnEntity > execute ( SimpleStatement statement ) throws NullPointerException {
176- return null ;
213+
214+ CqlSession getSession () {
215+ return session ;
177216 }
178217
218+
179219 @ Override
180- public CassandraPreparedStatement nativeQueryPrepare (String query ) throws NullPointerException {
181- return null ;
220+ public String toString () {
221+ final StringBuilder sb = new StringBuilder ("CassandraColumnFamilyManager{" );
222+ sb .append ("session=" ).append (session );
223+ sb .append ('}' );
224+ return sb .toString ();
182225 }
183226}
184227
0 commit comments