chore: improve mask/unmask encrypted_extra (#29943)

This commit is contained in:
Beto Dealmeida
2024-08-22 16:45:32 -04:00
committed by GitHub
parent bf94370d38
commit 4b59e42d3f
13 changed files with 490 additions and 151 deletions

View File

@@ -14,6 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import copy
import decimal
import logging
import uuid
@@ -24,8 +25,10 @@ import numpy as np
import pandas as pd
import simplejson
from flask_babel.speaklater import LazyString
from jsonpath_ng import parse
from simplejson import JSONDecodeError
from superset.constants import PASSWORD_MASK
from superset.utils.dates import datetime_to_epoch, EPOCH
logging.getLogger("MARKDOWN").setLevel(logging.INFO)
@@ -243,3 +246,56 @@ def loads(
except JSONDecodeError as ex:
logger.error("JSON is not valid %s", str(ex), exc_info=True)
raise
def redact_sensitive(
payload: dict[str, Any],
sensitive_fields: set[str],
) -> dict[str, Any]:
"""
Redacts sensitive fields from a payload.
:param payload: The payload to redact
:param sensitive_fields: The set of fields to redact, as JSONPath expressions
:returns: The redacted payload
"""
redacted_payload = copy.deepcopy(payload)
for json_path in sensitive_fields:
jsonpath_expr = parse(json_path)
for match in jsonpath_expr.find(redacted_payload):
match.context.value[match.path.fields[0]] = PASSWORD_MASK
return redacted_payload
def reveal_sensitive(
old_payload: dict[str, Any],
new_payload: dict[str, Any],
sensitive_fields: set[str],
) -> dict[str, Any]:
"""
Reveals sensitive fields from a payload when not modified.
This allows users to perform deep edits on a payload without having to provide
sensitive information. The old payload is sent to the user with any sensitive fields
masked, and when the user sends back a modified payload, any fields that were masked
are replaced with the original values from the old payload.
For now this is only used to edit `encrypted_extra` fields in the database.
:param old_payload: The old payload to reveal
:param new_payload: The new payload to reveal
:param sensitive_fields: The set of fields to reveal, as JSONPath expressions
:returns: The revealed payload
"""
revealed_payload = copy.deepcopy(new_payload)
for json_path in sensitive_fields:
jsonpath_expr = parse(json_path)
for match in jsonpath_expr.find(revealed_payload):
if match.value == PASSWORD_MASK:
old_value = match.full_path.find(old_payload)
match.context.value[match.path.fields[0]] = old_value[0].value
return revealed_payload