Skip to content

Commit 00ceffe

Browse files
bobbai00MA77HEW820
authored andcommitted
Add URI generator, resolver and corresponding document open & create functions (#3211)
This PR refactor the storage key by representing it using the storage URI. And this PR also adds the `openDocument` and `createDocument` function using the given URI. ## Major Changes ### 1. VFS URI resource type definition, resolve and decode functions Two types of VFS resources are defined: ``` object VFSResourceType extends Enumeration { val RESULT: Value = Value("result") val MATERIALIZED_RESULT: Value = Value("materializedResult") } ``` Two defs are added to the `FileResolver` - `resolve`: create the URI pointing to the storage resource on the VFS ```java /** * Resolve a VFS resource to its URI. The URI can be used by the DocumentFactory to create resource or open resource * * @param resourceType The type of the VFS resource. * @param workflowId Workflow identifier. * @param executionId Execution identifier. * @param operatorId Operator identifier. * @param portIdentity Optional port identifier. **Required** if `resourceType` is `RESULT` or `MATERIALIZED_RESULT`. * @return A VFS URI * @throws IllegalArgumentException if `resourceType` is `RESULT` but `portIdentity` is missing. */ def resolve( resourceType: VFSResourceType.Value, workflowId: WorkflowIdentity, executionId: ExecutionIdentity, operatorId: OperatorIdentity, portIdentity: Option[PortIdentity] = None ): URI ``` - `decodeVFSUri`: decode a VFS URI to components ```java /** * Parses a VFS URI and extracts its components * * @param uri The VFS URI to parse. * @return A `VFSUriComponents` object with the extracted data. * @throws IllegalArgumentException if the URI is malformed. */ def decodeVFSUri(uri: URI): ( WorkflowIdentity, ExecutionIdentity, OperatorIdentity, Option[PortIdentity], VFSResourceType.Value ) ``` ### 2. `createDocument` and `openDocument` functions to the `DocumentFactory` `createDocument` and `openDocument` defs to create/open a storage resource pointed by the URI - `DocumentFactory.createDocument` ```java /** * Create a document for storage specified by the uri. * This document is suitable for storing structural data, i.e. the schema is required to create such document. * @param uri the location of the document * @param schema the schema of the data stored in the document * @return the created document */ def createDocument(uri: URI, schema: Schema): VirtualDocument[_] ``` - `DocumentFactory.openDocument` ```java /** * Open a document specified by the uri. * The document should be storing the structural data as the document and the schema will be returned * @param uri the uri of the document * @return the VirtualDocument, which is the handler of the data; the Schema, which is the schema of the data stored in the document */ def openDocument(uri: URI): (VirtualDocument[_], Schema) ```
1 parent 911fcec commit 00ceffe

File tree

42 files changed

+647
-424
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+647
-424
lines changed

core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/ScheduleGenerator.scala

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package edu.uci.ics.amber.engine.architecture.scheduling
22

3-
import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage}
3+
import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory}
4+
import edu.uci.ics.amber.core.storage.VFSResourceType.MATERIALIZED_RESULT
5+
import edu.uci.ics.amber.core.storage.result.ExecutionResourcesMapping
46
import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, WorkflowContext}
57
import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex
68
import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{
@@ -151,17 +153,18 @@ abstract class ScheduleGenerator(
151153
.removeLink(physicalLink)
152154

153155
// create cache writer and link
154-
val storageKey = OpResultStorage.createStorageKey(
156+
// create the uri of the materialization storage
157+
val storageUri = VFSURIFactory.createMaterializedResultURI(
158+
workflowContext.workflowId,
159+
workflowContext.executionId,
155160
physicalLink.fromOpId.logicalOpId,
156-
physicalLink.fromPortId,
157-
isMaterialized = true
161+
physicalLink.fromPortId
158162
)
163+
159164
val fromPortOutputMode =
160165
physicalPlan.getOperator(physicalLink.fromOpId).outputPorts(physicalLink.fromPortId)._1.mode
161166
val matWriterPhysicalOp: PhysicalOp = SpecialPhysicalOpFactory.newSinkPhysicalOp(
162-
workflowContext.workflowId,
163-
workflowContext.executionId,
164-
storageKey,
167+
storageUri,
165168
fromPortOutputMode
166169
)
167170
val sourceToWriterLink =
@@ -182,19 +185,15 @@ abstract class ScheduleGenerator(
182185
._3
183186
.toOption
184187
.get
185-
ResultStorage
186-
.getOpResultStorage(workflowContext.workflowId)
187-
.create(
188-
key = storageKey,
189-
mode = OpResultStorage.defaultStorageMode,
190-
schema = schema
191-
)
188+
// create the document
189+
DocumentFactory.createDocument(storageUri, schema)
190+
ExecutionResourcesMapping.addResourceUri(workflowContext.executionId, storageUri)
192191

193192
// create cache reader and link
194193
val matReaderPhysicalOp: PhysicalOp = SpecialPhysicalOpFactory.newSourcePhysicalOp(
195194
workflowContext.workflowId,
196195
workflowContext.executionId,
197-
storageKey
196+
storageUri
198197
)
199198
val readerToDestLink =
200199
PhysicalLink(

core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import edu.uci.ics.amber.operator.sink.ProgressiveSinkOpExec
1212
import edu.uci.ics.amber.operator.source.cache.CacheSourceOpExec
1313
import edu.uci.ics.amber.util.VirtualIdentityUtils
1414

15+
import java.net.URI
16+
1517
trait InitializeExecutorHandler {
1618
this: DataProcessorRPCHandlerInitializer =>
1719

@@ -26,15 +28,14 @@ trait InitializeExecutorHandler {
2628
case OpExecWithClassName(className, descString) =>
2729
ExecFactory.newExecFromJavaClassName(className, descString, workerIdx, workerCount)
2830
case OpExecWithCode(code, _) => ExecFactory.newExecFromJavaCode(code)
29-
case OpExecSink(storageKey, workflowIdentity, outputMode) =>
31+
case OpExecSink(storageUri, workflowIdentity, outputMode) =>
3032
new ProgressiveSinkOpExec(
3133
workerIdx,
3234
outputMode,
33-
storageKey,
34-
workflowIdentity
35+
URI.create(storageUri)
3536
)
36-
case OpExecSource(storageKey, workflowIdentity) =>
37-
new CacheSourceOpExec(storageKey, workflowIdentity)
37+
case OpExecSource(storageUri, _) =>
38+
new CacheSourceOpExec(URI.create(storageUri))
3839
}
3940
EmptyReturn()
4041
}

core/amber/src/main/scala/edu/uci/ics/texera/web/ComputingUnitMaster.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package edu.uci.ics.texera.web
22

33
import com.fasterxml.jackson.module.scala.DefaultScalaModule
44
import com.typesafe.scalalogging.LazyLogging
5-
import edu.uci.ics.amber.core.storage.result.OpResultStorage
5+
import edu.uci.ics.amber.core.storage.DocumentFactory
66
import edu.uci.ics.amber.core.storage.util.mongo.MongoDatabaseManager
77
import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext}
88
import edu.uci.ics.amber.engine.architecture.controller.ControllerConfig
@@ -179,9 +179,9 @@ class ComputingUnitMaster extends io.dropwizard.Application[Configuration] with
179179
val storageType = collection.get("storageType").asText()
180180
val collectionName = collection.get("storageKey").asText()
181181
storageType match {
182-
case OpResultStorage.ICEBERG =>
182+
case DocumentFactory.ICEBERG =>
183183
// rely on the server-side result cleanup logic.
184-
case OpResultStorage.MONGODB =>
184+
case DocumentFactory.MONGODB =>
185185
MongoDatabaseManager.dropCollection(collectionName)
186186
}
187187
})

core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/dataset/DatasetResource.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package edu.uci.ics.texera.web.resource.dashboard.user.dataset
22

3-
import edu.uci.ics.amber.core.storage.{FileResolver, StorageConfig}
4-
import edu.uci.ics.amber.core.storage.model.DatasetFileDocument
3+
import edu.uci.ics.amber.core.storage.{DocumentFactory, FileResolver, StorageConfig}
54
import edu.uci.ics.amber.core.storage.util.dataset.{
65
GitVersionControlLocalFileStorage,
76
PhysicalFileNode
@@ -283,7 +282,7 @@ object DatasetResource {
283282
}
284283

285284
datasetOperation.filesToRemove.foreach { fileUri =>
286-
new DatasetFileDocument(fileUri).clear()
285+
DocumentFactory.openDocument(fileUri)._1.clear()
287286
}
288287
}
289288
)
@@ -820,7 +819,7 @@ class DatasetResource {
820819
val fileUri = FileResolver.resolve(decodedPathStr)
821820
val streamingOutput = new StreamingOutput() {
822821
override def write(output: OutputStream): Unit = {
823-
val inputStream = new DatasetFileDocument(fileUri).asInputStream()
822+
val inputStream = DocumentFactory.openReadonlyDocument(fileUri).asInputStream()
824823
try {
825824
val buffer = new Array[Byte](8192) // buffer size
826825
var bytesRead = inputStream.read(buffer)

core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import edu.uci.ics.amber.core.storage.StorageConfig
44
import edu.uci.ics.amber.engine.architecture.logreplay.{ReplayDestination, ReplayLogRecord}
55
import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage
66
import edu.uci.ics.amber.core.virtualidentity.{ChannelMarkerIdentity, ExecutionIdentity}
7+
import edu.uci.ics.amber.engine.common.AmberConfig
78
import edu.uci.ics.texera.dao.SqlServer
89
import edu.uci.ics.texera.web.auth.SessionUser
910
import edu.uci.ics.texera.dao.jooq.generated.Tables.{

core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionResultService.scala

Lines changed: 57 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ import akka.actor.Cancellable
44
import com.fasterxml.jackson.annotation.{JsonTypeInfo, JsonTypeName}
55
import com.fasterxml.jackson.databind.node.ObjectNode
66
import com.typesafe.scalalogging.LazyLogging
7-
import edu.uci.ics.amber.core.storage.StorageConfig
8-
import edu.uci.ics.amber.core.storage.result.OpResultStorage.MONGODB
7+
import edu.uci.ics.amber.core.storage.DocumentFactory.MONGODB
8+
import edu.uci.ics.amber.core.storage.VFSResourceType.{MATERIALIZED_RESULT, RESULT}
9+
import edu.uci.ics.amber.core.storage.model.VirtualDocument
10+
import edu.uci.ics.amber.core.storage.{DocumentFactory, StorageConfig, VFSURIFactory}
911
import edu.uci.ics.amber.core.storage.result._
1012
import edu.uci.ics.amber.core.tuple.Tuple
1113
import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan}
@@ -19,7 +21,11 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregat
1921
import edu.uci.ics.amber.engine.common.client.AmberClient
2022
import edu.uci.ics.amber.engine.common.executionruntimestate.ExecutionMetadataStore
2123
import edu.uci.ics.amber.engine.common.{AmberConfig, AmberRuntime}
22-
import edu.uci.ics.amber.core.virtualidentity.{OperatorIdentity, WorkflowIdentity}
24+
import edu.uci.ics.amber.core.virtualidentity.{
25+
ExecutionIdentity,
26+
OperatorIdentity,
27+
WorkflowIdentity
28+
}
2329
import edu.uci.ics.amber.core.workflow.OutputPort.OutputMode
2430
import edu.uci.ics.amber.core.workflow.PortIdentity
2531
import edu.uci.ics.texera.web.SubscriptionManager
@@ -29,7 +35,10 @@ import edu.uci.ics.texera.web.model.websocket.event.{
2935
WebResultUpdateEvent
3036
}
3137
import edu.uci.ics.texera.web.model.websocket.request.ResultPaginationRequest
38+
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
39+
import edu.uci.ics.texera.web.service.WorkflowExecutionService.getLatestExecutionId
3240
import edu.uci.ics.texera.web.storage.{ExecutionStateStore, WorkflowStateStore}
41+
import org.jooq.types.UInteger
3342

3443
import java.util.UUID
3544
import scala.collection.mutable
@@ -60,6 +69,7 @@ object ExecutionResultService {
6069
*/
6170
private def convertWebResultUpdate(
6271
workflowIdentity: WorkflowIdentity,
72+
executionId: ExecutionIdentity,
6373
physicalOps: List[PhysicalOp],
6474
oldTupleCount: Int,
6575
newTupleCount: Int
@@ -85,10 +95,14 @@ object ExecutionResultService {
8595
}
8696
}
8797

88-
val storage =
89-
ResultStorage
90-
.getOpResultStorage(workflowIdentity)
91-
.get(OpResultStorage.createStorageKey(physicalOps.head.id.logicalOpId, PortIdentity()))
98+
val storageUri = VFSURIFactory.createResultURI(
99+
workflowIdentity,
100+
executionId,
101+
physicalOps.head.id.logicalOpId,
102+
PortIdentity()
103+
)
104+
val storage: VirtualDocument[Tuple] =
105+
DocumentFactory.openDocument(storageUri)._1.asInstanceOf[VirtualDocument[Tuple]]
92106
val webUpdate = webOutputMode match {
93107
case PaginationMode() =>
94108
val numTuples = storage.getCount
@@ -167,11 +181,11 @@ class ExecutionResultService(
167181
private var resultUpdateCancellable: Cancellable = _
168182

169183
def attachToExecution(
184+
executionId: ExecutionIdentity,
170185
stateStore: ExecutionStateStore,
171186
physicalPlan: PhysicalPlan,
172187
client: AmberClient
173188
): Unit = {
174-
175189
if (resultUpdateCancellable != null && !resultUpdateCancellable.isCancelled) {
176190
resultUpdateCancellable.cancel()
177191
}
@@ -188,7 +202,7 @@ class ExecutionResultService(
188202
2.seconds,
189203
resultPullingFrequency.seconds
190204
) {
191-
onResultUpdate(physicalPlan)
205+
onResultUpdate(executionId, physicalPlan)
192206
}
193207
}
194208
} else {
@@ -204,7 +218,7 @@ class ExecutionResultService(
204218
logger.info("Workflow execution terminated. Stop update results.")
205219
if (resultUpdateCancellable.cancel() || resultUpdateCancellable.isCancelled) {
206220
// immediately perform final update
207-
onResultUpdate(physicalPlan)
221+
onResultUpdate(executionId, physicalPlan)
208222
}
209223
}
210224
})
@@ -233,16 +247,20 @@ class ExecutionResultService(
233247
val oldInfo = oldState.resultInfo.getOrElse(opId, OperatorResultMetadata())
234248
buf(opId.id) = ExecutionResultService.convertWebResultUpdate(
235249
workflowIdentity,
250+
executionId,
236251
physicalPlan.getPhysicalOpsOfLogicalOp(opId),
237252
oldInfo.tupleCount,
238253
info.tupleCount
239254
)
240255
if (StorageConfig.resultStorageMode == MONGODB) {
241256
// using the first port for now. TODO: support multiple ports
242-
val storageKey = OpResultStorage.createStorageKey(opId, PortIdentity())
243-
val opStorage = ResultStorage
244-
.getOpResultStorage(workflowIdentity)
245-
.get(storageKey)
257+
val storageUri = VFSURIFactory.createResultURI(
258+
workflowIdentity,
259+
executionId,
260+
opId,
261+
PortIdentity()
262+
)
263+
val opStorage = DocumentFactory.openDocument(storageUri)._1
246264
opStorage match {
247265
case mongoDocument: MongoDocument[Tuple] =>
248266
val tableCatStats = mongoDocument.getCategoricalStats
@@ -278,14 +296,21 @@ class ExecutionResultService(
278296
def handleResultPagination(request: ResultPaginationRequest): TexeraWebSocketEvent = {
279297
// calculate from index (pageIndex starts from 1 instead of 0)
280298
val from = request.pageSize * (request.pageIndex - 1)
281-
299+
val latestExecutionId = getLatestExecutionId(workflowIdentity).getOrElse(
300+
throw new IllegalStateException("No execution is recorded")
301+
)
282302
// using the first port for now. TODO: support multiple ports
283-
val storageKey =
284-
OpResultStorage.createStorageKey(OperatorIdentity(request.operatorID), PortIdentity())
303+
val storageUri = VFSURIFactory.createResultURI(
304+
workflowIdentity,
305+
latestExecutionId,
306+
OperatorIdentity(request.operatorID),
307+
PortIdentity()
308+
)
285309
val paginationIterable = {
286-
ResultStorage
287-
.getOpResultStorage(workflowIdentity)
288-
.get(storageKey)
310+
DocumentFactory
311+
.openDocument(storageUri)
312+
._1
313+
.asInstanceOf[VirtualDocument[Tuple]]
289314
.getRange(from, from + request.pageSize)
290315
.to(Iterable)
291316
}
@@ -298,26 +323,24 @@ class ExecutionResultService(
298323
PaginatedResultEvent.apply(request, mappedResults, attributes)
299324
}
300325

301-
private def onResultUpdate(physicalPlan: PhysicalPlan): Unit = {
326+
private def onResultUpdate(executionId: ExecutionIdentity, physicalPlan: PhysicalPlan): Unit = {
302327
workflowStateStore.resultStore.updateState { _ =>
303328
val newInfo: Map[OperatorIdentity, OperatorResultMetadata] = {
304-
ResultStorage
305-
.getOpResultStorage(workflowIdentity)
306-
.getAllKeys
307-
.filter(!_.startsWith("materialized_"))
308-
.map(storageKey => {
309-
val count = ResultStorage
310-
.getOpResultStorage(workflowIdentity)
311-
.get(storageKey)
312-
.getCount
313-
.toInt
314-
315-
val (opId, storagePortId) = OpResultStorage.decodeStorageKey(storageKey)
329+
ExecutionResourcesMapping
330+
.getResourceURIs(executionId)
331+
.filter(uri => {
332+
val (_, _, _, _, resourceType) = VFSURIFactory.decodeURI(uri)
333+
resourceType != MATERIALIZED_RESULT
334+
})
335+
.map(uri => {
336+
val count = DocumentFactory.openDocument(uri)._1.getCount.toInt
337+
338+
val (_, _, opId, storagePortId, _) = VFSURIFactory.decodeURI(uri)
316339

317340
// Retrieve the mode of the specified output port
318341
val mode = physicalPlan
319342
.getPhysicalOpsOfLogicalOp(opId)
320-
.flatMap(_.outputPorts.get(storagePortId))
343+
.flatMap(_.outputPorts.get(storagePortId.get))
321344
.map(_._1.mode)
322345
.head
323346

core/amber/src/main/scala/edu/uci/ics/texera/web/service/ResultExportService.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ import com.google.api.services.drive.Drive
77
import com.google.api.services.drive.model.{File, FileList, Permission}
88
import com.google.api.services.sheets.v4.Sheets
99
import com.google.api.services.sheets.v4.model.{Spreadsheet, SpreadsheetProperties, ValueRange}
10+
import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory}
11+
import edu.uci.ics.amber.core.storage.VFSResourceType.RESULT
1012
import edu.uci.ics.amber.core.storage.model.VirtualDocument
11-
import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage}
1213
import edu.uci.ics.amber.core.tuple.Tuple
1314
import edu.uci.ics.amber.engine.common.Utils.retry
1415
import edu.uci.ics.amber.util.PathUtils
@@ -22,6 +23,7 @@ import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowVersionRe
2223
import org.jooq.types.UInteger
2324
import edu.uci.ics.amber.util.ArrowUtils
2425
import edu.uci.ics.amber.core.workflow.PortIdentity
26+
import edu.uci.ics.texera.web.service.WorkflowExecutionService.getLatestExecutionId
2527

2628
import java.io.{PipedInputStream, PipedOutputStream}
2729
import java.nio.charset.StandardCharsets
@@ -56,7 +58,6 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) {
5658
import ResultExportService._
5759

5860
private val cache = new mutable.HashMap[String, String]
59-
6061
def exportResult(
6162
user: User,
6263
request: ResultExportRequest
@@ -71,11 +72,17 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) {
7172

7273
// By now the workflow should finish running
7374
// Only supports external port 0 for now. TODO: support multiple ports
75+
val storageUri = VFSURIFactory.createResultURI(
76+
workflowIdentity,
77+
getLatestExecutionId(workflowIdentity).getOrElse(
78+
return ResultExportResponse("error", "The workflow contains no results")
79+
),
80+
OperatorIdentity(request.operatorId),
81+
PortIdentity()
82+
)
7483
val operatorResult: VirtualDocument[Tuple] =
75-
ResultStorage
76-
.getOpResultStorage(workflowIdentity)
77-
.get(OpResultStorage.createStorageKey(OperatorIdentity(request.operatorId), PortIdentity()))
78-
if (operatorResult == null) {
84+
DocumentFactory.openDocument(storageUri)._1.asInstanceOf[VirtualDocument[Tuple]]
85+
if (operatorResult.getCount == 0) {
7986
return ResultExportResponse("error", "The workflow contains no results")
8087
}
8188

0 commit comments

Comments
 (0)