|
17 | 17 |
|
18 | 18 | package org.apache.ignite.internal.processors.query.calcite.rule; |
19 | 19 |
|
| 20 | +import com.google.common.collect.ImmutableMap; |
20 | 21 | import org.apache.calcite.plan.RelOptCluster; |
21 | | -import org.apache.calcite.plan.RelOptPlanner; |
22 | 22 | import org.apache.calcite.plan.RelOptRule; |
| 23 | +import org.apache.calcite.plan.RelOptRuleCall; |
| 24 | +import org.apache.calcite.plan.RelRule; |
23 | 25 | import org.apache.calcite.plan.RelTraitSet; |
24 | | -import org.apache.calcite.rel.PhysicalNode; |
25 | 26 | import org.apache.calcite.rel.RelCollations; |
| 27 | +import org.apache.calcite.rel.RelDistribution; |
26 | 28 | import org.apache.calcite.rel.RelNode; |
27 | 29 | import org.apache.calcite.rel.logical.LogicalTableModify; |
28 | | -import org.apache.calcite.rel.metadata.RelMetadataQuery; |
| 30 | +import org.apache.calcite.rel.type.RelDataTypeField; |
| 31 | +import org.apache.calcite.sql.fun.SqlStdOperatorTable; |
| 32 | +import org.apache.calcite.tools.RelBuilder; |
29 | 33 | import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention; |
30 | 34 | import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify; |
| 35 | +import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable; |
| 36 | +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution; |
31 | 37 | import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; |
32 | 38 | import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait; |
| 39 | +import org.apache.ignite.internal.processors.query.calcite.util.Commons; |
| 40 | +import org.immutables.value.Value; |
33 | 41 |
|
34 | 42 | /** |
35 | | - * |
| 43 | + * Converts LogicalTableModify to physical relation operators. |
| 44 | + * There are two options: |
| 45 | + * - Perform table modify on initiator node. In this case IgniteTableModify with single distribution is inserted. |
| 46 | + * - Perform table modify on remote nodes. In this case IgniteTableModify with random distribution is inserted and |
| 47 | + * sum aggregate on top if this table modify (to aggregate and send to initiator node affected rows count) |
36 | 48 | */ |
37 | | -public class TableModifyConverterRule extends AbstractIgniteConverterRule<LogicalTableModify> { |
| 49 | +@Value.Enclosing |
| 50 | +public class TableModifyConverterRule extends RelRule<TableModifyConverterRule.Config> { |
38 | 51 | /** */ |
39 | | - public static final RelOptRule INSTANCE = new TableModifyConverterRule(); |
| 52 | + public static final RelOptRule INSTANCE = Config.DFLT.toRule(); |
| 53 | + |
| 54 | + /** Rule configuration. */ |
| 55 | + @Value.Immutable |
| 56 | + public interface Config extends RelRule.Config { |
| 57 | + /** Default config. */ |
| 58 | + TableModifyConverterRule.Config DFLT = ImmutableTableModifyConverterRule.Config.of() |
| 59 | + .withOperandSupplier(b -> |
| 60 | + b.operand(LogicalTableModify.class).anyInputs()); |
| 61 | + |
| 62 | + /** {@inheritDoc} */ |
| 63 | + @Override default TableModifyConverterRule toRule() { |
| 64 | + return new TableModifyConverterRule(this); |
| 65 | + } |
| 66 | + } |
40 | 67 |
|
41 | 68 | /** |
42 | | - * Creates a ConverterRule. |
| 69 | + * Creates a TableModifyRule. |
43 | 70 | */ |
44 | | - public TableModifyConverterRule() { |
45 | | - super(LogicalTableModify.class, "TableModifyConverterRule"); |
| 71 | + public TableModifyConverterRule(Config cfg) { |
| 72 | + super(cfg); |
46 | 73 | } |
47 | 74 |
|
48 | 75 | /** {@inheritDoc} */ |
49 | | - @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, LogicalTableModify rel) { |
| 76 | + @Override public void onMatch(RelOptRuleCall call) { |
| 77 | + LogicalTableModify rel = call.rel(0); |
| 78 | + |
| 79 | + RelBuilder relBuilder = relBuilderFactory.create(rel.getCluster(), null); |
| 80 | + |
| 81 | + RelNode singleNodeTableModify = convertTableModify(rel, IgniteDistributions.single(), IgniteDistributions.single()); |
| 82 | + |
| 83 | + if (Commons.queryTransactionVersion(call.getPlanner().getContext()) != null) { |
| 84 | + // If excplicit transaction is started, table modify can only be executed on initiator node. |
| 85 | + call.transformTo(singleNodeTableModify); |
| 86 | + |
| 87 | + return; |
| 88 | + } |
| 89 | + |
| 90 | + IgniteTable table = rel.getTable().unwrap(IgniteTable.class); |
| 91 | + IgniteDistribution inputDistribution = table.distribution(); |
| 92 | + |
| 93 | + switch (rel.getOperation()) { |
| 94 | + case MERGE: |
| 95 | + // Merge contains insert fields as well as _key field, it's impossible to generate input distribution. |
| 96 | + inputDistribution = null; |
| 97 | + |
| 98 | + break; |
| 99 | + |
| 100 | + case INSERT: |
| 101 | + case UPDATE: |
| 102 | + // Can only safely proceed, if modified values don't affect remote nodes data sources for the same query. |
| 103 | + if (inputDistribution.getType() != RelDistribution.Type.HASH_DISTRIBUTED) |
| 104 | + inputDistribution = null; |
| 105 | + |
| 106 | + break; |
| 107 | + |
| 108 | + case DELETE: |
| 109 | + inputDistribution = IgniteDistributions.random(); |
| 110 | + |
| 111 | + break; |
| 112 | + |
| 113 | + default: |
| 114 | + throw new IllegalStateException("Unknown operation type: " + rel.getOperation()); |
| 115 | + } |
| 116 | + |
| 117 | + if (inputDistribution == null) { |
| 118 | + call.transformTo(singleNodeTableModify); |
| 119 | + |
| 120 | + return; |
| 121 | + } |
| 122 | + |
| 123 | + RelDataTypeField outFld = rel.getRowType().getFieldList().get(0); |
| 124 | + |
| 125 | + relBuilder.push(convertTableModify(rel, IgniteDistributions.random(), inputDistribution)); |
| 126 | + relBuilder.aggregate(relBuilder.groupKey(), |
| 127 | + relBuilder.aggregateCall(SqlStdOperatorTable.SUM0, |
| 128 | + relBuilder.field(0)).as(outFld.getName())); |
| 129 | + relBuilder.project(relBuilder.cast(relBuilder.fields().get(0), outFld.getType().getSqlTypeName())); |
| 130 | + |
| 131 | + RelNode distributedTableModify = relBuilder.build(); |
| 132 | + |
| 133 | + call.transformTo(singleNodeTableModify, ImmutableMap.of(distributedTableModify, rel)); |
| 134 | + } |
| 135 | + |
| 136 | + /** */ |
| 137 | + private IgniteTableModify convertTableModify( |
| 138 | + LogicalTableModify rel, |
| 139 | + IgniteDistribution outputDistribution, |
| 140 | + IgniteDistribution inputDistribution |
| 141 | + ) { |
50 | 142 | RelOptCluster cluster = rel.getCluster(); |
| 143 | + |
51 | 144 | RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE) |
52 | | - .replace(IgniteDistributions.single()) |
| 145 | + .replace(outputDistribution) |
53 | 146 | .replace(RewindabilityTrait.ONE_WAY) |
54 | 147 | .replace(RelCollations.EMPTY); |
55 | | - RelNode input = convert(rel.getInput(), traits); |
| 148 | + |
| 149 | + RelTraitSet inputTraits = traits.replace(inputDistribution); |
| 150 | + |
| 151 | + RelNode input = convert(rel.getInput(), inputTraits); |
56 | 152 |
|
57 | 153 | return new IgniteTableModify(cluster, traits, rel.getTable(), input, |
58 | | - rel.getOperation(), rel.getUpdateColumnList(), rel.getSourceExpressionList(), rel.isFlattened()); |
| 154 | + rel.getOperation(), rel.getUpdateColumnList(), rel.getSourceExpressionList(), rel.isFlattened()); |
59 | 155 | } |
60 | 156 | } |
0 commit comments