Skip to content

Commit 062af7c

Browse files
wenyanshi-123CRZbulabula
authored andcommitted
Fix compatibility issues when loading snapshot in pipe. (#16580)
1 parent ed2b216 commit 062af7c

File tree

4 files changed

+22
-20
lines changed

4 files changed

+22
-20
lines changed

integration-test/src/test/java/org/apache/iotdb/ainode/it/AINodeConcurrentInferenceIT.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public static void tearDown() throws Exception {
6565

6666
private static void prepareDataForTreeModel() throws SQLException {
6767
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT);
68-
Statement statement = connection.createStatement()) {
68+
Statement statement = connection.createStatement()) {
6969
statement.execute("CREATE DATABASE root.AI");
7070
statement.execute("CREATE TIMESERIES root.AI.s WITH DATATYPE=DOUBLE, ENCODING=RLE");
7171
for (int i = 0; i < 2880; i++) {
@@ -79,7 +79,7 @@ private static void prepareDataForTreeModel() throws SQLException {
7979

8080
private static void prepareDataForTableModel() throws SQLException {
8181
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
82-
Statement statement = connection.createStatement()) {
82+
Statement statement = connection.createStatement()) {
8383
statement.execute("CREATE DATABASE root");
8484
statement.execute("CREATE TABLE root.AI (s DOUBLE FIELD)");
8585
for (int i = 0; i < 2880; i++) {
@@ -99,7 +99,7 @@ public void concurrentGPUCallInferenceTest() throws SQLException, InterruptedExc
9999
private void concurrentGPUCallInferenceTest(String modelId)
100100
throws SQLException, InterruptedException {
101101
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT);
102-
Statement statement = connection.createStatement()) {
102+
Statement statement = connection.createStatement()) {
103103
final int threadCnt = 10;
104104
final int loop = 100;
105105
final int predictLength = 512;
@@ -134,7 +134,7 @@ public void concurrentGPUForecastTest() throws SQLException, InterruptedExceptio
134134
public void concurrentGPUForecastTest(String modelId, String selectSql)
135135
throws SQLException, InterruptedException {
136136
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
137-
Statement statement = connection.createStatement()) {
137+
Statement statement = connection.createStatement()) {
138138
final int threadCnt = 10;
139139
final int loop = 100;
140140
final int predictLength = 512;
@@ -164,7 +164,7 @@ private void checkModelOnSpecifiedDevice(Statement statement, String modelId, St
164164
for (int retry = 0; retry < 200; retry++) {
165165
Set<String> foundDevices = new HashSet<>();
166166
try (final ResultSet resultSet =
167-
statement.executeQuery(String.format("SHOW LOADED MODELS '%s'", device))) {
167+
statement.executeQuery(String.format("SHOW LOADED MODELS '%s'", device))) {
168168
while (resultSet.next()) {
169169
String deviceId = resultSet.getString("DeviceId");
170170
String loadedModelId = resultSet.getString("ModelId");

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/ConfigRegionListeningQueue.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -157,15 +157,21 @@ public synchronized void tryListenToSnapshots(
157157
: null,
158158
snapshotPathInfo.getRight());
159159
if (type == CNSnapshotFileType.USER_ROLE) {
160-
long userId = Long.parseLong(snapshotPath.toFile().getName().split("_")[0]);
161-
try {
162-
curEvent.setAuthUserName(
163-
ConfigNode.getInstance()
164-
.getConfigManager()
165-
.getPermissionManager()
166-
.getUserName(userId));
167-
} catch (AuthException e) {
168-
LOGGER.warn("Failed to collect user name for user id {}", userId, e);
160+
String userName = snapshotPath.toFile().getName().split("_")[0];
161+
long userId;
162+
if (userName.matches("\\d+")) {
163+
userId = Long.parseLong(userName);
164+
try {
165+
curEvent.setAuthUserName(
166+
ConfigNode.getInstance()
167+
.getConfigManager()
168+
.getPermissionManager()
169+
.getUserName(userId));
170+
} catch (AuthException e) {
171+
LOGGER.warn("Failed to collect user name for user id {}", userId, e);
172+
}
173+
} else {
174+
curEvent.setAuthUserName(userName);
169175
}
170176
}
171177
events.add(curEvent);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/udf/UDTFForecast.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,8 @@ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurati
125125
Arrays.stream(
126126
parameters.getStringOrDefault(OPTIONS_PARAMETER_NAME, DEFAULT_OPTIONS).split(","))
127127
.map(s -> s.split("="))
128-
.filter(arr -> arr.length == 2 && !arr[0].isEmpty()) // 防御性检查
129-
.collect(
130-
Collectors.toMap(
131-
arr -> arr[0].trim(), arr -> arr[1].trim(), (v1, v2) -> v2 // 如果 key 重复,保留后一个
132-
));
128+
.filter(arr -> arr.length == 2 && !arr[0].isEmpty())
129+
.collect(Collectors.toMap(arr -> arr[0].trim(), arr -> arr[1].trim(), (v1, v2) -> v2));
133130
this.inputRows = new LinkedList<>();
134131
List<TSDataType> tsDataTypeList = new ArrayList<>(this.types.size() - 1);
135132
for (int i = 0; i < this.types.size(); i++) {

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/auth/user/BasicUserManager.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141

4242
import static org.apache.iotdb.commons.auth.entity.User.INTERNAL_USER_END_ID;
4343

44-
4544
/** This class stores information of each user. */
4645
public abstract class BasicUserManager extends BasicRoleManager {
4746

0 commit comments

Comments
 (0)