diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRule.java
new file mode 100644
index 0000000000000..1467c2b70ef5f
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRule.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
+import org.apache.flink.table.planner.calcite.FlinkRelFactories;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.planner.plan.utils.ExpandUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.immutables.value.Value;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * This rule rewrites an aggregation query with grouping sets into an regular aggregation query with
+ * expand.
+ *
+ *
This rule duplicates the input data by two or more times (# number of groupSets + an optional
+ * non-distinct group). This will put quite a bit of memory pressure of the used aggregate and
+ * exchange operators.
+ *
+ *
This rule will be used for the plan with grouping sets or the plan with distinct aggregations
+ * after {@link FlinkAggregateExpandDistinctAggregatesRule} applied.
+ *
+ *
`FlinkAggregateExpandDistinctAggregatesRule` rewrites an aggregate query with distinct
+ * aggregations into an expanded double aggregation. The first aggregate has grouping sets in which
+ * the regular aggregation expressions and every distinct clause are aggregated in a separate group.
+ * The results are then combined in a second aggregate.
+ *
+ *
Examples:
+ *
+ * MyTable: a: INT, b: BIGINT, c: VARCHAR(32), d: VARCHAR(32)
+ *
+ * Original records:
+ * | a | b | c | d |
+ * |:-:|:-:|:--:|:--:|
+ * | 1 | 1 | c1 | d1 |
+ * | 1 | 2 | c1 | d2 |
+ * | 2 | 1 | c1 | d1 |
+ *
+ * Example1 (expand for DISTINCT aggregates):
+ *
+ * SQL: SELECT a, SUM(DISTINCT b) as t1, COUNT(DISTINCT c) as t2, COUNT(d) as t3 FROM MyTable GROUP
+ * BY a
+ *
+ * Logical plan:
+ * {@code
+ * LogicalAggregate(group=[{0}], t1=[SUM(DISTINCT $1)], t2=[COUNT(DISTINCT $2)], t3=[COUNT($3)])
+ * LogicalTableScan(table=[[builtin, default, MyTable]])
+ * }
+ *
+ * Logical plan after `FlinkAggregateExpandDistinctAggregatesRule` applied:
+ * {@code
+ * LogicalProject(a=[$0], t1=[$1], t2=[$2], t3=[CAST($3):BIGINT NOT NULL])
+ * LogicalProject(a=[$0], t1=[$1], t2=[$2], $f3=[CASE(IS NOT NULL($3), $3, 0)])
+ * LogicalAggregate(group=[{0}], t1=[SUM($1) FILTER $4], t2=[COUNT($2) FILTER $5],
+ * t3=[MIN($3) FILTER $6])
+ * LogicalProject(a=[$0], b=[$1], c=[$2], t3=[$3], $g_1=[=($4, 1)], $g_2=[=($4, 2)],
+ * $g_3=[=($4, 3)])
+ * LogicalAggregate(group=[{0, 1, 2}], groups=[[{0, 1}, {0, 2}, {0}]], t3=[COUNT($3)],
+ * $g=[GROUPING($0, $1, $2)])
+ * LogicalTableScan(table=[[builtin, default, MyTable]])
+ * }
+ *
+ * Logical plan after this rule applied:
+ * {@code
+ * LogicalCalc(expr#0..3=[{inputs}], expr#4=[IS NOT NULL($t3)], ...)
+ * LogicalAggregate(group=[{0}], t1=[SUM($1) FILTER $4], t2=[COUNT($2) FILTER $5],
+ * t3=[MIN($3) FILTER $6])
+ * LogicalCalc(expr#0..4=[{inputs}], ... expr#10=[CASE($t6, $t5, $t8, $t7, $t9)],
+ * expr#11=[1], expr#12=[=($t10, $t11)], ... $g_1=[$t12], ...)
+ * LogicalAggregate(group=[{0, 1, 2, 4}], groups=[[]], t3=[COUNT($3)])
+ * LogicalExpand(projects=[{a=[$0], b=[$1], c=[null], d=[$3], $e=[1]},
+ * {a=[$0], b=[null], c=[$2], d=[$3], $e=[2]}, {a=[$0], b=[null], c=[null], d=[$3], $e=[3]}])
+ * LogicalTableSourceScan(table=[[builtin, default, MyTable]], fields=[a, b, c, d])
+ * }
+ *
+ * '$e = 1' is equivalent to 'group by a, b' '$e = 2' is equivalent to 'group by a, c' '$e = 3' is
+ * equivalent to 'group by a'
+ *
+ * Expanded records: \+-----+-----+-----+-----+-----+ \| a | b | c | d | $e |
+ * \+-----+-----+-----+-----+-----+ ---+--- \| 1 | 1 | null| d1 | 1 | |
+ * \+-----+-----+-----+-----+-----+ | \| 1 | null| c1 | d1 | 2 | records expanded by record1
+ * \+-----+-----+-----+-----+-----+ | \| 1 | null| null| d1 | 3 | | \+-----+-----+-----+-----+-----+
+ * ---+--- \| 1 | 2 | null| d2 | 1 | | \+-----+-----+-----+-----+-----+ | \| 1 | null| c1 | d2 | 2 |
+ * records expanded by record2 \+-----+-----+-----+-----+-----+ | \| 1 | null| null| d2 | 3 | |
+ * \+-----+-----+-----+-----+-----+ ---+--- \| 2 | 1 | null| d1 | 1 | |
+ * \+-----+-----+-----+-----+-----+ | \| 2 | null| c1 | d1 | 2 | records expanded by record3
+ * \+-----+-----+-----+-----+-----+ | \| 2 | null| null| d1 | 3 | | \+-----+-----+-----+-----+-----+
+ * ---+---
+ *
+ * Example2 (Some fields are both in DISTINCT aggregates and non-DISTINCT aggregates):
+ *
+ * SQL: SELECT MAX(a) as t1, COUNT(DISTINCT a) as t2, count(DISTINCT d) as t3 FROM MyTable
+ *
+ * Field `a` is both in DISTINCT aggregate and `MAX` aggregate, so, `a` should be outputted as two
+ * individual fields, one is for `MAX` aggregate, another is for DISTINCT aggregate.
+ *
+ * Expanded records: \+-----+-----+-----+-----+ \| a | d | $e | a_0 | \+-----+-----+-----+-----+
+ * ---+--- \| 1 | null| 1 | 1 | | \+-----+-----+-----+-----+ | \| null| d1 | 2 | 1 | records
+ * expanded by record1 \+-----+-----+-----+-----+ | \| null| null| 3 | 1 | |
+ * \+-----+-----+-----+-----+ ---+--- \| 1 | null| 1 | 1 | | \+-----+-----+-----+-----+ | \| null|
+ * d2 | 2 | 1 | records expanded by record2 \+-----+-----+-----+-----+ | \| null| null| 3 | 1 | |
+ * \+-----+-----+-----+-----+ ---+--- \| 2 | null| 1 | 2 | | \+-----+-----+-----+-----+ | \| null|
+ * d1 | 2 | 2 | records expanded by record3 \+-----+-----+-----+-----+ | \| null| null| 3 | 2 | |
+ * \+-----+-----+-----+-----+ ---+---
+ *
+ * Example3 (expand for CUBE/ROLLUP/GROUPING SETS):
+ *
+ * SQL: SELECT a, c, SUM(b) as b FROM MyTable GROUP BY GROUPING SETS (a, c)
+ *
+ * Logical plan:
+ * {@code
+ * LogicalAggregate(group=[{0, 1}], groups=[[{0}, {1}]], b=[SUM($2)])
+ * LogicalProject(a=[$0], c=[$2], b=[$1])
+ * LogicalTableScan(table=[[builtin, default, MyTable]])
+ * }
+ *
+ * Logical plan after this rule applied:
+ * {@code
+ * LogicalCalc(expr#0..3=[{inputs}], proj#0..1=[{exprs}], b=[$t3])
+ * LogicalAggregate(group=[{0, 2, 3}], groups=[[]], b=[SUM($1)])
+ * LogicalExpand(projects=[{a=[$0], b=[$1], c=[null], $e=[1]},
+ * {a=[null], b=[$1], c=[$2], $e=[2]}])
+ * LogicalNativeTableScan(table=[[builtin, default, MyTable]])
+ * }
+ *
+ * '$e = 1' is equivalent to 'group by a' '$e = 2' is equivalent to 'group by c'
+ *
+ * Expanded records: \+-----+-----+-----+-----+ \| a | b | c | $e | \+-----+-----+-----+-----+
+ * ---+--- \| 1 | 1 | null| 1 | | \+-----+-----+-----+-----+ records expanded by record1 \| null| 1
+ * \| c1 | 2 | | \+-----+-----+-----+-----+ ---+--- \| 1 | 2 | null| 1 | |
+ * \+-----+-----+-----+-----+ records expanded by record2 \| null| 2 | c1 | 2 | |
+ * \+-----+-----+-----+-----+ ---+--- \| 2 | 1 | null| 1 | | \+-----+-----+-----+-----+ records
+ * expanded by record3 \| null| 1 | c1 | 2 | | \+-----+-----+-----+-----+ ---+---
+ *
+ */
+@Value.Enclosing
+public class DecomposeGroupingSetsRule
+ extends RelRule {
+ public static final DecomposeGroupingSetsRule INSTANCE =
+ DecomposeGroupingSetsRule.DecomposeGroupingSetsRuleConfig.DEFAULT.toRule();
+
+ protected DecomposeGroupingSetsRule(DecomposeGroupingSetsRuleConfig config) {
+ super(config);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ LogicalAggregate agg = call.rel(0);
+ List