3333
3434public class BinaryFrameFrameSPInstruction extends BinarySPInstruction {
3535 protected BinaryFrameFrameSPInstruction (Operator op , CPOperand in1 , CPOperand in2 , CPOperand out ,
36- String opcode , String istr ) {
36+ String opcode , String istr ) {
3737 super (SPType .Binary , op , in1 , in2 , out , opcode , istr );
3838 }
3939
@@ -45,7 +45,7 @@ public void processInstruction(ExecutionContext ec) {
4545 // Get input RDDs
4646 JavaPairRDD <Long , FrameBlock > in1 = sec .getFrameBinaryBlockRDDHandleForVariable (input1 .getName ());
4747 JavaPairRDD <Long , FrameBlock > out = null ;
48-
48+
4949 if (getOpcode ().equals (Opcodes .DROPINVALIDTYPE .toString ())) {
5050 // get schema frame-block
5151 Broadcast <FrameBlock > fb = sec .getSparkContext ().broadcast (sec .getFrameInput (input2 .getName ()));
@@ -60,18 +60,25 @@ else if(getOpcode().equals(Opcodes.VALUESWAP.toString())) {
6060 // Attach result frame with FrameBlock associated with output_name
6161 sec .releaseFrameInput (input2 .getName ());
6262 }
63+ else if (getOpcode ().equals (Opcodes .APPLYSCHEMA .toString ())){
64+ Broadcast <FrameBlock > fb = sec .getSparkContext ().broadcast (sec .getFrameInput (input2 .getName ()));
65+ out = in1 .mapValues (new applySchema (fb .getValue ()));
66+ sec .releaseFrameInput (input2 .getName ());
67+ }
6368 else {
6469 JavaPairRDD <Long , FrameBlock > in2 = sec .getFrameBinaryBlockRDDHandleForVariable (input2 .getName ());
6570 // create output frame
6671 BinaryOperator dop = (BinaryOperator ) _optr ;
6772 // check for binary operations
6873 out = in1 .join (in2 ).mapValues (new FrameComparison (dop ));
6974 }
70-
75+
7176 //set output RDD and maintain dependencies
7277 sec .setRDDHandleForVariable (output .getName (), out );
7378 sec .addLineageRDD (output .getName (), input1 .getName ());
74- if ( !getOpcode ().equals (Opcodes .DROPINVALIDTYPE .toString ()) && !getOpcode ().equals (Opcodes .VALUESWAP .toString ()))
79+ if (!getOpcode ().equals (Opcodes .DROPINVALIDTYPE .toString ()) && //
80+ !getOpcode ().equals (Opcodes .VALUESWAP .toString ()) && //
81+ !getOpcode ().equals (Opcodes .APPLYSCHEMA .toString ()))
7582 sec .addLineageRDD (output .getName (), input2 .getName ());
7683 }
7784
@@ -117,4 +124,20 @@ public FrameBlock call(FrameBlock arg0) throws Exception {
117124 return arg0 .valueSwap (schema_frame );
118125 }
119126 }
120- }
127+
128+
129+ private static class applySchema implements Function <FrameBlock , FrameBlock >{
130+ private static final long serialVersionUID = 58504021316402L ;
131+
132+ private FrameBlock schema ;
133+
134+ public applySchema (FrameBlock schema ) {
135+ this .schema = schema ;
136+ }
137+
138+ @ Override
139+ public FrameBlock call (FrameBlock arg0 ) throws Exception {
140+ return arg0 .applySchema (schema );
141+ }
142+ }
143+ }
0 commit comments