mirror of
https://github.com/apache/superset.git
synced 2026-04-18 23:55:00 +00:00
fix(encryption): resolve SECRET_KEY lazily to fix silent re-encrypt-secrets failures (#37982)
This commit is contained in:
@@ -54,7 +54,7 @@ class SQLAlchemyUtilsAdapter( # pylint: disable=too-few-public-methods
|
||||
**kwargs: Optional[dict[str, Any]],
|
||||
) -> TypeDecorator:
|
||||
if app_config:
|
||||
return EncryptedType(*args, app_config["SECRET_KEY"], **kwargs)
|
||||
return EncryptedType(*args, lambda: app_config["SECRET_KEY"], **kwargs)
|
||||
|
||||
raise Exception( # pylint: disable=broad-exception-raised
|
||||
"Missing app_config kwarg"
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
# under the License.
|
||||
from typing import Any, Optional
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import String, TypeDecorator
|
||||
from sqlalchemy_utils import EncryptedType
|
||||
from sqlalchemy_utils.types.encrypted.encrypted_type import StringEncryptedType
|
||||
@@ -54,7 +55,8 @@ class EncryptedFieldTest(SupersetTestCase):
|
||||
def test_create_field(self):
|
||||
field = encrypted_field_factory.create(String(1024))
|
||||
assert isinstance(field, EncryptedType)
|
||||
assert self.app.config["SECRET_KEY"] == field.key
|
||||
assert callable(field.key)
|
||||
assert self.app.config["SECRET_KEY"] == field.key()
|
||||
|
||||
def test_custom_adapter(self):
|
||||
self.app.config["SQLALCHEMY_ENCRYPTED_FIELD_TYPE_ADAPTER"] = (
|
||||
@@ -86,3 +88,75 @@ class EncryptedFieldTest(SupersetTestCase):
|
||||
" was not created using the"
|
||||
" encrypted_field_factory"
|
||||
)
|
||||
|
||||
def test_lazy_key_resolution(self):
|
||||
"""
|
||||
Verify that the encryption key is resolved lazily at runtime,
|
||||
not captured statically at field creation time.
|
||||
"""
|
||||
original_key = self.app.config["SECRET_KEY"]
|
||||
field = encrypted_field_factory.create(String(1024))
|
||||
|
||||
# Key should initially resolve to the current SECRET_KEY
|
||||
assert callable(field.key)
|
||||
assert field.key() == original_key
|
||||
|
||||
# Simulate a key change (e.g. config override, env var update)
|
||||
new_key = "ROTATED_TEST_KEY_12345"
|
||||
self.app.config["SECRET_KEY"] = new_key
|
||||
|
||||
# The field's key should now resolve to the new value
|
||||
assert field.key() == new_key
|
||||
|
||||
# Restore original key
|
||||
self.app.config["SECRET_KEY"] = original_key
|
||||
assert field.key() == original_key
|
||||
|
||||
def test_secret_key_rotation(self):
|
||||
"""
|
||||
End-to-end test: encrypt data with KEY_A, rotate to KEY_B,
|
||||
run re-encryption, and verify data is accessible under KEY_B.
|
||||
"""
|
||||
from sqlalchemy.engine import make_url
|
||||
|
||||
key_a = self.app.config["SECRET_KEY"]
|
||||
key_b = "NEW_ROTATION_TEST_KEY_67890"
|
||||
test_value = "super_secret_password_123"
|
||||
|
||||
field = encrypted_field_factory.create(String(1024))
|
||||
dialect = make_url("sqlite://").get_dialect()
|
||||
|
||||
# Step 1: Encrypt with KEY_A
|
||||
encrypted_a = field.process_bind_param(test_value, dialect)
|
||||
assert encrypted_a is not None
|
||||
assert encrypted_a != test_value
|
||||
|
||||
# Step 2: Verify decryption with KEY_A works
|
||||
decrypted = field.process_result_value(encrypted_a, dialect)
|
||||
assert decrypted == test_value
|
||||
|
||||
# Step 3: Rotate key to KEY_B
|
||||
self.app.config["SECRET_KEY"] = key_b
|
||||
|
||||
# Step 4: Re-encrypt with KEY_B (simulating SecretsMigrator logic)
|
||||
# Decrypt using previous key
|
||||
previous_field = EncryptedType(type_in=field.underlying_type, key=key_a)
|
||||
decrypted_with_prev = previous_field.process_result_value(encrypted_a, dialect)
|
||||
assert decrypted_with_prev == test_value
|
||||
|
||||
# Re-encrypt using current key (KEY_B, resolved via lambda)
|
||||
encrypted_b = field.process_bind_param(decrypted_with_prev, dialect)
|
||||
assert encrypted_b is not None
|
||||
assert encrypted_b != encrypted_a # Different ciphertext
|
||||
|
||||
# Step 5: Verify decryption with KEY_B works
|
||||
decrypted_b = field.process_result_value(encrypted_b, dialect)
|
||||
assert decrypted_b == test_value
|
||||
|
||||
# Step 6: Verify KEY_A can no longer decrypt the new ciphertext
|
||||
self.app.config["SECRET_KEY"] = key_a
|
||||
with pytest.raises(ValueError, match="Invalid decryption key"):
|
||||
field.process_result_value(encrypted_b, dialect)
|
||||
|
||||
# Restore original key
|
||||
self.app.config["SECRET_KEY"] = key_a
|
||||
|
||||
Reference in New Issue
Block a user