diff --git a/superset/utils/encrypt.py b/superset/utils/encrypt.py index ab05e455a38..b833fc79de8 100644 --- a/superset/utils/encrypt.py +++ b/superset/utils/encrypt.py @@ -54,7 +54,7 @@ class SQLAlchemyUtilsAdapter( # pylint: disable=too-few-public-methods **kwargs: Optional[dict[str, Any]], ) -> TypeDecorator: if app_config: - return EncryptedType(*args, app_config["SECRET_KEY"], **kwargs) + return EncryptedType(*args, lambda: app_config["SECRET_KEY"], **kwargs) raise Exception( # pylint: disable=broad-exception-raised "Missing app_config kwarg" diff --git a/tests/integration_tests/utils/encrypt_tests.py b/tests/integration_tests/utils/encrypt_tests.py index 4f48f669132..95279ee607e 100644 --- a/tests/integration_tests/utils/encrypt_tests.py +++ b/tests/integration_tests/utils/encrypt_tests.py @@ -16,6 +16,7 @@ # under the License. from typing import Any, Optional +import pytest from sqlalchemy import String, TypeDecorator from sqlalchemy_utils import EncryptedType from sqlalchemy_utils.types.encrypted.encrypted_type import StringEncryptedType @@ -54,7 +55,8 @@ class EncryptedFieldTest(SupersetTestCase): def test_create_field(self): field = encrypted_field_factory.create(String(1024)) assert isinstance(field, EncryptedType) - assert self.app.config["SECRET_KEY"] == field.key + assert callable(field.key) + assert self.app.config["SECRET_KEY"] == field.key() def test_custom_adapter(self): self.app.config["SQLALCHEMY_ENCRYPTED_FIELD_TYPE_ADAPTER"] = ( @@ -86,3 +88,75 @@ class EncryptedFieldTest(SupersetTestCase): " was not created using the" " encrypted_field_factory" ) + + def test_lazy_key_resolution(self): + """ + Verify that the encryption key is resolved lazily at runtime, + not captured statically at field creation time. + """ + original_key = self.app.config["SECRET_KEY"] + field = encrypted_field_factory.create(String(1024)) + + # Key should initially resolve to the current SECRET_KEY + assert callable(field.key) + assert field.key() == original_key + + # Simulate a key change (e.g. config override, env var update) + new_key = "ROTATED_TEST_KEY_12345" + self.app.config["SECRET_KEY"] = new_key + + # The field's key should now resolve to the new value + assert field.key() == new_key + + # Restore original key + self.app.config["SECRET_KEY"] = original_key + assert field.key() == original_key + + def test_secret_key_rotation(self): + """ + End-to-end test: encrypt data with KEY_A, rotate to KEY_B, + run re-encryption, and verify data is accessible under KEY_B. + """ + from sqlalchemy.engine import make_url + + key_a = self.app.config["SECRET_KEY"] + key_b = "NEW_ROTATION_TEST_KEY_67890" + test_value = "super_secret_password_123" + + field = encrypted_field_factory.create(String(1024)) + dialect = make_url("sqlite://").get_dialect() + + # Step 1: Encrypt with KEY_A + encrypted_a = field.process_bind_param(test_value, dialect) + assert encrypted_a is not None + assert encrypted_a != test_value + + # Step 2: Verify decryption with KEY_A works + decrypted = field.process_result_value(encrypted_a, dialect) + assert decrypted == test_value + + # Step 3: Rotate key to KEY_B + self.app.config["SECRET_KEY"] = key_b + + # Step 4: Re-encrypt with KEY_B (simulating SecretsMigrator logic) + # Decrypt using previous key + previous_field = EncryptedType(type_in=field.underlying_type, key=key_a) + decrypted_with_prev = previous_field.process_result_value(encrypted_a, dialect) + assert decrypted_with_prev == test_value + + # Re-encrypt using current key (KEY_B, resolved via lambda) + encrypted_b = field.process_bind_param(decrypted_with_prev, dialect) + assert encrypted_b is not None + assert encrypted_b != encrypted_a # Different ciphertext + + # Step 5: Verify decryption with KEY_B works + decrypted_b = field.process_result_value(encrypted_b, dialect) + assert decrypted_b == test_value + + # Step 6: Verify KEY_A can no longer decrypt the new ciphertext + self.app.config["SECRET_KEY"] = key_a + with pytest.raises(ValueError, match="Invalid decryption key"): + field.process_result_value(encrypted_b, dialect) + + # Restore original key + self.app.config["SECRET_KEY"] = key_a