Compare commits

...

4 Commits

Author SHA1 Message Date
Beto Dealmeida
d6afe5b627 Update docs 2026-01-26 14:32:45 -05:00
Beto Dealmeida
af16bd9a19 Provisioned Redshift clusters 2026-01-26 10:41:15 -05:00
Beto Dealmeida
64656baf35 Phase 2 2026-01-22 19:34:43 -05:00
Beto Dealmeida
1c0429a0de feat(AWS IAM): phase 1 2026-01-22 19:06:46 -05:00
12 changed files with 2948 additions and 1 deletions

View File

@@ -27,6 +27,7 @@ repos:
args: [--check-untyped-defs]
exclude: ^superset-extensions-cli/
additional_dependencies: [
types-cachetools,
types-simplejson,
types-python-dateutil,
types-requests,

View File

@@ -108,6 +108,8 @@ services:
extra_hosts:
- "host.docker.internal:host-gateway"
user: *superset-user
ports:
- "${SUPERSET_PORT:-8088}:8088"
depends_on:
superset-init-light:
condition: service_completed_successfully

View File

@@ -40,6 +40,8 @@ are compatible with Superset.
| [AWS Athena](/docs/configuration/databases#aws-athena) | `pip install pyathena[pandas]` , `pip install PyAthenaJDBC` | `awsathena+rest://{access_key_id}:{access_key}@athena.{region}.amazonaws.com/{schema}?s3_staging_dir={s3_staging_dir}&...` |
| [AWS DynamoDB](/docs/configuration/databases#aws-dynamodb) | `pip install pydynamodb` | `dynamodb://{access_key_id}:{secret_access_key}@dynamodb.{region_name}.amazonaws.com?connector=superset` |
| [AWS Redshift](/docs/configuration/databases#aws-redshift) | `pip install sqlalchemy-redshift` | `redshift+psycopg2://<userName>:<DBPassword>@<AWS End Point>:5439/<Database Name>` |
| [AWS Aurora PostgreSQL](/docs/configuration/databases#aws-aurora-postgresql-and-mysql) | `pip install psycopg2` | `postgresql://<userName>:<DBPassword>@<cluster-endpoint>:5432/<Database Name>` |
| [AWS Aurora MySQL](/docs/configuration/databases#aws-aurora-postgresql-and-mysql) | `pip install mysqlclient` | `mysql://<userName>:<DBPassword>@<cluster-endpoint>:3306/<Database Name>` |
| [Apache Doris](/docs/configuration/databases#apache-doris) | `pip install pydoris` | `doris://<User>:<Password>@<Host>:<Port>/<Catalog>.<Database>` |
| [Apache Drill](/docs/configuration/databases#apache-drill) | `pip install sqlalchemy-drill` | `drill+sadrill://<username>:<password>@<host>:<port>/<storage_plugin>`, often useful: `?use_ssl=True/False` |
| [Apache Druid](/docs/configuration/databases#apache-druid) | `pip install pydruid` | `druid://<User>:<password>@<Host>:<Port-default-9088>/druid/v2/sql` |
@@ -333,6 +335,224 @@ You have to define the following arguments in Superset's redshift database conne
{"connect_args":{"iam":true,"is_serverless":true,"serverless_acct_id":"<aws account number>","serverless_work_group":"<redshift work group>","database":"<database>","user":"IAMR:<superset iam role name>"}}
```
##### Cross-Account IAM Authentication
Superset also supports cross-account IAM authentication for both Redshift Serverless and provisioned clusters. This approach uses STS AssumeRole to obtain temporary credentials and is ideal for multi-account AWS architectures.
**Prerequisites:**
- Redshift cluster or Serverless workgroup with IAM authentication enabled
- IAM role in the data account with permissions to get Redshift credentials
- Superset running on AWS infrastructure with an IAM role that can assume the data account role
**Configuration for Redshift Serverless:**
1. Set the SQLAlchemy URI:
```
redshift+psycopg2://{username}@{workgroup}.{account_id}.{region}.redshift-serverless.amazonaws.com:5439/{database}
```
2. Add IAM configuration in **Secure Extra** (ADVANCED → Security → Secure Extra):
```json
{
"aws_iam": {
"enabled": true,
"role_arn": "arn:aws:iam::DATA_ACCOUNT_ID:role/SupersetRedshiftAccess",
"external_id": "your-unique-external-id",
"region": "us-east-1",
"workgroup_name": "my-workgroup",
"db_name": "dev"
}
}
```
**Configuration for Provisioned Redshift Clusters:**
1. Set the SQLAlchemy URI:
```
redshift+psycopg2://{username}@{cluster-identifier}.{unique-id}.{region}.redshift.amazonaws.com:5439/{database}
```
2. Add IAM configuration in **Secure Extra** (ADVANCED → Security → Secure Extra):
```json
{
"aws_iam": {
"enabled": true,
"role_arn": "arn:aws:iam::DATA_ACCOUNT_ID:role/SupersetRedshiftAccess",
"external_id": "your-unique-external-id",
"region": "us-east-1",
"cluster_identifier": "my-redshift-cluster",
"db_username": "superset_user",
"db_name": "analytics"
}
}
```
**Configuration Fields:**
| Field | Serverless | Provisioned | Description |
|-------|------------|-------------|-------------|
| `enabled` | Required | Required | Set to `true` to enable IAM authentication |
| `role_arn` | Required | Required | ARN of the IAM role to assume |
| `region` | Required | Required | AWS region |
| `db_name` | Required | Required | Database name |
| `workgroup_name` | Required | - | Redshift Serverless workgroup name |
| `cluster_identifier` | - | Required | Provisioned cluster identifier |
| `db_username` | - | Required | Database username for provisioned clusters |
| `external_id` | Optional | Optional | External ID for cross-account security |
| `session_duration` | Optional | Optional | STS session duration in seconds (default: 3600) |
**Required IAM Permissions:**
For Redshift Serverless:
```json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"redshift-serverless:GetCredentials",
"redshift-serverless:GetWorkgroup"
],
"Resource": "arn:aws:redshift-serverless:REGION:ACCOUNT_ID:workgroup/WORKGROUP_ID"
}
]
}
```
For provisioned Redshift clusters:
```json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "redshift:GetClusterCredentials",
"Resource": [
"arn:aws:redshift:REGION:ACCOUNT_ID:dbuser:CLUSTER_NAME/superset_user",
"arn:aws:redshift:REGION:ACCOUNT_ID:dbname:CLUSTER_NAME/analytics"
]
}
]
}
```
:::resources
- [AWS Docs: IAM Authentication for Redshift](https://docs.aws.amazon.com/redshift/latest/mgmt/generating-user-credentials.html)
- [AWS Docs: Redshift Serverless IAM](https://docs.aws.amazon.com/redshift/latest/mgmt/serverless-iam.html)
:::
#### AWS Aurora (PostgreSQL and MySQL)
Amazon Aurora is a MySQL and PostgreSQL-compatible relational database built for the cloud. Superset supports connecting to Aurora using standard PostgreSQL or MySQL drivers, with optional IAM authentication for enhanced security.
##### Standard Connection (Username/Password)
For Aurora PostgreSQL, use the PostgreSQL driver:
```
postgresql://{username}:{password}@{cluster-endpoint}:{port}/{database}
```
For Aurora MySQL, use the MySQL driver:
```
mysql://{username}:{password}@{cluster-endpoint}:{port}/{database}
```
##### Cross-Account IAM Authentication
Superset supports AWS cross-account IAM authentication for Aurora databases. This eliminates the need to store database passwords and provides automatic credential rotation.
**Prerequisites:**
- Aurora cluster with [IAM database authentication enabled](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.Enabling.html)
- Database user configured for IAM authentication
- IAM role with `rds-db:connect` permission
- Superset running on AWS infrastructure (EC2, ECS, or EKS) with an IAM role that can assume the database access role
**Configuration:**
1. Set the SQLAlchemy URI with the cluster endpoint (password will be replaced by IAM token):
For Aurora PostgreSQL:
```
postgresql://{db_username}@{cluster-endpoint}:{port}/{database}
```
For Aurora MySQL:
```
mysql://{db_username}@{cluster-endpoint}:{port}/{database}
```
2. Add the IAM configuration in the **Secure Extra** field (ADVANCED → Security → Secure Extra):
```json
{
"aws_iam": {
"enabled": true,
"role_arn": "arn:aws:iam::DATA_ACCOUNT_ID:role/SupersetDatabaseAccess",
"external_id": "your-unique-external-id",
"region": "us-east-1",
"db_username": "superset_iam_user"
}
}
```
**Configuration Fields:**
| Field | Required | Description |
|-------|----------|-------------|
| `enabled` | Yes | Set to `true` to enable IAM authentication |
| `role_arn` | Yes | ARN of the IAM role to assume for database access |
| `region` | Yes | AWS region where the Aurora cluster is located |
| `db_username` | Yes | Database username configured for IAM authentication |
| `external_id` | No | External ID for cross-account role assumption (recommended for security) |
| `session_duration` | No | STS session duration in seconds (default: 3600) |
**Required IAM Permissions:**
The IAM role specified in `role_arn` needs the following permission:
```json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "rds-db:connect",
"Resource": "arn:aws:rds-db:REGION:ACCOUNT_ID:dbuser:CLUSTER_RESOURCE_ID/superset_iam_user"
}
]
}
```
For cross-account access, the role also needs a trust policy allowing Superset's IAM role to assume it:
```json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::SUPERSET_ACCOUNT_ID:role/SupersetApplicationRole"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "your-unique-external-id"
}
}
}
]
}
```
:::resources
- [AWS Docs: IAM Database Authentication for Aurora](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html)
- [AWS Docs: Cross-Account IAM Roles](https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_cross-account-with-roles.html)
:::
#### ClickHouse
To use ClickHouse with Superset, you will need to install the `clickhouse-connect` Python library:

View File

@@ -42,3 +42,29 @@ class AuroraPostgresDataAPI(PostgresEngineSpec):
"secret_arn={secret_arn}&"
"region_name={region_name}"
)
class AuroraMySQLEngineSpec(MySQLEngineSpec):
"""
Aurora MySQL engine spec.
IAM authentication is handled by the parent MySQLEngineSpec via
the aws_iam config in encrypted_extra.
"""
engine = "mysql"
engine_name = "Aurora MySQL"
default_driver = "mysqldb"
class AuroraPostgresEngineSpec(PostgresEngineSpec):
"""
Aurora PostgreSQL engine spec.
IAM authentication is handled by the parent PostgresEngineSpec via
the aws_iam config in encrypted_extra.
"""
engine = "postgresql"
engine_name = "Aurora PostgreSQL"
default_driver = "psycopg2"

View File

@@ -0,0 +1,634 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
AWS IAM Authentication Mixin for database engine specs.
This mixin provides cross-account IAM authentication support for AWS databases
(Aurora PostgreSQL, Aurora MySQL, Redshift). It handles:
- Assuming IAM roles via STS AssumeRole
- Generating RDS IAM auth tokens
- Generating Redshift Serverless credentials
- Configuring SSL (required for IAM auth)
- Caching STS credentials to reduce API calls
"""
from __future__ import annotations
import logging
import threading
from typing import Any, TYPE_CHECKING, TypedDict
from cachetools import TTLCache
from superset.databases.utils import make_url_safe
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.exceptions import SupersetSecurityException
if TYPE_CHECKING:
from superset.models.core import Database
logger = logging.getLogger(__name__)
# Default session duration for STS AssumeRole (1 hour)
DEFAULT_SESSION_DURATION = 3600
# Default ports
DEFAULT_POSTGRES_PORT = 5432
DEFAULT_MYSQL_PORT = 3306
DEFAULT_REDSHIFT_PORT = 5439
# Cache STS credentials: key = (role_arn, region, external_id), TTL = 50 min
_credentials_cache: TTLCache[tuple[str, str, str | None], dict[str, Any]] = TTLCache(
maxsize=100, ttl=3000
)
_credentials_lock = threading.RLock()
class AWSIAMConfig(TypedDict, total=False):
"""Configuration for AWS IAM authentication."""
enabled: bool
role_arn: str
external_id: str
region: str
db_username: str
session_duration: int
# Redshift Serverless fields
workgroup_name: str
db_name: str
# Redshift provisioned cluster fields
cluster_identifier: str
class AWSIAMAuthMixin:
"""
Mixin that provides AWS IAM authentication for database connections.
This mixin can be used with database engine specs that support IAM
authentication (Aurora PostgreSQL, Aurora MySQL, Redshift).
Configuration is provided via the database's encrypted_extra JSON:
{
"aws_iam": {
"enabled": true,
"role_arn": "arn:aws:iam::222222222222:role/SupersetDatabaseAccess",
"external_id": "superset-prod-12345", # optional
"region": "us-east-1",
"db_username": "superset_iam_user",
"session_duration": 3600 # optional, defaults to 3600
}
}
"""
supports_iam_authentication = True
# AWS error patterns for actionable error messages
aws_iam_custom_errors: dict[str, tuple[SupersetErrorType, str]] = {
"AccessDenied": (
SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR,
"Unable to assume IAM role. Verify the role ARN and trust policy "
"allow access from Superset's IAM role.",
),
"InvalidIdentityToken": (
SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR,
"Invalid IAM credentials. Ensure Superset has a valid IAM role "
"with permissions to assume the target role.",
),
"MalformedPolicyDocument": (
SupersetErrorType.CONNECTION_MISSING_PARAMETERS_ERROR,
"Invalid IAM role ARN format. Please verify the role ARN.",
),
"ExpiredTokenException": (
SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR,
"AWS credentials have expired. Please refresh the connection.",
),
}
@classmethod
def get_iam_credentials(
cls,
role_arn: str,
region: str,
external_id: str | None = None,
session_duration: int = DEFAULT_SESSION_DURATION,
) -> dict[str, Any]:
"""
Assume cross-account IAM role via STS AssumeRole with credential caching.
Credentials are cached by (role_arn, region, external_id) with a 50-minute
TTL to reduce STS API calls while ensuring tokens are refreshed before the
default 1-hour expiration.
:param role_arn: The ARN of the IAM role to assume
:param region: AWS region for the STS client
:param external_id: External ID for the role assumption (optional)
:param session_duration: Duration of the session in seconds
:returns: Dictionary with AccessKeyId, SecretAccessKey, SessionToken
:raises SupersetSecurityException: If role assumption fails
"""
cache_key = (role_arn, region, external_id)
with _credentials_lock:
cached = _credentials_cache.get(cache_key)
if cached is not None:
return cached
try:
# Lazy import to avoid errors when boto3 is not installed
import boto3
from botocore.exceptions import ClientError
except ImportError as ex:
raise SupersetSecurityException(
SupersetError(
message="boto3 is required for AWS IAM authentication. "
"Install it with: pip install boto3",
error_type=SupersetErrorType.GENERIC_DB_ENGINE_ERROR,
level=ErrorLevel.ERROR,
)
) from ex
try:
sts_client = boto3.client("sts", region_name=region)
assume_role_kwargs: dict[str, Any] = {
"RoleArn": role_arn,
"RoleSessionName": "superset-iam-session",
"DurationSeconds": session_duration,
}
if external_id:
assume_role_kwargs["ExternalId"] = external_id
response = sts_client.assume_role(**assume_role_kwargs)
credentials = response["Credentials"]
with _credentials_lock:
_credentials_cache[cache_key] = credentials
return credentials
except ClientError as ex:
error_code = ex.response.get("Error", {}).get("Code", "")
error_message = ex.response.get("Error", {}).get("Message", "")
# Handle ExternalId mismatch (shows as AccessDenied with specific message)
# Check this first before generic AccessDenied handling
if "external id" in error_message.lower():
raise SupersetSecurityException(
SupersetError(
message="External ID mismatch. Verify the external_id "
"configuration matches the trust policy.",
error_type=SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR,
level=ErrorLevel.ERROR,
)
) from ex
if error_code in cls.aws_iam_custom_errors:
error_type, message = cls.aws_iam_custom_errors[error_code]
raise SupersetSecurityException(
SupersetError(
message=message,
error_type=error_type,
level=ErrorLevel.ERROR,
)
) from ex
raise SupersetSecurityException(
SupersetError(
message=f"Failed to assume IAM role: {ex}",
error_type=SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR,
level=ErrorLevel.ERROR,
)
) from ex
@classmethod
def generate_rds_auth_token(
cls,
credentials: dict[str, Any],
hostname: str,
port: int,
username: str,
region: str,
) -> str:
"""
Generate RDS IAM auth token using temporary credentials.
:param credentials: STS credentials from assume_role
:param hostname: RDS/Aurora endpoint hostname
:param port: Database port
:param username: Database username configured for IAM auth
:param region: AWS region
:returns: IAM auth token to use as database password
:raises SupersetSecurityException: If token generation fails
"""
try:
import boto3
from botocore.exceptions import ClientError
except ImportError as ex:
raise SupersetSecurityException(
SupersetError(
message="boto3 is required for AWS IAM authentication.",
error_type=SupersetErrorType.GENERIC_DB_ENGINE_ERROR,
level=ErrorLevel.ERROR,
)
) from ex
try:
rds_client = boto3.client(
"rds",
region_name=region,
aws_access_key_id=credentials["AccessKeyId"],
aws_secret_access_key=credentials["SecretAccessKey"],
aws_session_token=credentials["SessionToken"],
)
token = rds_client.generate_db_auth_token(
DBHostname=hostname,
Port=port,
DBUsername=username,
)
return token
except ClientError as ex:
raise SupersetSecurityException(
SupersetError(
message=f"Failed to generate RDS auth token: {ex}",
error_type=SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR,
level=ErrorLevel.ERROR,
)
) from ex
@classmethod
def generate_redshift_credentials(
cls,
credentials: dict[str, Any],
workgroup_name: str,
db_name: str,
region: str,
) -> tuple[str, str]:
"""
Generate Redshift Serverless credentials using temporary STS credentials.
:param credentials: STS credentials from assume_role
:param workgroup_name: Redshift Serverless workgroup name
:param db_name: Redshift database name
:param region: AWS region
:returns: Tuple of (username, password) for Redshift connection
:raises SupersetSecurityException: If credential generation fails
"""
try:
import boto3
from botocore.exceptions import ClientError
except ImportError as ex:
raise SupersetSecurityException(
SupersetError(
message="boto3 is required for AWS IAM authentication.",
error_type=SupersetErrorType.GENERIC_DB_ENGINE_ERROR,
level=ErrorLevel.ERROR,
)
) from ex
try:
client = boto3.client(
"redshift-serverless",
region_name=region,
aws_access_key_id=credentials["AccessKeyId"],
aws_secret_access_key=credentials["SecretAccessKey"],
aws_session_token=credentials["SessionToken"],
)
response = client.get_credentials(
workgroupName=workgroup_name,
dbName=db_name,
)
return response["dbUser"], response["dbPassword"]
except ClientError as ex:
raise SupersetSecurityException(
SupersetError(
message=f"Failed to get Redshift Serverless credentials: {ex}",
error_type=SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR,
level=ErrorLevel.ERROR,
)
) from ex
@classmethod
def generate_redshift_cluster_credentials(
cls,
credentials: dict[str, Any],
cluster_identifier: str,
db_user: str,
db_name: str,
region: str,
auto_create: bool = False,
) -> tuple[str, str]:
"""
Generate credentials for a provisioned Redshift cluster using temporary
STS credentials.
:param credentials: STS credentials from assume_role
:param cluster_identifier: Redshift cluster identifier
:param db_user: Database username to get credentials for
:param db_name: Redshift database name
:param region: AWS region
:param auto_create: Whether to auto-create the database user if it doesn't exist
:returns: Tuple of (username, password) for Redshift connection
:raises SupersetSecurityException: If credential generation fails
"""
try:
import boto3
from botocore.exceptions import ClientError
except ImportError as ex:
raise SupersetSecurityException(
SupersetError(
message="boto3 is required for AWS IAM authentication.",
error_type=SupersetErrorType.GENERIC_DB_ENGINE_ERROR,
level=ErrorLevel.ERROR,
)
) from ex
try:
client = boto3.client(
"redshift",
region_name=region,
aws_access_key_id=credentials["AccessKeyId"],
aws_secret_access_key=credentials["SecretAccessKey"],
aws_session_token=credentials["SessionToken"],
)
response = client.get_cluster_credentials(
ClusterIdentifier=cluster_identifier,
DbUser=db_user,
DbName=db_name,
AutoCreate=auto_create,
)
return response["DbUser"], response["DbPassword"]
except ClientError as ex:
raise SupersetSecurityException(
SupersetError(
message=f"Failed to get Redshift cluster credentials: {ex}",
error_type=SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR,
level=ErrorLevel.ERROR,
)
) from ex
@classmethod
def _apply_iam_authentication(
cls,
database: Database,
params: dict[str, Any],
iam_config: AWSIAMConfig,
ssl_args: dict[str, str] | None = None,
default_port: int = DEFAULT_POSTGRES_PORT,
) -> None:
"""
Apply IAM authentication to the connection parameters.
Full flow: assume role -> generate token -> update connect_args -> enable SSL.
:param database: Database model instance
:param params: Engine parameters dict to modify
:param iam_config: IAM configuration from encrypted_extra
:param ssl_args: SSL args to apply (defaults to sslmode=require)
:param default_port: Default port if not specified in URI
:raises SupersetSecurityException: If any step fails
"""
if ssl_args is None:
ssl_args = {"sslmode": "require"}
# Extract configuration
role_arn = iam_config.get("role_arn")
region = iam_config.get("region")
db_username = iam_config.get("db_username")
external_id = iam_config.get("external_id")
session_duration = iam_config.get("session_duration", DEFAULT_SESSION_DURATION)
# Validate required fields
missing_fields = []
if not role_arn:
missing_fields.append("role_arn")
if not region:
missing_fields.append("region")
if not db_username:
missing_fields.append("db_username")
if missing_fields:
raise SupersetSecurityException(
SupersetError(
message="AWS IAM configuration missing required fields: "
f"{', '.join(missing_fields)}",
error_type=SupersetErrorType.CONNECTION_MISSING_PARAMETERS_ERROR,
level=ErrorLevel.ERROR,
)
)
# Type assertions after validation (mypy doesn't narrow types from list check)
assert role_arn is not None
assert region is not None
assert db_username is not None
# Get hostname and port from the database URI
uri = make_url_safe(database.sqlalchemy_uri_decrypted)
hostname = uri.host
port = uri.port or default_port
if not hostname:
raise SupersetSecurityException(
SupersetError(
message=(
"Database URI must include a hostname for IAM authentication"
),
error_type=SupersetErrorType.CONNECTION_MISSING_PARAMETERS_ERROR,
level=ErrorLevel.ERROR,
)
)
logger.debug(
"Applying IAM authentication for %s:%d as user %s",
hostname,
port,
db_username,
)
# Step 1: Assume the IAM role
credentials = cls.get_iam_credentials(
role_arn=role_arn,
region=region,
external_id=external_id,
session_duration=session_duration,
)
# Step 2: Generate the RDS auth token
token = cls.generate_rds_auth_token(
credentials=credentials,
hostname=hostname,
port=port,
username=db_username,
region=region,
)
# Step 3: Update connection parameters
connect_args = params.setdefault("connect_args", {})
# Set the IAM token as the password
connect_args["password"] = token
# Override username if different from URI
connect_args["user"] = db_username
# Step 4: Enable SSL (required for IAM authentication)
connect_args.update(ssl_args)
logger.debug("IAM authentication configured successfully")
@classmethod
def _apply_redshift_iam_authentication(
cls,
database: Database,
params: dict[str, Any],
iam_config: AWSIAMConfig,
) -> None:
"""
Apply Redshift IAM authentication to connection parameters.
Supports both Redshift Serverless (workgroup_name) and provisioned
clusters (cluster_identifier). The method auto-detects which type
based on the configuration provided.
Flow: assume role -> get Redshift credentials -> update connect_args -> SSL.
:param database: Database model instance
:param params: Engine parameters dict to modify
:param iam_config: IAM configuration from encrypted_extra
:raises SupersetSecurityException: If any step fails
"""
# Extract configuration
role_arn = iam_config.get("role_arn")
region = iam_config.get("region")
external_id = iam_config.get("external_id")
session_duration = iam_config.get("session_duration", DEFAULT_SESSION_DURATION)
# Serverless fields
workgroup_name = iam_config.get("workgroup_name")
# Provisioned cluster fields
cluster_identifier = iam_config.get("cluster_identifier")
db_username = iam_config.get("db_username")
# Common field
db_name = iam_config.get("db_name")
# Determine deployment type
is_serverless = bool(workgroup_name)
is_provisioned = bool(cluster_identifier)
if is_serverless and is_provisioned:
raise SupersetSecurityException(
SupersetError(
message="AWS IAM configuration cannot have both workgroup_name "
"(Serverless) and cluster_identifier (provisioned). "
"Please specify only one.",
error_type=SupersetErrorType.CONNECTION_MISSING_PARAMETERS_ERROR,
level=ErrorLevel.ERROR,
)
)
if not is_serverless and not is_provisioned:
raise SupersetSecurityException(
SupersetError(
message="AWS IAM configuration must include either workgroup_name "
"(for Redshift Serverless) or cluster_identifier "
"(for provisioned Redshift clusters).",
error_type=SupersetErrorType.CONNECTION_MISSING_PARAMETERS_ERROR,
level=ErrorLevel.ERROR,
)
)
# Validate common required fields
missing_fields = []
if not role_arn:
missing_fields.append("role_arn")
if not region:
missing_fields.append("region")
if not db_name:
missing_fields.append("db_name")
# Validate provisioned cluster specific fields
if is_provisioned and not db_username:
missing_fields.append("db_username")
if missing_fields:
raise SupersetSecurityException(
SupersetError(
message="AWS IAM configuration missing required fields: "
f"{', '.join(missing_fields)}",
error_type=SupersetErrorType.CONNECTION_MISSING_PARAMETERS_ERROR,
level=ErrorLevel.ERROR,
)
)
# Type assertions after validation
assert role_arn is not None
assert region is not None
assert db_name is not None
# Step 1: Assume the IAM role
credentials = cls.get_iam_credentials(
role_arn=role_arn,
region=region,
external_id=external_id,
session_duration=session_duration,
)
# Step 2: Get Redshift credentials based on deployment type
if is_serverless:
assert workgroup_name is not None
logger.debug(
"Applying Redshift Serverless IAM authentication for workgroup %s",
workgroup_name,
)
db_user, db_password = cls.generate_redshift_credentials(
credentials=credentials,
workgroup_name=workgroup_name,
db_name=db_name,
region=region,
)
else:
assert cluster_identifier is not None
assert db_username is not None
logger.debug(
"Applying Redshift provisioned cluster IAM authentication for %s",
cluster_identifier,
)
db_user, db_password = cls.generate_redshift_cluster_credentials(
credentials=credentials,
cluster_identifier=cluster_identifier,
db_user=db_username,
db_name=db_name,
region=region,
)
# Step 3: Update connection parameters
connect_args = params.setdefault("connect_args", {})
connect_args["password"] = db_password
connect_args["user"] = db_user
# Step 4: Enable SSL (required for Redshift IAM authentication)
connect_args["sslmode"] = "verify-ca"
logger.debug("Redshift IAM authentication configured successfully")

View File

@@ -14,12 +14,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import contextlib
import logging
import re
from datetime import datetime
from decimal import Decimal
from re import Pattern
from typing import Any, Callable, Optional
from typing import Any, Callable, Optional, TYPE_CHECKING
from urllib import parse
from flask_babel import gettext as __
@@ -42,8 +45,14 @@ from superset.constants import TimeGrain
from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin
from superset.errors import SupersetErrorType
from superset.models.sql_lab import Query
from superset.utils import json
from superset.utils.core import GenericDataType
if TYPE_CHECKING:
from superset.models.core import Database
logger = logging.getLogger(__name__)
# Regular expressions to catch custom errors
CONNECTION_ACCESS_DENIED_REGEX = re.compile(
"Access denied for user '(?P<username>.*?)'@'(?P<hostname>.*?)'"
@@ -192,6 +201,49 @@ class MySQLEngineSpec(BasicParametersMixin, BaseEngineSpec):
"mysqlconnector": {"allow_local_infile": 0},
}
# Sensitive fields that should be masked in encrypted_extra
encrypted_extra_sensitive_fields = {
"$.aws_iam.external_id",
"$.aws_iam.role_arn",
}
@staticmethod
def update_params_from_encrypted_extra(
database: Database,
params: dict[str, Any],
) -> None:
"""
Extract sensitive parameters from encrypted_extra.
Handles AWS IAM authentication if configured, then merges any
remaining encrypted_extra keys into params.
"""
if not database.encrypted_extra:
return
try:
encrypted_extra = json.loads(database.encrypted_extra)
except json.JSONDecodeError as ex:
logger.error(ex, exc_info=True)
raise
# Handle AWS IAM auth: pop the key so it doesn't reach create_engine()
iam_config = encrypted_extra.pop("aws_iam", None)
if iam_config and iam_config.get("enabled"):
from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin
AWSIAMAuthMixin._apply_iam_authentication(
database,
params,
iam_config,
ssl_args={"ssl_mode": "REQUIRED"},
default_port=3306,
)
# Standard behavior: merge remaining keys into params
if encrypted_extra:
params.update(encrypted_extra)
@classmethod
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[dict[str, Any]] = None

View File

@@ -218,6 +218,12 @@ class PostgresEngineSpec(BasicParametersMixin, PostgresBaseEngineSpec):
max_column_name_length = 63
try_remove_schema_from_table_name = False # pylint: disable=invalid-name
# Sensitive fields that should be masked in encrypted_extra
encrypted_extra_sensitive_fields = {
"$.aws_iam.external_id",
"$.aws_iam.role_arn",
}
column_type_mappings = (
(
re.compile(r"^double precision", re.IGNORECASE),
@@ -320,6 +326,43 @@ class PostgresEngineSpec(BasicParametersMixin, PostgresBaseEngineSpec):
return uri, connect_args
@staticmethod
def update_params_from_encrypted_extra(
database: Database,
params: dict[str, Any],
) -> None:
"""
Extract sensitive parameters from encrypted_extra.
Handles AWS IAM authentication if configured, then merges any
remaining encrypted_extra keys into params (standard behavior).
"""
if not database.encrypted_extra:
return
try:
encrypted_extra = json.loads(database.encrypted_extra)
except json.JSONDecodeError as ex:
logger.error(ex, exc_info=True)
raise
# Handle AWS IAM auth: pop the key so it doesn't reach create_engine()
iam_config = encrypted_extra.pop("aws_iam", None)
if iam_config and iam_config.get("enabled"):
from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin
AWSIAMAuthMixin._apply_iam_authentication(
database,
params,
iam_config,
ssl_args={"sslmode": "require"},
default_port=5432,
)
# Standard behavior: merge remaining keys into params
if encrypted_extra:
params.update(encrypted_extra)
@classmethod
def get_default_catalog(cls, database: Database) -> str:
"""

View File

@@ -31,6 +31,7 @@ from superset.errors import SupersetErrorType
from superset.models.core import Database
from superset.models.sql_lab import Query
from superset.sql.parse import Table
from superset.utils import json
logger = logging.getLogger()
@@ -103,6 +104,45 @@ class RedshiftEngineSpec(BasicParametersMixin, PostgresBaseEngineSpec):
),
}
# Sensitive fields that should be masked in encrypted_extra
encrypted_extra_sensitive_fields = {
"$.aws_iam.external_id",
"$.aws_iam.role_arn",
}
@staticmethod
def update_params_from_encrypted_extra(
database: Database,
params: dict[str, Any],
) -> None:
"""
Extract sensitive parameters from encrypted_extra.
Handles AWS IAM authentication for Redshift Serverless if configured,
then merges any remaining encrypted_extra keys into params.
"""
if not database.encrypted_extra:
return
try:
encrypted_extra = json.loads(database.encrypted_extra)
except json.JSONDecodeError as ex:
logger.error(ex, exc_info=True)
raise
# Handle AWS IAM auth: pop the key so it doesn't reach create_engine()
iam_config = encrypted_extra.pop("aws_iam", None)
if iam_config and iam_config.get("enabled"):
from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin
AWSIAMAuthMixin._apply_redshift_iam_authentication(
database, params, iam_config
)
# Standard behavior: merge remaining keys into params
if encrypted_extra:
params.update(encrypted_extra)
@classmethod
def df_to_sql(
cls,

View File

@@ -0,0 +1,313 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=import-outside-toplevel
from __future__ import annotations
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from superset.utils import json
def test_aurora_postgres_engine_spec_properties() -> None:
from superset.db_engine_specs.aurora import AuroraPostgresEngineSpec
assert AuroraPostgresEngineSpec.engine == "postgresql"
assert AuroraPostgresEngineSpec.engine_name == "Aurora PostgreSQL"
assert AuroraPostgresEngineSpec.default_driver == "psycopg2"
def test_update_params_from_encrypted_extra_without_iam() -> None:
from superset.db_engine_specs.postgres import PostgresEngineSpec
database = MagicMock()
database.encrypted_extra = json.dumps({})
database.sqlalchemy_uri_decrypted = (
"postgresql://user:password@mydb.us-east-1.rds.amazonaws.com:5432/mydb"
)
params: dict[str, Any] = {}
PostgresEngineSpec.update_params_from_encrypted_extra(database, params)
# No modifications should be made
assert params == {}
def test_update_params_from_encrypted_extra_iam_disabled() -> None:
from superset.db_engine_specs.postgres import PostgresEngineSpec
database = MagicMock()
database.encrypted_extra = json.dumps(
{
"aws_iam": {
"enabled": False,
"role_arn": "arn:aws:iam::123456789012:role/TestRole",
"region": "us-east-1",
"db_username": "superset_user",
}
}
)
database.sqlalchemy_uri_decrypted = (
"postgresql://user:password@mydb.us-east-1.rds.amazonaws.com:5432/mydb"
)
params: dict[str, Any] = {}
PostgresEngineSpec.update_params_from_encrypted_extra(database, params)
# No modifications should be made when IAM is disabled
assert params == {}
def test_update_params_from_encrypted_extra_with_iam() -> None:
from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin
from superset.db_engine_specs.postgres import PostgresEngineSpec
database = MagicMock()
database.encrypted_extra = json.dumps(
{
"aws_iam": {
"enabled": True,
"role_arn": "arn:aws:iam::123456789012:role/TestRole",
"region": "us-east-1",
"db_username": "superset_iam_user",
}
}
)
database.sqlalchemy_uri_decrypted = (
"postgresql://user@mydb.cluster-xyz.us-east-1.rds.amazonaws.com:5432/mydb"
)
params: dict[str, Any] = {}
with (
patch.object(
AWSIAMAuthMixin,
"get_iam_credentials",
return_value={
"AccessKeyId": "ASIA...",
"SecretAccessKey": "secret...",
"SessionToken": "token...",
},
),
patch.object(
AWSIAMAuthMixin,
"generate_rds_auth_token",
return_value="iam-auth-token",
),
):
PostgresEngineSpec.update_params_from_encrypted_extra(database, params)
assert "connect_args" in params
assert params["connect_args"]["password"] == "iam-auth-token" # noqa: S105
assert params["connect_args"]["user"] == "superset_iam_user"
assert params["connect_args"]["sslmode"] == "require"
def test_update_params_merges_remaining_encrypted_extra() -> None:
from superset.db_engine_specs.postgres import PostgresEngineSpec
database = MagicMock()
database.encrypted_extra = json.dumps(
{
"aws_iam": {"enabled": False},
"pool_size": 10,
}
)
database.sqlalchemy_uri_decrypted = (
"postgresql://user:password@mydb.us-east-1.rds.amazonaws.com:5432/mydb"
)
params: dict[str, Any] = {}
PostgresEngineSpec.update_params_from_encrypted_extra(database, params)
# aws_iam should be consumed, pool_size should be merged
assert "aws_iam" not in params
assert params["pool_size"] == 10
def test_update_params_from_encrypted_extra_no_encrypted_extra() -> None:
from superset.db_engine_specs.postgres import PostgresEngineSpec
database = MagicMock()
database.encrypted_extra = None
params: dict[str, Any] = {}
PostgresEngineSpec.update_params_from_encrypted_extra(database, params)
# No modifications should be made
assert params == {}
def test_update_params_from_encrypted_extra_invalid_json() -> None:
from superset.db_engine_specs.postgres import PostgresEngineSpec
database = MagicMock()
database.encrypted_extra = "not-valid-json"
params: dict[str, Any] = {}
with pytest.raises(json.JSONDecodeError):
PostgresEngineSpec.update_params_from_encrypted_extra(database, params)
def test_encrypted_extra_sensitive_fields() -> None:
from superset.db_engine_specs.postgres import PostgresEngineSpec
# Verify sensitive fields are properly defined
assert (
"$.aws_iam.external_id" in PostgresEngineSpec.encrypted_extra_sensitive_fields
)
assert "$.aws_iam.role_arn" in PostgresEngineSpec.encrypted_extra_sensitive_fields
def test_mask_encrypted_extra() -> None:
from superset.db_engine_specs.postgres import PostgresEngineSpec
encrypted_extra = json.dumps(
{
"aws_iam": {
"enabled": True,
"role_arn": "arn:aws:iam::123456789012:role/SecretRole",
"external_id": "secret-external-id-12345",
"region": "us-east-1",
"db_username": "superset_user",
}
}
)
masked = PostgresEngineSpec.mask_encrypted_extra(encrypted_extra)
assert masked is not None
masked_config = json.loads(masked)
# role_arn and external_id should be masked
assert (
masked_config["aws_iam"]["role_arn"]
!= "arn:aws:iam::123456789012:role/SecretRole"
)
assert masked_config["aws_iam"]["external_id"] != "secret-external-id-12345"
# Non-sensitive fields should remain unchanged
assert masked_config["aws_iam"]["enabled"] is True
assert masked_config["aws_iam"]["region"] == "us-east-1"
assert masked_config["aws_iam"]["db_username"] == "superset_user"
def test_aurora_postgres_inherits_from_postgres() -> None:
from superset.db_engine_specs.aurora import AuroraPostgresEngineSpec
from superset.db_engine_specs.postgres import PostgresEngineSpec
# Verify inheritance
assert issubclass(AuroraPostgresEngineSpec, PostgresEngineSpec)
# Verify it inherits PostgreSQL capabilities
assert AuroraPostgresEngineSpec.supports_dynamic_schema is True
assert AuroraPostgresEngineSpec.supports_catalog is True
def test_aurora_mysql_engine_spec_properties() -> None:
from superset.db_engine_specs.aurora import AuroraMySQLEngineSpec
assert AuroraMySQLEngineSpec.engine == "mysql"
assert AuroraMySQLEngineSpec.engine_name == "Aurora MySQL"
assert AuroraMySQLEngineSpec.default_driver == "mysqldb"
def test_aurora_mysql_inherits_from_mysql() -> None:
from superset.db_engine_specs.aurora import AuroraMySQLEngineSpec
from superset.db_engine_specs.mysql import MySQLEngineSpec
assert issubclass(AuroraMySQLEngineSpec, MySQLEngineSpec)
assert AuroraMySQLEngineSpec.supports_dynamic_schema is True
def test_aurora_mysql_has_iam_support() -> None:
from superset.db_engine_specs.aurora import AuroraMySQLEngineSpec
# Verify it inherits encrypted_extra_sensitive_fields
assert (
"$.aws_iam.external_id"
in AuroraMySQLEngineSpec.encrypted_extra_sensitive_fields
)
assert (
"$.aws_iam.role_arn" in AuroraMySQLEngineSpec.encrypted_extra_sensitive_fields
)
def test_aurora_mysql_update_params_from_encrypted_extra_with_iam() -> None:
from superset.db_engine_specs.aurora import AuroraMySQLEngineSpec
from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin
database = MagicMock()
database.encrypted_extra = json.dumps(
{
"aws_iam": {
"enabled": True,
"role_arn": "arn:aws:iam::123456789012:role/TestRole",
"region": "us-east-1",
"db_username": "superset_iam_user",
}
}
)
database.sqlalchemy_uri_decrypted = (
"mysql://user@mydb.cluster-xyz.us-east-1.rds.amazonaws.com:3306/mydb"
)
params: dict[str, Any] = {}
with (
patch.object(
AWSIAMAuthMixin,
"get_iam_credentials",
return_value={
"AccessKeyId": "ASIA...",
"SecretAccessKey": "secret...",
"SessionToken": "token...",
},
),
patch.object(
AWSIAMAuthMixin,
"generate_rds_auth_token",
return_value="iam-auth-token",
),
):
AuroraMySQLEngineSpec.update_params_from_encrypted_extra(database, params)
assert "connect_args" in params
assert params["connect_args"]["password"] == "iam-auth-token" # noqa: S105
assert params["connect_args"]["user"] == "superset_iam_user"
assert params["connect_args"]["ssl_mode"] == "REQUIRED"
def test_aurora_data_api_classes_unchanged() -> None:
from superset.db_engine_specs.aurora import (
AuroraMySQLDataAPI,
AuroraPostgresDataAPI,
)
# Verify Data API classes are still available and unchanged
assert AuroraMySQLDataAPI.engine == "mysql"
assert AuroraMySQLDataAPI.default_driver == "auroradataapi"
assert AuroraMySQLDataAPI.engine_name == "Aurora MySQL (Data API)"
assert AuroraPostgresDataAPI.engine == "postgresql"
assert AuroraPostgresDataAPI.default_driver == "auroradataapi"
assert AuroraPostgresDataAPI.engine_name == "Aurora PostgreSQL (Data API)"

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,232 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=import-outside-toplevel
from __future__ import annotations
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from superset.utils import json
def test_mysql_encrypted_extra_sensitive_fields() -> None:
from superset.db_engine_specs.mysql import MySQLEngineSpec
assert "$.aws_iam.external_id" in MySQLEngineSpec.encrypted_extra_sensitive_fields
assert "$.aws_iam.role_arn" in MySQLEngineSpec.encrypted_extra_sensitive_fields
def test_mysql_update_params_no_encrypted_extra() -> None:
from superset.db_engine_specs.mysql import MySQLEngineSpec
database = MagicMock()
database.encrypted_extra = None
params: dict[str, Any] = {}
MySQLEngineSpec.update_params_from_encrypted_extra(database, params)
assert params == {}
def test_mysql_update_params_empty_encrypted_extra() -> None:
from superset.db_engine_specs.mysql import MySQLEngineSpec
database = MagicMock()
database.encrypted_extra = json.dumps({})
params: dict[str, Any] = {}
MySQLEngineSpec.update_params_from_encrypted_extra(database, params)
assert params == {}
def test_mysql_update_params_iam_disabled() -> None:
from superset.db_engine_specs.mysql import MySQLEngineSpec
database = MagicMock()
database.encrypted_extra = json.dumps(
{
"aws_iam": {
"enabled": False,
"role_arn": "arn:aws:iam::123456789012:role/TestRole",
"region": "us-east-1",
"db_username": "superset_user",
}
}
)
params: dict[str, Any] = {}
MySQLEngineSpec.update_params_from_encrypted_extra(database, params)
assert params == {}
def test_mysql_update_params_with_iam() -> None:
from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin
from superset.db_engine_specs.mysql import MySQLEngineSpec
database = MagicMock()
database.encrypted_extra = json.dumps(
{
"aws_iam": {
"enabled": True,
"role_arn": "arn:aws:iam::123456789012:role/TestRole",
"region": "us-east-1",
"db_username": "superset_iam_user",
}
}
)
database.sqlalchemy_uri_decrypted = (
"mysql://user@mydb.cluster-xyz.us-east-1.rds.amazonaws.com:3306/mydb"
)
params: dict[str, Any] = {}
with (
patch.object(
AWSIAMAuthMixin,
"get_iam_credentials",
return_value={
"AccessKeyId": "ASIA...",
"SecretAccessKey": "secret...",
"SessionToken": "token...",
},
),
patch.object(
AWSIAMAuthMixin,
"generate_rds_auth_token",
return_value="iam-auth-token",
),
):
MySQLEngineSpec.update_params_from_encrypted_extra(database, params)
assert "connect_args" in params
assert params["connect_args"]["password"] == "iam-auth-token" # noqa: S105
assert params["connect_args"]["user"] == "superset_iam_user"
assert params["connect_args"]["ssl_mode"] == "REQUIRED"
def test_mysql_update_params_iam_uses_mysql_port() -> None:
from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin
from superset.db_engine_specs.mysql import MySQLEngineSpec
database = MagicMock()
database.encrypted_extra = json.dumps(
{
"aws_iam": {
"enabled": True,
"role_arn": "arn:aws:iam::123456789012:role/TestRole",
"region": "us-east-1",
"db_username": "superset_iam_user",
}
}
)
# URI without explicit port
database.sqlalchemy_uri_decrypted = (
"mysql://user@mydb.cluster-xyz.us-east-1.rds.amazonaws.com/mydb"
)
params: dict[str, Any] = {}
with (
patch.object(
AWSIAMAuthMixin,
"get_iam_credentials",
return_value={
"AccessKeyId": "ASIA...",
"SecretAccessKey": "secret...",
"SessionToken": "token...",
},
),
patch.object(
AWSIAMAuthMixin,
"generate_rds_auth_token",
return_value="iam-auth-token",
) as mock_gen_token,
):
MySQLEngineSpec.update_params_from_encrypted_extra(database, params)
# Should use default MySQL port 3306
token_call_kwargs = mock_gen_token.call_args[1]
assert token_call_kwargs["port"] == 3306
def test_mysql_update_params_merges_remaining_encrypted_extra() -> None:
from superset.db_engine_specs.mysql import MySQLEngineSpec
database = MagicMock()
database.encrypted_extra = json.dumps(
{
"aws_iam": {"enabled": False},
"pool_size": 10,
}
)
params: dict[str, Any] = {}
MySQLEngineSpec.update_params_from_encrypted_extra(database, params)
assert "aws_iam" not in params
assert params["pool_size"] == 10
def test_mysql_update_params_invalid_json() -> None:
from superset.db_engine_specs.mysql import MySQLEngineSpec
database = MagicMock()
database.encrypted_extra = "not-valid-json"
params: dict[str, Any] = {}
with pytest.raises(json.JSONDecodeError):
MySQLEngineSpec.update_params_from_encrypted_extra(database, params)
def test_mysql_mask_encrypted_extra() -> None:
from superset.db_engine_specs.mysql import MySQLEngineSpec
encrypted_extra = json.dumps(
{
"aws_iam": {
"enabled": True,
"role_arn": "arn:aws:iam::123456789012:role/SecretRole",
"external_id": "secret-external-id-12345",
"region": "us-east-1",
"db_username": "superset_user",
}
}
)
masked = MySQLEngineSpec.mask_encrypted_extra(encrypted_extra)
assert masked is not None
masked_config = json.loads(masked)
# role_arn and external_id should be masked
assert (
masked_config["aws_iam"]["role_arn"]
!= "arn:aws:iam::123456789012:role/SecretRole"
)
assert masked_config["aws_iam"]["external_id"] != "secret-external-id-12345"
# Non-sensitive fields should remain unchanged
assert masked_config["aws_iam"]["enabled"] is True
assert masked_config["aws_iam"]["region"] == "us-east-1"
assert masked_config["aws_iam"]["db_username"] == "superset_user"

View File

@@ -0,0 +1,382 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=import-outside-toplevel
from __future__ import annotations
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from superset.utils import json
def test_redshift_encrypted_extra_sensitive_fields() -> None:
from superset.db_engine_specs.redshift import RedshiftEngineSpec
assert (
"$.aws_iam.external_id" in RedshiftEngineSpec.encrypted_extra_sensitive_fields
)
assert "$.aws_iam.role_arn" in RedshiftEngineSpec.encrypted_extra_sensitive_fields
def test_redshift_update_params_no_encrypted_extra() -> None:
from superset.db_engine_specs.redshift import RedshiftEngineSpec
database = MagicMock()
database.encrypted_extra = None
params: dict[str, Any] = {}
RedshiftEngineSpec.update_params_from_encrypted_extra(database, params)
assert params == {}
def test_redshift_update_params_empty_encrypted_extra() -> None:
from superset.db_engine_specs.redshift import RedshiftEngineSpec
database = MagicMock()
database.encrypted_extra = json.dumps({})
params: dict[str, Any] = {}
RedshiftEngineSpec.update_params_from_encrypted_extra(database, params)
assert params == {}
def test_redshift_update_params_iam_disabled() -> None:
from superset.db_engine_specs.redshift import RedshiftEngineSpec
database = MagicMock()
database.encrypted_extra = json.dumps(
{
"aws_iam": {
"enabled": False,
"role_arn": "arn:aws:iam::123456789012:role/TestRole",
"region": "us-east-1",
"workgroup_name": "my-workgroup",
"db_name": "dev",
}
}
)
params: dict[str, Any] = {}
RedshiftEngineSpec.update_params_from_encrypted_extra(database, params)
assert params == {}
def test_redshift_update_params_with_iam() -> None:
from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin
from superset.db_engine_specs.redshift import RedshiftEngineSpec
database = MagicMock()
database.encrypted_extra = json.dumps(
{
"aws_iam": {
"enabled": True,
"role_arn": "arn:aws:iam::123456789012:role/RedshiftRole",
"region": "us-east-1",
"workgroup_name": "my-workgroup",
"db_name": "dev",
}
}
)
database.sqlalchemy_uri_decrypted = (
"redshift+psycopg2://user@my-workgroup.123456789012.us-east-1"
".redshift-serverless.amazonaws.com:5439/dev"
)
params: dict[str, Any] = {}
with (
patch.object(
AWSIAMAuthMixin,
"get_iam_credentials",
return_value={
"AccessKeyId": "ASIA...",
"SecretAccessKey": "secret...",
"SessionToken": "token...",
},
),
patch.object(
AWSIAMAuthMixin,
"generate_redshift_credentials",
return_value=("IAM:admin", "redshift-temp-password"),
),
):
RedshiftEngineSpec.update_params_from_encrypted_extra(database, params)
assert "connect_args" in params
assert params["connect_args"]["password"] == "redshift-temp-password" # noqa: S105
assert params["connect_args"]["user"] == "IAM:admin"
assert params["connect_args"]["sslmode"] == "verify-ca"
def test_redshift_update_params_with_external_id() -> None:
from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin
from superset.db_engine_specs.redshift import RedshiftEngineSpec
database = MagicMock()
database.encrypted_extra = json.dumps(
{
"aws_iam": {
"enabled": True,
"role_arn": "arn:aws:iam::222222222222:role/CrossAccountRedshift",
"external_id": "superset-prod-12345",
"region": "us-west-2",
"workgroup_name": "prod-workgroup",
"db_name": "analytics",
"session_duration": 1800,
}
}
)
database.sqlalchemy_uri_decrypted = (
"redshift+psycopg2://user@prod-workgroup.222222222222.us-west-2"
".redshift-serverless.amazonaws.com:5439/analytics"
)
params: dict[str, Any] = {}
with (
patch.object(
AWSIAMAuthMixin,
"get_iam_credentials",
return_value={
"AccessKeyId": "ASIA...",
"SecretAccessKey": "secret...",
"SessionToken": "token...",
},
) as mock_get_creds,
patch.object(
AWSIAMAuthMixin,
"generate_redshift_credentials",
return_value=("IAM:admin", "redshift-temp-password"),
),
):
RedshiftEngineSpec.update_params_from_encrypted_extra(database, params)
mock_get_creds.assert_called_once_with(
role_arn="arn:aws:iam::222222222222:role/CrossAccountRedshift",
region="us-west-2",
external_id="superset-prod-12345",
session_duration=1800,
)
def test_redshift_update_params_merges_remaining_encrypted_extra() -> None:
from superset.db_engine_specs.redshift import RedshiftEngineSpec
database = MagicMock()
database.encrypted_extra = json.dumps(
{
"aws_iam": {"enabled": False},
"pool_size": 5,
}
)
params: dict[str, Any] = {}
RedshiftEngineSpec.update_params_from_encrypted_extra(database, params)
assert "aws_iam" not in params
assert params["pool_size"] == 5
def test_redshift_update_params_invalid_json() -> None:
from superset.db_engine_specs.redshift import RedshiftEngineSpec
database = MagicMock()
database.encrypted_extra = "not-valid-json"
params: dict[str, Any] = {}
with pytest.raises(json.JSONDecodeError):
RedshiftEngineSpec.update_params_from_encrypted_extra(database, params)
def test_redshift_mask_encrypted_extra() -> None:
from superset.db_engine_specs.redshift import RedshiftEngineSpec
encrypted_extra = json.dumps(
{
"aws_iam": {
"enabled": True,
"role_arn": "arn:aws:iam::123456789012:role/SecretRole",
"external_id": "secret-external-id-12345",
"region": "us-east-1",
"workgroup_name": "my-workgroup",
"db_name": "dev",
}
}
)
masked = RedshiftEngineSpec.mask_encrypted_extra(encrypted_extra)
assert masked is not None
masked_config = json.loads(masked)
# role_arn and external_id should be masked
assert (
masked_config["aws_iam"]["role_arn"]
!= "arn:aws:iam::123456789012:role/SecretRole"
)
assert masked_config["aws_iam"]["external_id"] != "secret-external-id-12345"
# Non-sensitive fields should remain unchanged
assert masked_config["aws_iam"]["enabled"] is True
assert masked_config["aws_iam"]["region"] == "us-east-1"
assert masked_config["aws_iam"]["workgroup_name"] == "my-workgroup"
assert masked_config["aws_iam"]["db_name"] == "dev"
def test_redshift_update_params_with_iam_provisioned_cluster() -> None:
from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin
from superset.db_engine_specs.redshift import RedshiftEngineSpec
database = MagicMock()
database.encrypted_extra = json.dumps(
{
"aws_iam": {
"enabled": True,
"role_arn": "arn:aws:iam::123456789012:role/RedshiftRole",
"region": "us-east-1",
"cluster_identifier": "my-redshift-cluster",
"db_username": "superset_user",
"db_name": "analytics",
}
}
)
database.sqlalchemy_uri_decrypted = (
"redshift+psycopg2://user@my-redshift-cluster.abc123.us-east-1"
".redshift.amazonaws.com:5439/analytics"
)
params: dict[str, Any] = {}
with (
patch.object(
AWSIAMAuthMixin,
"get_iam_credentials",
return_value={
"AccessKeyId": "ASIA...",
"SecretAccessKey": "secret...",
"SessionToken": "token...",
},
),
patch.object(
AWSIAMAuthMixin,
"generate_redshift_cluster_credentials",
return_value=("IAM:superset_user", "cluster-temp-password"),
),
):
RedshiftEngineSpec.update_params_from_encrypted_extra(database, params)
assert "connect_args" in params
assert params["connect_args"]["password"] == "cluster-temp-password" # noqa: S105
assert params["connect_args"]["user"] == "IAM:superset_user"
assert params["connect_args"]["sslmode"] == "verify-ca"
def test_redshift_update_params_provisioned_cluster_with_external_id() -> None:
from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin
from superset.db_engine_specs.redshift import RedshiftEngineSpec
database = MagicMock()
database.encrypted_extra = json.dumps(
{
"aws_iam": {
"enabled": True,
"role_arn": "arn:aws:iam::222222222222:role/CrossAccountRedshift",
"external_id": "superset-prod-12345",
"region": "us-west-2",
"cluster_identifier": "prod-cluster",
"db_username": "analytics_user",
"db_name": "prod_db",
"session_duration": 1800,
}
}
)
database.sqlalchemy_uri_decrypted = (
"redshift+psycopg2://user@prod-cluster.xyz789.us-west-2"
".redshift.amazonaws.com:5439/prod_db"
)
params: dict[str, Any] = {}
with (
patch.object(
AWSIAMAuthMixin,
"get_iam_credentials",
return_value={
"AccessKeyId": "ASIA...",
"SecretAccessKey": "secret...",
"SessionToken": "token...",
},
) as mock_get_creds,
patch.object(
AWSIAMAuthMixin,
"generate_redshift_cluster_credentials",
return_value=("IAM:analytics_user", "cluster-temp-password"),
),
):
RedshiftEngineSpec.update_params_from_encrypted_extra(database, params)
mock_get_creds.assert_called_once_with(
role_arn="arn:aws:iam::222222222222:role/CrossAccountRedshift",
region="us-west-2",
external_id="superset-prod-12345",
session_duration=1800,
)
def test_redshift_mask_encrypted_extra_provisioned_cluster() -> None:
from superset.db_engine_specs.redshift import RedshiftEngineSpec
encrypted_extra = json.dumps(
{
"aws_iam": {
"enabled": True,
"role_arn": "arn:aws:iam::123456789012:role/SecretRole",
"external_id": "secret-external-id-12345",
"region": "us-east-1",
"cluster_identifier": "my-cluster",
"db_username": "superset_user",
"db_name": "analytics",
}
}
)
masked = RedshiftEngineSpec.mask_encrypted_extra(encrypted_extra)
assert masked is not None
masked_config = json.loads(masked)
# role_arn and external_id should be masked
assert (
masked_config["aws_iam"]["role_arn"]
!= "arn:aws:iam::123456789012:role/SecretRole"
)
assert masked_config["aws_iam"]["external_id"] != "secret-external-id-12345"
# Non-sensitive fields should remain unchanged
assert masked_config["aws_iam"]["enabled"] is True
assert masked_config["aws_iam"]["region"] == "us-east-1"
assert masked_config["aws_iam"]["cluster_identifier"] == "my-cluster"
assert masked_config["aws_iam"]["db_username"] == "superset_user"
assert masked_config["aws_iam"]["db_name"] == "analytics"