Files
superset2/tests/integration_tests/utils/encrypt_tests.py

163 lines
6.2 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Any, Optional
import pytest
from sqlalchemy import String, TypeDecorator
from sqlalchemy_utils import EncryptedType
from sqlalchemy_utils.types.encrypted.encrypted_type import StringEncryptedType
from superset.extensions import encrypted_field_factory
from superset.utils.encrypt import (
AbstractEncryptedFieldAdapter,
SecretsMigrator,
SQLAlchemyUtilsAdapter,
)
from tests.integration_tests.base_tests import SupersetTestCase
class CustomEncFieldAdapter(AbstractEncryptedFieldAdapter):
def create(
self,
app_config: Optional[dict[str, Any]],
*args: list[Any],
**kwargs: Optional[dict[str, Any]],
) -> TypeDecorator:
if app_config:
return StringEncryptedType(*args, app_config["SECRET_KEY"], **kwargs)
else:
raise Exception("Missing app_config kwarg")
class EncryptedFieldTest(SupersetTestCase):
def setUp(self) -> None:
self.app.config["SQLALCHEMY_ENCRYPTED_FIELD_TYPE_ADAPTER"] = (
SQLAlchemyUtilsAdapter
)
encrypted_field_factory.init_app(self.app)
super().setUp()
def test_create_field(self):
field = encrypted_field_factory.create(String(1024))
assert isinstance(field, EncryptedType)
assert callable(field.key)
assert self.app.config["SECRET_KEY"] == field.key()
def test_custom_adapter(self):
self.app.config["SQLALCHEMY_ENCRYPTED_FIELD_TYPE_ADAPTER"] = (
CustomEncFieldAdapter
)
encrypted_field_factory.init_app(self.app)
field = encrypted_field_factory.create(String(1024))
assert isinstance(field, StringEncryptedType)
assert not isinstance(field, EncryptedType)
assert field.__created_by_enc_field_adapter__
assert self.app.config["SECRET_KEY"] == field.key
def test_ensure_encrypted_field_factory_is_used(self):
"""
Ensure that the EncryptedFieldFactory is used everywhere
that an encrypted field is needed.
:return:
"""
from superset.extensions import encrypted_field_factory
migrator = SecretsMigrator("")
encrypted_fields = migrator.discover_encrypted_fields()
for table_name, cols in encrypted_fields.items():
for col_name, field in cols.items():
if not encrypted_field_factory.created_by_enc_field_factory(field):
self.fail(
f"The encrypted column [{col_name}]"
f" in the table [{table_name}]"
" was not created using the"
" encrypted_field_factory"
)
def test_lazy_key_resolution(self):
"""
Verify that the encryption key is resolved lazily at runtime,
not captured statically at field creation time.
"""
original_key = self.app.config["SECRET_KEY"]
field = encrypted_field_factory.create(String(1024))
# Key should initially resolve to the current SECRET_KEY
assert callable(field.key)
assert field.key() == original_key
# Simulate a key change (e.g. config override, env var update)
new_key = "ROTATED_TEST_KEY_12345"
self.app.config["SECRET_KEY"] = new_key
# The field's key should now resolve to the new value
assert field.key() == new_key
# Restore original key
self.app.config["SECRET_KEY"] = original_key
assert field.key() == original_key
def test_secret_key_rotation(self):
"""
End-to-end test: encrypt data with KEY_A, rotate to KEY_B,
run re-encryption, and verify data is accessible under KEY_B.
"""
from sqlalchemy.engine import make_url
key_a = self.app.config["SECRET_KEY"]
key_b = "NEW_ROTATION_TEST_KEY_67890"
test_value = "super_secret_password_123"
field = encrypted_field_factory.create(String(1024))
dialect = make_url("sqlite://").get_dialect()
# Step 1: Encrypt with KEY_A
encrypted_a = field.process_bind_param(test_value, dialect)
assert encrypted_a is not None
assert encrypted_a != test_value
# Step 2: Verify decryption with KEY_A works
decrypted = field.process_result_value(encrypted_a, dialect)
assert decrypted == test_value
# Step 3: Rotate key to KEY_B
self.app.config["SECRET_KEY"] = key_b
# Step 4: Re-encrypt with KEY_B (simulating SecretsMigrator logic)
# Decrypt using previous key
previous_field = EncryptedType(type_in=field.underlying_type, key=key_a)
decrypted_with_prev = previous_field.process_result_value(encrypted_a, dialect)
assert decrypted_with_prev == test_value
# Re-encrypt using current key (KEY_B, resolved via lambda)
encrypted_b = field.process_bind_param(decrypted_with_prev, dialect)
assert encrypted_b is not None
assert encrypted_b != encrypted_a # Different ciphertext
# Step 5: Verify decryption with KEY_B works
decrypted_b = field.process_result_value(encrypted_b, dialect)
assert decrypted_b == test_value
# Step 6: Verify KEY_A can no longer decrypt the new ciphertext
self.app.config["SECRET_KEY"] = key_a
with pytest.raises(ValueError, match="Invalid decryption key"):
field.process_result_value(encrypted_b, dialect)
# Restore original key
self.app.config["SECRET_KEY"] = key_a