|
19 | 19 |
|
20 | 20 | package org.apache.iotdb.db.it; |
21 | 21 |
|
| 22 | +import org.apache.iotdb.isession.ISession; |
| 23 | +import org.apache.iotdb.isession.SessionDataSet; |
22 | 24 | import org.apache.iotdb.it.env.EnvFactory; |
23 | 25 | import org.apache.iotdb.it.framework.IoTDBTestRunner; |
24 | 26 | import org.apache.iotdb.itbase.category.ClusterIT; |
25 | 27 | import org.apache.iotdb.itbase.category.LocalStandaloneIT; |
| 28 | +import org.apache.iotdb.rpc.IoTDBConnectionException; |
| 29 | +import org.apache.iotdb.rpc.StatementExecutionException; |
26 | 30 |
|
| 31 | +import org.apache.tsfile.enums.TSDataType; |
| 32 | +import org.apache.tsfile.write.record.Tablet; |
| 33 | +import org.apache.tsfile.write.schema.IMeasurementSchema; |
| 34 | +import org.apache.tsfile.write.schema.MeasurementSchema; |
27 | 35 | import org.junit.AfterClass; |
| 36 | +import org.junit.Assert; |
28 | 37 | import org.junit.BeforeClass; |
29 | 38 | import org.junit.Test; |
30 | 39 | import org.junit.experimental.categories.Category; |
|
34 | 43 | import java.sql.ResultSet; |
35 | 44 | import java.sql.SQLException; |
36 | 45 | import java.sql.Statement; |
| 46 | +import java.util.Collections; |
| 47 | +import java.util.List; |
37 | 48 | import java.util.Locale; |
38 | 49 |
|
39 | 50 | import static org.junit.Assert.assertEquals; |
@@ -181,4 +192,48 @@ public void testFlushNotExistGroupNoData() { |
181 | 192 | fail(e.getMessage()); |
182 | 193 | } |
183 | 194 | } |
| 195 | + |
| 196 | + @Test |
| 197 | + public void testStreamingQueryMemTableWithOverlappedData() |
| 198 | + throws IoTDBConnectionException, StatementExecutionException { |
| 199 | + String device = "root.stream.d1"; |
| 200 | + try (ISession session = EnvFactory.getEnv().getSessionConnection()) { |
| 201 | + session.open(); |
| 202 | + generateTimeRangeWithTimestamp(session, device, 1, 10); |
| 203 | + |
| 204 | + generateTimeRangeWithTimestamp(session, device, 500000, 520000); |
| 205 | + session.executeNonQueryStatement("flush"); |
| 206 | + generateTimeRangeWithTimestamp(session, device, 100000, 350000); |
| 207 | + |
| 208 | + SessionDataSet sessionDataSet = |
| 209 | + session.executeQueryStatement("select count(*) from root.stream.d1"); |
| 210 | + SessionDataSet.DataIterator iterator = sessionDataSet.iterator(); |
| 211 | + long count = 0; |
| 212 | + while (iterator.next()) { |
| 213 | + count = iterator.getLong(1); |
| 214 | + } |
| 215 | + Assert.assertEquals(10 + 20001 + 250001, count); |
| 216 | + } |
| 217 | + } |
| 218 | + |
| 219 | + private static void generateTimeRangeWithTimestamp( |
| 220 | + ISession session, String device, long start, long end) |
| 221 | + throws IoTDBConnectionException, StatementExecutionException { |
| 222 | + List<IMeasurementSchema> measurementSchemas = |
| 223 | + Collections.singletonList(new MeasurementSchema("s1", TSDataType.INT64)); |
| 224 | + Tablet tablet = new Tablet(device, measurementSchemas); |
| 225 | + for (long currentTime = start; currentTime <= end; currentTime++) { |
| 226 | + int rowIndex = tablet.getRowSize(); |
| 227 | + if (rowIndex == tablet.getMaxRowNumber()) { |
| 228 | + session.insertTablet(tablet); |
| 229 | + tablet.reset(); |
| 230 | + rowIndex = 0; |
| 231 | + } |
| 232 | + tablet.addTimestamp(rowIndex, currentTime); |
| 233 | + tablet.addValue(rowIndex, 0, currentTime); |
| 234 | + } |
| 235 | + if (tablet.getRowSize() > 0) { |
| 236 | + session.insertTablet(tablet); |
| 237 | + } |
| 238 | + } |
184 | 239 | } |
0 commit comments