Skip to content

Commit 0622594

Browse files
committed
FLINK-38569 DeltaJoin:avoid generating ReadingMetadataSpec when no metadata keys
1 parent ded9980 commit 0622594

File tree

6 files changed

+74
-29
lines changed

6 files changed

+74
-29
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -498,9 +498,16 @@ public static void validateAndApplyMetadata(
498498
.collect(Collectors.toList());
499499
final DataType producedDataType =
500500
TypeConversions.fromLogicalToDataType(createProducedType(schema, source));
501-
sourceAbilities.add(
502-
new ReadingMetadataSpec(metadataKeys, (RowType) producedDataType.getLogicalType()));
501+
502+
// Apply metadata setting to source (FLINK-23911)
503503
metadataSource.applyReadableMetadata(metadataKeys, producedDataType);
504+
505+
// Only add ReadingMetadataSpec if non-empty
506+
if (!metadataKeys.isEmpty()) {
507+
sourceAbilities.add(
508+
new ReadingMetadataSpec(
509+
metadataKeys, (RowType) producedDataType.getLogicalType()));
510+
}
504511
}
505512

506513
private static void validateScanSource(

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,15 @@
3535
import org.apache.flink.table.planner.plan.rules.logical.PushProjectIntoTableSourceScanRule;
3636
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
3737
import org.apache.flink.table.types.logical.RowType;
38+
import org.apache.flink.table.types.utils.TypeConversions;
3839

3940
import org.apache.calcite.rel.RelNode;
4041
import org.apache.calcite.rex.RexBuilder;
4142
import org.apache.calcite.rex.RexProgramBuilder;
4243

4344
import java.util.ArrayList;
4445
import java.util.Arrays;
46+
import java.util.Collections;
4547
import java.util.Comparator;
4648
import java.util.HashMap;
4749
import java.util.HashSet;
@@ -212,6 +214,15 @@ public List<RelNode> reuseDuplicatedScan(List<RelNode> relNodes) {
212214
ScanTableSource newTableSource =
213215
tableSourceSpec.getScanTableSource(flinkContext, flinkTypeFactory);
214216

217+
// FLINK-23911: Ensure source is told "read zero metadata" even when
218+
// no ReadingMetadataSpec was added (FLINK-38569)
219+
if (newTableSource instanceof SupportsReadingMetadata && allMetaKeys.isEmpty()) {
220+
((SupportsReadingMetadata) newTableSource)
221+
.applyReadableMetadata(
222+
Collections.emptyList(),
223+
TypeConversions.fromLogicalToDataType(newSourceType));
224+
}
225+
215226
TableSourceTable newSourceTable =
216227
pickTable.replace(
217228
newTableSource,
@@ -278,7 +289,8 @@ private static RowType applyPhysicalAndMetadataPushDown(
278289
sourceAbilitySpecs.add(
279290
new ProjectPushDownSpec(projectedPhysicalFields, newProducedType));
280291
}
281-
if (supportsReadingMeta) {
292+
293+
if (supportsReadingMeta && !usedMetadataNames.isEmpty()) {
282294
sourceAbilitySpecs.add(new ReadingMetadataSpec(usedMetadataNames, newProducedType));
283295
}
284296
return newProducedType;

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.flink.table.planner.plan.utils.NestedSchema;
4141
import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
4242
import org.apache.flink.table.types.logical.RowType;
43+
import org.apache.flink.table.types.utils.TypeConversions;
4344

4445
import org.apache.calcite.plan.RelOptRule;
4546
import org.apache.calcite.plan.RelOptRuleCall;
@@ -86,6 +87,20 @@
8687
public class PushProjectIntoTableSourceScanRule
8788
extends RelRule<PushProjectIntoTableSourceScanRule.Config> {
8889

90+
/**
91+
* Result of performPushDown containing the new type and metadata keys for
92+
* FLINK-23911/FLINK-38569.
93+
*/
94+
private static class PushDownResult {
95+
final RowType newProducedType;
96+
final List<String> projectedMetadataKeys;
97+
98+
PushDownResult(RowType newProducedType, List<String> projectedMetadataKeys) {
99+
this.newProducedType = newProducedType;
100+
this.projectedMetadataKeys = projectedMetadataKeys;
101+
}
102+
}
103+
89104
public static final PushProjectIntoTableSourceScanRule INSTANCE =
90105
new PushProjectIntoTableSourceScanRule(
91106
PushProjectIntoTableSourceScanRule.Config.DEFAULT);
@@ -168,13 +183,23 @@ public void onMatch(RelOptRuleCall call) {
168183
}
169184

170185
final List<SourceAbilitySpec> abilitySpecs = new ArrayList<>();
171-
final RowType newProducedType =
186+
final PushDownResult result =
172187
performPushDown(sourceTable, projectedSchema, producedType, abilitySpecs);
188+
final RowType newProducedType = result.newProducedType;
173189

174190
final DynamicTableSource newTableSource = sourceTable.tableSource().copy();
175191
final SourceAbilityContext context = SourceAbilityContext.from(scan);
176192
abilitySpecs.forEach(spec -> spec.apply(newTableSource, context));
177193

194+
// FLINK-23911: Ensure copied source is told "read zero metadata" even when
195+
// no ReadingMetadataSpec was added (FLINK-38569)
196+
if (supportsMetadata(sourceTable.tableSource()) && result.projectedMetadataKeys.isEmpty()) {
197+
((SupportsReadingMetadata) newTableSource)
198+
.applyReadableMetadata(
199+
Collections.emptyList(),
200+
TypeConversions.fromLogicalToDataType(newProducedType));
201+
}
202+
178203
final RelDataType newRowType = typeFactory.buildRelNodeRowType(newProducedType);
179204
final TableSourceTable newSource =
180205
sourceTable.copy(
@@ -260,7 +285,7 @@ private List<RexNode> getPrimaryKeyProjections(LogicalTableScan scan) {
260285
.collect(Collectors.toList());
261286
}
262287

263-
private RowType performPushDown(
288+
private PushDownResult performPushDown(
264289
TableSourceTable source,
265290
NestedSchema projectedSchema,
266291
RowType producedType,
@@ -328,9 +353,10 @@ private RowType performPushDown(
328353
final RowType newProducedType =
329354
(RowType) Projection.of(projectedFields).project(producedType);
330355

356+
List<String> projectedMetadataKeys = Collections.emptyList();
331357
if (supportsMetadata(source.tableSource())) {
332358
// Use the projected column name to get the metadata key
333-
final List<String> projectedMetadataKeys =
359+
projectedMetadataKeys =
334360
projectedMetadataColumns.stream()
335361
.map(
336362
nestedColumn ->
@@ -348,10 +374,12 @@ private RowType performPushDown(
348374
.map(col -> col.getMetadataKey().orElse(col.getName()))
349375
.collect(Collectors.toList());
350376

351-
abilitySpecs.add(new ReadingMetadataSpec(projectedMetadataKeys, newProducedType));
377+
if (!projectedMetadataKeys.isEmpty()) {
378+
abilitySpecs.add(new ReadingMetadataSpec(projectedMetadataKeys, newProducedType));
379+
}
352380
}
353381

354-
return newProducedType;
382+
return new PushDownResult(newProducedType, projectedMetadataKeys);
355383
}
356384

357385
private List<RexNode> rewriteProjections(

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,6 @@ public class DeltaJoinUtil {
113113
FilterPushDownSpec.class,
114114
ProjectPushDownSpec.class,
115115
PartitionPushDownSpec.class,
116-
// TODO FLINK-38569 ReadingMetadataSpec should not be generated when there are
117-
// no metadata keys to be read
118116
ReadingMetadataSpec.class);
119117

120118
private DeltaJoinUtil() {}

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ LogicalProject(id=[$0], nestedName=[$1.nested1.name], nestedSum=[+($3..value, $3
6767
<Resource name="optimized rel plan">
6868
<![CDATA[
6969
LogicalProject(id=[$0], nestedName=[$1], nestedSum=[+($2, $3)])
70-
+- LogicalTableScan(table=[[default_catalog, default_database, NestedTable, project=[id, deepNested_nested1_name, deepNestedWith._.value, deepNestedWith._nested_.value], metadata=[]]])
70+
+- LogicalTableScan(table=[[default_catalog, default_database, NestedTable, project=[id, deepNested_nested1_name, deepNestedWith._.value, deepNestedWith._nested_.value]]])
7171
]]>
7272
</Resource>
7373
</TestCase>
@@ -207,7 +207,7 @@ LogicalProject(id=[$0], nestedName=[$1.nested1.name], nestedValue=[$2.value], ne
207207
<Resource name="optimized rel plan">
208208
<![CDATA[
209209
LogicalProject(id=[$0], nestedName=[$1], nestedValue=[$4], nestedFlag=[$2], nestedNum=[$3])
210-
+- LogicalTableScan(table=[[default_catalog, default_database, NestedTable, project=[id, deepNested_nested1_name, deepNested_nested2_flag, deepNested_nested2_num, nested_value], metadata=[]]])
210+
+- LogicalTableScan(table=[[default_catalog, default_database, NestedTable, project=[id, deepNested_nested1_name, deepNested_nested2_flag, deepNested_nested2_num, nested_value]]])
211211
]]>
212212
</Resource>
213213
</TestCase>
@@ -224,7 +224,7 @@ LogicalProject(EXPR$0=[ITEM($2.Mid.data_arr, $0).value], EXPR$1=[ITEM($2.Mid.dat
224224
<Resource name="optimized rel plan">
225225
<![CDATA[
226226
LogicalProject(EXPR$0=[ITEM($0, $2).value], EXPR$1=[ITEM($1, _UTF-16LE'item').value])
227-
+- LogicalTableScan(table=[[default_catalog, default_database, NestedItemTable, project=[Result_Mid_data_arr, Result_Mid_data_map, ID], metadata=[]]])
227+
+- LogicalTableScan(table=[[default_catalog, default_database, NestedItemTable, project=[Result_Mid_data_arr, Result_Mid_data_map, ID]]])
228228
]]>
229229
</Resource>
230230
</TestCase>
@@ -241,7 +241,7 @@ LogicalProject(EXPR$0=[ITEM($2.Mid.data_arr, 2).value], EXPR$1=[ITEM($2.Mid.data
241241
<Resource name="optimized rel plan">
242242
<![CDATA[
243243
LogicalProject(EXPR$0=[ITEM($0.data_arr, 2).value], EXPR$1=[ITEM($0.data_arr, $1).value], EXPR$2=[ITEM($0.data_map, _UTF-16LE'item').value], Mid=[$0])
244-
+- LogicalTableScan(table=[[default_catalog, default_database, NestedItemTable, project=[Result_Mid, ID], metadata=[]]])
244+
+- LogicalTableScan(table=[[default_catalog, default_database, NestedItemTable, project=[Result_Mid, ID]]])
245245
]]>
246246
</Resource>
247247
</TestCase>
@@ -258,7 +258,7 @@ LogicalProject(EXPR$0=[ITEM($2.Mid.data_arr, 2).value], data_arr=[$2.Mid.data_ar
258258
<Resource name="optimized rel plan">
259259
<![CDATA[
260260
LogicalProject(EXPR$0=[ITEM($0, 2).value], data_arr=[$0])
261-
+- LogicalTableScan(table=[[default_catalog, default_database, NestedItemTable, project=[Result_Mid_data_arr], metadata=[]]])
261+
+- LogicalTableScan(table=[[default_catalog, default_database, NestedItemTable, project=[Result_Mid_data_arr]]])
262262
]]>
263263
</Resource>
264264
</TestCase>
@@ -275,7 +275,7 @@ LogicalProject(EXPR$0=[CAST(ITEM(CAST($5.result):RecordType:peek_no_expand(Recor
275275
<Resource name="optimized rel plan">
276276
<![CDATA[
277277
LogicalProject(EXPR$0=[CAST(ITEM(CAST($0.result):RecordType:peek_no_expand(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL symbol) NOT NULL meta) NOT NULL ARRAY, 1).meta.symbol):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
278-
+- LogicalTableScan(table=[[default_catalog, default_database, ItemTable, project=[chart], metadata=[]]])
278+
+- LogicalTableScan(table=[[default_catalog, default_database, ItemTable, project=[chart]]])
279279
]]>
280280
</Resource>
281281
</TestCase>
@@ -292,7 +292,7 @@ LogicalProject(EXPR$0=[ITEM($2.data_arr, $0).value], EXPR$1=[ITEM($2.data_map, _
292292
<Resource name="optimized rel plan">
293293
<![CDATA[
294294
LogicalProject(EXPR$0=[ITEM($0.data_arr, $1).value], EXPR$1=[ITEM($0.data_map, _UTF-16LE'item').value], EXPR$2=[ITEM($2, 1)], EXPR$3=[ITEM($2, $1)], EXPR$4=[ITEM($3, _UTF-16LE'item')])
295-
+- LogicalTableScan(table=[[default_catalog, default_database, ItemTable, project=[Result, ID, outer_array, outer_map], metadata=[]]])
295+
+- LogicalTableScan(table=[[default_catalog, default_database, ItemTable, project=[Result, ID, outer_array, outer_map]]])
296296
]]>
297297
</Resource>
298298
</TestCase>
@@ -329,7 +329,7 @@ LogicalProject(id=[$0], EXPR$1=[ITEM($5, _UTF-16LE'e')])
329329
<Resource name="optimized rel plan">
330330
<![CDATA[
331331
LogicalProject(id=[$0], EXPR$1=[ITEM($1, _UTF-16LE'e')])
332-
+- LogicalTableScan(table=[[default_catalog, default_database, NestedTable, project=[id, testMap], metadata=[]]])
332+
+- LogicalTableScan(table=[[default_catalog, default_database, NestedTable, project=[id, testMap]]])
333333
]]>
334334
</Resource>
335335
</TestCase>
@@ -346,7 +346,7 @@ LogicalProject(a=[$0], EXPR$1=[TRIM(FLAG(BOTH), _UTF-16LE' ', $2)])
346346
<Resource name="optimized rel plan">
347347
<![CDATA[
348348
LogicalProject(a=[$0], EXPR$1=[TRIM(FLAG(BOTH), _UTF-16LE' ', $1)])
349-
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, project=[a, c], metadata=[]]])
349+
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, project=[a, c]]])
350350
]]>
351351
</Resource>
352352
</TestCase>
@@ -433,7 +433,7 @@ LogicalProject(a=[$0], c=[$2])
433433
</Resource>
434434
<Resource name="optimized rel plan">
435435
<![CDATA[
436-
LogicalTableScan(table=[[default_catalog, default_database, MyTable, project=[a, c], metadata=[]]])
436+
LogicalTableScan(table=[[default_catalog, default_database, MyTable, project=[a, c]]])
437437
]]>
438438
</Resource>
439439
</TestCase>

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ LogicalSink(table=[default_catalog.default_database.tmp_snk], fields=[a0, a1])
168168
<Resource name="optimized rel plan">
169169
<![CDATA[
170170
Sink(table=[default_catalog.default_database.tmp_snk], fields=[a0, a1])
171-
+- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a0, a1], metadata=[]]], fields=[a0, a1])
171+
+- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a0, a1]]], fields=[a0, a1])
172172
]]>
173173
</Resource>
174174
</TestCase>
@@ -310,7 +310,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[0],[1],[4],[5
310310
+- Calc(select=[a0, a1, null:VARCHAR(2147483647) AS EXPR$2, null:INTEGER AS EXPR$3, b0, b2, b1])
311311
+- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a2, a1, b1, b2, b0])
312312
:- Exchange(distribution=[hash[a1, a2]])
313-
: +- TableSourceScan(table=[[default_catalog, default_database, src1, filter=[>(a0, 1)], project=[a0, a2, a1], metadata=[]]], fields=[a0, a2, a1])
313+
: +- TableSourceScan(table=[[default_catalog, default_database, src1, filter=[>(a0, 1)], project=[a0, a2, a1]]], fields=[a0, a2, a1])
314314
+- Exchange(distribution=[hash[b1, b2]])
315315
+- Calc(select=[b1, b2, b0], where=[<(b1, 10)])
316316
+- TableSourceScan(table=[[default_catalog, default_database, src2, filter=[<>(b0, 0)]]], fields=[b0, b2, b1])
@@ -838,7 +838,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[0],[1],[4],[5
838838
+- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a2, a1, b0, b2, b1], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
839839
:- Exchange(distribution=[hash[a1, a2]])
840840
: +- Calc(select=[a0, a2, a1], where=[>(a0, RAND(10))])
841-
: +- TableSourceScan(table=[[default_catalog, default_database, src1, filter=[], project=[a0, a1, a2], metadata=[]]], fields=[a0, a1, a2])
841+
: +- TableSourceScan(table=[[default_catalog, default_database, src1, filter=[], project=[a0, a1, a2]]], fields=[a0, a1, a2])
842842
+- Exchange(distribution=[hash[b1, b2]])
843843
+- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1])
844844
]]>
@@ -908,7 +908,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[0],[1],[4],[5
908908
+- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a2, a1, b0, b2, b1], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
909909
:- Exchange(distribution=[hash[a1, a2]])
910910
: +- Calc(select=[a0, a2, a1], where=[>(a0, RAND(10))])
911-
: +- TableSourceScan(table=[[default_catalog, default_database, src1, filter=[], project=[a0, a1, a2], metadata=[]]], fields=[a0, a1, a2])
911+
: +- TableSourceScan(table=[[default_catalog, default_database, src1, filter=[], project=[a0, a1, a2]]], fields=[a0, a1, a2])
912912
+- Exchange(distribution=[hash[b1, b2]])
913913
+- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1])
914914
]]>
@@ -1046,7 +1046,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[0],[4],[5]],
10461046
+- Calc(select=[a0, null:DOUBLE AS EXPR$1, null:VARCHAR(2147483647) AS EXPR$2, null:INTEGER AS EXPR$3, b0, b2, null:DOUBLE AS EXPR$6])
10471047
+- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a2, a1, b0, b2, b1])
10481048
:- Exchange(distribution=[hash[a1, a2]])
1049-
: +- TableSourceScan(table=[[default_catalog, default_database, src1WithPartition, partitions=[{pt=1}], project=[a0, a2, a1], metadata=[]]], fields=[a0, a2, a1])
1049+
: +- TableSourceScan(table=[[default_catalog, default_database, src1WithPartition, partitions=[{pt=1}], project=[a0, a2, a1]]], fields=[a0, a2, a1])
10501050
+- Exchange(distribution=[hash[b1, b2]])
10511051
+- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1])
10521052
]]>
@@ -1097,7 +1097,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[0],[1],[2],[4
10971097
+- Calc(select=[a0, a1, a2, null:INTEGER AS EXPR$3, b0, b2, b1])
10981098
+- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a1, a2, b0, b2, b1])
10991099
:- Exchange(distribution=[hash[a1, a2]])
1100-
: +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a0, a1, a2], metadata=[]]], fields=[a0, a1, a2])
1100+
: +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a0, a1, a2]]], fields=[a0, a1, a2])
11011101
+- Exchange(distribution=[hash[b1, b2]])
11021102
+- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1])
11031103
]]>
@@ -1123,7 +1123,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[1],[2],[0],[4
11231123
+- Calc(select=[a0 AS a2, a1 AS a0, a2 AS a1, null:INTEGER AS EXPR$3, b0, b2, b1])
11241124
+- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a1, a2, a0, b0, b2, b1])
11251125
:- Exchange(distribution=[hash[a1, a2]])
1126-
: +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a1, a2, a0], metadata=[]]], fields=[a1, a2, a0])
1126+
: +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a1, a2, a0]]], fields=[a1, a2, a0])
11271127
+- Exchange(distribution=[hash[b1, b2]])
11281128
+- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1])
11291129
]]>
@@ -1150,7 +1150,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[0],[1],[2],[4
11501150
+- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a1, a2, b0, b2, b1], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
11511151
:- Exchange(distribution=[hash[a1, a2]])
11521152
: +- Calc(select=[a0, a1, SUBSTRING(a2, 2) AS a2])
1153-
: +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a0, a1, a2], metadata=[]]], fields=[a0, a1, a2])
1153+
: +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a0, a1, a2]]], fields=[a0, a1, a2])
11541154
+- Exchange(distribution=[hash[b1, b2]])
11551155
+- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1])
11561156
]]>
@@ -1177,7 +1177,7 @@ Sink(table=[default_catalog.default_database.snk], targetColumns=[[0],[1],[2],[4
11771177
+- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, a1, a2, b0, b2, b1])
11781178
:- Exchange(distribution=[hash[a1, a2]])
11791179
: +- Calc(select=[a0, a1, SUBSTRING(a2, 2) AS a2])
1180-
: +- TableSourceScan(table=[[default_catalog, default_database, src1WithMultiIndexes, project=[a0, a1, a2], metadata=[]]], fields=[a0, a1, a2])
1180+
: +- TableSourceScan(table=[[default_catalog, default_database, src1WithMultiIndexes, project=[a0, a1, a2]]], fields=[a0, a1, a2])
11811181
+- Exchange(distribution=[hash[b1, b2]])
11821182
+- TableSourceScan(table=[[default_catalog, default_database, src2]], fields=[b0, b2, b1])
11831183
]]>

0 commit comments

Comments
 (0)