Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
<aws.sdk.version>1.11.133</aws.sdk.version>
<bigquery.connector.hadoop2.version>0.10.2-hadoop2</bigquery.connector.hadoop2.version>
<bouncycastle.version>1.56</bouncycastle.version>
<cdap.version>6.11.0</cdap.version>
<cdap.version>6.11.1-SNAPSHOT</cdap.version>
<chlorine.version>1.1.5</chlorine.version>
<commons.validator.version>1.6</commons.validator.version>
<commons-io.version>2.5</commons-io.version>
Expand Down
5 changes: 5 additions & 0 deletions wrangler-proto/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@
<artifactId>cdap-etl-api</artifactId>
<version>${cdap.version}</version>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-proto</artifactId>
<version>${cdap.version}</version>
</dependency>
<dependency>
<groupId>io.cdap.wrangler</groupId>
<artifactId>wrangler-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.cdap.wrangler.proto.id;

import io.cdap.cdap.proto.id.SystemAppEntityId;

/**
* Uniquely identifies a dataprep.workspace entity
*/
public class WorkspaceEntityId extends SystemAppEntityId {
public WorkspaceEntityId(String namespace, String workspaceId) {
super(namespace, "dataprep", "workspace", workspaceId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@
import io.cdap.cdap.etl.proto.connection.SampleResponse;
import io.cdap.cdap.features.Feature;
import io.cdap.cdap.internal.io.SchemaTypeAdapter;
import io.cdap.cdap.proto.element.EntityType;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.security.StandardPermission;
import io.cdap.cdap.security.spi.authorization.ContextAccessEnforcer;
import io.cdap.wrangler.PropertyIds;
import io.cdap.wrangler.RequestExtractor;
import io.cdap.wrangler.api.DirectiveConfig;
Expand All @@ -58,6 +61,7 @@
import io.cdap.wrangler.parser.MigrateToV2;
import io.cdap.wrangler.parser.RecipeCompiler;
import io.cdap.wrangler.proto.BadRequestException;
import io.cdap.wrangler.proto.id.WorkspaceEntityId;
import io.cdap.wrangler.proto.recipe.v2.Recipe;
import io.cdap.wrangler.proto.recipe.v2.RecipeId;
import io.cdap.wrangler.proto.workspace.v2.Artifact;
Expand Down Expand Up @@ -85,6 +89,8 @@
import io.cdap.wrangler.utils.SchemaConverter;
import io.cdap.wrangler.utils.StructuredToRowTransformer;
import org.apache.commons.lang3.StringEscapeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.HttpURLConnection;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -121,6 +127,8 @@ public class WorkspaceHandler extends AbstractDirectiveHandler {
private WorkspaceStore wsStore;
private RecipeStore recipeStore;
private ConnectionDiscoverer discoverer;
private ContextAccessEnforcer contextAccessEnforcer;
private boolean authEnforcementEnabled;

// Injected by CDAP
@SuppressWarnings("unused")
Expand All @@ -132,6 +140,8 @@ public void initialize(SystemHttpServiceContext context) throws Exception {
wsStore = new WorkspaceStore(context);
recipeStore = new RecipeStore(context);
discoverer = new ConnectionDiscoverer(context);
contextAccessEnforcer = context.getContextAccessEnforcer();
authEnforcementEnabled = Feature.WRANGLER_WORKSPACE_AUTH_CHECK.isEnabled(context);
}

@POST
Expand All @@ -144,6 +154,11 @@ public void createWorkspace(HttpServiceRequest request, HttpServiceResponder res
throw new BadRequestException("Creating workspace in system namespace is currently not supported");
}

WorkspaceId wsId = new WorkspaceId(ns);
if (authEnforcementEnabled) {
contextAccessEnforcer.enforce(new WorkspaceEntityId(wsId.getNamespace().getName(), wsId.getWorkspaceId()),
StandardPermission.CREATE);
}
WorkspaceCreationRequest creationRequest =
GSON.fromJson(StandardCharsets.UTF_8.decode(request.getContent()).toString(), WorkspaceCreationRequest.class);

Expand Down Expand Up @@ -171,7 +186,6 @@ public void createWorkspace(HttpServiceRequest request, HttpServiceResponder res
return new StageSpec(plugin.getSchema(), pluginSpec);
}).collect(Collectors.toSet()), detail.getSupportedSampleTypes(), sampleRequest);

WorkspaceId wsId = new WorkspaceId(ns);
long now = System.currentTimeMillis();
Workspace workspace = Workspace.builder(generateWorkspaceName(wsId, creationRequest.getSampleRequest().getPath()),
wsId.getWorkspaceId())
Expand All @@ -190,6 +204,10 @@ public void listWorkspaces(HttpServiceRequest request, HttpServiceResponder resp
if (ns.getName().equalsIgnoreCase(NamespaceId.SYSTEM.getNamespace())) {
throw new BadRequestException("Listing workspaces in system namespace is currently not supported");
}
if (authEnforcementEnabled) {
contextAccessEnforcer.enforceOnParent(EntityType.SYSTEM_APP_ENTITY, new NamespaceId(ns.getName()),
StandardPermission.LIST);
}
responder.sendString(GSON.toJson(new ServiceResponse<>(wsStore.listWorkspaces(ns))));
});
}
Expand All @@ -204,7 +222,12 @@ public void getWorkspace(HttpServiceRequest request, HttpServiceResponder respon
if (ns.getName().equalsIgnoreCase(NamespaceId.SYSTEM.getNamespace())) {
throw new BadRequestException("Getting workspace in system namespace is currently not supported");
}
responder.sendString(GSON.toJson(wsStore.getWorkspace(new WorkspaceId(ns, workspaceId))));
WorkspaceId wsId = new WorkspaceId(ns, workspaceId);
if (authEnforcementEnabled) {
contextAccessEnforcer.enforce(new WorkspaceEntityId(wsId.getNamespace().getName(), wsId.getWorkspaceId()),
StandardPermission.GET);
}
responder.sendString(GSON.toJson(wsStore.getWorkspace(wsId)));
});
}

