diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java index 67fcfcec46309..e71925f43c2b9 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java @@ -25,6 +25,7 @@ import org.apache.flink.table.catalog.CatalogStore; import org.apache.flink.table.expressions.SqlFactory; import org.apache.flink.table.functions.UserDefinedFunction; +import org.apache.flink.table.secret.SecretStore; import javax.annotation.Nullable; @@ -66,16 +67,19 @@ public class EnvironmentSettings { private final @Nullable CatalogStore catalogStore; private final @Nullable SqlFactory sqlFactory; + private final @Nullable SecretStore secretStore; private EnvironmentSettings( Configuration configuration, ClassLoader classLoader, CatalogStore catalogStore, - SqlFactory sqlFactory) { + SqlFactory sqlFactory, + SecretStore secretStore) { this.configuration = configuration; this.classLoader = classLoader; this.catalogStore = catalogStore; this.sqlFactory = sqlFactory; + this.secretStore = secretStore; } /** @@ -154,6 +158,12 @@ public Optional getSqlFactory() { return Optional.ofNullable(sqlFactory); } + @Internal + @Nullable + public SecretStore getSecretStore() { + return secretStore; + } + /** A builder for {@link EnvironmentSettings}. */ @PublicEvolving public static class Builder { @@ -163,6 +173,7 @@ public static class Builder { private @Nullable CatalogStore catalogStore; private @Nullable SqlFactory sqlFactory; + private @Nullable SecretStore secretStore; public Builder() {} @@ -251,12 +262,28 @@ public Builder withSqlFactory(SqlFactory sqlFactory) { return this; } + /** + * Specifies the {@link SecretStore} to be used for managing secrets in the {@link + * TableEnvironment}. + * + *

The secret store allows for secure storage and retrieval of sensitive configuration + * data such as credentials, tokens, and passwords. + * + * @param secretStore the secret store instance to use + * @return this builder + */ + public Builder withSecretStore(SecretStore secretStore) { + this.secretStore = secretStore; + return this; + } + /** Returns an immutable instance of {@link EnvironmentSettings}. */ public EnvironmentSettings build() { if (classLoader == null) { classLoader = Thread.currentThread().getContextClassLoader(); } - return new EnvironmentSettings(configuration, classLoader, catalogStore, sqlFactory); + return new EnvironmentSettings( + configuration, classLoader, catalogStore, sqlFactory, secretStore); } } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 6a40fa9c2384d..c0ef068c5fb62 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -119,6 +119,8 @@ import org.apache.flink.table.resource.ResourceManager; import org.apache.flink.table.resource.ResourceType; import org.apache.flink.table.resource.ResourceUri; +import org.apache.flink.table.secret.SecretStore; +import org.apache.flink.table.secret.SecretStoreFactory; import org.apache.flink.table.types.AbstractDataType; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.utils.DataTypeUtils; @@ -263,6 +265,19 @@ public static TableEnvironmentImpl create(EnvironmentSettings settings) { ? settings.getCatalogStore() : catalogStoreFactory.createCatalogStore(); + final SecretStoreFactory secretStoreFactory = + TableFactoryUtil.findAndCreateSecretStoreFactory( + settings.getConfiguration(), userClassLoader); + final SecretStoreFactory.Context secretStoreContext = + TableFactoryUtil.buildSecretStoreFactoryContext( + settings.getConfiguration(), userClassLoader); + secretStoreFactory.open(secretStoreContext); + // TODO: pass secret store to catalog manager for encryption/decryption + final SecretStore secretStore = + settings.getSecretStore() != null + ? settings.getSecretStore() + : secretStoreFactory.createSecretStore(); + // use configuration to init table config final TableConfig tableConfig = TableConfig.getDefault(); tableConfig.setRootConfiguration(executor.getConfiguration()); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java index ad681b5439441..985c20100bf47 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java @@ -39,6 +39,8 @@ import org.apache.flink.table.legacy.factories.TableSourceFactory; import org.apache.flink.table.legacy.sinks.TableSink; import org.apache.flink.table.legacy.sources.TableSource; +import org.apache.flink.table.secret.CommonSecretOptions; +import org.apache.flink.table.secret.SecretStoreFactory; import java.util.Collections; import java.util.List; @@ -220,4 +222,38 @@ public static CatalogStoreFactory.Context buildCatalogStoreFactoryContext( return context; } + + public static SecretStoreFactory findAndCreateSecretStoreFactory( + Configuration configuration, ClassLoader classLoader) { + String identifier = configuration.get(CommonSecretOptions.TABLE_SECRET_STORE_KIND); + + SecretStoreFactory secretStoreFactory = + FactoryUtil.discoverFactory(classLoader, SecretStoreFactory.class, identifier); + + return secretStoreFactory; + } + + /** + * Build a {@link SecretStoreFactory.Context} for opening the {@link SecretStoreFactory}. + * + *

The configuration format should be as follows: + * + *

{@code
+     * table.secret-store.kind: {identifier}
+     * table.secret-store.{identifier}.{param1}: xxx
+     * table.secret-store.{identifier}.{param2}: xxx
+     * }
+ */ + public static SecretStoreFactory.Context buildSecretStoreFactoryContext( + Configuration configuration, ClassLoader classLoader) { + String identifier = configuration.get(CommonSecretOptions.TABLE_SECRET_STORE_KIND); + String secretStoreOptionPrefix = + CommonSecretOptions.TABLE_SECRET_STORE_OPTION_PREFIX + identifier + "."; + Map options = + new DelegatingConfiguration(configuration, secretStoreOptionPrefix).toMap(); + SecretStoreFactory.Context context = + new FactoryUtil.DefaultSecretStoreContext(options, configuration, classLoader); + + return context; + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java index 691397c08d4bd..dc0c942cdf20e 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java @@ -42,6 +42,7 @@ import org.apache.flink.table.legacy.factories.TableFactory; import org.apache.flink.table.ml.ModelProvider; import org.apache.flink.table.module.Module; +import org.apache.flink.table.secret.SecretStoreFactory; import org.apache.flink.table.utils.EncodingUtils; import org.apache.flink.table.watermark.WatermarkEmitStrategy; import org.apache.flink.util.Preconditions; @@ -1411,6 +1412,39 @@ public ClassLoader getClassLoader() { } } + public static class DefaultSecretStoreContext implements SecretStoreFactory.Context { + + private Map options; + + private ReadableConfig configuration; + + private ClassLoader classLoader; + + public DefaultSecretStoreContext( + Map options, + ReadableConfig configuration, + ClassLoader classLoader) { + this.options = options; + this.configuration = configuration; + this.classLoader = classLoader; + } + + @Override + public Map getOptions() { + return options; + } + + @Override + public ReadableConfig getConfiguration() { + return configuration; + } + + @Override + public ClassLoader getClassLoader() { + return classLoader; + } + } + /** Default implementation of {@link ModuleFactory.Context}. */ @Internal public static class DefaultModuleContext implements ModuleFactory.Context { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/CommonSecretOptions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/CommonSecretOptions.java new file mode 100644 index 0000000000000..bbff5921c76aa --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/CommonSecretOptions.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.secret; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** A collection of {@link ConfigOption} which are used for secret store configuration. */ +@Internal +public class CommonSecretOptions { + + public static final String DEFAULT_SECRET_STORE_KIND = "default_in_memory"; + public static final ConfigOption TABLE_SECRET_STORE_KIND = + ConfigOptions.key("table.secret-store.kind") + .stringType() + .defaultValue(DEFAULT_SECRET_STORE_KIND) + .withDescription( + "The kind of secret store to be used. Out of the box, 'default_in_memory' option is supported. " + + "Implementations can provide custom secret stores for different backends " + + "(e.g., cloud-specific secret managers)."); + + /** Used to filter the specific options for secret store. */ + public static final String TABLE_SECRET_STORE_OPTION_PREFIX = "table.secret-store."; +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java new file mode 100644 index 0000000000000..70dd48e21f979 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.secret; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.secret.exceptions.SecretNotFoundException; + +import java.util.Map; + +/** + * Interface for retrieving secrets from a secret store. + * + *

This interface enables read-only access to stored secrets, allowing applications to retrieve + * sensitive configuration data such as credentials, API tokens, and passwords. + * + *

Implementations of this interface should ensure secure retrieval and handling of secret data. + */ +@PublicEvolving +public interface ReadableSecretStore extends SecretStore { + + /** + * Retrieves a secret from the store by its identifier. + * + * @param secretId the unique identifier of the secret to retrieve + * @return a map containing the secret data as key-value pairs + * @throws SecretNotFoundException if the secret with the given identifier does not exist + */ + Map getSecret(String secretId) throws SecretNotFoundException; +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStore.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStore.java new file mode 100644 index 0000000000000..89af1d469e1b8 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStore.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.secret; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Base marker interface for secret store implementations. + * + *

This interface serves as the common base for both {@link ReadableSecretStore} and {@link + * WritableSecretStore}, allowing for flexible secret management implementations. + * + *

Secret stores are used to manage sensitive configuration data (credentials, tokens, passwords, + * etc.) in Flink SQL and Table API applications. + */ +@PublicEvolving +public interface SecretStore {} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStoreFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStoreFactory.java new file mode 100644 index 0000000000000..ceb4f17c93bea --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStoreFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.secret; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.secret.exceptions.SecretException; + +import java.util.Map; + +/** Factory for creating SecretStore instances. */ +@PublicEvolving +public interface SecretStoreFactory extends Factory { + + /** Creates a SecretStore instance. */ + SecretStore createSecretStore(); + + /** Initialize secret store. */ + void open(Context context) throws SecretException; + + /** Close secret store. */ + void close() throws CatalogException; + + interface Context { + /** + * Returns the options with which the secret store is created. + * + *

An implementation should perform validation of these options. + */ + Map getOptions(); + + /** Gives read-only access to the configuration of the current session. */ + ReadableConfig getConfiguration(); + + /** + * Returns the class loader of the current session. + * + *

The class loader is in particular useful for discovering further (nested) factories. + */ + ClassLoader getClassLoader(); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java new file mode 100644 index 0000000000000..db5037b7b2f53 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.secret; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.secret.exceptions.SecretNotFoundException; + +import java.util.Map; + +/** + * Interface for storing, updating, and removing secrets in a secret store. + * + *

This interface provides write operations for managing secrets, including adding new secrets, + * updating existing ones, and removing secrets that are no longer needed. + * + *

Implementations should ensure that secret operations are performed securely and, where + * applicable, atomically. + */ +@PublicEvolving +public interface WritableSecretStore extends SecretStore { + + /** + * Stores a new secret in the secret store. + * + * @param secretData a map containing the secret data as key-value pairs to be stored + * @return a unique identifier for the stored secret + */ + String storeSecret(Map secretData); + + /** + * Removes a secret from the secret store. + * + * @param secretId the unique identifier of the secret to remove + */ + void removeSecret(String secretId); + + /** + * Atomically updates an existing secret with new data. + * + *

This operation replaces the entire secret data with the provided new data. + * + * @param secretId the unique identifier of the secret to update + * @param newSecretData a map containing the new secret data as key-value pairs + * @throws SecretNotFoundException if the secret with the given identifier does not exist + */ + void updateSecret(String secretId, Map newSecretData) + throws SecretNotFoundException; +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretException.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretException.java new file mode 100644 index 0000000000000..8304e41ac963c --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretException.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.secret.exceptions; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Base exception for all secret-related errors in the secret store. + * + *

This exception serves as the parent class for all secret store related exceptions, providing a + * common type for handling errors that occur during secret management operations. + */ +@PublicEvolving +public class SecretException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + /** + * Constructs a new SecretException with the specified detail message. + * + * @param message the detail message explaining the reason for the exception + */ + public SecretException(String message) { + super(message); + } + + /** + * Constructs a new SecretException with the specified detail message and cause. + * + * @param message the detail message explaining the reason for the exception + * @param cause the cause of the exception + */ + public SecretException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Constructs a new SecretException with the specified cause. + * + * @param cause the cause of the exception + */ + public SecretException(Throwable cause) { + super(cause); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretNotFoundException.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretNotFoundException.java new file mode 100644 index 0000000000000..a773ea7e1e685 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretNotFoundException.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.secret.exceptions; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Exception thrown when a requested secret cannot be found in the secret store. + * + *

This exception is typically thrown by {@link ReadableSecretStore#getSecret(String)} or {@link + * WritableSecretStore#updateSecret(String, java.util.Map)} when attempting to access or modify a + * secret that does not exist. + */ +@PublicEvolving +public class SecretNotFoundException extends Exception { + + private static final long serialVersionUID = 1L; + + /** + * Constructs a new SecretNotFoundException with the specified detail message. + * + * @param message the detail message explaining the reason for the exception + */ + public SecretNotFoundException(String message) { + super(message); + } + + /** + * Constructs a new SecretNotFoundException with the specified detail message and cause. + * + * @param message the detail message explaining the reason for the exception + * @param cause the cause of the exception + */ + public SecretNotFoundException(String message, Throwable cause) { + super(message, cause); + } +}