Compare commits

..

5 Commits

Author SHA1 Message Date
Evan
63603ec369 fix(sqllab): keep RLS schema probe out of the session + always run the schema gate
Addresses review feedback on the cost-estimate security-parity path:

- The transient Query built to resolve the per-query schema joined the
  session via the `database` backref's `cascade="all, delete-orphan"`.
  With no `client_id` (nullable=False) it autoflushed into an IntegrityError
  the moment `apply_rls` issued its own session query, 500-ing every
  RLS-enabled estimate that references a table. Mirror the probe pattern in
  `SupersetSecurityManager.raise_for_access`: set `client_id`/`user_id` and
  `db.session.expunge` the probe.
- `get_default_schema_for_query` was skipped whenever the caller pinned a
  schema, bypassing the engine's per-query gate (e.g. Postgres rejecting
  `SET search_path`) that the executor runs unconditionally. Always resolve
  through it; an explicit schema still wins as the RLS predicate target.

Unit tests patch the session (so they can't see the autoflush); add an
integration test exercising a real session + real `apply_rls` that would
have caught the regression, and flip the explicit-schema test to assert the
gate now runs.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-10 14:59:51 -07:00
Evan
d6280dcc48 fix(sqllab): resolve estimate RLS schema via per-query engine gate
Resolve the default schema for cost-estimate RLS through
get_default_schema_for_query (mirroring sql_lab.execute_sql_statements)
instead of the static get_default_schema, so engine-specific per-query
security gates (e.g. the Postgres search_path rejection) also run on the
estimate path.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-10 14:59:51 -07:00
Evan
51efa469d8 fix(sqllab): resolve default catalog/schema before applying RLS in cost estimate
The cost-estimate RLS injection passed the raw self._schema ("" when the
caller omitted a schema) and self._catalog (None) to apply_rls. The
execution path it mirrors resolves the database default first
(SQLExecutor: get_default_catalog() / get_default_schema(catalog);
sql_lab.execute_sql_statements: get_default_schema_for_query). Without
resolution, unqualified table references could not be matched against
datasets registered under the default schema, so the estimate would skip
RLS predicates the real query enforces — defeating the security-parity
goal of this PR.

Resolve catalog then schema (catalog first, like the executor) before
injection, and pin the corrected behavior with unit tests covering both
the default-resolution and explicit-override cases.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-10 14:59:50 -07:00
Claude Code
e1d15a44e0 test(sqllab): cover disallowed-function and RLS controls on cost estimation
Extends the cost-estimation security tests beyond DML/disallowed-table to the
remaining controls: a disallowed function is blocked, a benign statement passes
through unchanged, and RLS predicates are injected per statement when
RLS_IN_SQLLAB is enabled.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-10 14:59:50 -07:00
Claude Code
d45e58d94f fix(sqllab): apply SQL security controls to cost estimation [DRAFT]
The cost-estimation path (QueryEstimationCommand) only checked database-level
access, skipping the disallowed-function, disallowed-table, DML, and row-level
security controls that the execution path (sql_lab.execute_sql_statements)
applies. As a result EXPLAIN/cost estimation could be used to probe disallowed
functions/tables, bypass the DML guard, and — because RLS predicates were not
injected — confirm the existence/cardinality of rows hidden by RLS.

Add _apply_sql_security() mirroring the executor: reject disallowed
functions/tables and DML (when allow_dml is False), and inject RLS predicates
into each statement (when RLS_IN_SQLLAB is enabled) before estimating, so the
estimate reflects the same constrained query the user could actually run.

DRAFT: changes the SQL sent to the engine's estimate_query_cost across engine
specs; needs validation that EXPLAIN still works for RLS-injected SQL on the
supported analytics databases before merge.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-10 14:59:50 -07:00
7 changed files with 338 additions and 221 deletions

View File

@@ -103,19 +103,6 @@ class DatasourceTypeUpdateRequiredValidationError(ValidationError):
)
class ChartQueryContextDatasourceMismatchValidationError(ValidationError):
"""
Raised when a query-context-only update carries a datasource that does not
match the chart's own datasource.
"""
def __init__(self) -> None:
super().__init__(
_("The query context datasource does not match the chart datasource"),
field_name="query_context",
)
class ChartNotFoundError(CommandException):
message = "Chart not found."

