3030import com .datastax .oss .driver .api .core .metadata .schema .TableMetadata ;
3131import com .datastax .oss .driver .api .core .type .DataType ;
3232import com .datastax .oss .driver .api .core .type .DataTypes ;
33+ import com .datastax .oss .driver .api .core .type .UserDefinedType ;
3334import com .datastax .oss .driver .api .core .type .codec .registry .CodecRegistry ;
3435import com .datastax .oss .driver .api .core .type .reflect .GenericType ;
3536import com .datastax .oss .driver .api .querybuilder .SchemaBuilder ;
@@ -61,19 +62,26 @@ public final class CassandraChatMemoryConfig {
6162 // todo – make configurable
6263 public static final String DEFAULT_EXCHANGE_ID_NAME = "message_timestamp" ;
6364
64- public static final String DEFAULT_ASSISTANT_COLUMN_NAME = "assistant" ;
65-
66- public static final String DEFAULT_USER_COLUMN_NAME = "user" ;
65+ public static final String DEFAULT_MESSAGES_COLUMN_NAME = "messages" ;
6766
6867 private static final Logger logger = LoggerFactory .getLogger (CassandraChatMemoryConfig .class );
6968
7069 final CqlSession session ;
7170
7271 final Schema schema ;
7372
74- final String assistantColumn ;
73+ final String messageUDT = "ai_chat_message" ;
74+
75+ final String messagesColumn ;
76+
77+ // todo – make configurable
78+ final String messageUdtTimestampColumn = "msg_timestamp" ;
79+
80+ // todo – make configurable
81+ final String messageUdtTypeColumn = "msg_type" ;
7582
76- final String userColumn ;
83+ // todo – make configurable
84+ final String messageUdtContentColumn = "msg_content" ;
7785
7886 final SessionIdToPrimaryKeysTranslator primaryKeyTranslator ;
7987
@@ -84,8 +92,7 @@ public final class CassandraChatMemoryConfig {
8492 private CassandraChatMemoryConfig (Builder builder ) {
8593 this .session = builder .session ;
8694 this .schema = new Schema (builder .keyspace , builder .table , builder .partitionKeys , builder .clusteringKeys );
87- this .assistantColumn = builder .assistantColumn ;
88- this .userColumn = builder .userColumn ;
95+ this .messagesColumn = builder .messagesColumn ;
8996 this .timeToLiveSeconds = builder .timeToLiveSeconds ;
9097 this .disallowSchemaChanges = builder .disallowSchemaChanges ;
9198 this .primaryKeyTranslator = builder .primaryKeyTranslator ;
@@ -109,6 +116,7 @@ void dropKeyspace() {
109116 void ensureSchemaExists () {
110117 if (!this .disallowSchemaChanges ) {
111118 SchemaUtil .ensureKeyspaceExists (this .session , this .schema .keyspace );
119+ ensureMessageTypeExist ();
112120 ensureTableExists ();
113121 ensureTableColumnsExist ();
114122 SchemaUtil .checkSchemaAgreement (this .session );
@@ -129,17 +137,35 @@ void checkSchemaValid() {
129137 .getTable (this .schema .table )
130138 .isPresent (), "table %s does not exist" );
131139
140+ Preconditions .checkState (this .session .getMetadata ()
141+ .getKeyspace (this .schema .keyspace ())
142+ .get ()
143+ .getUserDefinedType (messageUDT )
144+ .isPresent (), "table %s does not exist" );
145+
146+ UserDefinedType udt = this .session .getMetadata ()
147+ .getKeyspace (this .schema .keyspace ())
148+ .get ()
149+ .getUserDefinedType (messageUDT )
150+ .get ();
151+
152+ Preconditions .checkState (udt .contains (this .messageUdtTimestampColumn ), "field %s does not exist" ,
153+ this .messageUdtTimestampColumn );
154+
155+ Preconditions .checkState (udt .contains (this .messageUdtTypeColumn ), "field %s does not exist" ,
156+ this .messageUdtTypeColumn );
157+
158+ Preconditions .checkState (udt .contains (this .messageUdtContentColumn ), "field %s does not exist" ,
159+ this .messageUdtContentColumn );
160+
132161 TableMetadata tableMetadata = this .session .getMetadata ()
133162 .getKeyspace (this .schema .keyspace )
134163 .get ()
135164 .getTable (this .schema .table )
136165 .get ();
137166
138- Preconditions .checkState (tableMetadata .getColumn (this .assistantColumn ).isPresent (), "column %s does not exist" ,
139- this .assistantColumn );
140-
141- Preconditions .checkState (tableMetadata .getColumn (this .userColumn ).isPresent (), "column %s does not exist" ,
142- this .userColumn );
167+ Preconditions .checkState (tableMetadata .getColumn (this .messagesColumn ).isPresent (), "column %s does not exist" ,
168+ this .messagesColumn );
143169 }
144170
145171 private void ensureTableExists () {
@@ -159,9 +185,11 @@ private void ensureTableExists() {
159185
160186 String lastClusteringColumn = this .schema .clusteringKeys .get (this .schema .clusteringKeys .size () - 1 ).name ();
161187
162- CreateTableWithOptions createTableWithOptions = createTable .withColumn (this .userColumn , DataTypes .TEXT )
188+ CreateTableWithOptions createTableWithOptions = createTable
189+ .withColumn (this .messagesColumn , DataTypes .frozenListOf (SchemaBuilder .udt (messageUDT , true )))
163190 .withClusteringOrder (lastClusteringColumn , ClusteringOrder .DESC )
164- // TODO replace w/ SchemaBuilder.unifiedCompactionStrategy() is available
191+ // TODO replace w/ SchemaBuilder.unifiedCompactionStrategy() when
192+ // available
165193 .withOption ("compaction" , Map .of ("class" , "UnifiedCompactionStrategy" ));
166194
167195 if (null != this .timeToLiveSeconds ) {
@@ -171,6 +199,18 @@ private void ensureTableExists() {
171199 }
172200 }
173201
202+ private void ensureMessageTypeExist () {
203+
204+ SimpleStatement stmt = SchemaBuilder .createType (messageUDT )
205+ .ifNotExists ()
206+ .withField (messageUdtTimestampColumn , DataTypes .TIMESTAMP )
207+ .withField (messageUdtTypeColumn , DataTypes .TEXT )
208+ .withField (messageUdtContentColumn , DataTypes .TEXT )
209+ .build ();
210+
211+ this .session .execute (stmt .setKeyspace (this .schema .keyspace ));
212+ }
213+
174214 private void ensureTableColumnsExist () {
175215
176216 TableMetadata tableMetadata = this .session .getMetadata ()
@@ -179,18 +219,12 @@ private void ensureTableColumnsExist() {
179219 .getTable (this .schema .table ())
180220 .get ();
181221
182- boolean addAssistantColumn = tableMetadata .getColumn (this .assistantColumn ).isEmpty ();
183- boolean addUserColumn = tableMetadata .getColumn (this .userColumn ).isEmpty ();
222+ if (tableMetadata .getColumn (this .messagesColumn ).isEmpty ()) {
223+
224+ SimpleStatement stmt = SchemaBuilder .alterTable (this .schema .keyspace (), this .schema .table ())
225+ .addColumn (this .messagesColumn , DataTypes .frozenListOf (SchemaBuilder .udt (messageUDT , true )))
226+ .build ();
184227
185- if (addAssistantColumn || addUserColumn ) {
186- AlterTableAddColumn alterTable = SchemaBuilder .alterTable (this .schema .keyspace (), this .schema .table ());
187- if (addAssistantColumn ) {
188- alterTable = alterTable .addColumn (this .assistantColumn , DataTypes .TEXT );
189- }
190- if (addUserColumn ) {
191- alterTable = alterTable .addColumn (this .userColumn , DataTypes .TEXT );
192- }
193- SimpleStatement stmt = ((AlterTableAddColumnEnd ) alterTable ).build ();
194228 logger .debug ("Executing {}" , stmt .getQuery ());
195229 this .session .execute (stmt );
196230 }
@@ -228,9 +262,7 @@ public static final class Builder {
228262 private List <SchemaColumn > clusteringKeys = List
229263 .of (new SchemaColumn (DEFAULT_EXCHANGE_ID_NAME , DataTypes .TIMESTAMP ));
230264
231- private String assistantColumn = DEFAULT_ASSISTANT_COLUMN_NAME ;
232-
233- private String userColumn = DEFAULT_USER_COLUMN_NAME ;
265+ private String messagesColumn = DEFAULT_MESSAGES_COLUMN_NAME ;
234266
235267 private Integer timeToLiveSeconds = null ;
236268
@@ -289,13 +321,8 @@ public Builder withClusteringKeys(List<SchemaColumn> clusteringKeys) {
289321 return this ;
290322 }
291323
292- public Builder withAssistantColumnName (String name ) {
293- this .assistantColumn = name ;
294- return this ;
295- }
296-
297- public Builder withUserColumnName (String name ) {
298- this .userColumn = name ;
324+ public Builder withMessagesColumnName (String name ) {
325+ this .messagesColumn = name ;
299326 return this ;
300327 }
301328
0 commit comments