mirror of
https://github.com/apache/superset.git
synced 2026-04-19 08:04:53 +00:00
[druid] Allow custom druid postaggregators (#3146)
* [druid] Allow custom druid postaggregators Also, fix the postaggregation for approxHistogram quantiles so it adds the dependent field and that can show up in the graphs/tables. In general, postAggregators add significant power, we should probably support including custom postAggregators. Plywood has standard postAggregators here, and a customAggregator escape hatch that allows you to define custom postAggregators. This commit adds a similar capability for Superset and a additional field/fields/fieldName breakdown of the typical naming for dependent aggregations, which should make it significantly easier to develop approxHistogram and custom postAggregation-required dashboards. * [druid] Minor style cleanup in tests file. * [druid] Apply code review suggestions * break out CustomPostAggregator into separate class. This just cleans up the creation of the postaggregator a little bit. * minor style issues. * move the function around so the git diff is more readable
This commit is contained in:
committed by
Maxime Beauchemin
parent
ad5a4389a2
commit
1e325d9645
@@ -50,6 +50,13 @@ class JavascriptPostAggregator(Postaggregator):
|
||||
self.name = name
|
||||
|
||||
|
||||
class CustomPostAggregator(Postaggregator):
|
||||
"""A way to allow users to specify completely custom PostAggregators"""
|
||||
def __init__(self, name, post_aggregator):
|
||||
self.name = name
|
||||
self.post_aggregator = post_aggregator
|
||||
|
||||
|
||||
class DruidCluster(Model, AuditMixinNullable):
|
||||
|
||||
"""ORM object referencing the Druid clusters"""
|
||||
@@ -690,6 +697,75 @@ class DruidDatasource(Model, BaseDatasource):
|
||||
period_name).total_seconds() * 1000
|
||||
return granularity
|
||||
|
||||
@staticmethod
|
||||
def _metrics_and_post_aggs(metrics, metrics_dict):
|
||||
all_metrics = []
|
||||
post_aggs = {}
|
||||
|
||||
def recursive_get_fields(_conf):
|
||||
_type = _conf.get('type')
|
||||
_field = _conf.get('field')
|
||||
_fields = _conf.get('fields')
|
||||
|
||||
field_names = []
|
||||
if _type in ['fieldAccess', 'hyperUniqueCardinality',
|
||||
'quantile', 'quantiles']:
|
||||
field_names.append(_conf.get('fieldName', ''))
|
||||
|
||||
if _field:
|
||||
field_names += recursive_get_fields(_field)
|
||||
|
||||
if _fields:
|
||||
for _f in _fields:
|
||||
field_names += recursive_get_fields(_f)
|
||||
|
||||
return list(set(field_names))
|
||||
|
||||
for metric_name in metrics:
|
||||
metric = metrics_dict[metric_name]
|
||||
if metric.metric_type != 'postagg':
|
||||
all_metrics.append(metric_name)
|
||||
else:
|
||||
mconf = metric.json_obj
|
||||
all_metrics += recursive_get_fields(mconf)
|
||||
all_metrics += mconf.get('fieldNames', [])
|
||||
if mconf.get('type') == 'javascript':
|
||||
post_aggs[metric_name] = JavascriptPostAggregator(
|
||||
name=mconf.get('name', ''),
|
||||
field_names=mconf.get('fieldNames', []),
|
||||
function=mconf.get('function', ''))
|
||||
elif mconf.get('type') == 'quantile':
|
||||
post_aggs[metric_name] = Quantile(
|
||||
mconf.get('name', ''),
|
||||
mconf.get('probability', ''),
|
||||
)
|
||||
elif mconf.get('type') == 'quantiles':
|
||||
post_aggs[metric_name] = Quantiles(
|
||||
mconf.get('name', ''),
|
||||
mconf.get('probabilities', ''),
|
||||
)
|
||||
elif mconf.get('type') == 'fieldAccess':
|
||||
post_aggs[metric_name] = Field(mconf.get('name'))
|
||||
elif mconf.get('type') == 'constant':
|
||||
post_aggs[metric_name] = Const(
|
||||
mconf.get('value'),
|
||||
output_name=mconf.get('name', '')
|
||||
)
|
||||
elif mconf.get('type') == 'hyperUniqueCardinality':
|
||||
post_aggs[metric_name] = HyperUniqueCardinality(
|
||||
mconf.get('name')
|
||||
)
|
||||
elif mconf.get('type') == 'arithmetic':
|
||||
post_aggs[metric_name] = Postaggregator(
|
||||
mconf.get('fn', "/"),
|
||||
mconf.get('fields', []),
|
||||
mconf.get('name', ''))
|
||||
else:
|
||||
post_aggs[metric_name] = CustomPostAggregator(
|
||||
mconf.get('name', ''),
|
||||
mconf)
|
||||
return all_metrics, post_aggs
|
||||
|
||||
def values_for_column(self,
|
||||
column_name,
|
||||
limit=10000):
|
||||
@@ -749,61 +825,10 @@ class DruidDatasource(Model, BaseDatasource):
|
||||
|
||||
query_str = ""
|
||||
metrics_dict = {m.metric_name: m for m in self.metrics}
|
||||
all_metrics = []
|
||||
post_aggs = {}
|
||||
|
||||
columns_dict = {c.column_name: c for c in self.columns}
|
||||
|
||||
def recursive_get_fields(_conf):
|
||||
_fields = _conf.get('fields', [])
|
||||
field_names = []
|
||||
for _f in _fields:
|
||||
_type = _f.get('type')
|
||||
if _type in ['fieldAccess', 'hyperUniqueCardinality']:
|
||||
field_names.append(_f.get('fieldName'))
|
||||
elif _type == 'arithmetic':
|
||||
field_names += recursive_get_fields(_f)
|
||||
return list(set(field_names))
|
||||
|
||||
for metric_name in metrics:
|
||||
metric = metrics_dict[metric_name]
|
||||
if metric.metric_type != 'postagg':
|
||||
all_metrics.append(metric_name)
|
||||
else:
|
||||
mconf = metric.json_obj
|
||||
all_metrics += recursive_get_fields(mconf)
|
||||
all_metrics += mconf.get('fieldNames', [])
|
||||
if mconf.get('type') == 'javascript':
|
||||
post_aggs[metric_name] = JavascriptPostAggregator(
|
||||
name=mconf.get('name', ''),
|
||||
field_names=mconf.get('fieldNames', []),
|
||||
function=mconf.get('function', ''))
|
||||
elif mconf.get('type') == 'quantile':
|
||||
post_aggs[metric_name] = Quantile(
|
||||
mconf.get('name', ''),
|
||||
mconf.get('probability', ''),
|
||||
)
|
||||
elif mconf.get('type') == 'quantiles':
|
||||
post_aggs[metric_name] = Quantiles(
|
||||
mconf.get('name', ''),
|
||||
mconf.get('probabilities', ''),
|
||||
)
|
||||
elif mconf.get('type') == 'fieldAccess':
|
||||
post_aggs[metric_name] = Field(mconf.get('name'))
|
||||
elif mconf.get('type') == 'constant':
|
||||
post_aggs[metric_name] = Const(
|
||||
mconf.get('value'),
|
||||
output_name=mconf.get('name', '')
|
||||
)
|
||||
elif mconf.get('type') == 'hyperUniqueCardinality':
|
||||
post_aggs[metric_name] = HyperUniqueCardinality(
|
||||
mconf.get('name')
|
||||
)
|
||||
else:
|
||||
post_aggs[metric_name] = Postaggregator(
|
||||
mconf.get('fn', "/"),
|
||||
mconf.get('fields', []),
|
||||
mconf.get('name', ''))
|
||||
all_metrics, post_aggs = self._metrics_and_post_aggs(metrics, metrics_dict)
|
||||
|
||||
aggregations = OrderedDict()
|
||||
for m in self.metrics:
|
||||
|
||||
@@ -11,8 +11,8 @@ import unittest
|
||||
from mock import Mock, patch
|
||||
|
||||
from superset import db, sm, security
|
||||
from superset.connectors.druid.models import DruidCluster, DruidDatasource
|
||||
from superset.connectors.druid.models import PyDruid
|
||||
from superset.connectors.druid.models import DruidMetric, DruidCluster, DruidDatasource
|
||||
from superset.connectors.druid.models import PyDruid, Quantile, Postaggregator
|
||||
|
||||
from .base_tests import SupersetTestCase
|
||||
|
||||
@@ -38,7 +38,7 @@ SEGMENT_METADATA = [{
|
||||
"metric1": {
|
||||
"type": "longSum",
|
||||
"name": "metric1",
|
||||
"fieldName": "metric1"}
|
||||
"fieldName": "metric1"},
|
||||
},
|
||||
"size": 300000,
|
||||
"numRows": 5000000
|
||||
@@ -318,6 +318,77 @@ class DruidTests(SupersetTestCase):
|
||||
permission=permission, view_menu=view_menu).first()
|
||||
assert pv is not None
|
||||
|
||||
def test_metrics_and_post_aggs(self):
|
||||
"""
|
||||
Test generation of metrics and post-aggregations from an initial list
|
||||
of superset metrics (which may include the results of either). This
|
||||
primarily tests that specifying a post-aggregator metric will also
|
||||
require the raw aggregation of the associated druid metric column.
|
||||
"""
|
||||
metrics_dict = {
|
||||
'unused_count': DruidMetric(
|
||||
metric_name='unused_count',
|
||||
verbose_name='COUNT(*)',
|
||||
metric_type='count',
|
||||
json=json.dumps({'type': 'count', 'name': 'unused_count'})),
|
||||
'some_sum': DruidMetric(
|
||||
metric_name='some_sum',
|
||||
verbose_name='SUM(*)',
|
||||
metric_type='sum',
|
||||
json=json.dumps({'type': 'sum', 'name': 'sum'})),
|
||||
'a_histogram': DruidMetric(
|
||||
metric_name='a_histogram',
|
||||
verbose_name='APPROXIMATE_HISTOGRAM(*)',
|
||||
metric_type='approxHistogramFold',
|
||||
json=json.dumps({'type': 'approxHistogramFold', 'name': 'a_histogram'})),
|
||||
'aCustomMetric': DruidMetric(
|
||||
metric_name='aCustomMetric',
|
||||
verbose_name='MY_AWESOME_METRIC(*)',
|
||||
metric_type='aCustomType',
|
||||
json=json.dumps({'type': 'customMetric', 'name': 'aCustomMetric'})),
|
||||
'quantile_p95': DruidMetric(
|
||||
metric_name='quantile_p95',
|
||||
verbose_name='P95(*)',
|
||||
metric_type='postagg',
|
||||
json=json.dumps({
|
||||
'type': 'quantile',
|
||||
'probability': 0.95,
|
||||
'name': 'p95',
|
||||
'fieldName': 'a_histogram'})),
|
||||
'aCustomPostAgg': DruidMetric(
|
||||
metric_name='aCustomPostAgg',
|
||||
verbose_name='CUSTOM_POST_AGG(*)',
|
||||
metric_type='postagg',
|
||||
json=json.dumps({
|
||||
'type': 'customPostAgg',
|
||||
'name': 'aCustomPostAgg',
|
||||
'field': {
|
||||
'type': 'fieldAccess',
|
||||
'fieldName': 'aCustomMetric'}})),
|
||||
}
|
||||
|
||||
metrics = ['some_sum']
|
||||
all_metrics, post_aggs = DruidDatasource._metrics_and_post_aggs(
|
||||
metrics, metrics_dict)
|
||||
|
||||
assert all_metrics == ['some_sum']
|
||||
assert post_aggs == {}
|
||||
|
||||
metrics = ['quantile_p95']
|
||||
all_metrics, post_aggs = DruidDatasource._metrics_and_post_aggs(
|
||||
metrics, metrics_dict)
|
||||
|
||||
result_postaggs = set(['quantile_p95'])
|
||||
assert all_metrics == ['a_histogram']
|
||||
assert set(post_aggs.keys()) == result_postaggs
|
||||
|
||||
metrics = ['aCustomPostAgg']
|
||||
all_metrics, post_aggs = DruidDatasource._metrics_and_post_aggs(
|
||||
metrics, metrics_dict)
|
||||
|
||||
result_postaggs = set(['aCustomPostAgg'])
|
||||
assert all_metrics == ['aCustomMetric']
|
||||
assert set(post_aggs.keys()) == result_postaggs
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
Reference in New Issue
Block a user