feat(filter-set): Add filterset resource (#14015)

* Add filterset resource

* fix: fix pre-commit

* add tests

* add tests and fixes based of failures

* Fix pre-commit errors

* chore init filterset resource under ff constraint

* Fix migration conflicts

* Fix pylint and migrations issues

* Fix pylint and migrations issues

* Fix pylint and migrations issues

* Fix pylint and migrations issues

* Fix pylint and migrations issues

* Fix pylint and migrations issues

* Fix pylint and migrations issues

* Fix pylint and migrations issues

* Fix pylint and migrations issues

* Fix pylint and migrations issues

* Fix pylint and migrations issues

* add tests and fixes based of failures

* Fix missing license

* fix down revision

* update down_revision

* fix: update down_revision

* chore: add description to migration

* fix: type

* refactor: is_user_admin

* fix: use get_public_role

* fix: move import to the relevant location

* chore: add openSpec api schema

* chore: cover all openspec API

* fix: pre-commit and lint

* fix: put and post schemas

* fix: undo superset_test_config.py

* fix: limit filterSetsApi to include_route_methods = {"get_list", "put", "post", "delete"}

* renaming some params

* chore: add debug in test config

* fix: rename database to different name

* fix: try to make conftest.py harmless

* fix: pre-commit

* fix: new down_revision ref

* fix: bad ref

* fix: bad ref 2

* fix: bad ref 3

* fix: add api in initiatior

* fix: open spec

* fix: convert name to str to include int usecases

* fix: pylint

* fix: pylint

* Update superset/common/request_contexed_based.py

Co-authored-by: Ville Brofeldt <33317356+villebro@users.noreply.github.com>

* chore: resolve PR comments

* chore: resolve PR comments

* chore: resolve PR comments

* fix failed tests

* fix pylint

* Update conftest.py

* chore remove BaseCommand to remove abstraction

* chore remove BaseCommand to remove abstraction

* chore remove BaseCommand to remove abstraction

* chore remove BaseCommand to remove abstraction

* chore fix migration

Co-authored-by: Ofeknielsen <ofek.israel@nieslen.com>
Co-authored-by: amitmiran137 <amit.miran@nielsen.com>
Co-authored-by: Amit Miran <47772523+amitmiran137@users.noreply.github.com>
Co-authored-by: Ville Brofeldt <33317356+villebro@users.noreply.github.com>
This commit is contained in:
ofekisr
2021-09-23 11:27:59 +03:00
committed by GitHub
parent 997320ac1a
commit 84f7614e97
31 changed files with 3317 additions and 23 deletions

View File

@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

View File

@@ -0,0 +1,321 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import json
from typing import Any, Dict, Generator, List, TYPE_CHECKING
import pytest
from superset import security_manager as sm
from superset.dashboards.filter_sets.consts import (
DESCRIPTION_FIELD,
JSON_METADATA_FIELD,
NAME_FIELD,
OWNER_ID_FIELD,
OWNER_TYPE_FIELD,
USER_OWNER_TYPE,
)
from superset.models.dashboard import Dashboard
from superset.models.filter_set import FilterSet
from tests.integration_tests.dashboards.filter_sets.consts import (
ADMIN_USERNAME_FOR_TEST,
DASHBOARD_OWNER_USERNAME,
FILTER_SET_OWNER_USERNAME,
REGULAR_USER,
)
from tests.integration_tests.dashboards.superset_factory_util import (
create_dashboard,
create_database,
create_datasource_table,
create_slice,
)
from tests.integration_tests.test_app import app
if TYPE_CHECKING:
from flask.ctx import AppContext
from flask.testing import FlaskClient
from flask_appbuilder.security.sqla.models import (
Role,
User,
ViewMenu,
PermissionView,
)
from flask_appbuilder.security.manager import BaseSecurityManager
from sqlalchemy.orm import Session
from superset.models.slice import Slice
from superset.connectors.sqla.models import SqlaTable
from superset.models.core import Database
security_manager: BaseSecurityManager = sm
# @pytest.fixture(autouse=True, scope="session")
# def setup_sample_data() -> Any:
# pass
@pytest.fixture(autouse=True)
def expire_on_commit_true() -> Generator[None, None, None]:
ctx: AppContext
with app.app_context() as ctx:
ctx.app.appbuilder.get_session.configure(expire_on_commit=False)
yield
ctx.app.appbuilder.get_session.configure(expire_on_commit=True)
@pytest.fixture(autouse=True, scope="module")
def test_users() -> Generator[Dict[str, int], None, None]:
usernames = [
ADMIN_USERNAME_FOR_TEST,
DASHBOARD_OWNER_USERNAME,
FILTER_SET_OWNER_USERNAME,
REGULAR_USER,
]
with app.app_context():
filter_set_role = build_filter_set_role()
admin_role: Role = security_manager.find_role("Admin")
usernames_to_ids = create_test_users(admin_role, filter_set_role, usernames)
yield usernames_to_ids
ctx: AppContext
delete_users(usernames_to_ids)
def delete_users(usernames_to_ids: Dict[str, int]) -> None:
with app.app_context() as ctx:
session: Session = ctx.app.appbuilder.get_session
for username in usernames_to_ids.keys():
session.delete(security_manager.find_user(username))
session.commit()
def create_test_users(
admin_role: Role, filter_set_role: Role, usernames: List[str]
) -> Dict[str, int]:
users: List[User] = []
for username in usernames:
user = build_user(username, filter_set_role, admin_role)
users.append(user)
return {user.username: user.id for user in users}
def build_user(username: str, filter_set_role: Role, admin_role: Role) -> User:
roles_to_add = (
[admin_role] if username == ADMIN_USERNAME_FOR_TEST else [filter_set_role]
)
user: User = security_manager.add_user(
username, "test", "test", username, roles_to_add, password="general"
)
if not user:
user = security_manager.find_user(username)
if user is None:
raise Exception("Failed to build the user {}".format(username))
return user
def build_filter_set_role() -> Role:
filter_set_role: Role = security_manager.add_role("filter_set_role")
filterset_view_name: ViewMenu = security_manager.find_view_menu("FilterSets")
all_datasource_view_name: ViewMenu = security_manager.find_view_menu(
"all_datasource_access"
)
pvms: List[PermissionView] = security_manager.find_permissions_view_menu(
filterset_view_name
) + security_manager.find_permissions_view_menu(all_datasource_view_name)
for pvm in pvms:
security_manager.add_permission_role(filter_set_role, pvm)
return filter_set_role
@pytest.fixture
def client() -> Generator[FlaskClient[Any], None, None]:
with app.test_client() as client:
yield client
@pytest.fixture
def dashboard() -> Generator[Dashboard, None, None]:
dashboard: Dashboard
slice_: Slice
datasource: SqlaTable
database: Database
session: Session
try:
with app.app_context() as ctx:
dashboard_owner_user = security_manager.find_user(DASHBOARD_OWNER_USERNAME)
database = create_database("test_database_filter_sets")
datasource = create_datasource_table(
name="test_datasource", database=database, owners=[dashboard_owner_user]
)
slice_ = create_slice(
datasource=datasource, name="test_slice", owners=[dashboard_owner_user]
)
dashboard = create_dashboard(
dashboard_title="test_dashboard",
published=True,
slices=[slice_],
owners=[dashboard_owner_user],
)
session = ctx.app.appbuilder.get_session
session.add(dashboard)
session.commit()
yield dashboard
except Exception as ex:
print(str(ex))
finally:
with app.app_context() as ctx:
session = ctx.app.appbuilder.get_session
try:
dashboard.owners = []
slice_.owners = []
datasource.owners = []
session.merge(dashboard)
session.merge(slice_)
session.merge(datasource)
session.commit()
session.delete(dashboard)
session.delete(slice_)
session.delete(datasource)
session.delete(database)
session.commit()
except Exception as ex:
print(str(ex))
@pytest.fixture
def dashboard_id(dashboard) -> int:
return dashboard.id
@pytest.fixture
def filtersets(
dashboard_id: int, test_users: Dict[str, int], dumped_valid_json_metadata: str
) -> Generator[Dict[str, List[FilterSet]], None, None]:
try:
with app.app_context() as ctx:
session: Session = ctx.app.appbuilder.get_session
first_filter_set = FilterSet(
name="filter_set_1_of_" + str(dashboard_id),
dashboard_id=dashboard_id,
json_metadata=dumped_valid_json_metadata,
owner_id=dashboard_id,
owner_type="Dashboard",
)
second_filter_set = FilterSet(
name="filter_set_2_of_" + str(dashboard_id),
json_metadata=dumped_valid_json_metadata,
dashboard_id=dashboard_id,
owner_id=dashboard_id,
owner_type="Dashboard",
)
third_filter_set = FilterSet(
name="filter_set_3_of_" + str(dashboard_id),
json_metadata=dumped_valid_json_metadata,
dashboard_id=dashboard_id,
owner_id=test_users[FILTER_SET_OWNER_USERNAME],
owner_type="User",
)
forth_filter_set = FilterSet(
name="filter_set_4_of_" + str(dashboard_id),
json_metadata=dumped_valid_json_metadata,
dashboard_id=dashboard_id,
owner_id=test_users[FILTER_SET_OWNER_USERNAME],
owner_type="User",
)
session.add(first_filter_set)
session.add(second_filter_set)
session.add(third_filter_set)
session.add(forth_filter_set)
session.commit()
yv = {
"Dashboard": [first_filter_set, second_filter_set],
FILTER_SET_OWNER_USERNAME: [third_filter_set, forth_filter_set],
}
yield yv
except Exception as ex:
print(str(ex))
@pytest.fixture
def filterset_id(filtersets: Dict[str, List[FilterSet]]) -> int:
return filtersets["Dashboard"][0].id
@pytest.fixture
def valid_json_metadata() -> Dict[str, Any]:
return {"nativeFilters": {}}
@pytest.fixture
def dumped_valid_json_metadata(valid_json_metadata: Dict[str, Any]) -> str:
return json.dumps(valid_json_metadata)
@pytest.fixture
def exists_user_id() -> int:
return 1
@pytest.fixture
def valid_filter_set_data_for_create(
dashboard_id: int, dumped_valid_json_metadata: str, exists_user_id: int
) -> Dict[str, Any]:
name = "test_filter_set_of_dashboard_" + str(dashboard_id)
return {
NAME_FIELD: name,
DESCRIPTION_FIELD: "description of " + name,
JSON_METADATA_FIELD: dumped_valid_json_metadata,
OWNER_TYPE_FIELD: USER_OWNER_TYPE,
OWNER_ID_FIELD: exists_user_id,
}
@pytest.fixture
def valid_filter_set_data_for_update(
dashboard_id: int, dumped_valid_json_metadata: str, exists_user_id: int
) -> Dict[str, Any]:
name = "name_changed_test_filter_set_of_dashboard_" + str(dashboard_id)
return {
NAME_FIELD: name,
DESCRIPTION_FIELD: "changed description of " + name,
JSON_METADATA_FIELD: dumped_valid_json_metadata,
}
@pytest.fixture
def not_exists_dashboard(dashboard_id: int) -> int:
return dashboard_id + 1
@pytest.fixture
def not_exists_user_id() -> int:
return 99999
@pytest.fixture()
def dashboard_based_filter_set_dict(
filtersets: Dict[str, List[FilterSet]]
) -> Dict[str, Any]:
return filtersets["Dashboard"][0].to_dict()
@pytest.fixture()
def user_based_filter_set_dict(
filtersets: Dict[str, List[FilterSet]]
) -> Dict[str, Any]:
return filtersets[FILTER_SET_OWNER_USERNAME][0].to_dict()

View File

@@ -0,0 +1,22 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
FILTER_SET_URI = "api/v1/dashboard/{dashboard_id}/filtersets"
ADMIN_USERNAME_FOR_TEST = "admin@filterset.com"
DASHBOARD_OWNER_USERNAME = "dash_owner_user@filterset.com"
FILTER_SET_OWNER_USERNAME = "fs_owner_user@filterset.com"
REGULAR_USER = "regular_user@filterset.com"

View File

@@ -0,0 +1,630 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from typing import Any, Dict, TYPE_CHECKING
from superset.dashboards.filter_sets.consts import (
DASHBOARD_OWNER_TYPE,
DESCRIPTION_FIELD,
JSON_METADATA_FIELD,
NAME_FIELD,
OWNER_ID_FIELD,
OWNER_TYPE_FIELD,
USER_OWNER_TYPE,
)
from tests.integration_tests.base_tests import login
from tests.integration_tests.dashboards.filter_sets.consts import (
ADMIN_USERNAME_FOR_TEST,
DASHBOARD_OWNER_USERNAME,
FILTER_SET_OWNER_USERNAME,
)
from tests.integration_tests.dashboards.filter_sets.utils import (
call_create_filter_set,
get_filter_set_by_dashboard_id,
get_filter_set_by_name,
)
if TYPE_CHECKING:
from flask.testing import FlaskClient
def assert_filterset_was_not_created(filter_set_data: Dict[str, Any]) -> None:
assert get_filter_set_by_name(str(filter_set_data["name"])) is None
def assert_filterset_was_created(filter_set_data: Dict[str, Any]) -> None:
assert get_filter_set_by_name(filter_set_data["name"]) is not None
class TestCreateFilterSetsApi:
def test_with_extra_field__400(
self,
dashboard_id: int,
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_create["extra"] = "val"
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 400
assert response.json["message"]["extra"][0] == "Unknown field."
assert_filterset_was_not_created(valid_filter_set_data_for_create)
def test_with_id_field__400(
self,
dashboard_id: int,
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_create["id"] = 1
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 400
assert response.json["message"]["id"][0] == "Unknown field."
assert_filterset_was_not_created(valid_filter_set_data_for_create)
def test_with_dashboard_not_exists__404(
self,
not_exists_dashboard: int,
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# act
login(client, "admin")
response = call_create_filter_set(
client, not_exists_dashboard, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 404
assert_filterset_was_not_created(valid_filter_set_data_for_create)
def test_without_name__400(
self,
dashboard_id: int,
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_create.pop(NAME_FIELD, None)
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 400
assert get_filter_set_by_dashboard_id(dashboard_id) == []
def test_with_none_name__400(
self,
dashboard_id: int,
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_create[NAME_FIELD] = None
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 400
assert_filterset_was_not_created(valid_filter_set_data_for_create)
def test_with_int_as_name__400(
self,
dashboard_id: int,
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_create[NAME_FIELD] = 4
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 400
assert_filterset_was_not_created(valid_filter_set_data_for_create)
def test_without_description__201(
self,
dashboard_id: int,
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_create.pop(DESCRIPTION_FIELD, None)
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 201
assert_filterset_was_created(valid_filter_set_data_for_create)
def test_with_none_description__201(
self,
dashboard_id: int,
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_create[DESCRIPTION_FIELD] = None
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 201
assert_filterset_was_created(valid_filter_set_data_for_create)
def test_with_int_as_description__400(
self,
dashboard_id: int,
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_create[DESCRIPTION_FIELD] = 1
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 400
assert_filterset_was_not_created(valid_filter_set_data_for_create)
def test_without_json_metadata__400(
self,
dashboard_id: int,
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_create.pop(JSON_METADATA_FIELD, None)
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 400
assert_filterset_was_not_created(valid_filter_set_data_for_create)
def test_with_invalid_json_metadata__400(
self,
dashboard_id: int,
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_create[DESCRIPTION_FIELD] = {}
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 400
assert_filterset_was_not_created(valid_filter_set_data_for_create)
def test_without_owner_type__400(
self,
dashboard_id: int,
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_create.pop(OWNER_TYPE_FIELD, None)
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 400
assert_filterset_was_not_created(valid_filter_set_data_for_create)
def test_with_invalid_owner_type__400(
self,
dashboard_id: int,
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_create[OWNER_TYPE_FIELD] = "OTHER_TYPE"
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 400
assert_filterset_was_not_created(valid_filter_set_data_for_create)
def test_without_owner_id_when_owner_type_is_user__400(
self,
dashboard_id: int,
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_create[OWNER_TYPE_FIELD] = USER_OWNER_TYPE
valid_filter_set_data_for_create.pop(OWNER_ID_FIELD, None)
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 400
assert_filterset_was_not_created(valid_filter_set_data_for_create)
def test_without_owner_id_when_owner_type_is_dashboard__201(
self,
dashboard_id: int,
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_create[OWNER_TYPE_FIELD] = DASHBOARD_OWNER_TYPE
valid_filter_set_data_for_create.pop(OWNER_ID_FIELD, None)
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 201
assert_filterset_was_created(valid_filter_set_data_for_create)
def test_with_not_exists_owner__400(
self,
dashboard_id: int,
valid_filter_set_data_for_create: Dict[str, Any],
not_exists_user_id: int,
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_create[OWNER_TYPE_FIELD] = USER_OWNER_TYPE
valid_filter_set_data_for_create[OWNER_ID_FIELD] = not_exists_user_id
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 400
assert_filterset_was_not_created(valid_filter_set_data_for_create)
def test_when_caller_is_admin_and_owner_is_admin__201(
self,
dashboard_id: int,
test_users: Dict[str, int],
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_create[OWNER_TYPE_FIELD] = USER_OWNER_TYPE
valid_filter_set_data_for_create[OWNER_ID_FIELD] = test_users[
ADMIN_USERNAME_FOR_TEST
]
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 201
assert_filterset_was_created(valid_filter_set_data_for_create)
def test_when_caller_is_admin_and_owner_is_dashboard_owner__201(
self,
dashboard_id: int,
test_users: Dict[str, int],
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_create[OWNER_TYPE_FIELD] = USER_OWNER_TYPE
valid_filter_set_data_for_create[OWNER_ID_FIELD] = test_users[
DASHBOARD_OWNER_USERNAME
]
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 201
assert_filterset_was_created(valid_filter_set_data_for_create)
def test_when_caller_is_admin_and_owner_is_regular_user__201(
self,
dashboard_id: int,
test_users: Dict[str, int],
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_create[OWNER_TYPE_FIELD] = USER_OWNER_TYPE
valid_filter_set_data_for_create[OWNER_ID_FIELD] = test_users[
FILTER_SET_OWNER_USERNAME
]
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 201
assert_filterset_was_created(valid_filter_set_data_for_create)
def test_when_caller_is_admin_and_owner_type_is_dashboard__201(
self,
dashboard_id: int,
test_users: Dict[str, int],
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_create[OWNER_TYPE_FIELD] = DASHBOARD_OWNER_TYPE
valid_filter_set_data_for_create[OWNER_ID_FIELD] = dashboard_id
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 201
assert_filterset_was_created(valid_filter_set_data_for_create)
def test_when_caller_is_dashboard_owner_and_owner_is_admin__201(
self,
dashboard_id: int,
test_users: Dict[str, int],
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, DASHBOARD_OWNER_USERNAME)
valid_filter_set_data_for_create[OWNER_TYPE_FIELD] = USER_OWNER_TYPE
valid_filter_set_data_for_create[OWNER_ID_FIELD] = test_users[
ADMIN_USERNAME_FOR_TEST
]
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 201
assert_filterset_was_created(valid_filter_set_data_for_create)
def test_when_caller_is_dashboard_owner_and_owner_is_dashboard_owner__201(
self,
dashboard_id: int,
test_users: Dict[str, int],
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, DASHBOARD_OWNER_USERNAME)
valid_filter_set_data_for_create[OWNER_TYPE_FIELD] = USER_OWNER_TYPE
valid_filter_set_data_for_create[OWNER_ID_FIELD] = test_users[
DASHBOARD_OWNER_USERNAME
]
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 201
assert_filterset_was_created(valid_filter_set_data_for_create)
def test_when_caller_is_dashboard_owner_and_owner_is_regular_user__201(
self,
dashboard_id: int,
test_users: Dict[str, int],
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, DASHBOARD_OWNER_USERNAME)
valid_filter_set_data_for_create[OWNER_TYPE_FIELD] = USER_OWNER_TYPE
valid_filter_set_data_for_create[OWNER_ID_FIELD] = test_users[
FILTER_SET_OWNER_USERNAME
]
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 201
assert_filterset_was_created(valid_filter_set_data_for_create)
def test_when_caller_is_dashboard_owner_and_owner_type_is_dashboard__201(
self,
dashboard_id: int,
test_users: Dict[str, int],
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, DASHBOARD_OWNER_USERNAME)
valid_filter_set_data_for_create[OWNER_TYPE_FIELD] = DASHBOARD_OWNER_TYPE
valid_filter_set_data_for_create[OWNER_ID_FIELD] = dashboard_id
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 201
assert_filterset_was_created(valid_filter_set_data_for_create)
def test_when_caller_is_regular_user_and_owner_is_admin__201(
self,
dashboard_id: int,
test_users: Dict[str, int],
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, FILTER_SET_OWNER_USERNAME)
valid_filter_set_data_for_create[OWNER_TYPE_FIELD] = USER_OWNER_TYPE
valid_filter_set_data_for_create[OWNER_ID_FIELD] = test_users[
ADMIN_USERNAME_FOR_TEST
]
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 201
assert_filterset_was_created(valid_filter_set_data_for_create)
def test_when_caller_is_regular_user_and_owner_is_dashboard_owner__201(
self,
dashboard_id: int,
test_users: Dict[str, int],
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, FILTER_SET_OWNER_USERNAME)
valid_filter_set_data_for_create[OWNER_TYPE_FIELD] = USER_OWNER_TYPE
valid_filter_set_data_for_create[OWNER_ID_FIELD] = test_users[
DASHBOARD_OWNER_USERNAME
]
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 201
assert_filterset_was_created(valid_filter_set_data_for_create)
def test_when_caller_is_regular_user_and_owner_is_regular_user__201(
self,
dashboard_id: int,
test_users: Dict[str, int],
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, FILTER_SET_OWNER_USERNAME)
valid_filter_set_data_for_create[OWNER_TYPE_FIELD] = USER_OWNER_TYPE
valid_filter_set_data_for_create[OWNER_ID_FIELD] = test_users[
FILTER_SET_OWNER_USERNAME
]
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 201
assert_filterset_was_created(valid_filter_set_data_for_create)
def test_when_caller_is_regular_user_and_owner_type_is_dashboard__403(
self,
dashboard_id: int,
test_users: Dict[str, int],
valid_filter_set_data_for_create: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, FILTER_SET_OWNER_USERNAME)
valid_filter_set_data_for_create[OWNER_TYPE_FIELD] = DASHBOARD_OWNER_TYPE
valid_filter_set_data_for_create[OWNER_ID_FIELD] = dashboard_id
# act
response = call_create_filter_set(
client, dashboard_id, valid_filter_set_data_for_create
)
# assert
assert response.status_code == 403
assert_filterset_was_not_created(valid_filter_set_data_for_create)

View File

@@ -0,0 +1,209 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from typing import Any, Dict, List, TYPE_CHECKING
from tests.integration_tests.base_tests import login
from tests.integration_tests.dashboards.filter_sets.consts import (
DASHBOARD_OWNER_USERNAME,
FILTER_SET_OWNER_USERNAME,
REGULAR_USER,
)
from tests.integration_tests.dashboards.filter_sets.utils import (
call_delete_filter_set,
collect_all_ids,
get_filter_set_by_name,
)
if TYPE_CHECKING:
from flask.testing import FlaskClient
from superset.models.filter_set import FilterSet
def assert_filterset_was_not_deleted(filter_set_dict: Dict[str, Any]) -> None:
assert get_filter_set_by_name(filter_set_dict["name"]) is not None
def assert_filterset_deleted(filter_set_dict: Dict[str, Any]) -> None:
assert get_filter_set_by_name(filter_set_dict["name"]) is None
class TestDeleteFilterSet:
def test_with_dashboard_exists_filterset_not_exists__200(
self,
dashboard_id: int,
filtersets: Dict[str, List[FilterSet]],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
filter_set_id = max(collect_all_ids(filtersets)) + 1
response = call_delete_filter_set(client, {"id": filter_set_id}, dashboard_id)
# assert
assert response.status_code == 200
def test_with_dashboard_not_exists_filterset_not_exists__404(
self,
not_exists_dashboard: int,
filtersets: Dict[str, List[FilterSet]],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
filter_set_id = max(collect_all_ids(filtersets)) + 1
response = call_delete_filter_set(
client, {"id": filter_set_id}, not_exists_dashboard
)
# assert
assert response.status_code == 404
def test_with_dashboard_not_exists_filterset_exists__404(
self,
not_exists_dashboard: int,
dashboard_based_filter_set_dict: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
# act
response = call_delete_filter_set(
client, dashboard_based_filter_set_dict, not_exists_dashboard
)
# assert
assert response.status_code == 404
assert_filterset_was_not_deleted(dashboard_based_filter_set_dict)
def test_when_caller_is_admin_and_owner_type_is_user__200(
self,
test_users: Dict[str, int],
user_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
# act
response = call_delete_filter_set(client, user_based_filter_set_dict)
# assert
assert response.status_code == 200
assert_filterset_deleted(user_based_filter_set_dict)
def test_when_caller_is_admin_and_owner_type_is_dashboard__200(
self,
test_users: Dict[str, int],
dashboard_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
# act
response = call_delete_filter_set(client, dashboard_based_filter_set_dict)
# assert
assert response.status_code == 200
assert_filterset_deleted(dashboard_based_filter_set_dict)
def test_when_caller_is_dashboard_owner_and_owner_is_other_user_403(
self,
test_users: Dict[str, int],
user_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, DASHBOARD_OWNER_USERNAME)
# act
response = call_delete_filter_set(client, user_based_filter_set_dict)
# assert
assert response.status_code == 403
assert_filterset_was_not_deleted(user_based_filter_set_dict)
def test_when_caller_is_dashboard_owner_and_owner_type_is_dashboard__200(
self,
test_users: Dict[str, int],
dashboard_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, DASHBOARD_OWNER_USERNAME)
# act
response = call_delete_filter_set(client, dashboard_based_filter_set_dict)
# assert
assert response.status_code == 200
assert_filterset_deleted(dashboard_based_filter_set_dict)
def test_when_caller_is_filterset_owner__200(
self,
test_users: Dict[str, int],
user_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, FILTER_SET_OWNER_USERNAME)
# act
response = call_delete_filter_set(client, user_based_filter_set_dict)
# assert
assert response.status_code == 200
assert_filterset_deleted(user_based_filter_set_dict)
def test_when_caller_is_regular_user_and_owner_type_is_user__403(
self,
test_users: Dict[str, int],
user_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, REGULAR_USER)
# act
response = call_delete_filter_set(client, user_based_filter_set_dict)
# assert
assert response.status_code == 403
assert_filterset_was_not_deleted(user_based_filter_set_dict)
def test_when_caller_is_regular_user_and_owner_type_is_dashboard__403(
self,
test_users: Dict[str, int],
dashboard_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, REGULAR_USER)
# act
response = call_delete_filter_set(client, dashboard_based_filter_set_dict)
# assert
assert response.status_code == 403
assert_filterset_was_not_deleted(dashboard_based_filter_set_dict)

View File

@@ -0,0 +1,129 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from typing import Any, Dict, List, Set, TYPE_CHECKING
from tests.integration_tests.base_tests import login
from tests.integration_tests.dashboards.filter_sets.consts import (
DASHBOARD_OWNER_USERNAME,
FILTER_SET_OWNER_USERNAME,
REGULAR_USER,
)
from tests.integration_tests.dashboards.filter_sets.utils import (
call_get_filter_sets,
collect_all_ids,
)
if TYPE_CHECKING:
from flask.testing import FlaskClient
from superset.models.filter_set import FilterSet
class TestGetFilterSetsApi:
def test_with_dashboard_not_exists__404(
self, not_exists_dashboard: int, client: FlaskClient[Any],
):
# arrange
login(client, "admin")
# act
response = call_get_filter_sets(client, not_exists_dashboard)
# assert
assert response.status_code == 404
def test_dashboards_without_filtersets__200(
self, dashboard_id: int, client: FlaskClient[Any]
):
# arrange
login(client, "admin")
# act
response = call_get_filter_sets(client, dashboard_id)
# assert
assert response.status_code == 200
assert response.is_json and response.json["count"] == 0
def test_when_caller_admin__200(
self,
dashboard_id: int,
filtersets: Dict[str, List[FilterSet]],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
expected_ids: Set[int] = collect_all_ids(filtersets)
# act
response = call_get_filter_sets(client, dashboard_id)
# assert
assert response.status_code == 200
assert response.is_json and set(response.json["ids"]) == expected_ids
def test_when_caller_dashboard_owner__200(
self,
dashboard_id: int,
filtersets: Dict[str, List[FilterSet]],
client: FlaskClient[Any],
):
# arrange
login(client, DASHBOARD_OWNER_USERNAME)
expected_ids = collect_all_ids(filtersets["Dashboard"])
# act
response = call_get_filter_sets(client, dashboard_id)
# assert
assert response.status_code == 200
assert response.is_json and set(response.json["ids"]) == expected_ids
def test_when_caller_filterset_owner__200(
self,
dashboard_id: int,
filtersets: Dict[str, List[FilterSet]],
client: FlaskClient[Any],
):
# arrange
login(client, FILTER_SET_OWNER_USERNAME)
expected_ids = collect_all_ids(filtersets[FILTER_SET_OWNER_USERNAME])
# act
response = call_get_filter_sets(client, dashboard_id)
# assert
assert response.status_code == 200
assert response.is_json and set(response.json["ids"]) == expected_ids
def test_when_caller_regular_user__200(
self,
dashboard_id: int,
filtersets: Dict[str, List[int]],
client: FlaskClient[Any],
):
# arrange
login(client, REGULAR_USER)
expected_ids: Set[int] = set()
# act
response = call_get_filter_sets(client, dashboard_id)
# assert
assert response.status_code == 200
assert response.is_json and set(response.json["ids"]) == expected_ids

View File

@@ -0,0 +1,519 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import json
from typing import Any, Dict, List, TYPE_CHECKING
from superset.dashboards.filter_sets.consts import (
DESCRIPTION_FIELD,
JSON_METADATA_FIELD,
NAME_FIELD,
OWNER_TYPE_FIELD,
PARAMS_PROPERTY,
)
from tests.integration_tests.base_tests import login
from tests.integration_tests.dashboards.filter_sets.consts import (
DASHBOARD_OWNER_USERNAME,
FILTER_SET_OWNER_USERNAME,
REGULAR_USER,
)
from tests.integration_tests.dashboards.filter_sets.utils import (
call_update_filter_set,
collect_all_ids,
get_filter_set_by_name,
)
if TYPE_CHECKING:
from flask.testing import FlaskClient
from superset.models.filter_set import FilterSet
def merge_two_filter_set_dict(
first: Dict[Any, Any], second: Dict[Any, Any]
) -> Dict[Any, Any]:
for d in [first, second]:
if JSON_METADATA_FIELD in d:
if PARAMS_PROPERTY not in d:
d.setdefault(PARAMS_PROPERTY, json.loads(d[JSON_METADATA_FIELD]))
d.pop(JSON_METADATA_FIELD)
return {**first, **second}
def assert_filterset_was_not_updated(filter_set_dict: Dict[str, Any]) -> None:
assert filter_set_dict == get_filter_set_by_name(filter_set_dict["name"]).to_dict()
def assert_filterset_updated(
filter_set_dict_before: Dict[str, Any], data_updated: Dict[str, Any]
) -> None:
expected_data = merge_two_filter_set_dict(filter_set_dict_before, data_updated)
assert expected_data == get_filter_set_by_name(expected_data["name"]).to_dict()
class TestUpdateFilterSet:
def test_with_dashboard_exists_filterset_not_exists__404(
self,
dashboard_id: int,
filtersets: Dict[str, List[FilterSet]],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
filter_set_id = max(collect_all_ids(filtersets)) + 1
response = call_update_filter_set(
client, {"id": filter_set_id}, {}, dashboard_id
)
# assert
assert response.status_code == 404
def test_with_dashboard_not_exists_filterset_not_exists__404(
self,
not_exists_dashboard: int,
filtersets: Dict[str, List[FilterSet]],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
filter_set_id = max(collect_all_ids(filtersets)) + 1
response = call_update_filter_set(
client, {"id": filter_set_id}, {}, not_exists_dashboard
)
# assert
assert response.status_code == 404
def test_with_dashboard_not_exists_filterset_exists__404(
self,
not_exists_dashboard: int,
dashboard_based_filter_set_dict: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
# act
response = call_update_filter_set(
client, dashboard_based_filter_set_dict, {}, not_exists_dashboard
)
# assert
assert response.status_code == 404
assert_filterset_was_not_updated(dashboard_based_filter_set_dict)
def test_with_extra_field__400(
self,
dashboard_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_update["extra"] = "val"
# act
response = call_update_filter_set(
client, dashboard_based_filter_set_dict, valid_filter_set_data_for_update
)
# assert
assert response.status_code == 400
assert response.json["message"]["extra"][0] == "Unknown field."
assert_filterset_was_not_updated(dashboard_based_filter_set_dict)
def test_with_id_field__400(
self,
dashboard_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_update["id"] = 1
# act
response = call_update_filter_set(
client, dashboard_based_filter_set_dict, valid_filter_set_data_for_update
)
# assert
assert response.status_code == 400
assert response.json["message"]["id"][0] == "Unknown field."
assert_filterset_was_not_updated(dashboard_based_filter_set_dict)
def test_with_none_name__400(
self,
dashboard_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_update[NAME_FIELD] = None
# act
response = call_update_filter_set(
client, dashboard_based_filter_set_dict, valid_filter_set_data_for_update
)
# assert
assert response.status_code == 400
assert_filterset_was_not_updated(dashboard_based_filter_set_dict)
def test_with_int_as_name__400(
self,
dashboard_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_update[NAME_FIELD] = 4
# act
response = call_update_filter_set(
client, dashboard_based_filter_set_dict, valid_filter_set_data_for_update
)
# assert
assert response.status_code == 400
assert_filterset_was_not_updated(dashboard_based_filter_set_dict)
def test_without_name__200(
self,
dashboard_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_update.pop(NAME_FIELD, None)
# act
response = call_update_filter_set(
client, dashboard_based_filter_set_dict, valid_filter_set_data_for_update
)
# assert
assert response.status_code == 200
assert_filterset_updated(
dashboard_based_filter_set_dict, valid_filter_set_data_for_update
)
def test_with_none_description__400(
self,
dashboard_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_update[DESCRIPTION_FIELD] = None
# act
response = call_update_filter_set(
client, dashboard_based_filter_set_dict, valid_filter_set_data_for_update
)
# assert
assert response.status_code == 400
assert_filterset_was_not_updated(dashboard_based_filter_set_dict)
def test_with_int_as_description__400(
self,
dashboard_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_update[DESCRIPTION_FIELD] = 1
# act
response = call_update_filter_set(
client, dashboard_based_filter_set_dict, valid_filter_set_data_for_update
)
# assert
assert response.status_code == 400
assert_filterset_was_not_updated(dashboard_based_filter_set_dict)
def test_without_description__200(
self,
dashboard_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_update.pop(DESCRIPTION_FIELD, None)
# act
response = call_update_filter_set(
client, dashboard_based_filter_set_dict, valid_filter_set_data_for_update
)
# assert
assert response.status_code == 200
assert_filterset_updated(
dashboard_based_filter_set_dict, valid_filter_set_data_for_update
)
def test_with_invalid_json_metadata__400(
self,
dashboard_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_update[DESCRIPTION_FIELD] = {}
# act
response = call_update_filter_set(
client, dashboard_based_filter_set_dict, valid_filter_set_data_for_update
)
# assert
assert response.status_code == 400
assert_filterset_was_not_updated(dashboard_based_filter_set_dict)
def test_with_json_metadata__200(
self,
dashboard_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
valid_json_metadata: Dict[Any, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_json_metadata["nativeFilters"] = {"changed": "changed"}
valid_filter_set_data_for_update[JSON_METADATA_FIELD] = json.dumps(
valid_json_metadata
)
# act
response = call_update_filter_set(
client, dashboard_based_filter_set_dict, valid_filter_set_data_for_update
)
# assert
assert response.status_code == 200
assert_filterset_updated(
dashboard_based_filter_set_dict, valid_filter_set_data_for_update
)
def test_with_invalid_owner_type__400(
self,
dashboard_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_update[OWNER_TYPE_FIELD] = "OTHER_TYPE"
# act
response = call_update_filter_set(
client, dashboard_based_filter_set_dict, valid_filter_set_data_for_update
)
# assert
assert response.status_code == 400
assert_filterset_was_not_updated(dashboard_based_filter_set_dict)
def test_with_user_owner_type__400(
self,
dashboard_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_update[OWNER_TYPE_FIELD] = "User"
# act
response = call_update_filter_set(
client, dashboard_based_filter_set_dict, valid_filter_set_data_for_update
)
# assert
assert response.status_code == 400
assert_filterset_was_not_updated(dashboard_based_filter_set_dict)
def test_with_dashboard_owner_type__200(
self,
user_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
valid_filter_set_data_for_update[OWNER_TYPE_FIELD] = "Dashboard"
# act
response = call_update_filter_set(
client, user_based_filter_set_dict, valid_filter_set_data_for_update
)
# assert
assert response.status_code == 200
user_based_filter_set_dict["owner_id"] = user_based_filter_set_dict[
"dashboard_id"
]
assert_filterset_updated(
user_based_filter_set_dict, valid_filter_set_data_for_update
)
def test_when_caller_is_admin_and_owner_type_is_user__200(
self,
test_users: Dict[str, int],
user_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
# act
response = call_update_filter_set(
client, user_based_filter_set_dict, valid_filter_set_data_for_update
)
# assert
assert response.status_code == 200
assert_filterset_updated(
user_based_filter_set_dict, valid_filter_set_data_for_update
)
def test_when_caller_is_admin_and_owner_type_is_dashboard__200(
self,
test_users: Dict[str, int],
dashboard_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, "admin")
# act
response = call_update_filter_set(
client, dashboard_based_filter_set_dict, valid_filter_set_data_for_update
)
# assert
assert response.status_code == 200
assert_filterset_updated(
dashboard_based_filter_set_dict, valid_filter_set_data_for_update
)
def test_when_caller_is_dashboard_owner_and_owner_is_other_user_403(
self,
test_users: Dict[str, int],
user_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, DASHBOARD_OWNER_USERNAME)
# act
response = call_update_filter_set(
client, user_based_filter_set_dict, valid_filter_set_data_for_update
)
# assert
assert response.status_code == 403
assert_filterset_was_not_updated(user_based_filter_set_dict)
def test_when_caller_is_dashboard_owner_and_owner_type_is_dashboard__200(
self,
test_users: Dict[str, int],
dashboard_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, DASHBOARD_OWNER_USERNAME)
# act
response = call_update_filter_set(
client, dashboard_based_filter_set_dict, valid_filter_set_data_for_update
)
# assert
assert response.status_code == 200
assert_filterset_updated(
dashboard_based_filter_set_dict, valid_filter_set_data_for_update
)
def test_when_caller_is_filterset_owner__200(
self,
test_users: Dict[str, int],
user_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, FILTER_SET_OWNER_USERNAME)
# act
response = call_update_filter_set(
client, user_based_filter_set_dict, valid_filter_set_data_for_update
)
# assert
assert response.status_code == 200
assert_filterset_updated(
user_based_filter_set_dict, valid_filter_set_data_for_update
)
def test_when_caller_is_regular_user_and_owner_type_is_user__403(
self,
test_users: Dict[str, int],
user_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, REGULAR_USER)
# act
response = call_update_filter_set(
client, user_based_filter_set_dict, valid_filter_set_data_for_update
)
# assert
assert response.status_code == 403
assert_filterset_was_not_updated(user_based_filter_set_dict)
def test_when_caller_is_regular_user_and_owner_type_is_dashboard__403(
self,
test_users: Dict[str, int],
dashboard_based_filter_set_dict: Dict[str, Any],
valid_filter_set_data_for_update: Dict[str, Any],
client: FlaskClient[Any],
):
# arrange
login(client, REGULAR_USER)
# act
response = call_update_filter_set(
client, dashboard_based_filter_set_dict, valid_filter_set_data_for_update
)
# assert
assert response.status_code == 403
assert_filterset_was_not_updated(dashboard_based_filter_set_dict)

View File

@@ -0,0 +1,102 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from typing import Any, Dict, List, Optional, Set, TYPE_CHECKING, Union
from superset.models.filter_set import FilterSet
from tests.integration_tests.dashboards.filter_sets.consts import FILTER_SET_URI
from tests.integration_tests.test_app import app
if TYPE_CHECKING:
from flask import Response
from flask.testing import FlaskClient
def call_create_filter_set(
client: FlaskClient[Any], dashboard_id: int, data: Dict[str, Any]
) -> Response:
uri = FILTER_SET_URI.format(dashboard_id=dashboard_id)
return client.post(uri, json=data)
def call_get_filter_sets(client: FlaskClient[Any], dashboard_id: int) -> Response:
uri = FILTER_SET_URI.format(dashboard_id=dashboard_id)
return client.get(uri)
def call_delete_filter_set(
client: FlaskClient[Any],
filter_set_dict_to_update: Dict[str, Any],
dashboard_id: Optional[int] = None,
) -> Response:
dashboard_id = (
dashboard_id
if dashboard_id is not None
else filter_set_dict_to_update["dashboard_id"]
)
uri = "{}/{}".format(
FILTER_SET_URI.format(dashboard_id=dashboard_id),
filter_set_dict_to_update["id"],
)
return client.delete(uri)
def call_update_filter_set(
client: FlaskClient[Any],
filter_set_dict_to_update: Dict[str, Any],
data: Dict[str, Any],
dashboard_id: Optional[int] = None,
) -> Response:
dashboard_id = (
dashboard_id
if dashboard_id is not None
else filter_set_dict_to_update["dashboard_id"]
)
uri = "{}/{}".format(
FILTER_SET_URI.format(dashboard_id=dashboard_id),
filter_set_dict_to_update["id"],
)
return client.put(uri, json=data)
def get_filter_set_by_name(name: str) -> FilterSet:
with app.app_context():
return FilterSet.get_by_name(name)
def get_filter_set_by_id(id_: int) -> FilterSet:
with app.app_context():
return FilterSet.get(id_)
def get_filter_set_by_dashboard_id(dashboard_id: int) -> FilterSet:
with app.app_context():
return FilterSet.get_by_dashboard_id(dashboard_id)
def collect_all_ids(
filtersets: Union[Dict[str, List[FilterSet]], List[FilterSet]]
) -> Set[int]:
if isinstance(filtersets, dict):
filtersets_lists: List[List[FilterSet]] = list(filtersets.values())
ids: Set[int] = set()
lst: List[FilterSet]
for lst in filtersets_lists:
ids.update(set(map(lambda fs: fs.id, lst)))
return ids
return set(map(lambda fs: fs.id, filtersets))

View File

@@ -82,10 +82,10 @@ def create_dashboard(
json_metadata: str = "",
position_json: str = "",
) -> Dashboard:
dashboard_title = dashboard_title or random_title()
slug = slug or random_slug()
owners = owners or []
slices = slices or []
dashboard_title = dashboard_title if dashboard_title is not None else random_title()
slug = slug if slug is not None else random_slug()
owners = owners if owners is not None else []
slices = slices if slices is not None else []
return Dashboard(
dashboard_title=dashboard_title,
slug=slug,
@@ -109,25 +109,40 @@ def create_slice_to_db(
datasource_id: Optional[int] = None,
owners: Optional[List[User]] = None,
) -> Slice:
slice_ = create_slice(datasource_id, name, owners)
slice_ = create_slice(datasource_id, name=name, owners=owners)
insert_model(slice_)
inserted_slices_ids.append(slice_.id)
return slice_
def create_slice(
datasource_id: Optional[int], name: Optional[str], owners: Optional[List[User]]
datasource_id: Optional[int] = None,
datasource: Optional[SqlaTable] = None,
name: Optional[str] = None,
owners: Optional[List[User]] = None,
) -> Slice:
name = name or random_str()
owners = owners or []
name = name if name is not None else random_str()
owners = owners if owners is not None else []
datasource_type = "table"
if datasource:
return Slice(
slice_name=name,
table=datasource,
owners=owners,
datasource_type=datasource_type,
)
datasource_id = (
datasource_id or create_datasource_table_to_db(name=name + "_table").id
datasource_id
if datasource_id is not None
else create_datasource_table_to_db(name=name + "_table").id
)
return Slice(
slice_name=name,
datasource_id=datasource_id,
owners=owners,
datasource_type="table",
datasource_type=datasource_type,
)
@@ -136,7 +151,7 @@ def create_datasource_table_to_db(
db_id: Optional[int] = None,
owners: Optional[List[User]] = None,
) -> SqlaTable:
sqltable = create_datasource_table(name, db_id, owners)
sqltable = create_datasource_table(name, db_id, owners=owners)
insert_model(sqltable)
inserted_sqltables_ids.append(sqltable.id)
return sqltable
@@ -145,11 +160,14 @@ def create_datasource_table_to_db(
def create_datasource_table(
name: Optional[str] = None,
db_id: Optional[int] = None,
database: Optional[Database] = None,
owners: Optional[List[User]] = None,
) -> SqlaTable:
name = name or random_str()
owners = owners or []
db_id = db_id or create_database_to_db(name=name + "_db").id
name = name if name is not None else random_str()
owners = owners if owners is not None else []
if database:
return SqlaTable(table_name=name, database=database, owners=owners)
db_id = db_id if db_id is not None else create_database_to_db(name=name + "_db").id
return SqlaTable(table_name=name, database_id=db_id, owners=owners)
@@ -161,7 +179,7 @@ def create_database_to_db(name: Optional[str] = None) -> Database:
def create_database(name: Optional[str] = None) -> Database:
name = name or random_str()
name = name if name is not None else random_str()
return Database(database_name=name, sqlalchemy_uri="sqlite:///:memory:")