diff --git a/caravel/models.py b/caravel/models.py index 779605e69c1..5396356aeba 100644 --- a/caravel/models.py +++ b/caravel/models.py @@ -955,7 +955,14 @@ class DruidCluster(Model, AuditMixinNullable): return json.loads(requests.get(endpoint).text) + def get_druid_version(self): + endpoint = ( + "http://{obj.coordinator_host}:{obj.coordinator_port}/status" + ).format(obj=self) + return json.loads(requests.get(endpoint).text)['version'] + def refresh_datasources(self): + self.druid_version = self.get_druid_version() for datasource in self.get_datasources(): if datasource not in config.get('DRUID_DATA_SOURCE_BLACKLIST'): DruidDatasource.sync_to_db(datasource, self) @@ -1028,6 +1035,13 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable): if m.metric_name == metric_name ][0] + def version_higher(self, v1, v2): + v1nums = [int(n) for n in v1.split('.')] + v2nums = [int(n) for n in v2.split('.')] + return v1nums[0] > v2nums[0] or \ + (v1nums[0] == v2nums[0] and v1nums[1] > v2nums[1]) or \ + (v1nums[0] == v2nums[0] and v1nums[1] == v2nums[1] and v1nums[2] > v2nums[2]) + def latest_metadata(self): """Returns segment metadata from the latest segment""" client = self.cluster.get_pydruid_client() @@ -1040,8 +1054,9 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable): # we need to set this interval to more than 1 day ago to exclude # realtime segments, which trigged a bug (fixed in druid 0.8.2). # https://groups.google.com/forum/#!topic/druid-user/gVCqqspHqOQ + start = (0 if self.version_higher(self.cluster.druid_version, '0.8.2') else 1) intervals = (max_time - timedelta(days=7)).isoformat() + '/' - intervals += (max_time - timedelta(days=1)).isoformat() + intervals += (max_time - timedelta(days=start)).isoformat() segment_metadata = client.segment_metadata( datasource=self.datasource_name, intervals=intervals) diff --git a/tests/core_tests.py b/tests/core_tests.py index 598b7d6e068..6b60a61ade6 100644 --- a/tests/core_tests.py +++ b/tests/core_tests.py @@ -355,6 +355,7 @@ class DruidTests(CaravelTestCase): db.session.add(cluster) cluster.get_datasources = Mock(return_value=['test_datasource']) + cluster.get_druid_version = Mock(return_value='0.9.1') cluster.refresh_datasources() datasource_id = cluster.datasources[0].id db.session.commit()