Compare commits

...

1 Commits

Author SHA1 Message Date
Claude Code
28215d09fe fix(sqllab): apply SQL security controls to cost estimation [DRAFT]
The cost-estimation path (QueryEstimationCommand) only checked database-level
access, skipping the disallowed-function, disallowed-table, DML, and row-level
security controls that the execution path (sql_lab.execute_sql_statements)
applies. As a result EXPLAIN/cost estimation could be used to probe disallowed
functions/tables, bypass the DML guard, and — because RLS predicates were not
injected — confirm the existence/cardinality of rows hidden by RLS.

Add _apply_sql_security() mirroring the executor: reject disallowed
functions/tables and DML (when allow_dml is False), and inject RLS predicates
into each statement (when RLS_IN_SQLLAB is enabled) before estimating, so the
estimate reflects the same constrained query the user could actually run.

DRAFT: changes the SQL sent to the engine's estimate_query_cost across engine
specs; needs validation that EXPLAIN still works for RLS-injected SQL on the
supported analytics databases before merge.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-01 18:57:15 -07:00
2 changed files with 107 additions and 2 deletions

View File

@@ -22,13 +22,21 @@ from typing import Any, TypedDict
from flask import current_app as app
from flask_babel import gettext as __
from superset import db, security_manager
from superset import db, is_feature_enabled, security_manager
from superset.commands.base import BaseCommand
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.exceptions import SupersetErrorException, SupersetTimeoutException
from superset.exceptions import (
SupersetDisallowedSQLFunctionException,
SupersetDisallowedSQLTableException,
SupersetDMLNotAllowedException,
SupersetErrorException,
SupersetTimeoutException,
)
from superset.jinja_context import get_template_processor
from superset.models.core import Database
from superset.sql.parse import SQLScript
from superset.utils import core as utils
from superset.utils.rls import apply_rls
logger = logging.getLogger(__name__)
@@ -69,6 +77,49 @@ class QueryEstimationCommand(BaseCommand):
)
security_manager.raise_for_access(database=self._database)
def _apply_sql_security(self, sql: str) -> str:
"""Run the disallowed-function/table, DML and RLS controls against the
SQL to be estimated, mirroring ``sql_lab.execute_sql_statements``.
Returns the SQL with RLS predicates injected (when ``RLS_IN_SQLLAB`` is
enabled), so the cost estimate reflects the same constrained query the
user would actually be allowed to run.
"""
db_engine_spec = self._database.db_engine_spec
parsed_script = SQLScript(sql, engine=db_engine_spec.engine)
disallowed_functions = app.config["DISALLOWED_SQL_FUNCTIONS"].get(
db_engine_spec.engine,
set(),
)
if disallowed_functions and parsed_script.check_functions_present(
disallowed_functions
):
raise SupersetDisallowedSQLFunctionException(disallowed_functions)
disallowed_tables = app.config["DISALLOWED_SQL_TABLES"].get(
db_engine_spec.engine,
set(),
)
if disallowed_tables and parsed_script.check_tables_present(disallowed_tables):
found_tables = set()
for statement in parsed_script.statements:
present = {table.table.lower() for table in statement.tables}
for table in disallowed_tables:
if table.lower() in present:
found_tables.add(table)
raise SupersetDisallowedSQLTableException(found_tables or disallowed_tables)
if parsed_script.has_mutation() and not self._database.allow_dml:
raise SupersetDMLNotAllowedException()
if is_feature_enabled("RLS_IN_SQLLAB"):
for statement in parsed_script.statements:
apply_rls(self._database, self._catalog, self._schema, statement)
return parsed_script.format()
return sql
def run(
self,
) -> list[dict[str, Any]]:
@@ -79,6 +130,12 @@ class QueryEstimationCommand(BaseCommand):
template_processor = get_template_processor(self._database)
sql = template_processor.process_template(sql, **self._template_params)
# Apply the same SQL security controls used by the execution path
# (sql_lab.execute_sql_statements) so cost estimation cannot be used to
# probe disallowed functions/tables, bypass the DML guard, or confirm
# the existence of rows hidden by row-level security.
sql = self._apply_sql_security(sql)
timeout = app.config["SQLLAB_QUERY_COST_ESTIMATE_TIMEOUT"]
timeout_msg = f"The estimation exceeded the {timeout} seconds timeout."
try:

View File

@@ -143,3 +143,51 @@ def test_raise_for_access_called_with_correct_database(
call_kwargs = mock_security_manager.raise_for_access.call_args.kwargs
assert call_kwargs["database"] is mock_database
# ---------------------------------------------------------------------------
# SQL security controls applied on the estimate path (parity with executor)
# ---------------------------------------------------------------------------
def _make_command_with_db(
sql: str, *, allow_dml: bool = False, engine: str = "postgresql"
) -> QueryEstimationCommand:
command = QueryEstimationCommand(_make_params(sql=sql))
command._database = MagicMock()
command._database.db_engine_spec.engine = engine
command._database.allow_dml = allow_dml
command._catalog = None
command._schema = ""
return command
@patch("superset.commands.sql_lab.estimate.app")
def test_apply_sql_security_blocks_dml_when_not_allowed(mock_app: MagicMock) -> None:
mock_app.config = {"DISALLOWED_SQL_FUNCTIONS": {}, "DISALLOWED_SQL_TABLES": {}}
from superset.exceptions import SupersetDMLNotAllowedException
command = _make_command_with_db("INSERT INTO t VALUES (1)", allow_dml=False)
with pytest.raises(SupersetDMLNotAllowedException):
command._apply_sql_security("INSERT INTO t VALUES (1)")
@patch("superset.commands.sql_lab.estimate.app")
def test_apply_sql_security_allows_dml_when_enabled(mock_app: MagicMock) -> None:
mock_app.config = {"DISALLOWED_SQL_FUNCTIONS": {}, "DISALLOWED_SQL_TABLES": {}}
command = _make_command_with_db("INSERT INTO t VALUES (1)", allow_dml=True)
# No exception; SQL returned unchanged (RLS disabled by default).
assert command._apply_sql_security("INSERT INTO t VALUES (1)")
@patch("superset.commands.sql_lab.estimate.app")
def test_apply_sql_security_blocks_disallowed_table(mock_app: MagicMock) -> None:
mock_app.config = {
"DISALLOWED_SQL_FUNCTIONS": {},
"DISALLOWED_SQL_TABLES": {"postgresql": {"secrets"}},
}
from superset.exceptions import SupersetDisallowedSQLTableException
command = _make_command_with_db("SELECT * FROM secrets", allow_dml=True)
with pytest.raises(SupersetDisallowedSQLTableException):
command._apply_sql_security("SELECT * FROM secrets")