Skip to content

Commit b99cf55

Browse files
committed
[FLINK-38848] Supports to pass the required privilege when catalog manager loads the table
1 parent fec9336 commit b99cf55

File tree

5 files changed

+98
-7
lines changed

5 files changed

+98
-7
lines changed

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.flink.table.catalog.ResolvedCatalogTable;
4545
import org.apache.flink.table.catalog.ResolvedSchema;
4646
import org.apache.flink.table.catalog.SchemaTranslator;
47+
import org.apache.flink.table.catalog.TableWritePrivilege;
4748
import org.apache.flink.table.catalog.UnresolvedIdentifier;
4849
import org.apache.flink.table.expressions.ApiExpressionUtils;
4950
import org.apache.flink.table.expressions.Expression;
@@ -60,11 +61,14 @@
6061
import org.apache.flink.table.operations.utils.OperationExpressionsUtils.CategorizedExpressions;
6162
import org.apache.flink.table.operations.utils.OperationTreeBuilder;
6263

64+
import org.apache.flink.shaded.guava33.com.google.common.collect.Sets;
65+
6366
import java.util.ArrayList;
6467
import java.util.Arrays;
6568
import java.util.Collections;
6669
import java.util.List;
6770
import java.util.Optional;
71+
import java.util.Set;
6872
import java.util.concurrent.atomic.AtomicInteger;
6973
import java.util.stream.Collectors;
7074
import java.util.stream.Stream;
@@ -428,8 +432,9 @@ public TablePipeline insertInto(String tablePath, boolean overwrite) {
428432
tableEnvironment.getParser().parseIdentifier(tablePath);
429433
ObjectIdentifier objectIdentifier =
430434
tableEnvironment.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
435+
Set<TableWritePrivilege> privileges = Sets.newHashSet(TableWritePrivilege.INSERT);
431436
ContextResolvedTable contextResolvedTable =
432-
tableEnvironment.getCatalogManager().getTableOrError(objectIdentifier);
437+
tableEnvironment.getCatalogManager().getTableOrError(objectIdentifier, privileges);
433438
return insertInto(contextResolvedTable, overwrite);
434439
}
435440

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -687,7 +687,19 @@ public Optional<ContextResolvedTable> getTable(ObjectIdentifier objectIdentifier
687687
resolveCatalogBaseTable(temporaryTable);
688688
return Optional.of(ContextResolvedTable.temporary(objectIdentifier, resolvedTable));
689689
} else {
690-
return getPermanentTable(objectIdentifier, null);
690+
return getPermanentTable(objectIdentifier, null, null);
691+
}
692+
}
693+
694+
public Optional<ContextResolvedTable> getTable(
695+
ObjectIdentifier objectIdentifier, Set<TableWritePrivilege> privileges) {
696+
CatalogBaseTable temporaryTable = temporaryTables.get(objectIdentifier);
697+
if (temporaryTable != null) {
698+
final ResolvedCatalogBaseTable<?> resolvedTable =
699+
resolveCatalogBaseTable(temporaryTable);
700+
return Optional.of(ContextResolvedTable.temporary(objectIdentifier, resolvedTable));
701+
} else {
702+
return getPermanentTable(objectIdentifier, null, privileges);
691703
}
692704
}
693705

@@ -708,7 +720,7 @@ public Optional<ContextResolvedTable> getTable(
708720
resolveCatalogBaseTable(temporaryTable);
709721
return Optional.of(ContextResolvedTable.temporary(objectIdentifier, resolvedTable));
710722
} else {
711-
return getPermanentTable(objectIdentifier, timestamp);
723+
return getPermanentTable(objectIdentifier, timestamp, null);
712724
}
713725
}
714726

@@ -753,6 +765,17 @@ public ContextResolvedTable getTableOrError(ObjectIdentifier objectIdentifier) {
753765
objectIdentifier, listCatalogs())));
754766
}
755767

