Skip to content

Commit 2b01c6b

Browse files
authored
Merge pull request #16079 from cdapio/sidhdirenge-handle-missing-plugin
[CDAP-21224] Ignore PluginNotFoundException during AppSpec deserialization
2 parents 8e6f5a4 + b7fe2d1 commit 2b01c6b

File tree

9 files changed

+157
-18
lines changed

9 files changed

+157
-18
lines changed

cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/adapters/AppSpecDeserializationContext.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@
3434
import java.io.IOException;
3535
import java.util.ArrayList;
3636
import java.util.Collection;
37+
import java.util.Collections;
38+
import java.util.HashSet;
3739
import java.util.Optional;
40+
import java.util.Set;
3841
import java.util.concurrent.ExecutionException;
3942
import javax.annotation.Nonnull;
4043

@@ -49,8 +52,10 @@ public class AppSpecDeserializationContext {
4952
private static final Gson GSON = new Gson();
5053
private ArtifactId rootArtifact;
5154
private String namespace;
55+
private String appName;
5256
private final StructuredTable pluginDataTable;
5357
private final StructuredTable universalPluginDataTable;
58+
private Set<String> missingPlugins;
5459
private final LoadingCache<PluginKey, PluginClass> pluginCache;
5560

5661
/**
@@ -65,6 +70,7 @@ public AppSpecDeserializationContext(String namespace, StructuredTable pluginDat
6570
this.namespace = namespace;
6671
this.pluginDataTable = pluginDataTable;
6772
this.universalPluginDataTable = universalPluginDataTable;
73+
this.missingPlugins = new HashSet<>();
6874
this.pluginCache = CacheBuilder.newBuilder().maximumSize(10)
6975
.build(new CacheLoader<PluginKey, PluginClass>() {
7076
@Override
@@ -82,7 +88,7 @@ private PluginClass loadPlugin(PluginKey pluginKey) throws PluginNotExistsExcept
8288
row = fetchFromUniversalPluginDataTable(pluginKey);
8389
if (!row.isPresent()) {
8490
throw new PluginNotExistsException(
85-
new io.cdap.cdap.proto.id.ArtifactId(pluginKey.getParentNamespace(),
91+
new io.cdap.cdap.proto.id.ArtifactId(pluginKey.getArtifactNamespace(),
8692
pluginKey.getArtifactName(), pluginKey.getArtifactVersion()),
8793
pluginKey.getPluginType(), pluginKey.getPluginName());
8894
}
@@ -174,6 +180,36 @@ public void setRootArtifact(ArtifactId rootArtifact) {
174180
this.rootArtifact = rootArtifact;
175181
}
176182

183+
/**
184+
* Appends a plugin that could not be resolved during deserialization.
185+
*/
186+
public void appendMissingPlugin(String pluginInfo) {
187+
if (pluginInfo != null) {
188+
this.missingPlugins.add(pluginInfo);
189+
}
190+
}
191+
192+
/**
193+
* Returns a read-only view of the plugins that were missing during the deserialization process.
194+
*/
195+
public Set<String> getMissingPlugins() {
196+
return Collections.unmodifiableSet(this.missingPlugins);
197+
}
198+
199+
/**
200+
* Sets the application name.
201+
*/
202+
public void setAppName(String appName) {
203+
this.appName = appName;
204+
}
205+
206+
/**
207+
* Gets the application name, set externally.
208+
*/
209+
public String getAppName() {
210+
return appName;
211+
}
212+
177213
/**
178214
* Retrieves the {@link PluginClass} for the given key, using an internal cache.
179215
*

cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/adapters/ApplicationMetaAdapter.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@
2525
import io.cdap.cdap.internal.app.ApplicationSpecificationCodec;
2626
import io.cdap.cdap.internal.app.store.ApplicationMeta;
2727
import io.cdap.cdap.spi.data.StructuredTable;
28+
import java.util.Set;
2829
import javax.annotation.Nullable;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
2932

3033
/**
3134
* Manages Gson instances specifically configured for ApplicationMeta serialization and
@@ -38,6 +41,7 @@ public final class ApplicationMetaAdapter {
3841

3942
private static final Gson GSON_INSTANCE_REDUCTION_ENABLED = buildGsonInternal(true);
4043
private static final Gson GSON_INSTANCE_REDUCTION_DISABLED = buildGsonInternal(false);
44+
private static final Logger LOG = LoggerFactory.getLogger(ApplicationMetaAdapter.class);
4145

4246
private ApplicationMetaAdapter() {
4347
}
@@ -55,7 +59,13 @@ public static <T> T fromJson(String jsonString, Class<T> classOfT, String namesp
5559
AppSpecDeserializationContextHolder.setContext(operationContext);
5660
try {
5761
Gson gson = getGson(appSpecReductionEnabled);
58-
return gson.fromJson(jsonString, classOfT);
62+
T result = gson.fromJson(jsonString, classOfT);
63+
Set<String> missingPlugins = operationContext.getMissingPlugins();
64+
if (!missingPlugins.isEmpty()) {
65+
LOG.trace("For application {} : {}", operationContext.getAppName(),
66+
operationContext.getMissingPlugins());
67+
}
68+
return result;
5969
} finally {
6070
AppSpecDeserializationContextHolder.clearContext();
6171
}

cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/adapters/ApplicationMetaCodec.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ public ApplicationMeta deserialize(JsonElement json, Type typeOfT,
8787
"ArtifactId in ApplicationSpecification was null for app: " + id);
8888
}
8989
}
90+
91+
JsonElement appNameJson = specJson.get("name");
92+
appSpecDeserializationContext.setAppName(appNameJson == null ? null : appNameJson.getAsString());
93+
9094
ApplicationSpecification spec = context.deserialize(specJson, ApplicationSpecification.class);
9195
return new ApplicationMeta(id, spec, null, null);
9296
}

cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/adapters/PluginDeserializer.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,17 @@
3434
import java.lang.reflect.Type;
3535
import java.util.Collections;
3636
import java.util.List;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
3739

3840
/**
3941
* Gson deserializer for {@link Plugin}s. Uses {@link AppSpecDeserializationContext} to enrich
4042
* {@link PluginClass} data if initially minimal.
4143
*/
4244
public class PluginDeserializer implements JsonDeserializer<Plugin> {
4345

46+
private static final Logger LOG = LoggerFactory.getLogger(PluginDeserializer.class);
47+
4448
/**
4549
* Deserializes JSON to {@link Plugin}. If {@link PluginClass#getClassName()} is empty, attempts
4650
* to enrich it via {@link #resolvePluginClassData(List, ArtifactId, PluginClass)}.
@@ -105,8 +109,10 @@ private PluginClass resolvePluginClassData(List<ArtifactId> parents, ArtifactId
105109
exception = e;
106110
}
107111
}
108-
throw new RuntimeException(
109-
"No data found in plugin data table OR universal plugin data table for key: " + exception);
112+
if (exception != null) {
113+
appSpecDeserializationContext.appendMissingPlugin(exception.getMessage());
114+
}
115+
return pluginClass;
110116
}
111117

112118
/**

cdap-app-fabric/src/main/java/io/cdap/cdap/internal/provision/ProvisionerStore.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ List<ProvisioningTaskInfo> listTaskInfo() throws IOException {
7272
StructuredRow row = iterator.next();
7373
result.add(ProvisioningTaskInfoAdapter.fromJson(
7474
row.getString(StoreDefinition.ProvisionerStore.PROVISIONER_TASK_INFO_FIELD),
75-
context));
75+
row.getString(StoreDefinition.ProvisionerStore.NAMESPACE_FIELD), context));
7676
}
7777
}
7878
return result;
@@ -176,6 +176,9 @@ private ProvisioningTaskInfo fetchTaskInfo(StructuredTableContext context,
176176
createPrimaryKey(key.getProgramRunId(), key.getType()));
177177
String taskInfoJson = row.map(structuredRow -> structuredRow.getString(
178178
StoreDefinition.ProvisionerStore.PROVISIONER_TASK_INFO_FIELD)).orElse(null);
179-
return ProvisioningTaskInfoAdapter.fromJson(taskInfoJson, context);
179+
String namespace = row.map(
180+
structuredRow -> structuredRow.getString(StoreDefinition.ProvisionerStore.NAMESPACE_FIELD))
181+
.orElse(null);
182+
return ProvisioningTaskInfoAdapter.fromJson(taskInfoJson, namespace, context);
180183
}
181184
}

cdap-app-fabric/src/main/java/io/cdap/cdap/internal/provision/adapters/ProgramDescriptorDeserializer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ public ProgramDescriptor deserialize(JsonElement json, Type typeOfT,
6767
+ programId.getApplication());
6868
}
6969
}
70+
JsonElement appNameJson = specJson.get("name");
71+
appSpecDeserializationContext.setAppName(appNameJson != null ? appNameJson.getAsString() : null);
7072
ApplicationSpecification spec = context.deserialize(specJson, ApplicationSpecification.class);
7173
return new ProgramDescriptor(programId, spec);
7274
}

cdap-app-fabric/src/main/java/io/cdap/cdap/internal/provision/adapters/ProvisioningTaskInfoAdapter.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import io.cdap.cdap.spi.data.StructuredTableContext;
3939
import io.cdap.cdap.spi.data.TableNotFoundException;
4040
import io.cdap.cdap.store.StoreDefinition;
41+
import org.slf4j.Logger;
42+
import org.slf4j.LoggerFactory;
4143

4244
/**
4345
* Manages Gson instances specifically configured for {@link ProvisioningTaskInfo} serialization and
@@ -48,6 +50,7 @@
4850
*/
4951
public final class ProvisioningTaskInfoAdapter {
5052

53+
private static final Logger LOG = LoggerFactory.getLogger(ProvisioningTaskInfo.class);
5154
private static final Gson GSON_INSTANCE_REDUCTION_ENABLED = buildGsonInternal(true);
5255
private static final Gson GSON_INSTANCE_REDUCTION_DISABLED = buildGsonInternal(false);
5356

@@ -80,16 +83,23 @@ private static boolean isAppSpecReductionEnabled(StructuredTableContext context)
8083
* {@link AppSpecDeserializationContext} lifecycle via
8184
* {@link AppSpecDeserializationContextHolder}.
8285
*/
83-
public static ProvisioningTaskInfo fromJson(String jsonString, StructuredTableContext context) {
84-
boolean appSpecReductionEnabled = isAppSpecReductionEnabled(context);
85-
if (appSpecReductionEnabled) {
86-
AppSpecDeserializationContext operationContext = new AppSpecDeserializationContext(null,
87-
getPluginDataTable(context), getUniversalPluginDataTable(context));
88-
AppSpecDeserializationContextHolder.setContext(operationContext);
86+
public static ProvisioningTaskInfo fromJson(String jsonString, String namespace,
87+
StructuredTableContext context) {
88+
boolean reductionEnabled = isAppSpecReductionEnabled(context);
89+
AppSpecDeserializationContext opContext =
90+
reductionEnabled ? new AppSpecDeserializationContext(namespace, getPluginDataTable(context),
91+
getUniversalPluginDataTable(context)) : null;
92+
if (opContext != null) {
93+
AppSpecDeserializationContextHolder.setContext(opContext);
8994
}
9095
try {
91-
Gson gson = getGson(appSpecReductionEnabled);
92-
return gson.fromJson(jsonString, ProvisioningTaskInfo.class);
96+
Gson gson = getGson(reductionEnabled);
97+
ProvisioningTaskInfo taskInfo = gson.fromJson(jsonString, ProvisioningTaskInfo.class);
98+
if (opContext != null && !opContext.getMissingPlugins().isEmpty()) {
99+
LOG.trace("Missing plugins for application {}: {}", opContext.getAppName(),
100+
opContext.getMissingPlugins());
101+
}
102+
return taskInfo;
93103
} finally {
94104
AppSpecDeserializationContextHolder.clearContext();
95105
}

cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/AppMetadataStoreTest.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import java.util.LinkedHashMap;
6868
import java.util.List;
6969
import java.util.Map;
70+
import java.util.Objects;
7071
import java.util.Set;
7172
import java.util.TreeSet;
7273
import java.util.UUID;
@@ -1673,7 +1674,7 @@ public void testCreateAndGetApplication() {
16731674
ArtifactId artifactId = NamespaceId.DEFAULT.artifact("testArtifact", "1.0").toApiArtifactId();
16741675
ApplicationReference appRef = new ApplicationReference(NamespaceId.DEFAULT, appName);
16751676
ApplicationId appId = appRef.app(appName + "_version_" + 1);
1676-
ApplicationSpecification spec = createDummyAppSpecWithWorkflow(appId, artifactId);
1677+
ApplicationSpecification spec = createDummyAppSpecWithWorkflow(appId, artifactId, false);
16771678
ApplicationMeta meta = new ApplicationMeta(spec.getName(), spec,
16781679
new ChangeDetail(null, null, null,
16791680
creationTimeMillis + 1, true));
@@ -1698,6 +1699,33 @@ public void testCreateAndGetApplication() {
16981699
Assert.assertEquals(meta.toString(), gotMeta[0].toString());
16991700
}
17001701

1702+
@Test
1703+
public void testCreateAndGetApplicationWithMissingPlugins() {
1704+
String appName = "application1";
1705+
ArtifactId artifactId = NamespaceId.DEFAULT.artifact("testArtifact", "1.0").toApiArtifactId();
1706+
ApplicationReference appRef = new ApplicationReference(NamespaceId.DEFAULT, appName);
1707+
ApplicationId appId = appRef.app(appName + "_version_" + 1);
1708+
ApplicationSpecification spec = createDummyAppSpecWithWorkflow(appId, artifactId, true);
1709+
ApplicationMeta meta = new ApplicationMeta(spec.getName(), spec,
1710+
new ChangeDetail(null, null, null,
1711+
creationTimeMillis + 1, true));
1712+
1713+
// Deploy the first version
1714+
TransactionRunners.run(transactionRunner, context -> {
1715+
AppMetadataStore metaStore = AppMetadataStore.create(context);
1716+
StructuredTable pluginDataTable = metaStore.getPluginDataTable();
1717+
Plugins.addFormatTextPluginToTable(pluginDataTable);
1718+
metaStore.createLatestApplicationVersion(appId, meta);
1719+
});
1720+
1721+
// Verify latest version
1722+
TransactionRunners.run(transactionRunner, context -> {
1723+
AppMetadataStore metaStore = AppMetadataStore.create(context);
1724+
ApplicationMeta gotMeta = metaStore.getApplication(appId);
1725+
Assert.assertEquals(meta.toString(), Objects.requireNonNull(gotMeta).toString());
1726+
});
1727+
}
1728+
17011729
@Test
17021730
public void testDeleteCompletedRunsStartedBefore() throws Exception {
17031731
// Map an iterator to one of 15 different program+workflow permutations. Used to ensure
@@ -1839,8 +1867,10 @@ private ApplicationSpecification createDummyAppSpec(String appName, String appVe
18391867
}
18401868

18411869
private ApplicationSpecification createDummyAppSpecWithWorkflow(ApplicationId appId,
1842-
ArtifactId artifactId) {
1843-
Map<String, Plugin> plugins = Plugins.createDummyPlugins();
1870+
ArtifactId artifactId, boolean isAppSpecReductionEnabled) {
1871+
Map<String, Plugin> plugins =
1872+
isAppSpecReductionEnabled ? Plugins.createDummyPluginsForReducedAppSpec()
1873+
: Plugins.createDummyPlugins();
18441874
ImmutableList<WorkflowNode> nodes = ImmutableList.of(
18451875
new WorkflowActionNode("mr1",
18461876
new ScheduleProgramInfo(SchedulableProgramType.MAPREDUCE, "mr1")),
@@ -1858,7 +1888,7 @@ private ApplicationSpecification createDummyAppSpecWithWorkflow(ApplicationId ap
18581888
ImmutableMap.of(appId.workflow("wf1").getProgram(), wfSpec), Collections.emptyMap(),
18591889
Collections.emptyMap(),
18601890
Collections.emptyMap(),
1861-
Collections.emptyMap());
1891+
plugins);
18621892
}
18631893

18641894
private void runConcurrentOperation(String name, int numThreads, Runnable runnable) throws Exception {

cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/plugin/Plugins.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,44 @@ public static Map<String, Plugin> createDummyPlugins() {
9595
return plugins;
9696
}
9797

98+
public static Map<String, Plugin> createDummyPluginsForReducedAppSpec() {
99+
Map<String, Plugin> plugins = new HashMap<>();
100+
101+
Gson gson = new Gson();
102+
Map<String, String> childProperties = ImmutableMap.of("child1", "childVal1", "child2",
103+
"${secure(acc)}", "child3", "val3");
104+
Map<String, String> properties = ImmutableMap.of("key2", gson.toJson(childProperties), "key1",
105+
"val1");
106+
107+
PluginClass wranglerPluginClass = new PluginClass("transform", "Wrangler", null, null,
108+
Collections.emptyMap(), new Requirements(Collections.emptySet()), "");
109+
Plugin wranglerPlugin = new Plugin(Collections.emptyList(),
110+
NamespaceId.DEFAULT.artifact("wrangler-transform", "1.0").toApiArtifactId(),
111+
wranglerPluginClass, PluginProperties.builder().addAll(properties).build());
112+
plugins.put("p1", wranglerPlugin);
113+
114+
PluginClass directivePluginClass = new PluginClass("directive", "now", null, null,
115+
Collections.emptyMap(), new Requirements(Collections.emptySet()), "");
116+
ArtifactId parent = NamespaceId.DEFAULT.artifact("Wrangler", "1.0").toApiArtifactId();
117+
Plugin directivePlugin = new Plugin(Collections.singleton(parent),
118+
NamespaceId.DEFAULT.artifact("wrangler", "1.0").toApiArtifactId(), directivePluginClass,
119+
PluginProperties.builder().addAll(properties).build());
120+
plugins.put("p2", directivePlugin);
121+
122+
PluginClass gCloudFormatTextPluginClass = PluginClass.builder()
123+
.setClassName("io.cdap.plugin.format.text.input.TextInputFormatProvider").setName("text")
124+
.setRequirements(Requirements.EMPTY).setType("validatingInputFormat")
125+
.setDescription("Plugin for reading files in text format.").setConfigFieldName("conf")
126+
.build();
127+
parent = NamespaceId.DEFAULT.artifact("google-cloud", "1.0").toApiArtifactId();
128+
Plugin gCloudFormatTextPlugin = new Plugin(Collections.singleton(parent),
129+
NamespaceId.DEFAULT.artifact("format-text", "1.0").toApiArtifactId(),
130+
gCloudFormatTextPluginClass, PluginProperties.builder().addAll(properties).build());
131+
plugins.put("p3", gCloudFormatTextPlugin);
132+
133+
return plugins;
134+
}
135+
98136
private static void addPluginEntryToTable(StructuredTable pluginDataTable,
99137
String parentContextName, String pluginType, String pluginName, String artifactName,
100138
String pluginDataJson) throws IOException {

0 commit comments

Comments
 (0)