diff --git a/superset/bin/superset b/superset/bin/superset index 3b5b52906ff..3d711fc826e 100755 --- a/superset/bin/superset +++ b/superset/bin/superset @@ -112,14 +112,18 @@ def load_examples(load_test_data): print("Loading [Unicode test data]") data.load_unicode_test_data() -@manager.command -def refresh_druid(): - """Refresh all druid datasources""" +@manager.option( + '-d', '--datasource', + help=( + "Specify which datasource name to load, if omitted, all " + "datasources will be refreshed")) +def refresh_druid(datasource): + """Refresh druid datasources""" session = db.session() from superset import models for cluster in session.query(models.DruidCluster).all(): try: - cluster.refresh_datasources() + cluster.refresh_datasources(datasource_name=datasource) except Exception as e: print( "Error while processing cluster '{}'\n{}".format( diff --git a/superset/models.py b/superset/models.py index c91bfff3224..4557317851c 100644 --- a/superset/models.py +++ b/superset/models.py @@ -1522,11 +1522,16 @@ class DruidCluster(Model, AuditMixinNullable): ).format(obj=self) return json.loads(requests.get(endpoint).text)['version'] - def refresh_datasources(self): + def refresh_datasources(self, datasource_name=None): + """Refresh metadata of all datasources in the cluster + + If ``datasource_name`` is specified, only that datasource is updated + """ self.druid_version = self.get_druid_version() for datasource in self.get_datasources(): if datasource not in config.get('DRUID_DATA_SOURCE_BLACKLIST'): - DruidDatasource.sync_to_db(datasource, self) + if not datasource_name or datasource_name == datasource: + DruidDatasource.sync_to_db(datasource, self) @property def perm(self): @@ -1670,15 +1675,35 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable): # we need to set this interval to more than 1 day ago to exclude # realtime segments, which trigged a bug (fixed in druid 0.8.2). # https://groups.google.com/forum/#!topic/druid-user/gVCqqspHqOQ - start = (0 if self.version_higher(self.cluster.druid_version, '0.8.2') else 1) - intervals = (max_time - timedelta(days=7)).isoformat() + '/' - intervals += (max_time - timedelta(days=start)).isoformat() - segment_metadata = client.segment_metadata( - datasource=self.datasource_name, - intervals=intervals) + lbound = (max_time - timedelta(days=7)).isoformat() + rbound = max_time.isoformat() + if not self.version_higher(self.cluster.druid_version, '0.8.2'): + rbound = (max_time - timedelta(1)).isoformat() + segment_metadata = None + try: + segment_metadata = client.segment_metadata( + datasource=self.datasource_name, + intervals=lbound + '/' + rbound) + except Exception as e: + logging.warning("Failed first attempt to get latest segment") + logging.exception(e) + if not segment_metadata: + # if no segments in the past 7 days, look at all segments + lbound = datetime(1901, 1, 1).isoformat()[:10] + rbound = datetime(2050, 1, 1).isoformat()[:10] + if not self.version_higher(self.cluster.druid_version, '0.8.2'): + rbound = datetime.now().isoformat()[:10] + try: + segment_metadata = client.segment_metadata( + datasource=self.datasource_name, + intervals=lbound + '/' + rbound) + except Exception as e: + logging.warning("Failed 2nd attempt to get latest segment") + logging.exception(e) if segment_metadata: return segment_metadata[-1]['columns'] + def generate_metrics(self): for col in self.columns: col.generate_metrics() @@ -1774,6 +1799,7 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable): cols = datasource.latest_metadata() if not cols: + logging.error("Failed at fetching the latest segment") return for col in cols: col_obj = (