Expand All @@ -221,11 +244,14 @@ public void updateWorkspace(HttpServiceRequest request, HttpServiceResponder res
if (ns.getName().equalsIgnoreCase(NamespaceId.SYSTEM.getNamespace())) {
throw new BadRequestException("Updating workspace in system namespace is currently not supported");
}

WorkspaceId wsId = new WorkspaceId(ns, workspaceId);
if (authEnforcementEnabled) {
contextAccessEnforcer.enforce(new WorkspaceEntityId(wsId.getNamespace().getName(), wsId.getWorkspaceId()),
StandardPermission.UPDATE);
}
WorkspaceUpdateRequest updateRequest =
GSON.fromJson(StandardCharsets.UTF_8.decode(request.getContent()).toString(), WorkspaceUpdateRequest.class);

WorkspaceId wsId = new WorkspaceId(ns, workspaceId);
Workspace newWorkspace = Workspace.builder(wsStore.getWorkspace(wsId))
.setDirectives(updateRequest.getDirectives())
.setInsights(updateRequest.getInsights())
Expand All @@ -250,6 +276,10 @@ public void resampleWorkspace(HttpServiceRequest request, HttpServiceResponder r
}

WorkspaceId wsId = new WorkspaceId(ns, workspaceId);
if (authEnforcementEnabled) {
contextAccessEnforcer.enforce(new WorkspaceEntityId(wsId.getNamespace().getName(), wsId.getWorkspaceId()),
StandardPermission.UPDATE);
}
Workspace currentWorkspace = wsStore.getWorkspace(wsId);

String connectionName = currentWorkspace.getSampleSpec() == null ? null :
Expand Down Expand Up @@ -294,7 +324,12 @@ public void deleteWorkspace(HttpServiceRequest request, HttpServiceResponder res
if (ns.getName().equalsIgnoreCase(NamespaceId.SYSTEM.getNamespace())) {
throw new BadRequestException("Deleting workspace in system namespace is currently not supported");
}
wsStore.deleteWorkspace(new WorkspaceId(ns, workspaceId));
WorkspaceId wsId = new WorkspaceId(ns, workspaceId);
if (authEnforcementEnabled) {
contextAccessEnforcer.enforce(new WorkspaceEntityId(wsId.getNamespace().getName(), wsId.getWorkspaceId()),
StandardPermission.DELETE);
}
wsStore.deleteWorkspace(wsId);
responder.sendStatus(HttpURLConnection.HTTP_OK);
});
}
Expand All @@ -312,6 +347,11 @@ public void upload(HttpServiceRequest request, HttpServiceResponder responder,
throw new BadRequestException("Uploading data in system namespace is currently not supported");
}

