Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,16 +215,16 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan
final String className = pipePluginMeta.getClassName();
final String jarName = pipePluginMeta.getJarName();

// try to drop the old pipe plugin if exists to reduce the effect of the inconsistency
dropPipePlugin(new DropPipePluginPlan(pluginName));

pipePluginMetaKeeper.addPipePluginMeta(pluginName, pipePluginMeta);
pipePluginMetaKeeper.addJarNameAndMd5(jarName, pipePluginMeta.getJarMD5());

if (createPipePluginPlan.getJarFile() != null) {
pipePluginExecutableManager.savePluginToInstallDir(
ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()), pluginName, jarName);
computeFromPluginClass(pluginName, className);
try {
pipePluginExecutableManager.savePluginToInstallDir(
ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()), pluginName, jarName);
computeFromPluginClass(pluginName, className);
} catch (final Exception e) {
// We need to rollback if the creation has failed
pipePluginExecutableManager.removePluginFileUnderLibRoot(pluginName, jarName);
throw e;
}
} else {
final String existed = pipePluginMetaKeeper.getPluginNameByJarName(jarName);
if (Objects.nonNull(existed)) {
Expand All @@ -238,6 +238,12 @@ public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan
}
}

// try to drop the old pipe plugin if exists to reduce the effect of the inconsistency
dropPipePlugin(new DropPipePluginPlan(pluginName));

pipePluginMetaKeeper.addPipePluginMeta(pluginName, pipePluginMeta);
pipePluginMetaKeeper.addJarNameAndMd5(jarName, pipePluginMeta.getJarMD5());

return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (final Exception e) {
final String errorMessage =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
import org.apache.iotdb.confignode.persistence.pipe.PipeInfo;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.thrift.TException;
import org.apache.tsfile.common.conf.TSFileConfig;
Expand Down Expand Up @@ -149,10 +151,18 @@ public void testManagement() {
new CreatePipePluginPlan(
new PipePluginMeta(pluginName, "org.apache.iotdb.TestJar", false, "test.jar", "???"),
new Binary("123", TSFileConfig.STRING_CHARSET));
pipeInfo.getPipePluginInfo().createPipePlugin(createPipePluginPlan);

// Drop pipe plugin test plugin
pipeInfo.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName, false);
// Shall fail due to validation
Assert.assertEquals(
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(),
pipeInfo.getPipePluginInfo().createPipePlugin(createPipePluginPlan).getCode());

// Drop pipe plugin test plugin, validation failure
Assert.assertThrows(
PipeException.class,
() -> pipeInfo.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName, false));

// Idempotent
DropPipePluginPlan dropPipePluginPlan = new DropPipePluginPlan(pluginName);
pipeInfo.getPipePluginInfo().dropPipePlugin(dropPipePluginPlan);
}
Expand Down
Loading