Skip to content

Commit a03939d

Browse files
committed
add fts case
1 parent 78f5292 commit a03939d

File tree

1 file changed

+270
-4
lines changed

1 file changed

+270
-4
lines changed

src/test/java/com/alipay/oceanbase/rpc/ObTableFullTextIndexTest.java

Lines changed: 270 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,12 @@
1818
import org.junit.Test;
1919

2020
import java.nio.ByteBuffer;
21-
import java.sql.Connection;
22-
import java.sql.SQLException;
23-
import java.sql.Statement;
24-
import java.sql.Timestamp;
21+
import java.sql.*;
22+
import java.util.ArrayList;
2523
import java.util.List;
2624
import java.util.Locale;
2725
import java.util.Map;
26+
import java.util.concurrent.CountDownLatch;
2827

2928
import static com.alipay.oceanbase.rpc.mutation.MutationFactory.colVal;
3029
import static com.alipay.oceanbase.rpc.mutation.MutationFactory.row;
@@ -602,6 +601,12 @@ private void executeSQL(String createSQL) throws SQLException {
602601
statement.execute(createSQL);
603602
}
604603

604+
private ResultSet executeQuery(String sql) throws SQLException {
605+
Connection connection = ObTableClientTestUtil.getConnection();
606+
Statement statement = connection.createStatement();
607+
return statement.executeQuery(sql);
608+
}
609+
605610
@Test
606611
public void testQueryWithLimitOffset() throws Exception {
607612
try {
@@ -758,4 +763,265 @@ public void testBatch() throws Exception {
758763
executeSQL(truncatePartTableSQL);
759764
}
760765
}
766+
767+
@Test
768+
public void testConcurrentMixedBatch() throws Exception {
769+
try {
770+
executeSQL(truncatePartTableSQL);
771+
772+
int threadCount = 5;
773+
int batchSize = 6;
774+
CountDownLatch latch = new CountDownLatch(threadCount);
775+
Exception[] threadExceptions = new Exception[threadCount];
776+
777+
// 创建线程并发执行
778+
Thread[] threads = new Thread[threadCount];
779+
for (int t = 0; t < threadCount; t++) {
780+
final int threadId = t;
781+
threads[t] = new Thread(() -> {
782+
try {
783+
// 第一步:每个线程先插入自己的初始数据
784+
int baseId = threadId * batchSize;
785+
BatchOperation initBatch = client.batchOperation(partTableName);
786+
for (int i = 0; i < batchSize; i++) {
787+
int id = baseId + i;
788+
Insert insert = new Insert();
789+
insert.setRowKey(row(colVal(idCol, id)))
790+
.addMutateRow(row(
791+
colVal(c2Col, id),
792+
colVal(txtCol, "Initial text " + id)
793+
));
794+
initBatch.addOperation(insert);
795+
}
796+
BatchOperationResult initRes = initBatch.execute();
797+
Assert.assertEquals(batchSize, initRes.size());
798+
799+
// 第二步:执行混合batch操作
800+
BatchOperation mixedBatch = client.batchOperation(partTableName);
801+
802+
// 1. InsertOrUpdate
803+
InsertOrUpdate insup = new InsertOrUpdate();
804+
insup.setRowKey(row(colVal(idCol, baseId)))
805+
.addMutateRow(row(
806+
colVal(c2Col, baseId + 100),
807+
colVal(txtCol, "Updated by insertOrUpdate " + baseId)
808+
));
809+
mixedBatch.addOperation(insup);
810+
811+
// 2. Update
812+
Update upd = new Update();
813+
upd.setRowKey(row(colVal(idCol, baseId + 1)))
814+
.addMutateRow(row(
815+
colVal(c2Col, baseId + 101),
816+
colVal(txtCol, "Updated by update " + (baseId + 1))
817+
));
818+
mixedBatch.addOperation(upd);
819+
820+
// 3. Replace
821+
Replace replace = new Replace();
822+
replace.setRowKey(row(colVal(idCol, baseId + 2)))
823+
.addMutateRow(row(
824+
colVal(c2Col, baseId + 102),
825+
colVal(txtCol, "Updated by replace " + (baseId + 2))
826+
));
827+
mixedBatch.addOperation(replace);
828+
829+
// 4. Increment
830+
Increment increment = new Increment();
831+
increment.setRowKey(row(colVal(idCol, baseId + 3)))
832+
.addMutateRow(row(colVal(c2Col, 100)));
833+
mixedBatch.addOperation(increment);
834+
835+
// 5. Append
836+
Append append = new Append();
837+
append.setRowKey(row(colVal(idCol, baseId + 4)))
838+
.addMutateRow(row(
839+
colVal(txtCol, " Appended text by thread " + threadId)
840+
));
841+
mixedBatch.addOperation(append);
842+
843+
// 执行混合batch操作
844+
BatchOperationResult mixedRes = mixedBatch.execute();
845+
Assert.assertEquals(5, mixedRes.size());
846+
847+
// 验证结果
848+
BatchOperation getBatch = client.batchOperation(partTableName);
849+
for (int i = 0; i < batchSize - 1; i++) { // 最后一行不验证
850+
int id = baseId + i;
851+
TableQuery query = client.query(partTableName)
852+
.setRowKey(row(colVal(idCol, id)));
853+
getBatch.addOperation(query);
854+
}
855+
BatchOperationResult getRes = getBatch.execute();
856+
Assert.assertEquals(batchSize - 1, getRes.size());
857+
858+
// 验证每种操作的结果
859+
for (int i = 0; i < getRes.size(); i++) {
860+
Row row = getRes.get(i).getOperationRow();
861+
int id = baseId + i;
862+
Assert.assertEquals(id, row.get(idCol));
863+
864+
switch (i) {
865+
case 0: // InsertOrUpdate
866+
Assert.assertEquals(baseId + 100, row.get(c2Col));
867+
Assert.assertEquals("Updated by insertOrUpdate " + id, row.get(txtCol));
868+
break;
869+
case 1: // Update
870+
Assert.assertEquals(baseId + 101, row.get(c2Col));
871+
Assert.assertEquals("Updated by update " + id, row.get(txtCol));
872+
break;
873+
case 2: // Replace
874+
Assert.assertEquals(baseId + 102, row.get(c2Col));
875+
Assert.assertEquals("Updated by replace " + id, row.get(txtCol));
876+
break;
877+
case 3: // Increment
878+
Assert.assertEquals(id + 100, row.get(c2Col));
879+
Assert.assertEquals("Initial text " + id, row.get(txtCol));
880+
break;
881+
case 4: // Append
882+
Assert.assertEquals(id, row.get(c2Col));
883+
Assert.assertEquals("Initial text " + id + " Appended text by thread " + threadId,
884+
row.get(txtCol));
885+
break;
886+
}
887+
}
888+
889+
} catch (Exception e) {
890+
threadExceptions[threadId] = e;
891+
} finally {
892+
latch.countDown();
893+
}
894+
});
895+
threads[t].start();
896+
}
897+
898+
// 等待所有线程完成
899+
latch.await();
900+
901+
// 检查线程异常
902+
for (int t = 0; t < threadCount; t++) {
903+
if (threadExceptions[t] != null) {
904+
throw threadExceptions[t];
905+
}
906+
}
907+
908+
// 验证最终的总行数
909+
QueryResultSet countResult = client.query(partTableName).execute();
910+
int totalRows = 0;
911+
while (countResult.next()) {
912+
totalRows++;
913+
}
914+
Assert.assertEquals(threadCount * batchSize, totalRows);
915+
916+
} catch (Exception e) {
917+
e.printStackTrace();
918+
Assert.fail();
919+
} finally {
920+
executeSQL(truncatePartTableSQL);
921+
}
922+
}
923+
924+
@Test
925+
public void testTTLExpiration() throws Exception {
926+
try {
927+
executeSQL(truncateTTLTableSQL);
928+
executeSQL("ALTER SYSTEM SET enable_kv_ttl = true");
929+
930+
int batchSize = 100;
931+
int baseId = 1000;
932+
List<Integer> expiredIds = new ArrayList<>();
933+
List<Integer> activeIds = new ArrayList<>();
934+
935+
// Insert expired data
936+
Timestamp expiredTs = new Timestamp(System.currentTimeMillis() - 20000); // 20 seconds ago
937+
BatchOperation expiredBatch = client.batchOperation(ttlTableName);
938+
for (int i = 0; i < batchSize; i++) {
939+
int id = baseId + i;
940+
expiredIds.add(id);
941+
Insert insert = new Insert();
942+
insert.setRowKey(row(colVal(idCol, id)))
943+
.addMutateRow(row(
944+
colVal(expireTsCol, expiredTs),
945+
colVal(txtCol, "Expired text " + id)
946+
));
947+
expiredBatch.addOperation(insert);
948+
}
949+
BatchOperationResult expiredRes = expiredBatch.execute();
950+
Assert.assertEquals(batchSize, expiredRes.size());
951+
952+
// Insert non-expired data
953+
Timestamp activeTs = new Timestamp(System.currentTimeMillis() + 30000); // 30 seconds in future
954+
BatchOperation activeBatch = client.batchOperation(ttlTableName);
955+
for (int i = 0; i < batchSize; i++) {
956+
int id = baseId + batchSize + i;
957+
activeIds.add(id);
958+
Insert insert = new Insert();
959+
insert.setRowKey(row(colVal(idCol, id)))
960+
.addMutateRow(row(
961+
colVal(expireTsCol, activeTs),
962+
colVal(txtCol, "Active text " + id)
963+
));
964+
activeBatch.addOperation(insert);
965+
}
966+
BatchOperationResult activeRes = activeBatch.execute();
967+
Assert.assertEquals(batchSize, activeRes.size());
968+
969+
// Verify data after TTL task completion
970+
for (Integer id : expiredIds) {
971+
Map<String, Object> getRes = client.get(ttlTableName, new Object[] { id }, null);
972+
Assert.assertTrue("Expired row " + id + " should be deleted", getRes.isEmpty());
973+
}
974+
975+
for (Integer id : activeIds) {
976+
Map<String, Object> getRes = client.get(ttlTableName, new Object[] { id }, null);
977+
Assert.assertFalse("Non-expired row " + id + " should exist", getRes.isEmpty());
978+
Assert.assertEquals("Active text " + id, getRes.get(txtCol));
979+
Assert.assertEquals(activeTs, getRes.get(expireTsCol));
980+
}
981+
// Trigger TTL cleanup
982+
executeSQL("ALTER SYSTEM trigger ttl");
983+
984+
// Wait for TTL task to complete
985+
boolean taskCompleted = false;
986+
int maxRetries = 30;
987+
int retryCount = 0;
988+
989+
while (!taskCompleted && retryCount < maxRetries) {
990+
ResultSet rs = executeQuery("select * from oceanbase.dba_ob_kv_ttl_tasks where TABLE_NAME = '\" + ttlTableName + \"'");
991+
boolean hasRunningTask = false;
992+
993+
System.out.println("\nTTL tasks status check #" + (retryCount + 1) + ":");
994+
while (rs.next()) {
995+
String status = rs.getString("STATUS");
996+
System.out.println("Task ID: " + rs.getLong("TASK_ID") +
997+
", Status: " + status +
998+
", Table: " + rs.getString("TABLE_NAME"));
999+
1000+
if (!"FINISHED".equalsIgnoreCase(status)) {
1001+
hasRunningTask = true;
1002+
}
1003+
}
1004+
1005+
if (!hasRunningTask) {
1006+
taskCompleted = true;
1007+
} else {
1008+
Thread.sleep(2000);
1009+
retryCount++;
1010+
}
1011+
}
1012+
1013+
if (!taskCompleted) {
1014+
System.err.println("Warning: TTL tasks did not complete within expected time");
1015+
Assert.fail();
1016+
}
1017+
1018+
} catch (Exception e) {
1019+
e.printStackTrace();
1020+
Assert.fail();
1021+
} finally {
1022+
// Only disable TTL and truncate table after tasks complete
1023+
executeSQL("ALTER SYSTEM SET enable_kv_ttl = false");
1024+
executeSQL(truncateTTLTableSQL);
1025+
}
1026+
}
7611027
}

0 commit comments

Comments
 (0)