View File

@@ -29,7 +29,6 @@ from superset.commands.chart.exceptions import (
ChartForbiddenError,
ChartInvalidError,
ChartNotFoundError,
ChartQueryContextDatasourceMismatchValidationError,
ChartUpdateFailedError,
DashboardsForbiddenError,
DashboardsNotFoundValidationError,
@@ -42,7 +41,6 @@ from superset.exceptions import SupersetSecurityException
from superset.models.dashboard import Dashboard
from superset.models.slice import Slice
from superset.tags.models import ObjectType
from superset.utils import json
from superset.utils.decorators import on_error, transaction
logger = logging.getLogger(__name__)
@@ -103,51 +101,6 @@ class UpdateChartCommand(UpdateMixin, BaseCommand):
if not security_manager.is_owner(dash):
raise DashboardsForbiddenError()
def _validate_query_context_datasource(
self, exceptions: list[ValidationError]
) -> None:
"""
Ensure a query-context-only update keeps the chart's own datasource.
The submitted query context is only verified when it carries a parseable
``datasource`` object; a payload that references a different datasource than
the chart's persisted one is rejected. Payloads without a datasource fall
back to the chart's datasource at execution time and need no check.
"""
if not self._model:
return
raw_query_context = self._properties.get("query_context")
if not raw_query_context:
return
try:
query_context = json.loads(raw_query_context)
except (TypeError, ValueError):
# An unparseable payload cannot be verified or replayed; leave it for
# downstream handling rather than guessing at its intent.
return
datasource = (
query_context.get("datasource") if isinstance(query_context, dict) else None
)
if not isinstance(datasource, dict):
return
try:
ids_match = int(datasource["id"]) == self._model.datasource_id
except (KeyError, TypeError, ValueError):
ids_match = False
datasource_type = datasource.get("type")
types_match = (
datasource_type is None
or str(datasource_type) == self._model.datasource_type
)
if not ids_match or not types_match:
exceptions.append(ChartQueryContextDatasourceMismatchValidationError())
def validate(self) -> None: # noqa: C901
exceptions: list[ValidationError] = []
dashboard_ids = self._properties.get("dashboards")
@@ -181,12 +134,6 @@ class UpdateChartCommand(UpdateMixin, BaseCommand):
raise ChartForbiddenError() from ex
except ValidationError as ex:
exceptions.append(ex)
else:
# The query-context-only path skips the ownership check so report and
# alert workers can refresh a chart's cached payload. Keep that payload
# bound to the chart's own datasource so it cannot be repointed at an
# unrelated one.
self._validate_query_context_datasource(exceptions)
# validate tags
try:

View File

@@ -22,13 +22,22 @@ from typing import Any, TypedDict
from flask import current_app as app
from flask_babel import gettext as __
from superset import db, security_manager
from superset import db, is_feature_enabled, security_manager
from superset.commands.base import BaseCommand
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.exceptions import SupersetErrorException, SupersetTimeoutException
from superset.exceptions import (
SupersetDisallowedSQLFunctionException,
SupersetDisallowedSQLTableException,
SupersetDMLNotAllowedException,
SupersetErrorException,
SupersetTimeoutException,
)
from superset.jinja_context import get_template_processor
from superset.models.core import Database
from superset.models.sql_lab import Query
from superset.sql.parse import SQLScript
from superset.utils import core as utils
from superset.utils.rls import apply_rls
logger = logging.getLogger(__name__)
@@ -69,6 +78,85 @@ class QueryEstimationCommand(BaseCommand):
)
security_manager.raise_for_access(database=self._database)
def _apply_sql_security(self, sql: str) -> str:
"""Run the disallowed-function/table, DML and RLS controls against the
SQL to be estimated, mirroring ``sql_lab.execute_sql_statements``.
Returns the SQL with RLS predicates injected (when ``RLS_IN_SQLLAB`` is
enabled), so the cost estimate reflects the same constrained query the
user would actually be allowed to run.
"""
db_engine_spec = self._database.db_engine_spec
parsed_script = SQLScript(sql, engine=db_engine_spec.engine)
disallowed_functions = app.config["DISALLOWED_SQL_FUNCTIONS"].get(
db_engine_spec.engine,
set(),
)
if disallowed_functions and parsed_script.check_functions_present(
disallowed_functions
):
raise SupersetDisallowedSQLFunctionException(disallowed_functions)
disallowed_tables = app.config["DISALLOWED_SQL_TABLES"].get(
db_engine_spec.engine,
set(),
)
if disallowed_tables and parsed_script.check_tables_present(disallowed_tables):
found_tables = set()
for statement in parsed_script.statements:
present = {table.table.lower() for table in statement.tables}
for table in disallowed_tables:
if table.lower() in present:
found_tables.add(table)
raise SupersetDisallowedSQLTableException(found_tables or disallowed_tables)
if parsed_script.has_mutation() and not self._database.allow_dml:
raise SupersetDMLNotAllowedException()
if is_feature_enabled("RLS_IN_SQLLAB"):
# Resolve the default catalog/schema the same way the execution path
# does (``sql_lab.execute_sql_statements``) before injecting RLS.
# Crucially this goes through ``get_default_schema_for_query`` rather
# than the plain ``get_default_schema``, so engine-specific per-query
# security gates run too — e.g. ``PostgresEngineSpec`` rejects a query
# that sets ``search_path``. Resolving against the static default
# schema instead would both skip that gate and let unqualified tables
# dodge the RLS predicates the real query enforces, defeating the
# security parity this command exists to provide.
catalog = self._catalog or self._database.get_default_catalog()
# Build a transient (unsaved) Query so the engine spec can resolve the
# effective per-query schema exactly as the executor does. Mirror the
# probe built in ``SupersetSecurityManager.raise_for_access``: set a
# ``client_id`` (the column is ``nullable=False``) and expunge it, so
# the ``database`` backref's ``cascade="all, delete-orphan"`` cannot
# autoflush this incomplete row into the session when ``apply_rls``
# issues its own ``db.session`` query below.
probe_query = Query(
database=self._database,
sql=self._sql,
schema=self._schema or None,
catalog=catalog,
client_id=utils.shortid()[:10],
user_id=utils.get_user_id(),
)
db.session.expunge(probe_query)
# Always resolve through ``get_default_schema_for_query`` — even when
# the caller pinned a schema — so the engine's per-query security gate
# runs (e.g. ``PostgresEngineSpec`` rejects a query that sets
# ``search_path``), exactly as the executor does unconditionally. Only
# the resulting value falls back to the resolved default; an explicit
# schema still wins for the RLS predicate target.
resolved_schema = self._database.get_default_schema_for_query(
probe_query, self._template_params
)
schema = self._schema or resolved_schema or ""
for statement in parsed_script.statements:
apply_rls(self._database, catalog, schema, statement)
return parsed_script.format()
return sql
def run(
self,
) -> list[dict[str, Any]]:
@@ -79,6 +167,12 @@ class QueryEstimationCommand(BaseCommand):
template_processor = get_template_processor(self._database)
sql = template_processor.process_template(sql, **self._template_params)
# Apply the same SQL security controls used by the execution path
# (sql_lab.execute_sql_statements) so cost estimation cannot be used to
# probe disallowed functions/tables, bypass the DML guard, or confirm
# the existence of rows hidden by row-level security.
sql = self._apply_sql_security(sql)
timeout = app.config["SQLLAB_QUERY_COST_ESTIMATE_TIMEOUT"]
timeout_msg = f"The estimation exceeded the {timeout} seconds timeout."
try:

