Skip to content

Commit 609bc28

Browse files
authored
Pipe: Fixed the hardlink bug of plugin meta (#16937)
1 parent e9c614e commit 609bc28

File tree

2 files changed

+37
-12
lines changed

2 files changed

+37
-12
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -215,16 +215,8 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan
215215
final String className = pipePluginMeta.getClassName();
216216
final String jarName = pipePluginMeta.getJarName();
217217

218-
// try to drop the old pipe plugin if exists to reduce the effect of the inconsistency
219-
dropPipePlugin(new DropPipePluginPlan(pluginName));
220-
221-
pipePluginMetaKeeper.addPipePluginMeta(pluginName, pipePluginMeta);
222-
pipePluginMetaKeeper.addJarNameAndMd5(jarName, pipePluginMeta.getJarMD5());
223-
224218
if (createPipePluginPlan.getJarFile() != null) {
225-
pipePluginExecutableManager.savePluginToInstallDir(
226-
ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()), pluginName, jarName);
227-
computeFromPluginClass(pluginName, className);
219+
savePipePluginWithRollback(createPipePluginPlan);
228220
} else {
229221
final String existed = pipePluginMetaKeeper.getPluginNameByJarName(jarName);
230222
if (Objects.nonNull(existed)) {
@@ -238,6 +230,12 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan
238230
}
239231
}
240232

233+
// try to drop the old pipe plugin if exists to reduce the effect of the inconsistency
234+
dropPipePlugin(new DropPipePluginPlan(pluginName));
235+
236+
pipePluginMetaKeeper.addPipePluginMeta(pluginName, pipePluginMeta);
237+
pipePluginMetaKeeper.addJarNameAndMd5(jarName, pipePluginMeta.getJarMD5());
238+
241239
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
242240
} catch (final Exception e) {
243241
final String errorMessage =
@@ -250,6 +248,23 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan
250248
}
251249
}
252250

251+
private void savePipePluginWithRollback(final CreatePipePluginPlan createPipePluginPlan)
252+
throws Exception {
253+
final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta();
254+
final String pluginName = pipePluginMeta.getPluginName();
255+
final String className = pipePluginMeta.getClassName();
256+
final String jarName = pipePluginMeta.getJarName();
257+
try {
258+
pipePluginExecutableManager.savePluginToInstallDir(
259+
ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()), pluginName, jarName);
260+
computeFromPluginClass(pluginName, className);
261+
} catch (final Exception e) {
262+
// We need to rollback if the creation has failed
263+
pipePluginExecutableManager.removePluginFileUnderLibRoot(pluginName, jarName);
264+
throw e;
265+
}
266+
}
267+
253268
private void computeFromPluginClass(final String pluginName, final String className)
254269
throws Exception {
255270
final String pluginDirPath = pipePluginExecutableManager.getPluginsDirPath(pluginName);

iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
3232
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
3333
import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
34+
import org.apache.iotdb.pipe.api.exception.PipeException;
35+
import org.apache.iotdb.rpc.TSStatusCode;
3436

3537
import org.apache.thrift.TException;
3638
import org.apache.tsfile.common.conf.TSFileConfig;
@@ -149,10 +151,18 @@ public void testManagement() {
149151
new CreatePipePluginPlan(
150152
new PipePluginMeta(pluginName, "org.apache.iotdb.TestJar", false, "test.jar", "???"),
151153
new Binary("123", TSFileConfig.STRING_CHARSET));
152-
pipeInfo.getPipePluginInfo().createPipePlugin(createPipePluginPlan);
153154

154-
// Drop pipe plugin test plugin
155-
pipeInfo.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName, false);
155+
// Shall fail due to validation
156+
Assert.assertEquals(
157+
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
158+
pipeInfo.getPipePluginInfo().createPipePlugin(createPipePluginPlan).getCode());
159+
160+
// Drop pipe plugin test plugin, validation failure
161+
Assert.assertThrows(
162+
PipeException.class,
163+
() -> pipeInfo.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName, false));
164+
165+
// Idempotent
156166
DropPipePluginPlan dropPipePluginPlan = new DropPipePluginPlan(pluginName);
157167
pipeInfo.getPipePluginInfo().dropPipePlugin(dropPipePluginPlan);
158168
}

0 commit comments

Comments
 (0)