feat(db): add dynamic schema support for athena (#36003)

This commit is contained in:
Igor Shmulyan
2026-01-12 20:58:12 +02:00
committed by GitHub
parent 4fe2085596
commit fac5d2bcb6
2 changed files with 110 additions and 0 deletions

View File

@@ -21,6 +21,7 @@ from typing import Any, Optional
from flask_babel import gettext as __
from sqlalchemy import types
from sqlalchemy.engine.url import URL
from superset.constants import TimeGrain
from superset.db_engine_specs.base import BaseEngineSpec
@@ -38,6 +39,7 @@ class AthenaEngineSpec(BaseEngineSpec):
disable_ssh_tunneling = True
# Athena doesn't support IS true/false syntax, use = true/false instead
use_equality_for_boolean_filters = True
supports_dynamic_schema = True
_time_grain_expressions = {
None: "{col}",
@@ -92,3 +94,41 @@ class AthenaEngineSpec(BaseEngineSpec):
:return: Conditionally mutated label
"""
return label.lower()
@classmethod
def adjust_engine_params(
cls,
uri: URL,
connect_args: dict[str, Any],
catalog: str | None = None,
schema: str | None = None,
) -> tuple[URL, dict[str, Any]]:
"""
Adjust the SQLAlchemy URI for Athena with a provided catalog and schema.
For AWS Athena the SQLAlchemy URI looks like this:
awsathena+rest://athena.{region_name}.amazonaws.com:443/{schema_name}?catalog_name={catalog_name}&s3_staging_dir={s3_staging_dir}
"""
if catalog:
uri = uri.update_query_dict({"catalog_name": catalog})
if schema:
uri = uri.set(database=schema)
return uri, connect_args
@classmethod
def get_schema_from_engine_params(
cls,
sqlalchemy_uri: URL,
connect_args: dict[str, Any],
) -> str | None:
"""
Return the configured schema.
For AWS Athena the SQLAlchemy URI looks like this:
awsathena+rest://athena.{region_name}.amazonaws.com:443/{schema_name}?catalog_name={catalog_name}&s3_staging_dir={s3_staging_dir}
"""
return sqlalchemy_uri.database

View File

@@ -20,6 +20,7 @@ from datetime import datetime
from typing import Optional
import pytest
from sqlalchemy.engine.url import make_url
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from tests.unit_tests.db_engine_specs.utils import assert_convert_dttm
@@ -120,3 +121,72 @@ def test_handle_boolean_filter() -> None:
str(result_false.compile(compile_kwargs={"literal_binds": True}))
== "test_col = false"
)
def test_adjust_engine_params() -> None:
"""
Test `adjust_engine_params`.
The method can be used to adjust the schema dynamically.
"""
from superset.db_engine_specs.athena import AthenaEngineSpec
url = make_url("awsathena+rest://athena.us-east-1.amazonaws.com:443/default")
uri = AthenaEngineSpec.adjust_engine_params(url, {})[0]
assert str(uri) == "awsathena+rest://athena.us-east-1.amazonaws.com:443/default"
uri = AthenaEngineSpec.adjust_engine_params(
url,
{},
schema="new_schema",
)[0]
assert str(uri) == "awsathena+rest://athena.us-east-1.amazonaws.com:443/new_schema"
uri = AthenaEngineSpec.adjust_engine_params(
url,
{},
catalog="new_catalog",
)[0]
assert (
str(uri)
== "awsathena+rest://athena.us-east-1.amazonaws.com:443/default?catalog_name=new_catalog"
)
uri = AthenaEngineSpec.adjust_engine_params(
url,
{},
catalog="new_catalog",
schema="new_schema",
)[0]
assert (
str(uri)
== "awsathena+rest://athena.us-east-1.amazonaws.com:443/new_schema?catalog_name=new_catalog"
)
def test_get_schema_from_engine_params() -> None:
"""
Test the ``get_schema_from_engine_params`` method.
"""
from superset.db_engine_specs.athena import AthenaEngineSpec
assert (
AthenaEngineSpec.get_schema_from_engine_params(
make_url(
"awsathena+rest://athena.us-east-1.amazonaws.com:443/default?s3_staging_dir=s3%3A%2F%2Fathena-staging"
),
{},
)
== "default"
)
assert (
AthenaEngineSpec.get_schema_from_engine_params(
make_url(
"awsathena+rest://athena.us-east-1.amazonaws.com:443?s3_staging_dir=s3%3A%2F%2Fathena-staging"
),
{},
)
is None
)