View File

@@ -112,6 +112,35 @@ class TestQueryEstimationCommand(SupersetTestCase):
result = command.run()
assert result == payload
@patch("superset.commands.sql_lab.estimate.is_feature_enabled", return_value=True)
def test_apply_sql_security_rls_does_not_pollute_session(
self, mock_is_feature_enabled: Mock
) -> None:
"""Regression test for the RLS schema-resolution probe Query.
``_apply_sql_security`` builds a transient ``Query`` so the engine spec
can resolve the effective per-query schema. Because the ``database``
backref cascades ``all, delete-orphan``, that transient joins the
session; if it isn't expunged, the very next ``apply_rls`` call issues
its own ``db.session`` query, autoflush fires, and the probe — whose
``client_id`` column is ``nullable=False`` — raises ``IntegrityError``.
A mocked session (as in the unit tests) hides this entirely, so exercise
the real session and real ``apply_rls`` here with ``RLS_IN_SQLLAB`` on.
"""
database = get_example_database()
params = {"database_id": database.id, "sql": "SELECT * FROM some_table"}
schema = EstimateQueryCostSchema()
data: EstimateQueryCostSchema = schema.dump(params)
command = estimate.QueryEstimationCommand(data)
command._database = database
with override_user(self.get_user("admin")):
# Must not raise IntegrityError from an autoflushed probe Query.
command._apply_sql_security("SELECT * FROM some_table")
# And no transient probe Query may be left pending in the session.
assert not any(isinstance(obj, Query) for obj in db.session.new)
class TestSqlResultExportCommand(SupersetTestCase):
@pytest.fixture

