[druid] adding support for dimensionspecs (#1545)

More about it here:
http://druid.io/docs/latest/querying/dimensionspecs.html

fixes https://github.com/airbnb/caravel/issues/1086
This commit is contained in:
Maxime Beauchemin
2016-11-07 17:05:41 -08:00
committed by GitHub
parent 4014a48f7d
commit e4bd1884d3
6 changed files with 71 additions and 19 deletions

View File

@@ -0,0 +1,22 @@
"""dim_spec
Revision ID: c611f2b591b8
Revises: ad4d656d92bc
Create Date: 2016-11-02 17:36:04.970448
"""
# revision identifiers, used by Alembic.
revision = 'c611f2b591b8'
down_revision = 'ad4d656d92bc'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column('columns', sa.Column('dimension_spec_json', sa.Text(), nullable=True))
def downgrade():
op.drop_column('columns', 'dimension_spec_json')

View File

@@ -1884,6 +1884,8 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable):
all_metrics = []
post_aggs = {}
columns_dict = {c.column_name: c for c in self.columns}
def recursive_get_fields(_conf):
_fields = _conf.get('fields', [])
field_names = []
@@ -1931,9 +1933,19 @@ class DruidDatasource(Model, AuditMixinNullable, Queryable):
"Access to the metrics denied: " + ', '.join(rejected_metrics)
)
# the dimensions list with dimensionSpecs expanded
dimensions = []
groupby = [gb for gb in groupby if gb in columns_dict]
for column_name in groupby:
col = columns_dict.get(column_name)
dim_spec = col.dimension_spec
if dim_spec:
dimensions.append(dim_spec)
else:
dimensions.append(column_name)
qry = dict(
datasource=self.datasource_name,
dimensions=groupby,
dimensions=dimensions,
aggregations=aggregations,
granularity=DruidDatasource.granularity(
granularity,
@@ -2242,6 +2254,7 @@ class DruidColumn(Model, AuditMixinNullable):
min = Column(Boolean, default=False)
filterable = Column(Boolean, default=False)
description = Column(Text)
dimension_spec_json = Column(Text)
def __repr__(self):
return self.column_name
@@ -2250,6 +2263,11 @@ class DruidColumn(Model, AuditMixinNullable):
def isnum(self):
return self.type in ('LONG', 'DOUBLE', 'FLOAT', 'INT')
@property
def dimension_spec(self):
if self.dimension_spec_json:
return json.loads(self.dimension_spec_json)
def generate_metrics(self):
"""Generate metrics based on the column metadata"""
M = DruidMetric # noqa

View File

@@ -356,8 +356,9 @@ appbuilder.add_view_no_menu(TableColumnInlineView)
class DruidColumnInlineView(CompactCRUDMixin, CaravelModelView): # noqa
datamodel = SQLAInterface(models.DruidColumn)
edit_columns = [
'column_name', 'description', 'datasource', 'groupby',
'count_distinct', 'sum', 'min', 'max']
'column_name', 'description', 'dimension_spec_json', 'datasource',
'groupby', 'count_distinct', 'sum', 'min', 'max']
add_columns = edit_columns
list_columns = [
'column_name', 'type', 'groupby', 'filterable', 'count_distinct',
'sum', 'min', 'max']
@@ -374,9 +375,23 @@ class DruidColumnInlineView(CompactCRUDMixin, CaravelModelView): # noqa
'min': _("Min"),
'max': _("Max"),
}
description_columns = {
'dimension_spec_json': utils.markdown(
"this field can be used to specify "
"a `dimensionSpec` as documented [here]"
"(http://druid.io/docs/latest/querying/dimensionspecs.html). "
"Make sure to input valid JSON and that the "
"`outputName` matches the `column_name` defined "
"above.",
True),
}
def post_update(self, col):
col.generate_metrics()
utils.validate_json(col.dimension_spec_json)
def post_add(self, col):
self.post_update(col)
appbuilder.add_view_no_menu(DruidColumnInlineView)
@@ -707,11 +722,11 @@ class DruidClusterModelView(CaravelModelView, DeleteMixin): # noqa
'broker_endpoint': _("Broker Endpoint"),
}
def pre_add(self, db):
utils.merge_perm(sm, 'database_access', db.perm)
def pre_add(self, cluster):
utils.merge_perm(sm, 'database_access', cluster.perm)
def pre_update(self, db):
self.pre_add(db)
def pre_update(self, cluster):
self.pre_add(cluster)
if config['DRUID_IS_ACTIVE']:

View File

@@ -1034,7 +1034,6 @@ class NVD3TimeSeriesViz(NVD3Viz):
def get_df(self, query_obj=None):
form_data = self.form_data
df = super(NVD3TimeSeriesViz, self).get_df(query_obj)
df = df.fillna(0)
if form_data.get("granularity") == "all":
raise Exception("Pick a time granularity for your time series")

View File

@@ -10,10 +10,8 @@ import json
import io
import random
import unittest
from datetime import datetime
from flask import escape
from flask_appbuilder.security.sqla import models as ab_models
from caravel import db, models, utils, appbuilder, sm, jinja_context
from caravel.views import DatabaseView

View File

@@ -48,16 +48,16 @@ GB_RESULT_SET = [
"version": "v1",
"timestamp": "2012-01-01T00:00:00.000Z",
"event": {
"name": 'Canada',
"sum__num": 12345678,
"dim1": 'Canada',
"metric1": 12345678,
}
},
{
"version": "v1",
"timestamp": "2012-01-01T00:00:00.000Z",
"event": {
"name": 'USA',
"sum__num": 12345678 / 2,
"dim1": 'USA',
"metric1": 12345678 / 2,
}
},
]
@@ -121,26 +121,26 @@ class DruidTests(CaravelTestCase):
url = (
'/caravel/explore_json/druid/{}/?viz_type=table&granularity=one+day&'
'druid_time_origin=&since=7+days+ago&until=now&row_limit=5000&'
'include_search=false&metrics=count&groupby=name&flt_col_0=dim1&'
'include_search=false&metrics=count&groupby=dim1&flt_col_0=dim1&'
'flt_op_0=in&flt_eq_0=&slice_id=&slice_name=&collapsed_fieldsets=&'
'action=&datasource_name=test_datasource&datasource_id={}&'
'datasource_type=druid&previous_viz_type=table&'
'force=true'.format(datasource_id, datasource_id))
resp = self.get_json_resp(url)
self.assertEqual("Canada", resp['data']['records'][0]['name'])
self.assertEqual("Canada", resp['data']['records'][0]['dim1'])
# two groupby
url = (
'/caravel/explore_json/druid/{}/?viz_type=table&granularity=one+day&'
'druid_time_origin=&since=7+days+ago&until=now&row_limit=5000&'
'include_search=false&metrics=count&groupby=name&'
'flt_col_0=dim1&groupby=second&'
'include_search=false&metrics=count&groupby=dim1&'
'flt_col_0=dim1&groupby=dim2d&'
'flt_op_0=in&flt_eq_0=&slice_id=&slice_name=&collapsed_fieldsets=&'
'action=&datasource_name=test_datasource&datasource_id={}&'
'datasource_type=druid&previous_viz_type=table&'
'force=true'.format(datasource_id, datasource_id))
resp = self.get_json_resp(url)
self.assertEqual("Canada", resp['data']['records'][0]['name'])
self.assertEqual("Canada", resp['data']['records'][0]['dim1'])
def test_druid_sync_from_config(self):
CLUSTER_NAME = 'new_druid'