diff --git a/superset/views/core.py b/superset/views/core.py index 2f736c3554e..766c42338bb 100755 --- a/superset/views/core.py +++ b/superset/views/core.py @@ -1380,10 +1380,17 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods def user_slices( # pylint: disable=no-self-use self, user_id: Optional[int] = None ) -> FlaskResponse: - """List of slices a user created, or faved""" + """List of slices a user owns, created, modified or faved""" if not user_id: user_id = g.user.id FavStar = models.FavStar + + owner_ids_query = ( + db.session.query(Slice.id) + .join(Slice.owners) + .filter(security_manager.user_model.id == user_id) + ) + qry = ( db.session.query(Slice, FavStar.dttm) .join( @@ -1397,6 +1404,7 @@ class Superset(BaseSupersetView): # pylint: disable=too-many-public-methods ) .filter( or_( + Slice.id.in_(owner_ids_query), Slice.created_by_fk == user_id, Slice.changed_by_fk == user_id, FavStar.user_id == user_id, diff --git a/tests/base_tests.py b/tests/base_tests.py index d41a2401f05..28895fe061d 100644 --- a/tests/base_tests.py +++ b/tests/base_tests.py @@ -149,9 +149,12 @@ class SupersetTestCase(TestCase): resp = self.get_resp("/login/", data=dict(username=username, password=password)) self.assertNotIn("User confirmation needed", resp) - def get_slice(self, slice_name: str, session: Session) -> Slice: + def get_slice( + self, slice_name: str, session: Session, expunge_from_session: bool = True + ) -> Slice: slc = session.query(Slice).filter_by(slice_name=slice_name).one() - session.expunge_all() + if expunge_from_session: + session.expunge_all() return slc @staticmethod diff --git a/tests/core_tests.py b/tests/core_tests.py index ac641cf68c7..4b4a9bf51e6 100644 --- a/tests/core_tests.py +++ b/tests/core_tests.py @@ -356,6 +356,42 @@ class CoreTests(SupersetTestCase): resp = self.client.get(url) self.assertEqual(resp.status_code, 200) + def test_get_user_slices_for_owners(self): + self.login(username="admin") + user = security_manager.find_user("admin") + slice_name = "Girls" + + # ensure user is not owner of any slices + url = f"/superset/user_slices/{user.id}/" + resp = self.client.get(url) + data = json.loads(resp.data) + self.assertEqual(data, []) + + # make user owner of slice and verify that endpoint returns said slice + slc = self.get_slice( + slice_name=slice_name, session=db.session, expunge_from_session=False + ) + slc.owners = [user] + db.session.merge(slc) + db.session.commit() + url = f"/superset/user_slices/{user.id}/" + resp = self.client.get(url) + data = json.loads(resp.data) + self.assertEqual(len(data), 1) + self.assertEqual(data[0]["title"], slice_name) + + # remove ownership and ensure user no longer gets slice + slc = self.get_slice( + slice_name=slice_name, session=db.session, expunge_from_session=False + ) + slc.owners = [] + db.session.merge(slc) + db.session.commit() + url = f"/superset/user_slices/{user.id}/" + resp = self.client.get(url) + data = json.loads(resp.data) + self.assertEqual(data, []) + def test_get_user_slices(self): self.login(username="admin") userid = security_manager.find_user("admin").id @@ -736,31 +772,31 @@ class CoreTests(SupersetTestCase): slc = self.get_slice("Girls", db.session) # Setting some faves - url = "/superset/favstar/Slice/{}/select/".format(slc.id) + url = f"/superset/favstar/Slice/{slc.id}/select/" resp = self.get_json_resp(url) self.assertEqual(resp["count"], 1) dash = db.session.query(Dashboard).filter_by(slug="births").first() - url = "/superset/favstar/Dashboard/{}/select/".format(dash.id) + url = f"/superset/favstar/Dashboard/{dash.id}/select/" resp = self.get_json_resp(url) self.assertEqual(resp["count"], 1) userid = security_manager.find_user("admin").id - resp = self.get_resp("/superset/profile/admin/") + resp = self.get_resp(f"/superset/profile/{username}/") self.assertIn('"app"', resp) - data = self.get_json_resp("/superset/recent_activity/{}/".format(userid)) + data = self.get_json_resp(f"/superset/recent_activity/{userid}/") self.assertNotIn("message", data) - data = self.get_json_resp("/superset/created_slices/{}/".format(userid)) + data = self.get_json_resp(f"/superset/created_slices/{userid}/") self.assertNotIn("message", data) - data = self.get_json_resp("/superset/created_dashboards/{}/".format(userid)) + data = self.get_json_resp(f"/superset/created_dashboards/{userid}/") self.assertNotIn("message", data) - data = self.get_json_resp("/superset/fave_slices/{}/".format(userid)) + data = self.get_json_resp(f"/superset/fave_slices/{userid}/") self.assertNotIn("message", data) - data = self.get_json_resp("/superset/fave_dashboards/{}/".format(userid)) + data = self.get_json_resp(f"/superset/fave_dashboards/{userid}/") self.assertNotIn("message", data) - data = self.get_json_resp( - "/superset/fave_dashboards_by_username/{}/".format(username) - ) + data = self.get_json_resp(f"/superset/user_slices/{userid}/") + self.assertNotIn("message", data) + data = self.get_json_resp(f"/superset/fave_dashboards_by_username/{username}/") self.assertNotIn("message", data) def test_slice_id_is_always_logged_correctly_on_web_request(self):