Skip to content

Commit 911fcec

Browse files
aglinxinyuanMA77HEW820
authored andcommitted
Introduce IF operator (#3090)
The `IF` operator evaluates a condition against a specified state variable and routes the input data to either the `True` or `False` branch accordingly. The condition port accepts a state variable, and users can define the name of the state variable to be evaluated by the `IF` operator. Note: The Date to State operator will be introduced in a separate PR. ![image](https://github.com/user-attachments/assets/5f4c4cc9-a6d9-4c9d-b689-a4a833200a64)
1 parent 30475a1 commit 911fcec

File tree

5 files changed

+96
-1
lines changed

5 files changed

+96
-1
lines changed
6.86 KB
Loading

core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import edu.uci.ics.amber.operator.huggingFace.{
2626
HuggingFaceSpamSMSDetectionOpDesc,
2727
HuggingFaceTextSummarizationOpDesc
2828
}
29+
import edu.uci.ics.amber.operator.ifStatement.IfOpDesc
2930
import edu.uci.ics.amber.operator.intersect.IntersectOpDesc
3031
import edu.uci.ics.amber.operator.intervalJoin.IntervalJoinOpDesc
3132
import edu.uci.ics.amber.operator.keywordSearch.KeywordSearchOpDesc
@@ -116,6 +117,7 @@ trait StateTransferFunc
116117
)
117118
@JsonSubTypes(
118119
Array(
120+
new Type(value = classOf[IfOpDesc], name = "If"),
119121
new Type(value = classOf[SankeyDiagramOpDesc], name = "SankeyDiagram"),
120122
new Type(value = classOf[IcicleChartOpDesc], name = "IcicleChart"),
121123
new Type(value = classOf[CSVScanSourceOpDesc], name = "CSVFileScan"),
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package edu.uci.ics.amber.operator.ifStatement
2+
3+
import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription}
4+
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
5+
import edu.uci.ics.amber.core.executor.OpExecWithClassName
6+
import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
7+
import edu.uci.ics.amber.core.workflow.{
8+
InputPort,
9+
OutputPort,
10+
PhysicalOp,
11+
PortIdentity,
12+
SchemaPropagationFunc
13+
}
14+
import edu.uci.ics.amber.operator.LogicalOp
15+
import edu.uci.ics.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo}
16+
import edu.uci.ics.amber.util.JSONUtils.objectMapper
17+
18+
class IfOpDesc extends LogicalOp {
19+
@JsonProperty(required = true)
20+
@JsonSchemaTitle("Condition State")
21+
@JsonPropertyDescription("name of the state variable to evaluate")
22+
var conditionName: String = _
23+
24+
override def getPhysicalOp(
25+
workflowId: WorkflowIdentity,
26+
executionId: ExecutionIdentity
27+
): PhysicalOp = {
28+
PhysicalOp
29+
.oneToOnePhysicalOp(
30+
workflowId,
31+
executionId,
32+
operatorIdentifier,
33+
OpExecWithClassName(
34+
"edu.uci.ics.amber.operator.ifStatement.IfOpExec",
35+
objectMapper.writeValueAsString(this)
36+
)
37+
)
38+
.withInputPorts(operatorInfo.inputPorts)
39+
.withOutputPorts(operatorInfo.outputPorts)
40+
.withParallelizable(false)
41+
.withPropagateSchema(
42+
SchemaPropagationFunc(inputSchemas =>
43+
operatorInfo.outputPorts
44+
.map(_.id)
45+
.map(id => id -> inputSchemas(operatorInfo.inputPorts.last.id))
46+
.toMap
47+
)
48+
)
49+
}
50+
51+
override def operatorInfo: OperatorInfo =
52+
OperatorInfo(
53+
"If",
54+
"If",
55+
OperatorGroupConstants.CONTROL_GROUP,
56+
inputPorts = List(
57+
InputPort(PortIdentity(), "Condition"),
58+
InputPort(PortIdentity(1), dependencies = List(PortIdentity()))
59+
),
60+
outputPorts = List(OutputPort(PortIdentity(), "False"), OutputPort(PortIdentity(1), "True"))
61+
)
62+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package edu.uci.ics.amber.operator.ifStatement
2+
3+
import edu.uci.ics.amber.core.executor.OperatorExecutor
4+
import edu.uci.ics.amber.core.marker.State
5+
import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike}
6+
import edu.uci.ics.amber.core.workflow.PortIdentity
7+
import edu.uci.ics.amber.util.JSONUtils.objectMapper
8+
9+
class IfOpExec(descString: String) extends OperatorExecutor {
10+
private val desc: IfOpDesc = objectMapper.readValue(descString, classOf[IfOpDesc])
11+
private var outputPort: PortIdentity = PortIdentity(1) // by default, it should be the true port.
12+
13+
//This function can handle one or more states.
14+
//The state can have mutiple key-value pairs. Keys are not identified by conditionName will be ignored.
15+
//It can accept any value that can be converted to a boolean. For example, Int 1 will be converted to true.
16+
override def processState(state: State, port: Int): Option[State] = {
17+
outputPort =
18+
if (state.get(desc.conditionName).asInstanceOf[Boolean]) PortIdentity(1) else PortIdentity()
19+
Some(state)
20+
}
21+
22+
override def processTupleMultiPort(
23+
tuple: Tuple,
24+
port: Int
25+
): Iterator[(TupleLike, Option[PortIdentity])] =
26+
Iterator((tuple, Some(outputPort)))
27+
28+
override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = ???
29+
}

core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/metadata/OperatorGroupConstants.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ object OperatorGroupConstants {
2121
final val JAVA_GROUP = "Java"
2222
final val R_GROUP = "R"
2323
final val MACHINE_LEARNING_GENERAL_GROUP = "Machine Learning General"
24+
final val CONTROL_GROUP = "Control Block"
2425

2526
/**
2627
* The order of the groups to show up in the frontend operator panel.
@@ -46,6 +47,7 @@ object OperatorGroupConstants {
4647
GroupInfo(UTILITY_GROUP),
4748
GroupInfo(API_GROUP),
4849
GroupInfo(UDF_GROUP, List(GroupInfo(PYTHON_GROUP), GroupInfo(JAVA_GROUP), GroupInfo(R_GROUP))),
49-
GroupInfo(VISUALIZATION_GROUP)
50+
GroupInfo(VISUALIZATION_GROUP),
51+
GroupInfo(CONTROL_GROUP)
5052
)
5153
}

0 commit comments

Comments
 (0)