Files
superset2/superset/utils/slack.py
Joe Li 909acdbae4 chore(reports): deprecate Slack v1, default ALERT_REPORT_SLACK_V2 to True, harden v2 tests
Flips the ALERT_REPORT_SLACK_V2 feature flag default to True so the v2
auto-upgrade path runs out of the box, and adds one-shot DeprecationWarning
+ logger.warning emissions when v1 still runs (flag explicitly off, or bot
missing the channels:read scope). Slack retired the legacy files.upload
endpoint in 2025, so v1 file uploads are already broken at the API level —
only text-only chat_postMessage sends still succeed via the legacy path.

The bulk of the change is bulletproof unit-test coverage for SlackV2Notification
ahead of v1 removal in the next major:

- files_upload_v2 invocation with PNG (single + multiple), CSV, and PDF,
  asserting channel, file, title, filename, and initial_comment kwargs
- multi-channel fan-out (3 channels x 2 files = 6 uploads) and text-only
  multi-channel chat_postMessage
- inline-file precedence (CSV beats screenshots beats PDF)
- parametrized exception mapping across 7 slack_sdk error types -> the
  4 NotificationException subclasses
- statsd .ok and .warning gauge emission via the @statsd_gauge decorator
- execution_id propagation from g.logs_context to the success log, plus
  the falsy g.logs_context fallback path
- end-to-end auto-upgrade round-trip: v1 SLACK recipient with channel
  names raises SlackV1NotificationError -> update_report_schedule_slack_v2
  rewrites the row to channel IDs -> SlackV2Notification fast-paths the
  next send with no further channel resolution
- should_use_v2_api() warning behavior: deprecation warning emitted exactly
  once across multiple calls in both the flag-off and scope-missing paths,
  with the scope-missing logger.warning continuing to fire each call so
  operators see the actionable scope hint in their report-execution logs

Also locks in current behavior of the @backoff.on_exception(SlackApiError, ...)
decorator on send(): because send() catches every SlackApiError internally
and re-raises as NotificationUnprocessableException, backoff never sees the
target exception type and no retries actually fire. Test asserts call_count
== 1 with a docstring marking this as a known design issue to address
separately.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 12:41:28 -07:00

