Improve Druid metadata fetching resilience (#1584)

This commit is contained in:
Maxime Beauchemin
2016-11-11 09:46:48 -08:00
committed by GitHub
parent d6bc354ff3
commit 96d32dd11f
2 changed files with 42 additions and 12 deletions

View File

@@ -112,14 +112,18 @@ def load_examples(load_test_data):
print("Loading [Unicode test data]")
data.load_unicode_test_data()
@manager.command
def refresh_druid():
"""Refresh all druid datasources"""
@manager.option(
'-d', '--datasource',
help=(
"Specify which datasource name to load, if omitted, all "
"datasources will be refreshed"))
def refresh_druid(datasource):
"""Refresh druid datasources"""
session = db.session()
from superset import models
for cluster in session.query(models.DruidCluster).all():
try:
cluster.refresh_datasources()
cluster.refresh_datasources(datasource_name=datasource)
except Exception as e:
print(
"Error while processing cluster '{}'\n{}".format(

View File

@@ -1522,11 +1522,16 @@ class DruidCluster(Model, AuditMixinNullable):
).format(obj=self)
return json.loads(requests.get(endpoint).text)['version']
def refresh_datasources(self):
def refresh_datasources(self, datasource_name=None):
"""Refresh metadata of all datasources in the cluster
If ``datasource_name`` is specified, only that datasource is updated
"""
self.druid_version = self.get_druid_version()
for datasource in self.get_datasources():
if datasource not in config.get('DRUID_DATA_SOURCE_BLACKLIST'):
DruidDatasource.sync_to_db(datasource, self)
if not datasource_name or datasource_name == datasource:
DruidDatasource.sync_to_db(datasource, self)
@property
def perm(self):
@@ -1670,15 +1675,35 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable):
# we need to set this interval to more than 1 day ago to exclude
# realtime segments, which trigged a bug (fixed in druid 0.8.2).
# https://groups.google.com/forum/#!topic/druid-user/gVCqqspHqOQ
start = (0 if self.version_higher(self.cluster.druid_version, '0.8.2') else 1)
intervals = (max_time - timedelta(days=7)).isoformat() + '/'
intervals += (max_time - timedelta(days=start)).isoformat()
segment_metadata = client.segment_metadata(
datasource=self.datasource_name,
intervals=intervals)
lbound = (max_time - timedelta(days=7)).isoformat()
rbound = max_time.isoformat()
if not self.version_higher(self.cluster.druid_version, '0.8.2'):
rbound = (max_time - timedelta(1)).isoformat()
segment_metadata = None
try:
segment_metadata = client.segment_metadata(
datasource=self.datasource_name,
intervals=lbound + '/' + rbound)
except Exception as e:
logging.warning("Failed first attempt to get latest segment")
logging.exception(e)
if not segment_metadata:
# if no segments in the past 7 days, look at all segments
lbound = datetime(1901, 1, 1).isoformat()[:10]
rbound = datetime(2050, 1, 1).isoformat()[:10]
if not self.version_higher(self.cluster.druid_version, '0.8.2'):
rbound = datetime.now().isoformat()[:10]
try:
segment_metadata = client.segment_metadata(
datasource=self.datasource_name,
intervals=lbound + '/' + rbound)
except Exception as e:
logging.warning("Failed 2nd attempt to get latest segment")
logging.exception(e)
if segment_metadata:
return segment_metadata[-1]['columns']
def generate_metrics(self):
for col in self.columns:
col.generate_metrics()
@@ -1774,6 +1799,7 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable):
cols = datasource.latest_metadata()
if not cols:
logging.error("Failed at fetching the latest segment")
return
for col in cols:
col_obj = (