WorkspaceId id = new WorkspaceId(ns);
if (authEnforcementEnabled) {
contextAccessEnforcer.enforce(new WorkspaceEntityId(id.getNamespace().getName(), id.getWorkspaceId()),
StandardPermission.CREATE);
}
String name = request.getHeader(PropertyIds.FILE_NAME);
if (name == null) {
throw new BadRequestException("Name must be provided in the 'file' header");
Expand All @@ -335,7 +375,6 @@ public void upload(HttpServiceRequest request, HttpServiceResponder responder,
sample.add(new Row(COLUMN_NAME, line));
}

WorkspaceId id = new WorkspaceId(ns);
long now = System.currentTimeMillis();
Workspace workspace = Workspace.builder(name, id.getWorkspaceId())
.setCreatedTimeMillis(now).setUpdatedTimeMillis(now).build();
Expand All @@ -360,9 +399,12 @@ public void execute(HttpServiceRequest request, HttpServiceResponder responder,
@PathParam("id") String workspaceId) {
respond(responder, namespace, ns -> {
validateNamespace(ns, "Executing directives in system namespace is currently not supported");

DirectiveExecutionResponse response = execute(ns, request, new WorkspaceId(ns, workspaceId),
null);
WorkspaceId wsId = new WorkspaceId(ns, workspaceId);
if (authEnforcementEnabled) {
contextAccessEnforcer.enforce(new WorkspaceEntityId(wsId.getNamespace().getName(), wsId.getWorkspaceId()),
StandardPermission.USE);
}
DirectiveExecutionResponse response = execute(ns, request, wsId, null);
responder.sendJson(response);
});
}
Expand Down Expand Up @@ -406,6 +448,10 @@ public void specification(HttpServiceRequest request, HttpServiceResponder respo
composite.reload(namespace);

WorkspaceId wsId = new WorkspaceId(ns, workspaceId);
if (authEnforcementEnabled) {
contextAccessEnforcer.enforce(new WorkspaceEntityId(wsId.getNamespace().getName(), wsId.getWorkspaceId()),
StandardPermission.GET);
}
WorkspaceDetail detail = wsStore.getWorkspaceDetail(wsId);
List<String> directives = new ArrayList<>(detail.getWorkspace().getDirectives());
UserDirectivesCollector userDirectivesCollector = new UserDirectivesCollector();
Expand Down Expand Up @@ -448,12 +494,15 @@ public void applyRecipe(HttpServiceRequest request, HttpServiceResponder respond
@PathParam("recipe-id") String recipeIdString) {
respond(responder, namespace, ns -> {
validateNamespace(ns, "Executing directives in system namespace is currently not supported");

WorkspaceId wsId = new WorkspaceId(ns, workspaceId);
if (authEnforcementEnabled) {
contextAccessEnforcer.enforce(new WorkspaceEntityId(wsId.getNamespace().getName(), wsId.getWorkspaceId()),
StandardPermission.USE);
}
RecipeId recipeId = RecipeId.builder(ns).setRecipeId(recipeIdString).build();
Recipe recipe = recipeStore.getRecipeById(recipeId);

DirectiveExecutionResponse response = execute(ns, request, new WorkspaceId(ns, workspaceId),
recipe.getDirectives());
DirectiveExecutionResponse response = execute(ns, request, wsId, recipe.getDirectives());
responder.sendJson(response);
});
}
Expand Down
Loading