View File

@@ -17,11 +17,10 @@
import pytest
from pytest_mock import MockerFixture
from superset.commands.chart.exceptions import ChartForbiddenError, ChartInvalidError
from superset.commands.chart.exceptions import ChartForbiddenError
from superset.commands.chart.update import UpdateChartCommand
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.exceptions import SupersetSecurityException
from superset.utils import json
def _ownership_exc() -> SupersetSecurityException:
@@ -92,73 +91,3 @@ def test_update_chart_owner_can_perform_regular_update(
find_by_id.assert_called_once_with(1)
raise_for_ownership.assert_called_once()
def _query_context_payload(datasource: object) -> dict[str, object]:
return {
"query_context": json.dumps({"datasource": datasource, "queries": []}),
"query_context_generation": True,
}
def test_update_chart_query_context_matching_datasource_is_allowed(
mocker: MockerFixture,
) -> None:
"""A query context that targets the chart's own datasource is accepted."""
find_by_id = mocker.patch("superset.commands.chart.update.ChartDAO.find_by_id")
find_by_id.return_value = mocker.MagicMock(
id=1, tags=[], dashboards=[], datasource_id=42, datasource_type="table"
)
mocker.patch("superset.commands.chart.update.security_manager.raise_for_ownership")
UpdateChartCommand(
1, _query_context_payload({"id": 42, "type": "table"})
).validate()
@pytest.mark.parametrize(
"datasource",
[
{"id": 99, "type": "table"}, # different id
{"id": 42, "type": "query"}, # different type
{"id": "99", "type": "table"}, # different id as string
],
)
def test_update_chart_query_context_mismatched_datasource_is_rejected(
mocker: MockerFixture,
datasource: dict[str, object],
) -> None:
"""A query context pointing at a different datasource is rejected with a 4xx."""
find_by_id = mocker.patch("superset.commands.chart.update.ChartDAO.find_by_id")
find_by_id.return_value = mocker.MagicMock(
id=1, tags=[], dashboards=[], datasource_id=42, datasource_type="table"
)
mocker.patch("superset.commands.chart.update.security_manager.raise_for_ownership")
with pytest.raises(ChartInvalidError):
UpdateChartCommand(1, _query_context_payload(datasource)).validate()
@pytest.mark.parametrize(
"query_context",
[
"{}", # no datasource key
'{"datasource": null}', # null datasource
"not-json", # unparseable payload
],
)
def test_update_chart_query_context_without_datasource_is_allowed(
mocker: MockerFixture,
query_context: str,
) -> None:
"""Payloads with no verifiable datasource fall back to the chart's own."""
find_by_id = mocker.patch("superset.commands.chart.update.ChartDAO.find_by_id")
find_by_id.return_value = mocker.MagicMock(
id=1, tags=[], dashboards=[], datasource_id=42, datasource_type="table"
)
mocker.patch("superset.commands.chart.update.security_manager.raise_for_ownership")
UpdateChartCommand(
1,
{"query_context": query_context, "query_context_generation": True},
).validate()

View File

@@ -16,6 +16,7 @@
# under the License.
"""Unit tests for resource-level authorization in QueryEstimationCommand."""
from typing import cast
from unittest.mock import MagicMock, patch
import pytest
@@ -143,3 +144,214 @@ def test_raise_for_access_called_with_correct_database(
call_kwargs = mock_security_manager.raise_for_access.call_args.kwargs
assert call_kwargs["database"] is mock_database
# ---------------------------------------------------------------------------
# SQL security controls applied on the estimate path (parity with executor)
# ---------------------------------------------------------------------------
def _make_command_with_db(
sql: str, *, allow_dml: bool = False, engine: str = "postgresql"
) -> QueryEstimationCommand:
command = QueryEstimationCommand(_make_params(sql=sql))
command._database = MagicMock()
command._database.db_engine_spec.engine = engine
command._database.allow_dml = allow_dml
command._catalog = None
command._schema = ""
return command
@patch("superset.commands.sql_lab.estimate.app")
def test_apply_sql_security_blocks_dml_when_not_allowed(mock_app: MagicMock) -> None:
mock_app.config = {"DISALLOWED_SQL_FUNCTIONS": {}, "DISALLOWED_SQL_TABLES": {}}
from superset.exceptions import SupersetDMLNotAllowedException
command = _make_command_with_db("INSERT INTO t VALUES (1)", allow_dml=False)
with pytest.raises(SupersetDMLNotAllowedException):
command._apply_sql_security("INSERT INTO t VALUES (1)")
@patch("superset.commands.sql_lab.estimate.app")
def test_apply_sql_security_allows_dml_when_enabled(mock_app: MagicMock) -> None:
mock_app.config = {"DISALLOWED_SQL_FUNCTIONS": {}, "DISALLOWED_SQL_TABLES": {}}
command = _make_command_with_db("INSERT INTO t VALUES (1)", allow_dml=True)
# No exception; SQL returned unchanged (RLS disabled by default).
assert command._apply_sql_security("INSERT INTO t VALUES (1)")
@patch("superset.commands.sql_lab.estimate.app")
def test_apply_sql_security_blocks_disallowed_table(mock_app: MagicMock) -> None:
mock_app.config = {
"DISALLOWED_SQL_FUNCTIONS": {},
"DISALLOWED_SQL_TABLES": {"postgresql": {"secrets"}},
}
from superset.exceptions import SupersetDisallowedSQLTableException
command = _make_command_with_db("SELECT * FROM secrets", allow_dml=True)
with pytest.raises(SupersetDisallowedSQLTableException):
command._apply_sql_security("SELECT * FROM secrets")
@patch("superset.commands.sql_lab.estimate.app")
def test_apply_sql_security_blocks_disallowed_function(mock_app: MagicMock) -> None:
"""A disallowed function cannot be probed via cost estimation either."""
mock_app.config = {
"DISALLOWED_SQL_FUNCTIONS": {"postgresql": {"PG_SLEEP"}},
"DISALLOWED_SQL_TABLES": {},
}
from superset.exceptions import SupersetDisallowedSQLFunctionException
command = _make_command_with_db("SELECT pg_sleep(1)", allow_dml=True)
with pytest.raises(SupersetDisallowedSQLFunctionException):
command._apply_sql_security("SELECT pg_sleep(1)")
@patch("superset.commands.sql_lab.estimate.app")
def test_apply_sql_security_allows_benign_select(mock_app: MagicMock) -> None:
"""A benign statement passes through unchanged (no false positives)."""
mock_app.config = {"DISALLOWED_SQL_FUNCTIONS": {}, "DISALLOWED_SQL_TABLES": {}}
command = _make_command_with_db("SELECT 1", allow_dml=False)
# No disallowed content, no mutation, RLS disabled -> returned unchanged.
assert command._apply_sql_security("SELECT 1") == "SELECT 1"
@patch("superset.commands.sql_lab.estimate.apply_rls")
@patch("superset.commands.sql_lab.estimate.Query")
@patch("superset.commands.sql_lab.estimate.db")
@patch("superset.commands.sql_lab.estimate.is_feature_enabled", return_value=True)
@patch("superset.commands.sql_lab.estimate.app")
def test_apply_sql_security_injects_rls_when_enabled(
mock_app: MagicMock,
mock_is_feature_enabled: MagicMock,
mock_db: MagicMock,
mock_query: MagicMock,
mock_apply_rls: MagicMock,
) -> None:
"""With RLS_IN_SQLLAB enabled, RLS predicates are applied per statement so
the estimate reflects the constrained query the user could actually run."""
mock_app.config = {"DISALLOWED_SQL_FUNCTIONS": {}, "DISALLOWED_SQL_TABLES": {}}
command = _make_command_with_db("SELECT * FROM t", allow_dml=False)
result = command._apply_sql_security("SELECT * FROM t")
mock_is_feature_enabled.assert_called_with("RLS_IN_SQLLAB")
mock_apply_rls.assert_called_once()
# The transient probe Query is expunged so its (deliberately incomplete)
# row can't autoflush into the session when apply_rls queries below.
mock_db.session.expunge.assert_called_once_with(mock_query.return_value)
assert isinstance(result, str)
@patch("superset.commands.sql_lab.estimate.Query")
@patch("superset.commands.sql_lab.estimate.db")
@patch("superset.commands.sql_lab.estimate.apply_rls")
@patch("superset.commands.sql_lab.estimate.is_feature_enabled", return_value=True)
@patch("superset.commands.sql_lab.estimate.app")
def test_apply_sql_security_resolves_default_schema_for_rls(
mock_app: MagicMock,
mock_is_feature_enabled: MagicMock,
mock_apply_rls: MagicMock,
mock_db: MagicMock,
mock_query: MagicMock,
) -> None:
"""When no catalog/schema is supplied, RLS must be applied against the
database's *resolved* default catalog/schema — mirroring the execution path
(``SQLExecutor`` / ``sql_lab.execute_sql_statements``). Passing the raw
``""``/``None`` would let unqualified tables dodge RLS predicates that the
real query enforces, defeating the security parity goal of this command.
"""
mock_app.config = {"DISALLOWED_SQL_FUNCTIONS": {}, "DISALLOWED_SQL_TABLES": {}}
command = _make_command_with_db("SELECT * FROM t", allow_dml=False)
database = cast(MagicMock, command._database)
# Caller passed nothing: schema is "" and catalog is None.
command._schema = ""
command._catalog = None
database.get_default_catalog.return_value = "default_catalog"
database.get_default_schema_for_query.return_value = "public"
command._apply_sql_security("SELECT * FROM t")
# Default catalog/schema are resolved before injection, in the same order
# as the executor (catalog first, then schema derived per-query). The schema
# goes through ``get_default_schema_for_query`` so engine-specific per-query
# security gates (e.g. the Postgres ``search_path`` check) run as well.
database.get_default_catalog.assert_called_once_with()
database.get_default_schema_for_query.assert_called_once()
# RLS is applied with the *resolved* values, never the raw ""/None.
# apply_rls(database, catalog, schema, statement)
call_args = mock_apply_rls.call_args.args
assert call_args[1] == "default_catalog"
assert call_args[2] == "public"
@patch("superset.commands.sql_lab.estimate.Query")
@patch("superset.commands.sql_lab.estimate.db")
@patch("superset.commands.sql_lab.estimate.apply_rls")
@patch("superset.commands.sql_lab.estimate.is_feature_enabled", return_value=True)
@patch("superset.commands.sql_lab.estimate.app")
def test_apply_sql_security_respects_explicit_catalog_schema(
mock_app: MagicMock,
mock_is_feature_enabled: MagicMock,
mock_apply_rls: MagicMock,
mock_db: MagicMock,
mock_query: MagicMock,
) -> None:
"""An explicitly supplied catalog short-circuits default-catalog resolution,
and the explicit schema wins as the RLS target — but the schema resolver
``get_default_schema_for_query`` is still invoked so the engine's per-query
security gate runs even when a schema is pinned (parity with the executor,
which calls it unconditionally)."""
mock_app.config = {"DISALLOWED_SQL_FUNCTIONS": {}, "DISALLOWED_SQL_TABLES": {}}
command = _make_command_with_db("SELECT * FROM t", allow_dml=False)
database = cast(MagicMock, command._database)
command._catalog = "my_catalog"
command._schema = "my_schema"
command._apply_sql_security("SELECT * FROM t")
# Explicit catalog wins, so the default-catalog lookup is skipped...
database.get_default_catalog.assert_not_called()
# ...but the schema gate must run even when a schema is pinned, otherwise an
# explicit-schema estimate could smuggle a ``SET search_path`` past the gate
# the executor enforces.
database.get_default_schema_for_query.assert_called_once()
call_args = mock_apply_rls.call_args.args
assert call_args[1] == "my_catalog"
assert call_args[2] == "my_schema"
@patch("superset.commands.sql_lab.estimate.Query")
@patch("superset.commands.sql_lab.estimate.db")
@patch("superset.commands.sql_lab.estimate.apply_rls")
@patch("superset.commands.sql_lab.estimate.is_feature_enabled", return_value=True)
@patch("superset.commands.sql_lab.estimate.app")
def test_apply_sql_security_propagates_engine_schema_gate(
mock_app: MagicMock,
mock_is_feature_enabled: MagicMock,
mock_apply_rls: MagicMock,
mock_db: MagicMock,
mock_query: MagicMock,
) -> None:
"""Default-schema resolution goes through ``get_default_schema_for_query``,
so an engine-specific per-query security gate (e.g. the Postgres
``search_path`` check that rejects ``SET search_path = ...``) is enforced on
the estimate path too, rather than being silently bypassed.
"""
mock_app.config = {"DISALLOWED_SQL_FUNCTIONS": {}, "DISALLOWED_SQL_TABLES": {}}
command = _make_command_with_db(
"SET search_path = secret; SELECT * FROM t", allow_dml=True
)
database = cast(MagicMock, command._database)
command._schema = ""
command._catalog = None
database.get_default_catalog.return_value = "default_catalog"
database.get_default_schema_for_query.side_effect = _security_exception()
with pytest.raises(SupersetSecurityException):
command._apply_sql_security("SET search_path = secret; SELECT * FROM t")
# RLS injection must not happen once the schema gate has rejected the query.
mock_apply_rls.assert_not_called()

View File

@@ -1,81 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from superset.utils.core import split
def test_split_empty_string():
assert list(split("")) == [""]
def test_split_leading_delimiter():
assert list(split(" a")) == [
"",
"a",
]
def test_split_trailing_delimiter():
assert list(split("a ")) == [
"a",
"",
]
def test_split_only_delimiter():
assert list(split(" ")) == [
"",
"",
]
def test_split_nested_parentheses():
assert list(
split(
"a,(b,(c,d))",
delimiter=",",
)
) == [
"a",
"(b,(c,d))",
]
def test_branch_separator_found():
assert list(split("a b")) == [
"a",
"b",
]
def test_branch_separator_not_found():
assert list(split("ab")) == [
"ab",
]
def test_branch_parentheses():
assert list(split("(a b)")) == [
"(a b)",
]
def test_branch_escaped_quote():
assert list(split(r'"a\"b c" d')) == [
r'"a\"b c"',
"d",
]