diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.java new file mode 100644 index 0000000000000..6354f04de0748 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.stream; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.nodes.FlinkRelNode; +import org.apache.flink.table.planner.plan.nodes.exec.spec.IntervalJoinSpec; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalIntervalJoin; +import org.apache.flink.table.planner.plan.utils.IntervalJoinUtil; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; +import org.immutables.value.Value; + +import java.util.Collection; +import java.util.function.Function; +import java.util.stream.Collectors; + +import scala.Option; + +/** + * Rule that converts non-SEMI/ANTI {@link FlinkLogicalJoin} with window bounds in join condition to + * {@link StreamPhysicalIntervalJoin}. + */ +@Value.Enclosing +public class StreamPhysicalIntervalJoinRule + extends StreamPhysicalJoinRuleBase< + StreamPhysicalIntervalJoinRule.StreamPhysicalIntervalJoinRuleConfig> { + public static final RelOptRule INSTANCE = StreamPhysicalIntervalJoinRuleConfig.DEFAULT.toRule(); + + public StreamPhysicalIntervalJoinRule(StreamPhysicalIntervalJoinRuleConfig config) { + super(config); + } + + @Override + public boolean matches(RelOptRuleCall call) { + FlinkLogicalJoin join = call.rel(0); + + if (!IntervalJoinUtil.satisfyIntervalJoin(join)) { + return false; + } + + // validate the join + IntervalJoinSpec.WindowBounds windowBounds = extractWindowBounds(join).f0.get(); + + if (windowBounds.isEventTime()) { + RelDataType leftTimeAttributeType = + join.getLeft() + .getRowType() + .getFieldList() + .get(windowBounds.getLeftTimeIdx()) + .getType(); + RelDataType rightTimeAttributeType = + join.getRight() + .getRowType() + .getFieldList() + .get(windowBounds.getRightTimeIdx()) + .getType(); + if (leftTimeAttributeType.getSqlTypeName() != rightTimeAttributeType.getSqlTypeName()) { + throw new ValidationException( + String.format( + "Interval join with rowtime attribute requires same rowtime types," + + " but the types are %s and %s.", + leftTimeAttributeType, rightTimeAttributeType)); + } + } else { + // Check that no event-time attributes are in the input because the processing time + // window + // join does not correctly hold back watermarks. + // We rely on projection pushdown to remove unused attributes before the join. + RelDataType joinRowType = join.getRowType(); + boolean containsRowTime = + joinRowType.getFieldList().stream() + .anyMatch(f -> FlinkTypeFactory.isRowtimeIndicatorType(f.getType())); + if (containsRowTime) { + throw new TableException( + "Interval join with proctime attribute requires no event-time attributes are in the " + + "join inputs."); + } + } + return true; + } + + @Override + public Collection computeJoinLeftKeys(FlinkLogicalJoin join) { + Tuple2, Option> tuple2 = + extractWindowBounds(join); + return join.analyzeCondition().leftKeys.stream() + .filter(k -> tuple2.f0.get().getLeftTimeIdx() != k) + .collect(Collectors.toList()); + } + + @Override + public Collection computeJoinRightKeys(FlinkLogicalJoin join) { + Tuple2, Option> tuple2 = + extractWindowBounds(join); + return join.analyzeCondition().rightKeys.stream() + .filter(k -> tuple2.f0.get().getRightTimeIdx() != k) + .collect(Collectors.toList()); + } + + @Override + public FlinkRelNode transform( + FlinkLogicalJoin join, + FlinkRelNode leftInput, + Function leftConversion, + FlinkRelNode rightInput, + Function rightConversion, + RelTraitSet providedTraitSet) { + Tuple2, Option> tuple2 = + extractWindowBounds(join); + return new StreamPhysicalIntervalJoin( + join.getCluster(), + providedTraitSet, + leftConversion.apply(leftInput), + rightConversion.apply(rightInput), + join.getJoinType(), + join.getCondition(), + tuple2.f1.getOrElse(() -> join.getCluster().getRexBuilder().makeLiteral(true)), + tuple2.f0.get()); + } + + /** Configuration for {@link StreamPhysicalIntervalJoinRule}. */ + @Value.Immutable + public interface StreamPhysicalIntervalJoinRuleConfig + extends StreamPhysicalJoinRuleBaseRuleConfig { + StreamPhysicalIntervalJoinRule.StreamPhysicalIntervalJoinRuleConfig DEFAULT = + ImmutableStreamPhysicalIntervalJoinRule.StreamPhysicalIntervalJoinRuleConfig + .builder() + .build() + .withOperandSupplier(StreamPhysicalJoinRuleBaseRuleConfig.OPERAND_TRANSFORM) + .withDescription("StreamPhysicalJoinRuleBase") + .as( + StreamPhysicalIntervalJoinRule.StreamPhysicalIntervalJoinRuleConfig + .class); + + @Override + default StreamPhysicalIntervalJoinRule toRule() { + return new StreamPhysicalIntervalJoinRule(this); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.java new file mode 100644 index 0000000000000..66be2a2409669 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.stream; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.nodes.FlinkRelNode; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalJoin; +import org.apache.flink.table.planner.plan.utils.JoinUtil; +import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil; +import org.apache.flink.util.Preconditions; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.immutables.value.Value; + +import java.util.function.Function; + +/** + * Rule that converts {@link FlinkLogicalJoin} without window bounds in join condition to {@link + * StreamPhysicalJoin}. + */ +@Value.Enclosing +public class StreamPhysicalJoinRule + extends StreamPhysicalJoinRuleBase { + public static final StreamPhysicalJoinRule INSTANCE = + StreamPhysicalJoinRuleConfig.DEFAULT.toRule(); + + public StreamPhysicalJoinRule(StreamPhysicalJoinRule.StreamPhysicalJoinRuleConfig config) { + super(config); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final FlinkLogicalJoin join = call.rel(0); + final FlinkLogicalRel left = call.rel(1); + final FlinkLogicalRel right = call.rel(2); + + if (!JoinUtil.satisfyRegularJoin(join, left, right)) { + return false; + } + + // validate the join + if (left instanceof FlinkLogicalSnapshot) { + throw new TableException( + "Temporal table join only support apply FOR SYSTEM_TIME AS OF on the right table."); + } + + // INITIAL_TEMPORAL_JOIN_CONDITION should not appear in physical phase in case which + // fallback + // to regular join + Preconditions.checkState( + !TemporalJoinUtil.containsInitialTemporalJoinCondition(join.getCondition())); + + // Time attributes must not be in the output type of a regular join + boolean timeAttrInOutput = + join.getRowType().getFieldList().stream() + .anyMatch(f -> FlinkTypeFactory.isTimeIndicatorType(f.getType())); + Preconditions.checkState(!timeAttrInOutput); + + // Join condition must not access time attributes + boolean remainingPredsAccessTime = + JoinUtil.accessesTimeAttribute( + join.getCondition(), JoinUtil.combineJoinInputsRowType(join)); + Preconditions.checkState(!remainingPredsAccessTime); + return true; + } + + @Override + public FlinkRelNode transform( + FlinkLogicalJoin join, + FlinkRelNode leftInput, + Function leftConversion, + FlinkRelNode rightInput, + Function rightConversion, + RelTraitSet providedTraitSet) { + return new StreamPhysicalJoin( + join.getCluster(), + providedTraitSet, + leftConversion.apply(leftInput), + rightConversion.apply(rightInput), + join.getCondition(), + join.getJoinType(), + join.getHints()); + } + + /** Configuration for {@link StreamPhysicalIntervalJoinRule}. */ + @Value.Immutable + public interface StreamPhysicalJoinRuleConfig + extends StreamPhysicalJoinRuleBase.StreamPhysicalJoinRuleBaseRuleConfig { + StreamPhysicalJoinRule.StreamPhysicalJoinRuleConfig DEFAULT = + ImmutableStreamPhysicalJoinRule.StreamPhysicalJoinRuleConfig.builder() + .build() + .withOperandSupplier(OPERAND_TRANSFORM) + .withDescription("StreamPhysicalJoinRule") + .as(StreamPhysicalJoinRule.StreamPhysicalJoinRuleConfig.class); + + @Override + default StreamPhysicalJoinRule toRule() { + return new StreamPhysicalJoinRule(this); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRuleBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRuleBase.java new file mode 100644 index 0000000000000..339ad280d0412 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRuleBase.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.stream; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.planner.plan.nodes.FlinkConventions; +import org.apache.flink.table.planner.plan.nodes.FlinkRelNode; +import org.apache.flink.table.planner.plan.nodes.exec.spec.IntervalJoinSpec; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel; +import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution; +import org.apache.flink.table.planner.plan.utils.IntervalJoinUtil; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; +import org.immutables.value.Value; + +import java.util.Collection; +import java.util.function.Function; + +import scala.Option; + +/** + * Base implementation for rules match stream-stream join, including regular stream join, interval + * join and temporal join. + */ +@Value.Enclosing +public abstract class StreamPhysicalJoinRuleBase< + T extends StreamPhysicalJoinRuleBase.StreamPhysicalJoinRuleBaseRuleConfig> + extends RelRule { + + protected StreamPhysicalJoinRuleBase(T config) { + super(config); + } + + @Override + public void onMatch(RelOptRuleCall call) { + FlinkLogicalJoin join = call.rel(0); + FlinkLogicalRel left = call.rel(1); + FlinkLogicalRel right = call.rel(2); + FlinkRelNode newJoin = + transform( + join, + left, + leftInput -> convertInput(leftInput, computeJoinLeftKeys(join)), + right, + rightInput -> convertInput(rightInput, computeJoinRightKeys(join)), + join.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL())); + call.transformTo(newJoin); + } + + protected Tuple2, Option> extractWindowBounds( + FlinkLogicalJoin join) { + TableConfig tableConfig = ShortcutUtils.unwrapTableConfig(join); + return JavaScalaConversionUtil.toJava( + IntervalJoinUtil.extractWindowBoundsFromPredicate( + join.getCondition(), + join.getLeft().getRowType().getFieldCount(), + join.getRowType(), + join.getCluster().getRexBuilder(), + tableConfig, + ShortcutUtils.unwrapClassLoader(join))); + } + + private RelNode convertInput(RelNode input, Collection columns) { + RelTraitSet requiredTraitSet = toHashTraitByColumns(columns, input.getTraitSet()); + return RelOptRule.convert(input, requiredTraitSet); + } + + private RelTraitSet toHashTraitByColumns( + Collection columns, RelTraitSet inputTraitSet) { + FlinkRelDistribution distribution = + columns.isEmpty() + ? FlinkRelDistribution.SINGLETON() + : FlinkRelDistribution.hash( + columns.stream().mapToInt(Integer::intValue).toArray(), true); + return inputTraitSet.replace(FlinkConventions.STREAM_PHYSICAL()).replace(distribution); + } + + public Collection computeJoinLeftKeys(FlinkLogicalJoin join) { + return join.analyzeCondition().leftKeys; + } + + public Collection computeJoinRightKeys(FlinkLogicalJoin join) { + return join.analyzeCondition().rightKeys; + } + + public abstract FlinkRelNode transform( + FlinkLogicalJoin join, + FlinkRelNode leftInput, + Function leftConversion, + FlinkRelNode rightInput, + Function rightConversion, + RelTraitSet providedTraitSet); + + /** Configuration for {@link StreamPhysicalConstantTableFunctionScanRule}. */ + @Value.Immutable + public interface StreamPhysicalJoinRuleBaseRuleConfig extends RelRule.Config { + RelRule.OperandTransform OPERAND_TRANSFORM = + b0 -> + b0.operand(FlinkLogicalJoin.class) + .inputs( + b1 -> b1.operand(FlinkLogicalRel.class).anyInputs(), + b2 -> b2.operand(FlinkLogicalRel.class).anyInputs()); + StreamPhysicalJoinRuleBase.StreamPhysicalJoinRuleBaseRuleConfig DEFAULT = + ImmutableStreamPhysicalJoinRuleBase.StreamPhysicalJoinRuleBaseRuleConfig.builder() + .build() + .withOperandSupplier(OPERAND_TRANSFORM) + .withDescription("StreamPhysicalJoinRuleBase") + .as(StreamPhysicalJoinRuleBase.StreamPhysicalJoinRuleBaseRuleConfig.class); + + @Override + default StreamPhysicalJoinRuleBase toRule() { + throw new RuntimeException(); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalJoinRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalJoinRule.java new file mode 100644 index 0000000000000..4c7877a0c9431 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalJoinRule.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.stream; + +import org.apache.flink.table.planner.plan.nodes.FlinkRelNode; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTemporalJoin; +import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil; +import org.apache.flink.util.Preconditions; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.immutables.value.Value; + +import java.util.function.Function; + +/** + * Rule that matches a temporal join node and converts it to {@link StreamPhysicalTemporalJoin}, the + * temporal join node is a {@link FlinkLogicalJoin} which contains {@link TEMPORAL_JOIN_CONDITION}. + */ +@Value.Enclosing +public class StreamPhysicalTemporalJoinRule + extends StreamPhysicalJoinRuleBase< + StreamPhysicalTemporalJoinRule.StreamPhysicalTemporalJoinRuleConfig> { + public static final StreamPhysicalTemporalJoinRule INSTANCE = + StreamPhysicalTemporalJoinRuleConfig.DEFAULT.toRule(); + + public StreamPhysicalTemporalJoinRule(StreamPhysicalTemporalJoinRuleConfig config) { + super(config); + } + + @Override + public boolean matches(RelOptRuleCall call) { + FlinkLogicalJoin join = call.rel(0); + if (!TemporalJoinUtil.satisfyTemporalJoin(join)) { + return false; + } + + // validate the join + // INITIAL_TEMPORAL_JOIN_CONDITION should not appear in physical phase. + Preconditions.checkState( + !TemporalJoinUtil.containsInitialTemporalJoinCondition(join.getCondition())); + return true; + } + + @Override + public FlinkRelNode transform( + FlinkLogicalJoin join, + FlinkRelNode leftInput, + Function leftConversion, + FlinkRelNode rightInput, + Function rightConversion, + RelTraitSet providedTraitSet) { + final RelNode newRight; + if (rightInput instanceof FlinkLogicalSnapshot) { + newRight = ((FlinkLogicalSnapshot) rightInput).getInput(); + } else { + newRight = rightInput; + } + + return new StreamPhysicalTemporalJoin( + join.getCluster(), + providedTraitSet, + leftConversion.apply(leftInput), + rightConversion.apply(newRight), + join.getCondition(), + join.getJoinType()); + } + + /** Configuration for {@link StreamPhysicalIntervalJoinRule}. */ + @Value.Immutable + public interface StreamPhysicalTemporalJoinRuleConfig + extends StreamPhysicalJoinRuleBase.StreamPhysicalJoinRuleBaseRuleConfig { + StreamPhysicalTemporalJoinRule.StreamPhysicalTemporalJoinRuleConfig DEFAULT = + ImmutableStreamPhysicalTemporalJoinRule.StreamPhysicalTemporalJoinRuleConfig + .builder() + .build() + .withOperandSupplier(OPERAND_TRANSFORM) + .withDescription("StreamPhysicalTemporalJoinRule") + .as( + StreamPhysicalTemporalJoinRule.StreamPhysicalTemporalJoinRuleConfig + .class); + + @Override + default StreamPhysicalTemporalJoinRule toRule() { + return new StreamPhysicalTemporalJoinRule(this); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.scala deleted file mode 100644 index aad4610020343..0000000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalIntervalJoinRule.scala +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.rules.physical.stream - -import org.apache.flink.table.api.{TableException, ValidationException} -import org.apache.flink.table.planner.calcite.FlinkTypeFactory.isRowtimeIndicatorType -import org.apache.flink.table.planner.plan.nodes.FlinkRelNode -import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin -import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalIntervalJoin -import org.apache.flink.table.planner.plan.utils.IntervalJoinUtil.satisfyIntervalJoin - -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} -import org.apache.calcite.rel.RelNode - -import java.util - -import scala.collection.JavaConversions._ - -/** - * Rule that converts non-SEMI/ANTI [[FlinkLogicalJoin]] with window bounds in join condition to - * [[StreamPhysicalIntervalJoin]]. - */ -class StreamPhysicalIntervalJoinRule - extends StreamPhysicalJoinRuleBase("StreamPhysicalIntervalJoinRule") { - - override def matches(call: RelOptRuleCall): Boolean = { - val join: FlinkLogicalJoin = call.rel(0) - - if (!satisfyIntervalJoin(join)) { - return false - } - - // validate the join - val windowBounds = extractWindowBounds(join)._1.get - - if (windowBounds.isEventTime) { - val leftTimeAttributeType = join.getLeft.getRowType.getFieldList - .get(windowBounds.getLeftTimeIdx) - .getType - val rightTimeAttributeType = join.getRight.getRowType.getFieldList - .get(windowBounds.getRightTimeIdx) - .getType - if (leftTimeAttributeType.getSqlTypeName != rightTimeAttributeType.getSqlTypeName) { - throw new ValidationException( - String.format( - "Interval join with rowtime attribute requires same rowtime types," + - " but the types are %s and %s.", - leftTimeAttributeType.toString, - rightTimeAttributeType.toString - )) - } - } else { - // Check that no event-time attributes are in the input because the processing time window - // join does not correctly hold back watermarks. - // We rely on projection pushdown to remove unused attributes before the join. - val joinRowType = join.getRowType - val containsRowTime = joinRowType.getFieldList.exists(f => isRowtimeIndicatorType(f.getType)) - if (containsRowTime) { - throw new TableException( - "Interval join with proctime attribute requires no event-time attributes are in the " + - "join inputs.") - } - } - true - } - - override protected def computeJoinLeftKeys(join: FlinkLogicalJoin): util.Collection[Integer] = { - val (windowBounds, _) = extractWindowBounds(join) - join - .analyzeCondition() - .leftKeys - .filter(k => windowBounds.get.getLeftTimeIdx != k) - .toList - } - - override protected def computeJoinRightKeys(join: FlinkLogicalJoin): util.Collection[Integer] = { - val (windowBounds, _) = extractWindowBounds(join) - join - .analyzeCondition() - .rightKeys - .filter(k => windowBounds.get.getRightTimeIdx != k) - .toList - } - - override protected def transform( - join: FlinkLogicalJoin, - leftInput: FlinkRelNode, - leftConversion: RelNode => RelNode, - rightInput: FlinkRelNode, - rightConversion: RelNode => RelNode, - providedTraitSet: RelTraitSet): FlinkRelNode = { - val (windowBounds, remainCondition) = extractWindowBounds(join) - new StreamPhysicalIntervalJoin( - join.getCluster, - providedTraitSet, - leftConversion(leftInput), - rightConversion(rightInput), - join.getJoinType, - join.getCondition, - remainCondition.getOrElse(join.getCluster.getRexBuilder.makeLiteral(true)), - windowBounds.get) - } -} - -object StreamPhysicalIntervalJoinRule { - val INSTANCE: RelOptRule = new StreamPhysicalIntervalJoinRule -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.scala deleted file mode 100644 index c59183246f85d..0000000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.scala +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.rules.physical.stream - -import org.apache.flink.table.api.TableException -import org.apache.flink.table.planner.calcite.FlinkTypeFactory -import org.apache.flink.table.planner.plan.nodes.FlinkRelNode -import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalJoin, FlinkLogicalRel, FlinkLogicalSnapshot} -import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalJoin -import org.apache.flink.table.planner.plan.utils.JoinUtil.{accessesTimeAttribute, combineJoinInputsRowType, satisfyRegularJoin} -import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.containsInitialTemporalJoinCondition -import org.apache.flink.util.Preconditions.checkState - -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} -import org.apache.calcite.rel.RelNode - -import scala.collection.JavaConversions._ - -/** - * Rule that converts [[FlinkLogicalJoin]] without window bounds in join condition to - * [[StreamPhysicalJoin]]. - */ -class StreamPhysicalJoinRule extends StreamPhysicalJoinRuleBase("StreamPhysicalJoinRule") { - - override def matches(call: RelOptRuleCall): Boolean = { - val join: FlinkLogicalJoin = call.rel(0) - val left: FlinkLogicalRel = call.rel(1).asInstanceOf[FlinkLogicalRel] - val right: FlinkLogicalRel = call.rel(2).asInstanceOf[FlinkLogicalRel] - - if (!satisfyRegularJoin(join, left, right)) { - return false - } - - // validate the join - if (left.isInstanceOf[FlinkLogicalSnapshot]) { - throw new TableException( - "Temporal table join only support apply FOR SYSTEM_TIME AS OF on the right table.") - } - - // INITIAL_TEMPORAL_JOIN_CONDITION should not appear in physical phase in case which fallback - // to regular join - checkState(!containsInitialTemporalJoinCondition(join.getCondition)) - - // Time attributes must not be in the output type of a regular join - val timeAttrInOutput = join.getRowType.getFieldList - .exists(f => FlinkTypeFactory.isTimeIndicatorType(f.getType)) - checkState(!timeAttrInOutput) - - // Join condition must not access time attributes - val remainingPredsAccessTime = - accessesTimeAttribute(join.getCondition, combineJoinInputsRowType(join)) - checkState(!remainingPredsAccessTime) - true - } - - override protected def transform( - join: FlinkLogicalJoin, - leftInput: FlinkRelNode, - leftConversion: RelNode => RelNode, - rightInput: FlinkRelNode, - rightConversion: RelNode => RelNode, - providedTraitSet: RelTraitSet): FlinkRelNode = { - new StreamPhysicalJoin( - join.getCluster, - providedTraitSet, - leftConversion(leftInput), - rightConversion(rightInput), - join.getCondition, - join.getJoinType, - join.getHints) - } -} - -object StreamPhysicalJoinRule { - val INSTANCE: RelOptRule = new StreamPhysicalJoinRule -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRuleBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRuleBase.scala deleted file mode 100644 index cd8d199b7abb2..0000000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRuleBase.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.rules.physical.stream - -import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution -import org.apache.flink.table.planner.plan.nodes.{FlinkConventions, FlinkRelNode} -import org.apache.flink.table.planner.plan.nodes.exec.spec.IntervalJoinSpec.WindowBounds -import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalJoin, FlinkLogicalRel} -import org.apache.flink.table.planner.plan.utils.IntervalJoinUtil -import org.apache.flink.table.planner.utils.ShortcutUtils.{unwrapClassLoader, unwrapTableConfig} - -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} -import org.apache.calcite.plan.RelOptRule.{any, operand} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rex.RexNode - -import java.util - -/** - * Base implementation for rules match stream-stream join, including regular stream join, interval - * join and temporal join. - */ -abstract class StreamPhysicalJoinRuleBase(description: String) - extends RelOptRule( - operand( - classOf[FlinkLogicalJoin], - operand(classOf[FlinkLogicalRel], any()), - operand(classOf[FlinkLogicalRel], any())), - description) { - - protected def extractWindowBounds( - join: FlinkLogicalJoin): (Option[WindowBounds], Option[RexNode]) = { - val tableConfig = unwrapTableConfig(join) - IntervalJoinUtil.extractWindowBoundsFromPredicate( - join.getCondition, - join.getLeft.getRowType.getFieldCount, - join.getRowType, - join.getCluster.getRexBuilder, - tableConfig, - unwrapClassLoader(join)) - } - - override def onMatch(call: RelOptRuleCall): Unit = { - val join = call.rel[FlinkLogicalJoin](0) - val left = call.rel[FlinkLogicalRel](1) - val right = call.rel[FlinkLogicalRel](2) - - def toHashTraitByColumns( - columns: util.Collection[_ <: Number], - inputTraitSet: RelTraitSet): RelTraitSet = { - val distribution = if (columns.size() == 0) { - FlinkRelDistribution.SINGLETON - } else { - FlinkRelDistribution.hash(columns) - } - inputTraitSet - .replace(FlinkConventions.STREAM_PHYSICAL) - .replace(distribution) - } - - def convertInput(input: RelNode, columns: util.Collection[_ <: Number]): RelNode = { - val requiredTraitSet = toHashTraitByColumns(columns, input.getTraitSet) - RelOptRule.convert(input, requiredTraitSet) - } - - val newJoin = transform( - join, - left, - leftInput => { - convertInput(leftInput, computeJoinLeftKeys(join)) - }, - right, - rightInput => { - convertInput(rightInput, computeJoinRightKeys(join)) - }, - join.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL) - ) - call.transformTo(newJoin) - } - - protected def computeJoinLeftKeys(join: FlinkLogicalJoin): util.Collection[Integer] = - join.analyzeCondition().leftKeys - - protected def computeJoinRightKeys(join: FlinkLogicalJoin): util.Collection[Integer] = - join.analyzeCondition().rightKeys - - protected def transform( - join: FlinkLogicalJoin, - leftInput: FlinkRelNode, - leftConversion: RelNode => RelNode, - rightInput: FlinkRelNode, - rightConversion: RelNode => RelNode, - providedTraitSet: RelTraitSet): FlinkRelNode -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalJoinRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalJoinRule.scala deleted file mode 100644 index 0e3a98462823f..0000000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalJoinRule.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.rules.physical.stream - -import org.apache.flink.table.planner.plan.nodes.FlinkRelNode -import org.apache.flink.table.planner.plan.nodes.logical._ -import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTemporalJoin -import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.{containsInitialTemporalJoinCondition, satisfyTemporalJoin} -import org.apache.flink.util.Preconditions.checkState - -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} -import org.apache.calcite.rel.RelNode - -/** - * Rule that matches a temporal join node and converts it to [[StreamPhysicalTemporalJoin]], the - * temporal join node is a [[FlinkLogicalJoin]] which contains [[TEMPORAL_JOIN_CONDITION]]. - */ -class StreamPhysicalTemporalJoinRule - extends StreamPhysicalJoinRuleBase("StreamPhysicalTemporalJoinRule") { - - override def matches(call: RelOptRuleCall): Boolean = { - val join = call.rel[FlinkLogicalJoin](0) - if (!satisfyTemporalJoin(join)) { - return false - } - - // validate the join - // INITIAL_TEMPORAL_JOIN_CONDITION should not appear in physical phase. - checkState(!containsInitialTemporalJoinCondition(join.getCondition)) - true - } - - override protected def transform( - join: FlinkLogicalJoin, - leftInput: FlinkRelNode, - leftConversion: RelNode => RelNode, - rightInput: FlinkRelNode, - rightConversion: RelNode => RelNode, - providedTraitSet: RelTraitSet): FlinkRelNode = { - val newRight = rightInput match { - case snapshot: FlinkLogicalSnapshot => - snapshot.getInput - case rel: FlinkLogicalRel => rel - } - new StreamPhysicalTemporalJoin( - join.getCluster, - providedTraitSet, - leftConversion(leftInput), - rightConversion(newRight), - join.getCondition, - join.getJoinType) - } -} - -object StreamPhysicalTemporalJoinRule { - val INSTANCE: RelOptRule = new StreamPhysicalTemporalJoinRule -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala index 58b2a2d55260f..4d06a1607aea6 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala @@ -70,7 +70,7 @@ object IntervalJoinUtil { * @return * A Tuple2 of extracted window bounds and remaining predicates. */ - private[flink] def extractWindowBoundsFromPredicate( + def extractWindowBoundsFromPredicate( predicate: RexNode, leftLogicalFieldCnt: Int, joinRowType: RelDataType,