Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.catalog.CatalogStore;
import org.apache.flink.table.expressions.SqlFactory;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.secret.SecretStore;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -66,16 +67,19 @@ public class EnvironmentSettings {

private final @Nullable CatalogStore catalogStore;
private final @Nullable SqlFactory sqlFactory;
private final @Nullable SecretStore secretStore;

private EnvironmentSettings(
Configuration configuration,
ClassLoader classLoader,
CatalogStore catalogStore,
SqlFactory sqlFactory) {
SqlFactory sqlFactory,
SecretStore secretStore) {
this.configuration = configuration;
this.classLoader = classLoader;
this.catalogStore = catalogStore;
this.sqlFactory = sqlFactory;
this.secretStore = secretStore;
}

/**
Expand Down Expand Up @@ -154,6 +158,12 @@ public Optional<SqlFactory> getSqlFactory() {
return Optional.ofNullable(sqlFactory);
}

@Internal
@Nullable
public SecretStore getSecretStore() {
return secretStore;
}

/** A builder for {@link EnvironmentSettings}. */
@PublicEvolving
public static class Builder {
Expand All @@ -163,6 +173,7 @@ public static class Builder {

private @Nullable CatalogStore catalogStore;
private @Nullable SqlFactory sqlFactory;
private @Nullable SecretStore secretStore;

public Builder() {}

Expand Down Expand Up @@ -251,12 +262,28 @@ public Builder withSqlFactory(SqlFactory sqlFactory) {
return this;
}

/**
* Specifies the {@link SecretStore} to be used for managing secrets in the {@link
* TableEnvironment}.
*
* <p>The secret store allows for secure storage and retrieval of sensitive configuration
* data such as credentials, tokens, and passwords.
*
* @param secretStore the secret store instance to use
* @return this builder
*/
public Builder withSecretStore(SecretStore secretStore) {
this.secretStore = secretStore;
return this;
}

/** Returns an immutable instance of {@link EnvironmentSettings}. */
public EnvironmentSettings build() {
if (classLoader == null) {
classLoader = Thread.currentThread().getContextClassLoader();
}
return new EnvironmentSettings(configuration, classLoader, catalogStore, sqlFactory);
return new EnvironmentSettings(
configuration, classLoader, catalogStore, sqlFactory, secretStore);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@
import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.table.resource.ResourceType;
import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.table.secret.SecretStore;
import org.apache.flink.table.secret.SecretStoreFactory;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
Expand Down Expand Up @@ -263,6 +265,19 @@ public static TableEnvironmentImpl create(EnvironmentSettings settings) {
? settings.getCatalogStore()
: catalogStoreFactory.createCatalogStore();

final SecretStoreFactory secretStoreFactory =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest adding a test implementation of these interfaces to drive the usage of the interfaces from tests, including writeable readable stores with config options.

TableFactoryUtil.findAndCreateSecretStoreFactory(
settings.getConfiguration(), userClassLoader);
final SecretStoreFactory.Context secretStoreContext =
TableFactoryUtil.buildSecretStoreFactoryContext(
settings.getConfiguration(), userClassLoader);
secretStoreFactory.open(secretStoreContext);
// TODO: pass secret store to catalog manager for encryption/decryption
final SecretStore secretStore =
settings.getSecretStore() != null
? settings.getSecretStore()
: secretStoreFactory.createSecretStore();

// use configuration to init table config
final TableConfig tableConfig = TableConfig.getDefault();
tableConfig.setRootConfiguration(executor.getConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.flink.table.legacy.factories.TableSourceFactory;
import org.apache.flink.table.legacy.sinks.TableSink;
import org.apache.flink.table.legacy.sources.TableSource;
import org.apache.flink.table.secret.CommonSecretOptions;
import org.apache.flink.table.secret.SecretStoreFactory;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -220,4 +222,38 @@ public static CatalogStoreFactory.Context buildCatalogStoreFactoryContext(

return context;
}

public static SecretStoreFactory findAndCreateSecretStoreFactory(
Configuration configuration, ClassLoader classLoader) {
String identifier = configuration.get(CommonSecretOptions.TABLE_SECRET_STORE_KIND);

SecretStoreFactory secretStoreFactory =
FactoryUtil.discoverFactory(classLoader, SecretStoreFactory.class, identifier);

return secretStoreFactory;
}

/**
* Build a {@link SecretStoreFactory.Context} for opening the {@link SecretStoreFactory}.
*
* <p>The configuration format should be as follows:
*
* <pre>{@code
* table.secret-store.kind: {identifier}
* table.secret-store.{identifier}.{param1}: xxx
* table.secret-store.{identifier}.{param2}: xxx
* }</pre>
*/
public static SecretStoreFactory.Context buildSecretStoreFactoryContext(
Configuration configuration, ClassLoader classLoader) {
String identifier = configuration.get(CommonSecretOptions.TABLE_SECRET_STORE_KIND);
String secretStoreOptionPrefix =
CommonSecretOptions.TABLE_SECRET_STORE_OPTION_PREFIX + identifier + ".";
Map<String, String> options =
new DelegatingConfiguration(configuration, secretStoreOptionPrefix).toMap();
SecretStoreFactory.Context context =
new FactoryUtil.DefaultSecretStoreContext(options, configuration, classLoader);

return context;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.flink.table.legacy.factories.TableFactory;
import org.apache.flink.table.ml.ModelProvider;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.secret.SecretStoreFactory;
import org.apache.flink.table.utils.EncodingUtils;
import org.apache.flink.table.watermark.WatermarkEmitStrategy;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -1411,6 +1412,39 @@ public ClassLoader getClassLoader() {
}
}

public static class DefaultSecretStoreContext implements SecretStoreFactory.Context {

private Map<String, String> options;

private ReadableConfig configuration;

private ClassLoader classLoader;

public DefaultSecretStoreContext(
Map<String, String> options,
ReadableConfig configuration,
ClassLoader classLoader) {
this.options = options;
this.configuration = configuration;
this.classLoader = classLoader;
}

@Override
public Map<String, String> getOptions() {
return options;
}

@Override
public ReadableConfig getConfiguration() {
return configuration;
}

@Override
public ClassLoader getClassLoader() {
return classLoader;
}
}

/** Default implementation of {@link ModuleFactory.Context}. */
@Internal
public static class DefaultModuleContext implements ModuleFactory.Context {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.secret;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

/** A collection of {@link ConfigOption} which are used for secret store configuration. */
@Internal
public class CommonSecretOptions {

public static final String DEFAULT_SECRET_STORE_KIND = "default_in_memory";
public static final ConfigOption<String> TABLE_SECRET_STORE_KIND =
ConfigOptions.key("table.secret-store.kind")
.stringType()
.defaultValue(DEFAULT_SECRET_STORE_KIND)
.withDescription(
"The kind of secret store to be used. Out of the box, 'default_in_memory' option is supported. "
+ "Implementations can provide custom secret stores for different backends "
+ "(e.g., cloud-specific secret managers).");

/** Used to filter the specific options for secret store. */
public static final String TABLE_SECRET_STORE_OPTION_PREFIX = "table.secret-store.";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.secret;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.secret.exceptions.SecretNotFoundException;

import java.util.Map;

/**
* Interface for retrieving secrets from a secret store.
*
* <p>This interface enables read-only access to stored secrets, allowing applications to retrieve
* sensitive configuration data such as credentials, API tokens, and passwords.
*
* <p>Implementations of this interface should ensure secure retrieval and handling of secret data.
*/
@PublicEvolving
public interface ReadableSecretStore extends SecretStore {

/**
* Retrieves a secret from the store by its identifier.
*
* @param secretId the unique identifier of the secret to retrieve
* @return a map containing the secret data as key-value pairs
* @throws SecretNotFoundException if the secret with the given identifier does not exist
*/
Map<String, String> getSecret(String secretId) throws SecretNotFoundException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.secret;

import org.apache.flink.annotation.PublicEvolving;

/**
* Base marker interface for secret store implementations.
*
* <p>This interface serves as the common base for both {@link ReadableSecretStore} and {@link
* WritableSecretStore}, allowing for flexible secret management implementations.
*
* <p>Secret stores are used to manage sensitive configuration data (credentials, tokens, passwords,
* etc.) in Flink SQL and Table API applications.
*/
@PublicEvolving
public interface SecretStore {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.secret;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.secret.exceptions.SecretException;

import java.util.Map;

/** Factory for creating SecretStore instances. */
@PublicEvolving
public interface SecretStoreFactory extends Factory {

/** Creates a SecretStore instance. */
SecretStore createSecretStore();

/** Initialize secret store. */
void open(Context context) throws SecretException;

/** Close secret store. */
void close() throws CatalogException;

interface Context {
/**
* Returns the options with which the secret store is created.
*
* <p>An implementation should perform validation of these options.
*/
Map<String, String> getOptions();

/** Gives read-only access to the configuration of the current session. */
ReadableConfig getConfiguration();

/**
* Returns the class loader of the current session.
*
* <p>The class loader is in particular useful for discovering further (nested) factories.
*/
ClassLoader getClassLoader();
}
}
Loading