fix: database permissions on update and delete (avoid orphaned perms) (#20081)

* fix: database permissions on update and delete (avoid orphaned perms)

* fix event transaction

* fix test

* fix lint

* update datasource access permissions

* add tests

* fix import

* fix tests

* update slice and dataset perms also

* fix lint

* fix tests

* fix lint

* fix lint

* add test for edge case, small refactor

* add test for edge case, small refactor

* improve code

* fix lint
This commit is contained in:
Daniel Vaz Gaspar
2022-08-02 18:28:46 +01:00
committed by GitHub
parent 34ad80c642
commit bfd2a3d79f
6 changed files with 430 additions and 5 deletions

View File

@@ -354,6 +354,186 @@ class TestRolePermission(SupersetTestCase):
session.delete(stored_db)
session.commit()
def test_after_update_database__perm_database_access(self):
session = db.session
database = Database(database_name="tmp_database", sqlalchemy_uri="sqlite://")
session.add(database)
session.commit()
stored_db = (
session.query(Database).filter_by(database_name="tmp_database").one()
)
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"database_access", stored_db.perm
)
)
stored_db.database_name = "tmp_database2"
session.commit()
# Assert that the old permission was updated
self.assertIsNone(
security_manager.find_permission_view_menu(
"database_access", f"[tmp_database].(id:{stored_db.id})"
)
)
# Assert that the db permission was updated
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"database_access", f"[tmp_database2].(id:{stored_db.id})"
)
)
session.delete(stored_db)
session.commit()
def test_after_update_database__perm_database_access_exists(self):
session = db.session
# Add a bogus existing permission before the change
database = Database(database_name="tmp_database", sqlalchemy_uri="sqlite://")
session.add(database)
session.commit()
stored_db = (
session.query(Database).filter_by(database_name="tmp_database").one()
)
security_manager.add_permission_view_menu(
"database_access", f"[tmp_database2].(id:{stored_db.id})"
)
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"database_access", stored_db.perm
)
)
stored_db.database_name = "tmp_database2"
session.commit()
# Assert that the old permission was updated
self.assertIsNone(
security_manager.find_permission_view_menu(
"database_access", f"[tmp_database].(id:{stored_db.id})"
)
)
# Assert that the db permission was updated
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"database_access", f"[tmp_database2].(id:{stored_db.id})"
)
)
session.delete(stored_db)
session.commit()
def test_after_update_database__perm_datasource_access(self):
session = db.session
database = Database(database_name="tmp_database", sqlalchemy_uri="sqlite://")
session.add(database)
session.commit()
table1 = SqlaTable(
schema="tmp_schema",
table_name="tmp_table1",
database=database,
)
session.add(table1)
table2 = SqlaTable(
schema="tmp_schema",
table_name="tmp_table2",
database=database,
)
session.add(table2)
session.commit()
slice1 = Slice(
datasource_id=table1.id,
datasource_type=DatasourceType.TABLE,
datasource_name="tmp_table1",
slice_name="tmp_slice1",
)
session.add(slice1)
session.commit()
slice1 = session.query(Slice).filter_by(slice_name="tmp_slice1").one()
table1 = session.query(SqlaTable).filter_by(table_name="tmp_table1").one()
table2 = session.query(SqlaTable).filter_by(table_name="tmp_table2").one()
# assert initial perms
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_database].[tmp_table1](id:{table1.id})"
)
)
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_database].[tmp_table2](id:{table2.id})"
)
)
self.assertEqual(slice1.perm, f"[tmp_database].[tmp_table1](id:{table1.id})")
self.assertEqual(table1.perm, f"[tmp_database].[tmp_table1](id:{table1.id})")
self.assertEqual(table2.perm, f"[tmp_database].[tmp_table2](id:{table2.id})")
stored_db = (
session.query(Database).filter_by(database_name="tmp_database").one()
)
stored_db.database_name = "tmp_database2"
session.commit()
# Assert that the old permissions were updated
self.assertIsNone(
security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_database].[tmp_table1](id:{table1.id})"
)
)
self.assertIsNone(
security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_database].[tmp_table2](id:{table2.id})"
)
)
# Assert that the db permission was updated
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_database2].[tmp_table1](id:{table1.id})"
)
)
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"datasource_access", f"[tmp_database2].[tmp_table2](id:{table2.id})"
)
)
self.assertEqual(slice1.perm, f"[tmp_database2].[tmp_table1](id:{table1.id})")
self.assertEqual(table1.perm, f"[tmp_database2].[tmp_table1](id:{table1.id})")
self.assertEqual(table2.perm, f"[tmp_database2].[tmp_table2](id:{table2.id})")
session.delete(slice1)
session.delete(table1)
session.delete(table2)
session.delete(stored_db)
session.commit()
def test_after_delete_database__perm_database_access(self):
session = db.session
database = Database(database_name="tmp_database", sqlalchemy_uri="sqlite://")
session.add(database)
session.commit()
stored_db = (
session.query(Database).filter_by(database_name="tmp_database").one()
)
self.assertIsNotNone(
security_manager.find_permission_view_menu(
"database_access", stored_db.perm
)
)
session.delete(stored_db)
session.commit()
# Assert that the old permission was updated
self.assertIsNone(
security_manager.find_permission_view_menu(
"database_access", f"[tmp_database].(id:{stored_db.id})"
)
)
def test_hybrid_perm_database(self):
database = Database(database_name="tmp_database3", sqlalchemy_uri="sqlite://")