768+
public ContextResolvedTable getTableOrError(
769+
ObjectIdentifier objectIdentifier, Set<TableWritePrivilege> privileges) {
770+
return getTable(objectIdentifier, privileges)
771+
.orElseThrow(
772+
() ->
773+
new ValidationException(
774+
String.format(
775+
"Cannot find table '%s' in any of the catalogs %s, nor as a temporary table.",
776+
objectIdentifier, listCatalogs())));
777+
}
778+
756779
/**
757780
* Retrieves a partition with a fully qualified table path and partition spec. If the path is
758781
* not yet fully qualified use{@link #qualifyIdentifier(UnresolvedIdentifier)} first.
@@ -777,7 +800,9 @@ public Optional<CatalogPartition> getPartition(
777800
}
778801

779802
private Optional<ContextResolvedTable> getPermanentTable(
780-
ObjectIdentifier objectIdentifier, @Nullable Long timestamp) {
803+
ObjectIdentifier objectIdentifier,
804+
@Nullable Long timestamp,
805+
@Nullable Set<TableWritePrivilege> privileges) {
781806
Optional<Catalog> catalogOptional = getCatalog(objectIdentifier.getCatalogName());
782807
ObjectPath objectPath = objectIdentifier.toObjectPath();
783808
if (catalogOptional.isPresent()) {
@@ -792,6 +817,8 @@ private Optional<ContextResolvedTable> getPermanentTable(
792817
"%s is a view, but time travel is not supported for view.",
793818
objectIdentifier.asSummaryString()));
794819
}
820+
} else if (privileges != null) {
821+
table = currentCatalog.getTable(objectPath, privileges);
795822
} else {
796823
table = currentCatalog.getTable(objectPath);
797824
}

flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.Collections;
5252
import java.util.List;
5353
import java.util.Optional;
54+
import java.util.Set;
5455

5556
import static org.apache.flink.util.Preconditions.checkNotNull;
5657

@@ -282,6 +283,23 @@ default CatalogBaseTable getTable(ObjectPath tablePath, long timestamp)
282283
"getTable(ObjectPath, long) is not implemented for %s.", this.getClass()));
283284
}
284285

286+
/**
287+
* Returns a {@link CatalogTable} or {@link CatalogView} with specific write privileges
288+
* identified by the given {@link ObjectPath}. The framework will resolve the metadata objects
289+
* when necessary.
290+
*
291+
* @param tablePath Path of the table or view
292+
* @param writePrivileges specific write privileges for the table
293+
* @return The requested table or view
294+
* @throws TableNotExistException if the target does not exist
295+
* @throws CatalogException in case of any runtime exception
296+
*/
297+
default CatalogBaseTable getTable(
298+
ObjectPath tablePath, Set<TableWritePrivilege> writePrivileges)
299+
throws TableNotExistException, CatalogException {
300+
return getTable(tablePath);
301+
}
302+
285303
/**
286304
* Check if a table or view exists in this catalog.
287305
*
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.catalog;
20+
21+
import org.apache.flink.annotation.Internal;
22+
23+
/** The table writing privileges that will be provided when loading a table. */
24+
@Internal
25+
public enum TableWritePrivilege {
26+
/** The privilege of adding rows to the table. */
27+
INSERT,
28+
29+
/** The privilege for changing existing rows in the table. */
30+
UPDATE,
31+
32+
/** The privilege for deleting rows from the table. */
33+
DELETE
34+
}

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.flink.table.catalog.CatalogManager;
6464
import org.apache.flink.table.catalog.ContextResolvedTable;
6565
import org.apache.flink.table.catalog.ObjectIdentifier;
66+
import org.apache.flink.table.catalog.TableWritePrivilege;
6667
import org.apache.flink.table.catalog.UnresolvedIdentifier;
6768
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
6869
import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -115,6 +116,8 @@
115116
import org.apache.flink.table.planner.utils.OperationConverterUtils;
116117
import org.apache.flink.table.planner.utils.RowLevelModificationContextUtils;
117118

119+
import org.apache.flink.shaded.guava33.com.google.common.collect.Sets;
120+
118121
import org.apache.calcite.rel.RelRoot;
119122
import org.apache.calcite.rel.hint.HintStrategyTable;
120123
import org.apache.calcite.rel.hint.RelHint;
@@ -134,6 +137,7 @@
134137
import java.util.List;
135138
import java.util.Map;
136139
import java.util.Optional;
140+
import java.util.Set;
137141
import java.util.stream.Collectors;
138142

139143
/**
@@ -323,9 +327,10 @@ private Operation convertSqlInsert(RichSqlInsert insert) {
323327

324328
UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(targetTablePath);
325329
ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
330+
Set<TableWritePrivilege> privileges = Sets.newHashSet(TableWritePrivilege.INSERT);
326331
// If it is materialized table, convert it to catalog table for query optimize
327332
ContextResolvedTable contextResolvedTable =
328-
catalogManager.getTableOrError(identifier).toCatalogTable();
333+
catalogManager.getTableOrError(identifier, privileges).toCatalogTable();
329334

330335
PlannerQueryOperation query =
331336
(PlannerQueryOperation)
@@ -655,9 +660,10 @@ private Operation convertDelete(SqlDelete sqlDelete) {
655660
LogicalTableModify tableModify = (LogicalTableModify) deleteRelational.rel;
656661
UnresolvedIdentifier unresolvedTableIdentifier =
657662
UnresolvedIdentifier.of(tableModify.getTable().getQualifiedName());
663+
Set<TableWritePrivilege> privileges = Sets.newHashSet(TableWritePrivilege.DELETE);
658664
ContextResolvedTable contextResolvedTable =
659665
catalogManager.getTableOrError(
660-
catalogManager.qualifyIdentifier(unresolvedTableIdentifier));
666+
catalogManager.qualifyIdentifier(unresolvedTableIdentifier), privileges);
661667
// try push down delete
662668
Optional<DynamicTableSink> optionalDynamicTableSink =
663669
DeletePushDownUtils.getDynamicTableSink(contextResolvedTable, tableModify);
@@ -700,9 +706,10 @@ private Operation convertUpdate(SqlUpdate sqlUpdate) {
700706
LogicalTableModify tableModify = (LogicalTableModify) updateRelational.rel;
701707
UnresolvedIdentifier unresolvedTableIdentifier =
702708
UnresolvedIdentifier.of(tableModify.getTable().getQualifiedName());
709+
Set<TableWritePrivilege> privileges = Sets.newHashSet(TableWritePrivilege.UPDATE);
703710
ContextResolvedTable contextResolvedTable =
704711
catalogManager.getTableOrError(
705-
catalogManager.qualifyIdentifier(unresolvedTableIdentifier));
712+
catalogManager.qualifyIdentifier(unresolvedTableIdentifier), privileges);
706713
// get query
707714
PlannerQueryOperation queryOperation =
708715
new PlannerQueryOperation(

0 commit comments

Comments
 (0)