mirror of
https://github.com/apache/superset.git
synced 2026-05-06 16:34:32 +00:00
Filter hidden columns
This commit is contained in:
@@ -540,3 +540,229 @@ def test_get_all_group_keys_filtered_by_table(app_context: None):
|
||||
table = Table(table="users", schema="public", catalog=None)
|
||||
keys = get_all_group_keys(database_name="db1", table=table)
|
||||
assert keys == {"key1"}
|
||||
|
||||
|
||||
# Tests for get_hidden_columns_for_table
|
||||
def test_get_hidden_columns_for_table_no_hidden(app_context: None):
|
||||
"""Test getting hidden columns when no columns are hidden."""
|
||||
from superset.data_access_rules.utils import get_hidden_columns_for_table
|
||||
|
||||
database = MagicMock()
|
||||
database.database_name = "mydb"
|
||||
|
||||
rule = MagicMock(spec=DataAccessRule)
|
||||
rule.rule_dict = {
|
||||
"allowed": [
|
||||
{
|
||||
"database": "mydb",
|
||||
"schema": "public",
|
||||
"table": "users",
|
||||
"cls": {"email": "mask", "phone": "hash"}, # No "hide" actions
|
||||
}
|
||||
],
|
||||
"denied": [],
|
||||
}
|
||||
|
||||
table = Table(table="users", schema="public", catalog=None)
|
||||
hidden = get_hidden_columns_for_table(table, database, rules=[rule])
|
||||
assert hidden == set()
|
||||
|
||||
|
||||
def test_get_hidden_columns_for_table_with_hidden(app_context: None):
|
||||
"""Test getting hidden columns when some columns are hidden."""
|
||||
from superset.data_access_rules.utils import get_hidden_columns_for_table
|
||||
|
||||
database = MagicMock()
|
||||
database.database_name = "mydb"
|
||||
|
||||
rule = MagicMock(spec=DataAccessRule)
|
||||
rule.rule_dict = {
|
||||
"allowed": [
|
||||
{
|
||||
"database": "mydb",
|
||||
"schema": "public",
|
||||
"table": "users",
|
||||
"cls": {"email": "mask", "ssn": "hide", "password": "hide"},
|
||||
}
|
||||
],
|
||||
"denied": [],
|
||||
}
|
||||
|
||||
table = Table(table="users", schema="public", catalog=None)
|
||||
hidden = get_hidden_columns_for_table(table, database, rules=[rule])
|
||||
assert hidden == {"ssn", "password"}
|
||||
|
||||
|
||||
def test_get_hidden_columns_for_table_denied_access(app_context: None):
|
||||
"""Test that denied access returns no hidden columns."""
|
||||
from superset.data_access_rules.utils import get_hidden_columns_for_table
|
||||
|
||||
database = MagicMock()
|
||||
database.database_name = "mydb"
|
||||
|
||||
rule = MagicMock(spec=DataAccessRule)
|
||||
rule.rule_dict = {
|
||||
"allowed": [],
|
||||
"denied": [
|
||||
{
|
||||
"database": "mydb",
|
||||
"schema": "public",
|
||||
"table": "users",
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
table = Table(table="users", schema="public", catalog=None)
|
||||
hidden = get_hidden_columns_for_table(table, database, rules=[rule])
|
||||
# Denied access means no CLS rules are returned
|
||||
assert hidden == set()
|
||||
|
||||
|
||||
# Tests for filter_columns_by_cls
|
||||
def test_filter_columns_by_cls_no_hidden(app_context: None):
|
||||
"""Test filtering columns when no columns are hidden."""
|
||||
from superset.data_access_rules.utils import filter_columns_by_cls
|
||||
|
||||
database = MagicMock()
|
||||
database.database_name = "mydb"
|
||||
|
||||
columns = [
|
||||
{"column_name": "id", "type": "INTEGER"},
|
||||
{"column_name": "name", "type": "VARCHAR"},
|
||||
{"column_name": "email", "type": "VARCHAR"},
|
||||
]
|
||||
|
||||
rule = MagicMock(spec=DataAccessRule)
|
||||
rule.rule_dict = {
|
||||
"allowed": [
|
||||
{"database": "mydb", "schema": "public", "table": "users"}
|
||||
],
|
||||
"denied": [],
|
||||
}
|
||||
|
||||
table = Table(table="users", schema="public", catalog=None)
|
||||
|
||||
with patch(
|
||||
"superset.data_access_rules.utils.is_feature_enabled",
|
||||
return_value=True,
|
||||
):
|
||||
with patch(
|
||||
"superset.data_access_rules.utils.get_user_rules",
|
||||
return_value=[rule],
|
||||
):
|
||||
filtered = filter_columns_by_cls(columns, table, database)
|
||||
assert len(filtered) == 3
|
||||
assert filtered == columns
|
||||
|
||||
|
||||
def test_filter_columns_by_cls_with_hidden(app_context: None):
|
||||
"""Test filtering columns when some columns are hidden."""
|
||||
from superset.data_access_rules.utils import filter_columns_by_cls
|
||||
|
||||
database = MagicMock()
|
||||
database.database_name = "mydb"
|
||||
|
||||
columns = [
|
||||
{"column_name": "id", "type": "INTEGER"},
|
||||
{"column_name": "name", "type": "VARCHAR"},
|
||||
{"column_name": "email", "type": "VARCHAR"},
|
||||
{"column_name": "ssn", "type": "VARCHAR"},
|
||||
]
|
||||
|
||||
rule = MagicMock(spec=DataAccessRule)
|
||||
rule.rule_dict = {
|
||||
"allowed": [
|
||||
{
|
||||
"database": "mydb",
|
||||
"schema": "public",
|
||||
"table": "users",
|
||||
"cls": {"ssn": "hide"},
|
||||
}
|
||||
],
|
||||
"denied": [],
|
||||
}
|
||||
|
||||
table = Table(table="users", schema="public", catalog=None)
|
||||
|
||||
with patch(
|
||||
"superset.data_access_rules.utils.is_feature_enabled",
|
||||
return_value=True,
|
||||
):
|
||||
with patch(
|
||||
"superset.data_access_rules.utils.get_user_rules",
|
||||
return_value=[rule],
|
||||
):
|
||||
filtered = filter_columns_by_cls(columns, table, database)
|
||||
assert len(filtered) == 3
|
||||
column_names = [c["column_name"] for c in filtered]
|
||||
assert "ssn" not in column_names
|
||||
assert "id" in column_names
|
||||
assert "name" in column_names
|
||||
assert "email" in column_names
|
||||
|
||||
|
||||
def test_filter_columns_by_cls_feature_disabled(app_context: None):
|
||||
"""Test that filtering is skipped when feature flag is disabled."""
|
||||
from superset.data_access_rules.utils import filter_columns_by_cls
|
||||
|
||||
database = MagicMock()
|
||||
database.database_name = "mydb"
|
||||
|
||||
columns = [
|
||||
{"column_name": "id", "type": "INTEGER"},
|
||||
{"column_name": "ssn", "type": "VARCHAR"},
|
||||
]
|
||||
|
||||
table = Table(table="users", schema="public", catalog=None)
|
||||
|
||||
with patch(
|
||||
"superset.data_access_rules.utils.is_feature_enabled",
|
||||
return_value=False,
|
||||
):
|
||||
# Even if there would be hidden columns, they are not filtered
|
||||
filtered = filter_columns_by_cls(columns, table, database)
|
||||
assert len(filtered) == 2
|
||||
assert filtered == columns
|
||||
|
||||
|
||||
def test_filter_columns_by_cls_custom_key(app_context: None):
|
||||
"""Test filtering columns with custom column name key."""
|
||||
from superset.data_access_rules.utils import filter_columns_by_cls
|
||||
|
||||
database = MagicMock()
|
||||
database.database_name = "mydb"
|
||||
|
||||
# Columns with different key structure (like from SQL Lab table metadata)
|
||||
columns = [
|
||||
{"name": "id", "type": "INTEGER"},
|
||||
{"name": "ssn", "type": "VARCHAR"},
|
||||
]
|
||||
|
||||
rule = MagicMock(spec=DataAccessRule)
|
||||
rule.rule_dict = {
|
||||
"allowed": [
|
||||
{
|
||||
"database": "mydb",
|
||||
"schema": "public",
|
||||
"table": "users",
|
||||
"cls": {"ssn": "hide"},
|
||||
}
|
||||
],
|
||||
"denied": [],
|
||||
}
|
||||
|
||||
table = Table(table="users", schema="public", catalog=None)
|
||||
|
||||
with patch(
|
||||
"superset.data_access_rules.utils.is_feature_enabled",
|
||||
return_value=True,
|
||||
):
|
||||
with patch(
|
||||
"superset.data_access_rules.utils.get_user_rules",
|
||||
return_value=[rule],
|
||||
):
|
||||
filtered = filter_columns_by_cls(
|
||||
columns, table, database, column_name_key="name"
|
||||
)
|
||||
assert len(filtered) == 1
|
||||
assert filtered[0]["name"] == "id"
|
||||
|
||||
Reference in New Issue
Block a user