mirror of
https://github.com/apache/superset.git
synced 2026-06-03 14:49:23 +00:00
fix(alerts): improve Slack API rate limiting for large workspaces (#35622)
This commit is contained in:
@@ -65,6 +65,22 @@ To send alerts and reports to Slack channels, you need to create a new Slack App
|
||||
|
||||
Note: when you configure an alert or a report, the Slack channel list takes channel names without the leading '#' e.g. use `alerts` instead of `#alerts`.
|
||||
|
||||
#### Large Slack Workspaces (10k+ channels)
|
||||
|
||||
For workspaces with many channels, fetching the complete channel list can take several minutes and may encounter Slack API rate limits. Add the following to your `superset_config.py`:
|
||||
|
||||
```python
|
||||
from datetime import timedelta
|
||||
|
||||
# Increase cache timeout to reduce API calls
|
||||
# Default: 1 day (86400 seconds)
|
||||
SLACK_CACHE_TIMEOUT = int(timedelta(days=2).total_seconds())
|
||||
|
||||
# Increase retry count for rate limit errors
|
||||
# Default: 2
|
||||
SLACK_API_RATE_LIMIT_RETRY_COUNT = 5
|
||||
```
|
||||
|
||||
### Kubernetes-specific
|
||||
|
||||
- You must have a `celery beat` pod running. If you're using the chart included in the GitHub repository under [helm/superset](https://github.com/apache/superset/tree/master/helm/superset), you need to put `supersetCeleryBeat.enabled = true` in your values override.
|
||||
|
||||
@@ -1744,6 +1744,11 @@ SLACK_API_TOKEN: Callable[[], str] | str | None = None
|
||||
SLACK_PROXY = None
|
||||
SLACK_CACHE_TIMEOUT = int(timedelta(days=1).total_seconds())
|
||||
|
||||
# Maximum number of retries when Slack API returns rate limit errors
|
||||
# Default: 2
|
||||
# For workspaces with 10k+ channels, consider increasing to 10
|
||||
SLACK_API_RATE_LIMIT_RETRY_COUNT = 2
|
||||
|
||||
# The webdriver to use for generating reports. Use one of the following
|
||||
# firefox
|
||||
# Requires: geckodriver and firefox installations
|
||||
|
||||
@@ -26,10 +26,23 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
@celery_app.task(name="slack.cache_channels")
|
||||
def cache_channels() -> None:
|
||||
cache_timeout = current_app.config["SLACK_CACHE_TIMEOUT"]
|
||||
retry_count = current_app.config.get("SLACK_API_RATE_LIMIT_RETRY_COUNT", 2)
|
||||
|
||||
logger.info(
|
||||
"Starting Slack channels cache warm-up task "
|
||||
"(cache_timeout=%ds, retry_count=%d)",
|
||||
cache_timeout,
|
||||
retry_count,
|
||||
)
|
||||
|
||||
try:
|
||||
get_channels(
|
||||
force=True, cache_timeout=current_app.config["SLACK_CACHE_TIMEOUT"]
|
||||
)
|
||||
get_channels(force=True, cache_timeout=cache_timeout)
|
||||
except Exception as ex:
|
||||
logger.exception("An error occurred while caching Slack channels: %s", ex)
|
||||
logger.exception(
|
||||
"Failed to cache Slack channels: %s. "
|
||||
"If this is due to rate limiting, consider increasing "
|
||||
"SLACK_API_RATE_LIMIT_RETRY_COUNT.",
|
||||
str(ex),
|
||||
)
|
||||
raise
|
||||
|
||||
@@ -50,9 +50,12 @@ def get_slack_client() -> WebClient:
|
||||
token = token()
|
||||
client = WebClient(token=token, proxy=app.config["SLACK_PROXY"])
|
||||
|
||||
rate_limit_handler = RateLimitErrorRetryHandler(max_retry_count=2)
|
||||
max_retry_count = app.config.get("SLACK_API_RATE_LIMIT_RETRY_COUNT", 2)
|
||||
rate_limit_handler = RateLimitErrorRetryHandler(max_retry_count=max_retry_count)
|
||||
client.retry_handlers.append(rate_limit_handler)
|
||||
|
||||
logger.debug("Slack client configured with %d rate limit retries", max_retry_count)
|
||||
|
||||
return client
|
||||
|
||||
|
||||
@@ -73,19 +76,45 @@ def get_channels() -> list[SlackChannelSchema]:
|
||||
channels: list[SlackChannelSchema] = []
|
||||
extra_params = {"types": ",".join(SlackChannelTypes)}
|
||||
cursor = None
|
||||
page_count = 0
|
||||
|
||||
while True:
|
||||
response = client.conversations_list(
|
||||
limit=999, cursor=cursor, exclude_archived=True, **extra_params
|
||||
)
|
||||
channels.extend(
|
||||
channel_schema.load(channel) for channel in response.data["channels"]
|
||||
)
|
||||
cursor = response.data.get("response_metadata", {}).get("next_cursor")
|
||||
if not cursor:
|
||||
break
|
||||
logger.info("Starting Slack channels fetch")
|
||||
|
||||
return channels
|
||||
try:
|
||||
while True:
|
||||
page_count += 1
|
||||
|
||||
response = client.conversations_list(
|
||||
limit=999, cursor=cursor, exclude_archived=True, **extra_params
|
||||
)
|
||||
page_channels = response.data["channels"]
|
||||
channels.extend(channel_schema.load(channel) for channel in page_channels)
|
||||
|
||||
logger.debug(
|
||||
"Fetched page %d: %d channels (total: %d)",
|
||||
page_count,
|
||||
len(page_channels),
|
||||
len(channels),
|
||||
)
|
||||
|
||||
cursor = response.data.get("response_metadata", {}).get("next_cursor")
|
||||
if not cursor:
|
||||
break
|
||||
|
||||
logger.info(
|
||||
"Successfully fetched %d Slack channels in %d pages",
|
||||
len(channels),
|
||||
page_count,
|
||||
)
|
||||
return channels
|
||||
except SlackApiError as ex:
|
||||
logger.error(
|
||||
"Failed to fetch Slack channels after %d pages: %s",
|
||||
page_count,
|
||||
str(ex),
|
||||
exc_info=True,
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
def get_channels_with_search(
|
||||
@@ -104,7 +133,17 @@ def get_channels_with_search(
|
||||
force=force,
|
||||
cache_timeout=app.config["SLACK_CACHE_TIMEOUT"],
|
||||
)
|
||||
except (SlackClientError, SlackApiError) as ex:
|
||||
except SlackApiError as ex:
|
||||
# Check if it's a rate limit error
|
||||
status_code = getattr(ex.response, "status_code", None)
|
||||
if status_code == 429:
|
||||
raise SupersetException(
|
||||
f"Slack API rate limit exceeded: {ex}. "
|
||||
"For large workspaces, consider increasing "
|
||||
"SLACK_API_RATE_LIMIT_RETRY_COUNT"
|
||||
) from ex
|
||||
raise SupersetException(f"Failed to list channels: {ex}") from ex
|
||||
except SlackClientError as ex:
|
||||
raise SupersetException(f"Failed to list channels: {ex}") from ex
|
||||
|
||||
if types and not len(types) == len(SlackChannelTypes):
|
||||
|
||||
Reference in New Issue
Block a user