fix(tags): ensure tag creation is compatible with MySQL by avoiding Markup objects (#36075)

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Yuvraj Singh Chauhan
2025-11-18 03:35:42 +05:30
committed by GitHub
parent 6fc7af5ba8
commit 28bdec2c79
4 changed files with 594 additions and 2 deletions

View File

@@ -23,6 +23,7 @@ from urllib import parse
import prison
import pytest
from freezegun import freeze_time
from markupsafe import Markup
from sqlalchemy import and_
from sqlalchemy.sql import func
@@ -784,3 +785,47 @@ class TestTagApi(InsertChartMixin, SupersetTestCase):
result = rv.json["result"]
assert len(result["objects_tagged"]) == 2
assert len(result["objects_skipped"]) == 1
def test_create_tag_mysql_compatibility(self) -> None:
"""
Test creating a tag via API to ensure MySQL compatibility.
This test verifies the fix for issue #32484 where tag creation
failed with MySQL due to Markup objects being used instead of strings.
"""
self.login(ADMIN_USERNAME)
tag_name = "mysql-fix-verification-20251111"
uri = "api/v1/tag/"
# Create a tag via the API (tags can only be created with objects_to_tag)
# So we'll create a simple tag and verify it in the database
data = {
"name": tag_name,
"description": "Test tag for MySQL compatibility verification",
"objects_to_tag": [], # Empty list is acceptable
}
rv = self.client.post(uri, json=data)
# Should succeed without SQL errors (201 for created or 200 for success)
assert rv.status_code in [200, 201], (
f"Tag creation should succeed, got {rv.status_code}"
)
# Query the database to verify the tag was created correctly
created_tag = db.session.query(Tag).filter_by(name=tag_name).first()
assert created_tag is not None, "Tag should exist in database"
# Critical check: ensure the tag name is a plain string, not Markup
assert isinstance(created_tag.name, str), "Tag name should be a plain string"
assert not isinstance(created_tag.name, Markup), (
"Tag name should NOT be a Markup object"
)
assert created_tag.name.__class__ is str, "Tag name should be exactly str type"
assert created_tag.name == tag_name, "Tag name should match the input"
# Cleanup
db.session.delete(created_tag)
db.session.commit()

View File

@@ -0,0 +1,285 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Integration tests for tag creation with MySQL compatibility."""
import pytest
from markupsafe import Markup
from sqlalchemy.exc import ProgrammingError
from superset import db
from superset.tags.models import get_tag, Tag, TagType
from tests.integration_tests.base_tests import SupersetTestCase
class TestTagCreationMySQLCompatibility(SupersetTestCase):
"""Test suite to verify the MySQL compatibility fix for tag creation."""
def setUp(self) -> None:
"""Set up test fixtures."""
super().setUp()
# Clean up any existing test tags
self.cleanup_test_tags()
def tearDown(self):
"""Clean up after tests."""
self.cleanup_test_tags()
super().tearDown()
def cleanup_test_tags(self):
"""Remove test tags from the database."""
test_tag_prefixes = [
"mysql-fix-verification",
"test-tag",
"integration-test",
]
for prefix in test_tag_prefixes:
tags = db.session.query(Tag).filter(Tag.name.like(f"{prefix}%")).all()
for tag in tags:
db.session.delete(tag)
db.session.commit()
def test_create_tag_returns_string_not_markup(self) -> None:
"""
Test that creating a tag results in a plain string name, not Markup.
This is the core test for issue #32484 - ensures tag names are
compatible with MySQL driver requirements.
"""
tag_name = "mysql-fix-verification-20251111"
# Create tag using get_tag function
tag = get_tag(tag_name, db.session, TagType.custom)
# Verify the tag was created
assert tag is not None, "Tag should be created"
assert tag.id is not None, "Tag should have an ID after creation"
# Critical checks for MySQL compatibility
assert isinstance(tag.name, str), "Tag name should be a plain string"
assert not isinstance(tag.name, Markup), (
"Tag name should NOT be a Markup object"
)
assert tag.name.__class__ is str, "Tag name should be exactly str type"
assert tag.name == tag_name, f"Tag name should be '{tag_name}'"
# Verify the tag persists correctly in the database
db.session.commit()
# Retrieve the tag from database to ensure it was stored correctly
retrieved_tag = db.session.query(Tag).filter_by(name=tag_name).first()
assert retrieved_tag is not None, "Tag should be retrievable from database"
assert isinstance(retrieved_tag.name, str), (
"Retrieved tag name should be a string"
)
assert not isinstance(retrieved_tag.name, Markup), (
"Retrieved tag name should NOT be Markup"
)
assert retrieved_tag.name == tag_name, (
"Retrieved tag name should match original"
)
def test_create_multiple_tags_no_sql_error(self) -> None:
"""
Test creating multiple tags in sequence without SQL errors.
This ensures the fix works consistently across multiple operations.
"""
tag_names = [
"integration-test-tag-1",
"integration-test-tag-2",
"integration-test-tag-3",
]
created_tags = []
try:
for tag_name in tag_names:
tag = get_tag(tag_name, db.session, TagType.custom)
created_tags.append(tag)
assert isinstance(tag.name, str), (
f"Tag '{tag_name}' name should be a string"
)
assert not isinstance(tag.name, Markup), (
f"Tag '{tag_name}' should NOT be Markup"
)
except ProgrammingError as e:
pytest.fail(
f"ProgrammingError should not occur when creating tags: {e}",
)
db.session.commit()
# Verify all tags were created successfully
assert len(created_tags) == len(tag_names), "All tags should be created"
for tag_name in tag_names:
tag = db.session.query(Tag).filter_by(name=tag_name).first()
assert tag is not None, f"Tag '{tag_name}' should exist in database"
assert isinstance(tag.name, str), (
f"Tag '{tag_name}' should have string name"
)
def test_create_tag_with_special_characters(self) -> None:
"""
Test creating tags with special characters that might trigger HTML escaping.
Ensures these characters don't cause the tag name to become a Markup object.
"""
tag_names = [
"test-tag-with-dashes",
"test_tag_with_underscores",
"test.tag.with.dots",
"test:tag:with:colons",
"test tag with spaces",
]
for tag_name in tag_names:
try:
tag = get_tag(tag_name, db.session, TagType.custom)
assert isinstance(tag.name, str), f"Tag '{tag_name}' should be a string"
assert not isinstance(tag.name, Markup), (
f"Tag '{tag_name}' should NOT be Markup"
)
assert tag.name == tag_name, (
f"Tag name should match input: '{tag_name}'"
)
db.session.commit()
# Verify database persistence
retrieved_tag = db.session.query(Tag).filter_by(name=tag_name).first()
assert retrieved_tag is not None, (
f"Tag '{tag_name}' should be in database"
)
assert retrieved_tag.name == tag_name, (
f"Retrieved tag name should match: '{tag_name}'"
)
except ProgrammingError as e:
pytest.fail(
f"ProgrammingError should not occur for tag '{tag_name}': {e}"
)
def test_get_existing_tag_returns_string(self) -> None:
"""Test that retrieving an existing tag also returns a string name."""
tag_name = "test-tag-for-retrieval"
# Create tag first
original_tag = get_tag(tag_name, db.session, TagType.custom)
db.session.commit()
# Retrieve the same tag
retrieved_tag = get_tag(tag_name, db.session, TagType.custom)
assert retrieved_tag.id == original_tag.id, "Should retrieve the same tag"
assert isinstance(retrieved_tag.name, str), (
"Retrieved tag name should be a string"
)
assert not isinstance(retrieved_tag.name, Markup), (
"Retrieved tag name should NOT be Markup"
)
def test_tag_with_whitespace_handling(self) -> None:
"""Test that tags with leading/trailing whitespace are handled correctly."""
input_name = " test-tag-with-whitespace "
expected_name = "test-tag-with-whitespace"
tag = get_tag(input_name, db.session, TagType.custom)
assert isinstance(tag.name, str), "Tag name should be a string"
assert not isinstance(tag.name, Markup), "Tag name should NOT be Markup"
assert tag.name == expected_name, "Tag name should be stripped of whitespace"
db.session.commit()
# Verify in database
retrieved_tag = db.session.query(Tag).filter_by(name=expected_name).first()
assert retrieved_tag is not None, "Tag should exist with stripped name"
def test_tag_type_variations(self) -> None:
"""Test creating tags with different TagType values."""
tag_types = [
(TagType.custom, "test-custom-tag"),
(TagType.type, "type:chart"),
(TagType.owner, "owner:123"),
(TagType.favorited_by, "favorited_by:456"),
]
for tag_type, tag_name in tag_types:
try:
tag = get_tag(tag_name, db.session, tag_type)
assert isinstance(tag.name, str), (
f"Tag name for {tag_type} should be a string"
)
assert not isinstance(tag.name, Markup), (
f"Tag name for {tag_type} should NOT be Markup"
)
assert tag.type == tag_type, f"Tag type should be {tag_type}"
db.session.commit()
except ProgrammingError as e:
pytest.fail(f"ProgrammingError for {tag_type} tag '{tag_name}': {e}")
def test_no_sql_syntax_error_on_commit(self) -> None:
"""
Test that committing a tag to the database doesn't cause SQL syntax errors.
This is the main error that occurred in issue #32484 with MySQL.
"""
tag_name = "mysql-compatibility-test"
try:
get_tag(tag_name, db.session, TagType.custom)
# This commit should not raise a ProgrammingError
db.session.commit()
# Verify the tag exists in the database
retrieved_tag = db.session.query(Tag).filter_by(name=tag_name).first()
assert retrieved_tag is not None, "Tag should exist after commit"
assert isinstance(retrieved_tag.name, str), (
"Retrieved tag should have string name"
)
except ProgrammingError as e:
pytest.fail(f"ProgrammingError should not occur during commit: {e}")
except Exception as e:
pytest.fail(f"Unexpected error during tag creation and commit: {e}")
def test_tag_name_is_pure_string_type(self) -> None:
"""
Test that tag name is exactly of type 'str', not a subclass like Markup.
This is critical for MySQL compatibility as the driver cannot handle
Markup objects in SQL queries.
"""
tag_name = "test-tag-type-check"
tag = get_tag(tag_name, db.session, TagType.custom)
# Check that it's exactly a str, not a subclass
assert tag.name.__class__ is str, (
f"Tag name type should be exactly 'str', got {type(tag.name)}"
)
assert not isinstance(tag.name, Markup), (
"Tag name should not be a Markup instance"
)
# Markup is a subclass of str, so this additional check is important
assert tag.name.__class__.__name__ == "str", "Tag name class should be 'str'"

View File

@@ -0,0 +1,263 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Unit tests for tag models, specifically testing the MySQL fix for tag creation."""
from unittest.mock import MagicMock
from markupsafe import Markup
from sqlalchemy.orm import Session
from superset.tags.models import get_tag, Tag, TagType
def test_get_tag_returns_plain_string_not_markup() -> None:
"""
Test that get_tag() returns a Tag with a plain string name, not a Markup object.
This verifies the fix for issue #32484 where escape() was wrapping tag names
in Markup objects, causing MySQL driver errors.
"""
# Setup
mock_session = MagicMock(spec=Session)
mock_query = MagicMock()
mock_session.query.return_value = mock_query
mock_query.filter_by.return_value.one_or_none.return_value = None
tag_name = "test-tag-name"
tag_type = TagType.custom
# Execute
result = get_tag(tag_name, mock_session, tag_type)
# Verify
assert isinstance(result.name, str), "Tag name should be a plain string"
assert not isinstance(result.name, Markup), "Tag name should NOT be a Markup object"
assert result.name == tag_name, f"Tag name should be '{tag_name}'"
assert result.type == tag_type, f"Tag type should be {tag_type}"
def test_get_tag_with_special_characters() -> None:
"""
Test that get_tag() correctly handles tag names with special characters
without converting them to Markup objects.
"""
mock_session = MagicMock(spec=Session)
mock_query = MagicMock()
mock_session.query.return_value = mock_query
mock_query.filter_by.return_value.one_or_none.return_value = None
# Test with various special characters that might trigger HTML escaping
tag_names = [
"tag-with-dashes",
"tag_with_underscores",
"tag.with.dots",
"tag:with:colons",
"tag/with/slashes",
"tag with spaces",
"mysql-fix-verification-20251111",
]
for tag_name in tag_names:
result = get_tag(tag_name, mock_session, TagType.custom)
assert isinstance(result.name, str), (
f"Tag name '{tag_name}' should be a plain string"
)
assert not isinstance(result.name, Markup), (
f"Tag name '{tag_name}' should NOT be a Markup object"
)
assert result.name == tag_name, f"Tag name should match input: '{tag_name}'"
def test_get_tag_with_html_characters() -> None:
"""
Test that get_tag() handles HTML special characters correctly.
Even though these characters might have been escaped before, they should
now be stored as plain strings to avoid MySQL driver issues.
"""
mock_session = MagicMock(spec=Session)
mock_query = MagicMock()
mock_session.query.return_value = mock_query
mock_query.filter_by.return_value.one_or_none.return_value = None
# Test with HTML special characters
tag_names = [
"tag<with>brackets",
"tag&with&ampersands",
'tag"with"quotes',
"tag'with'apostrophes",
]
for tag_name in tag_names:
result = get_tag(tag_name, mock_session, TagType.custom)
assert isinstance(result.name, str), (
f"Tag name '{tag_name}' should be a plain string"
)
assert not isinstance(result.name, Markup), (
f"Tag name '{tag_name}' should NOT be a Markup object"
)
# Name should remain unchanged (not HTML-escaped)
assert result.name == tag_name, f"Tag name should not be escaped: '{tag_name}'"
def test_get_tag_strips_whitespace() -> None:
"""Test that get_tag() strips leading and trailing whitespace from tag names."""
mock_session = MagicMock(spec=Session)
mock_query = MagicMock()
mock_session.query.return_value = mock_query
mock_query.filter_by.return_value.one_or_none.return_value = None
tag_names_with_whitespace = [
(" tag-with-leading-spaces", "tag-with-leading-spaces"),
("tag-with-trailing-spaces ", "tag-with-trailing-spaces"),
(" tag-with-both ", "tag-with-both"),
("\ttag-with-tabs\t", "tag-with-tabs"),
("\ntag-with-newlines\n", "tag-with-newlines"),
]
for input_name, expected_name in tag_names_with_whitespace:
result = get_tag(input_name, mock_session, TagType.custom)
assert isinstance(result.name, str), "Tag name should be a plain string"
assert not isinstance(result.name, Markup), (
"Tag name should NOT be a Markup object"
)
assert result.name == expected_name, (
f"Tag name should be stripped: '{expected_name}'"
)
def test_get_tag_returns_existing_tag() -> None:
"""
Test that get_tag() returns existing tag from database.
Verifies it doesn't create a new one.
"""
mock_session = MagicMock(spec=Session)
# Create an existing tag
existing_tag = Tag(name="existing-tag", type=TagType.custom)
existing_tag.id = 42
mock_query = MagicMock()
mock_session.query.return_value = mock_query
mock_query.filter_by.return_value.one_or_none.return_value = existing_tag
# Execute
result = get_tag("existing-tag", mock_session, TagType.custom)
# Verify
assert result is existing_tag, "Should return the existing tag"
assert result.id == 42, "Should have the existing tag's ID"
mock_session.add.assert_not_called()
mock_session.commit.assert_not_called()
def test_get_tag_creates_new_tag() -> None:
"""Test that get_tag() creates and commits a new tag when it doesn't exist."""
mock_session = MagicMock(spec=Session)
mock_query = MagicMock()
mock_session.query.return_value = mock_query
mock_query.filter_by.return_value.one_or_none.return_value = None
tag_name = "new-tag"
tag_type = TagType.custom
# Execute
get_tag(tag_name, mock_session, tag_type)
# Verify
mock_session.add.assert_called_once()
mock_session.commit.assert_called_once()
# Check the tag object that was added
added_tag = mock_session.add.call_args[0][0]
assert isinstance(added_tag, Tag), "Should add a Tag object"
assert isinstance(added_tag.name, str), "Tag name should be a plain string"
assert not isinstance(added_tag.name, Markup), (
"Tag name should NOT be a Markup object"
)
assert added_tag.name == tag_name, "Tag name should match"
assert added_tag.type == tag_type, "Tag type should match"
def test_get_tag_with_different_tag_types() -> None:
"""Test that get_tag() works correctly with all TagType values."""
mock_session = MagicMock(spec=Session)
mock_query = MagicMock()
mock_session.query.return_value = mock_query
mock_query.filter_by.return_value.one_or_none.return_value = None
tag_types = [
TagType.custom,
TagType.type,
TagType.owner,
TagType.favorited_by,
]
for tag_type in tag_types:
tag_name = f"tag-{tag_type.name}"
result = get_tag(tag_name, mock_session, tag_type)
assert isinstance(result.name, str), (
f"Tag name for {tag_type} should be a plain string"
)
assert not isinstance(result.name, Markup), (
f"Tag name for {tag_type} should NOT be a Markup object"
)
assert result.type == tag_type, f"Tag type should be {tag_type}"
def test_tag_name_type_after_database_operation() -> None:
"""
Simulate the complete flow to ensure tag name remains a string
throughout the database operation lifecycle.
"""
mock_session = MagicMock(spec=Session)
mock_query = MagicMock()
mock_session.query.return_value = mock_query
mock_query.filter_by.return_value.one_or_none.return_value = None
tag_name = "mysql-compatibility-test"
# Execute
result = get_tag(tag_name, mock_session, TagType.custom)
# Verify the tag object before database operations
assert isinstance(result.name, str), (
"Tag name should be a string before DB operations"
)
assert not isinstance(result.name, Markup), (
"Tag name should NOT be Markup before DB operations"
)
# Verify that session.add was called with the correct tag
mock_session.add.assert_called_once()
added_tag = mock_session.add.call_args[0][0]
# The critical check: ensure the name passed to the database is a plain string
assert isinstance(added_tag.name, str), (
"Tag name should be a plain string when added to session"
)
assert not isinstance(added_tag.name, Markup), (
"Tag name should NOT be Markup when added to session"
)
assert added_tag.name.__class__ is str, (
"Tag name should be exactly str type, not a subclass"
)