247 lines
8.4 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import warnings
from typing import Callable, Optional
from flask import current_app as app
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from slack_sdk.http_retry.builtin_handlers import RateLimitErrorRetryHandler
from superset import feature_flag_manager
from superset.exceptions import SupersetException
from superset.extensions import cache_manager
from superset.reports.schemas import SlackChannelSchema
from superset.utils import cache as cache_util
from superset.utils.backports import StrEnum
from superset.utils.core import recipients_string_to_list
logger = logging.getLogger(__name__)
_SLACK_V1_DEPRECATION_MESSAGE = (
"Slack v1 (the legacy `Slack` recipient type and `files.upload` API) is "
"deprecated and will be removed in the next major release. Slack retired "
"the `files.upload` endpoint in 2025, so v1 file uploads no longer work; "
"only text-only `chat_postMessage` sends still succeed. Grant your Slack "
"bot the `channels:read` scope so existing v1 recipients can be "
"auto-upgraded to SlackV2 on their next send."
)
_v1_flag_off_warning_emitted = False
_v1_scope_missing_warning_emitted = False
class SlackChannelTypes(StrEnum):
PUBLIC = "public_channel"
PRIVATE = "private_channel"
class SlackClientError(Exception):
pass
def get_slack_client() -> WebClient:
token: str = app.config["SLACK_API_TOKEN"]
if callable(token):
token = token()
client = WebClient(token=token, proxy=app.config["SLACK_PROXY"])
max_retry_count = app.config.get("SLACK_API_RATE_LIMIT_RETRY_COUNT", 2)
rate_limit_handler = RateLimitErrorRetryHandler(max_retry_count=max_retry_count)
client.retry_handlers.append(rate_limit_handler)
logger.debug("Slack client configured with %d rate limit retries", max_retry_count)
return client
@cache_util.memoized_func(
key="slack_conversations_list",
cache=cache_manager.cache,
)
def get_channels() -> list[SlackChannelSchema]:
"""
Retrieves a list of all conversations accessible by the bot
from the Slack API, and caches results (to avoid rate limits).
The Slack API does not provide search so to apply a search use
get_channels_with_search instead.
"""
client = get_slack_client()
channel_schema = SlackChannelSchema()
channels: list[SlackChannelSchema] = []
extra_params = {"types": ",".join(SlackChannelTypes)}
cursor = None
page_count = 0
logger.info("Starting Slack channels fetch")
try:
while True:
page_count += 1
response = client.conversations_list(
limit=999, cursor=cursor, exclude_archived=True, **extra_params
)
page_channels = response.data["channels"]
channels.extend(channel_schema.load(channel) for channel in page_channels)
logger.debug(
"Fetched page %d: %d channels (total: %d)",
page_count,
len(page_channels),
len(channels),
)
cursor = response.data.get("response_metadata", {}).get("next_cursor")
if not cursor:
break
logger.info(
"Successfully fetched %d Slack channels in %d pages",
len(channels),
page_count,
)
return channels
except SlackApiError as ex:
logger.error(
"Failed to fetch Slack channels after %d pages: %s",
page_count,
str(ex),
exc_info=True,
)
raise
def get_channels_with_search(
search_string: str = "",
types: Optional[list[SlackChannelTypes]] = None,
exact_match: bool = False,
force: bool = False,
) -> list[SlackChannelSchema]:
"""
The slack api is paginated but does not include search, so we need to fetch
all channels and filter them ourselves
This will search by slack name or id
"""
try:
channels = get_channels(
force=force,
cache_timeout=app.config["SLACK_CACHE_TIMEOUT"],
)
except SlackApiError as ex:
# Check if it's a rate limit error
status_code = getattr(ex.response, "status_code", None)
if status_code == 429:
raise SupersetException(
f"Slack API rate limit exceeded: {ex}. "
"For large workspaces, consider increasing "
"SLACK_API_RATE_LIMIT_RETRY_COUNT"
) from ex
raise SupersetException(f"Failed to list channels: {ex}") from ex
except SlackClientError as ex:
raise SupersetException(f"Failed to list channels: {ex}") from ex
if types and not len(types) == len(SlackChannelTypes):
conditions: list[Callable[[SlackChannelSchema], bool]] = []
if SlackChannelTypes.PUBLIC in types:
conditions.append(lambda channel: not channel["is_private"])
if SlackChannelTypes.PRIVATE in types:
conditions.append(lambda channel: channel["is_private"])
channels = [
channel for channel in channels if any(cond(channel) for cond in conditions)
]
# The search string can be multiple channels separated by commas
if search_string:
search_array = recipients_string_to_list(search_string)
channels = [
channel
for channel in channels
if any(
(
search.lower() == channel["name"].lower()
or search.lower() == channel["id"].lower()
if exact_match
else (
search.lower() in channel["name"].lower()
or search.lower() in channel["id"].lower()
)
)
for search in search_array
)
]
return channels
def should_use_v2_api() -> bool:
global _v1_flag_off_warning_emitted, _v1_scope_missing_warning_emitted # noqa: PLW0603
if not feature_flag_manager.is_feature_enabled("ALERT_REPORT_SLACK_V2"):
if not _v1_flag_off_warning_emitted:
_v1_flag_off_warning_emitted = True
warnings.warn(
_SLACK_V1_DEPRECATION_MESSAGE, DeprecationWarning, stacklevel=2
)
logger.warning(
"ALERT_REPORT_SLACK_V2 is disabled; %s",
_SLACK_V1_DEPRECATION_MESSAGE,
)
return False
try:
client = get_slack_client()
client.conversations_list()
logger.info("Slack API v2 is available")
return True
except SlackApiError:
if not _v1_scope_missing_warning_emitted:
_v1_scope_missing_warning_emitted = True
warnings.warn(
_SLACK_V1_DEPRECATION_MESSAGE, DeprecationWarning, stacklevel=2
)
logger.warning(
"Slack bot is missing the `channels:read` scope; falling back to "
"the deprecated v1 API. %s",
_SLACK_V1_DEPRECATION_MESSAGE,
)
return False
def get_user_avatar(email: str, client: WebClient = None) -> str:
client = client or get_slack_client()
try:
response = client.users_lookupByEmail(email=email)
except Exception as ex:
raise SlackClientError(f"Failed to lookup user by email: {email}") from ex
user = response.data.get("user")
if user is None:
raise SlackClientError("No user found with that email.")
profile = user.get("profile")
if profile is None:
raise SlackClientError("User found but no profile available.")
avatar_url = profile.get("image_192")
if avatar_url is None:
raise SlackClientError("Profile image is not available.")
return avatar_url