fix(Explore): Apply RLS at column values (#30490)

Co-authored-by: Beto Dealmeida <roberto@dealmeida.net>
This commit is contained in:
Geido
2024-10-04 18:12:28 +03:00
committed by GitHub
parent 0b34197815
commit f314685a8e
4 changed files with 112 additions and 1 deletions

View File

@@ -18,6 +18,7 @@
from unittest.mock import ANY, patch
import pytest
from sqlalchemy.sql.elements import TextClause
from superset import db, security_manager
from superset.connectors.sqla.models import SqlaTable
@@ -176,3 +177,31 @@ class TestDatasourceApi(SupersetTestCase):
table.normalize_columns = False
self.client.get(f"api/v1/datasource/table/{table.id}/column/col2/values/") # noqa: F841
denormalize_name_mock.assert_called_with(ANY, "col2")
@pytest.mark.usefixtures("app_context", "virtual_dataset")
def test_get_column_values_with_rls(self):
self.login(ADMIN_USERNAME)
table = self.get_virtual_dataset()
with patch.object(
table, "get_sqla_row_level_filters", return_value=[TextClause("col2 = 'b'")]
):
rv = self.client.get(
f"api/v1/datasource/table/{table.id}/column/col2/values/"
)
self.assertEqual(rv.status_code, 200)
response = json.loads(rv.data.decode("utf-8"))
self.assertEqual(response["result"], ["b"])
@pytest.mark.usefixtures("app_context", "virtual_dataset")
def test_get_column_values_with_rls_no_values(self):
self.login(ADMIN_USERNAME)
table = self.get_virtual_dataset()
with patch.object(
table, "get_sqla_row_level_filters", return_value=[TextClause("col2 = 'q'")]
):
rv = self.client.get(
f"api/v1/datasource/table/{table.id}/column/col2/values/"
)
self.assertEqual(rv.status_code, 200)
response = json.loads(rv.data.decode("utf-8"))
self.assertEqual(response["result"], [])

View File

@@ -626,6 +626,32 @@ def test_values_for_column_on_text_column(text_column_table):
assert len(with_null) == 8
def test_values_for_column_on_text_column_with_rls(text_column_table):
with patch.object(
text_column_table,
"get_sqla_row_level_filters",
return_value=[
TextClause("foo = 'foo'"),
],
):
with_rls = text_column_table.values_for_column(column_name="foo", limit=10000)
assert with_rls == ["foo"]
assert len(with_rls) == 1
def test_values_for_column_on_text_column_with_rls_no_values(text_column_table):
with patch.object(
text_column_table,
"get_sqla_row_level_filters",
return_value=[
TextClause("foo = 'bar'"),
],
):
with_rls = text_column_table.values_for_column(column_name="foo", limit=10000)
assert with_rls == []
assert len(with_rls) == 0
def test_filter_on_text_column(text_column_table):
table = text_column_table
# null value should be replaced