Add extraction function support for Druid queries (#4740)

* add extraction fn support for Druid queries

* bump pydruid version to get extraction fn commits

* update and add tests for druid for filters with extraction fns

* conform to flake8 rules

* fix flake8 issues

* bump pyruid version for extraction function features
This commit is contained in:
jasnovak
2018-05-08 22:00:06 -07:00
committed by Maxime Beauchemin
parent 75df3d0f8e
commit e29beba023
3 changed files with 245 additions and 38 deletions

View File

@@ -22,7 +22,7 @@ pandas==0.22.0
parsedatetime==2.0.0
pathlib2==2.3.0
polyline==1.3.2
pydruid==0.4.1
pydruid==0.4.2
pyhive==0.5.0
python-dateutil==2.6.1
python-geohash==0.8.5

View File

@@ -21,7 +21,8 @@ from flask_appbuilder.models.decorators import renders
from flask_babel import lazy_gettext as _
from pydruid.client import PyDruid
from pydruid.utils.aggregators import count
from pydruid.utils.filters import Bound, Dimension, Filter
from pydruid.utils.dimensions import MapLookupExtraction, RegexExtraction
from pydruid.utils.filters import Dimension, Filter
from pydruid.utils.having import Aggregation
from pydruid.utils.postaggregator import (
Const, Field, HyperUniqueCardinality, Postaggregator, Quantile, Quantiles,
@@ -960,8 +961,25 @@ class DruidDatasource(Model, BaseDatasource):
for unused, row in df.iterrows():
fields = []
for dim in dimensions:
f = Dimension(dim) == row[dim]
fields.append(f)
f = None
# Check if this dimension uses an extraction function
# If so, create the appropriate pydruid extraction object
if isinstance(dim, dict) and 'extractionFn' in dim:
(col, extraction_fn) = DruidDatasource._create_extraction_fn(dim)
dim_val = dim['outputName']
f = Filter(
dimension=col,
value=row[dim_val],
extraction_function=extraction_fn,
)
elif isinstance(dim, dict):
dim_val = dim['outputName']
if dim_val:
f = Dimension(dim_val) == row[dim_val]
else:
f = Dimension(dim) == row[dim]
if f:
fields.append(f)
if len(fields) > 1:
term = Filter(type='and', fields=fields)
new_filters.append(term)
@@ -1065,7 +1083,9 @@ class DruidDatasource(Model, BaseDatasource):
values = []
for dimension in dimensions:
if isinstance(dimension, dict):
if 'dimension' in dimension:
if 'extractionFn' in dimension:
values.append(dimension)
elif 'dimension' in dimension:
values.append(dimension['dimension'])
else:
values.append(dimension)
@@ -1132,7 +1152,7 @@ class DruidDatasource(Model, BaseDatasource):
intervals=self.intervals_from_dttms(from_dttm, to_dttm),
)
filters = DruidDatasource.get_filters(filter, self.num_cols)
filters = DruidDatasource.get_filters(filter, self.num_cols, columns_dict)
if filters:
qry['filter'] = filters
@@ -1217,7 +1237,14 @@ class DruidDatasource(Model, BaseDatasource):
pre_qry = deepcopy(qry)
pre_qry_dims = self._dimensions_to_values(qry['dimensions'])
pre_qry['dimensions'] = list(set(pre_qry_dims))
# Can't use set on an array with dicts
# Use set with non-dict items only
non_dict_dims = list(
set([x for x in pre_qry_dims if not isinstance(x, dict)]),
)
dict_dims = [x for x in pre_qry_dims if isinstance(x, dict)]
pre_qry['dimensions'] = non_dict_dims + dict_dims
order_by = metrics[0] if metrics else pre_qry_dims[0]
@@ -1341,8 +1368,31 @@ class DruidDatasource(Model, BaseDatasource):
query=query_str,
duration=datetime.now() - qry_start_dttm)
@staticmethod
def _create_extraction_fn(dim_spec):
extraction_fn = None
if dim_spec and 'extractionFn' in dim_spec:
col = dim_spec['dimension']
fn = dim_spec['extractionFn']
ext_type = fn.get('type')
if ext_type == 'lookup' and fn['lookup'].get('type') == 'map':
replace_missing_values = fn.get('replaceMissingValueWith')
retain_missing_values = fn.get('retainMissingValue', False)
injective = fn.get('isOneToOne', False)
extraction_fn = MapLookupExtraction(
fn['lookup']['map'],
replace_missing_values=replace_missing_values,
retain_missing_values=retain_missing_values,
injective=injective,
)
elif ext_type == 'regex':
extraction_fn = RegexExtraction(fn['expr'])
else:
raise Exception(_('Unsupported extraction function: ' + ext_type))
return (col, extraction_fn)
@classmethod
def get_filters(cls, raw_filters, num_cols): # noqa
def get_filters(cls, raw_filters, num_cols, columns_dict): # noqa
"""Given Superset filter data structure, returns pydruid Filter(s)"""
filters = None
for flt in raw_filters:
@@ -1354,21 +1404,42 @@ class DruidDatasource(Model, BaseDatasource):
not op or
(eq is None and op not in ('IS NULL', 'IS NOT NULL'))):
continue
# Check if this dimension uses an extraction function
# If so, create the appropriate pydruid extraction object
column_def = columns_dict.get(col)
dim_spec = column_def.dimension_spec if column_def else None
extraction_fn = None
if dim_spec and 'extractionFn' in dim_spec:
(col, extraction_fn) = DruidDatasource._create_extraction_fn(dim_spec)
cond = None
is_numeric_col = col in num_cols
is_list_target = op in ('in', 'not in')
eq = cls.filter_values_handler(
eq, is_list_target=is_list_target,
target_column_is_numeric=is_numeric_col)
# For these two ops, could have used Dimension,
# but it doesn't support extraction functions
if op == '==':
cond = Dimension(col) == eq
cond = Filter(dimension=col, value=eq, extraction_function=extraction_fn)
elif op == '!=':
cond = Dimension(col) != eq
cond = ~Filter(dimension=col, value=eq, extraction_function=extraction_fn)
elif op in ('in', 'not in'):
fields = []
# ignore the filter if it has no value
if not len(eq):
continue
# if it uses an extraction fn, use the "in" operator
# as Dimension isn't supported
elif extraction_fn is not None:
cond = Filter(
dimension=col,
values=eq,
type='in',
extraction_function=extraction_fn,
)
elif len(eq) == 1:
cond = Dimension(col) == eq[0]
else:
@@ -1378,20 +1449,58 @@ class DruidDatasource(Model, BaseDatasource):
if op == 'not in':
cond = ~cond
elif op == 'regex':
cond = Filter(type='regex', pattern=eq, dimension=col)
cond = Filter(
extraction_function=extraction_fn,
type='regex',
pattern=eq,
dimension=col,
)
# For the ops below, could have used pydruid's Bound,
# but it doesn't support extraction functions
elif op == '>=':
cond = Bound(col, eq, None, alphaNumeric=is_numeric_col)
cond = Filter(
type='bound',
extraction_function=extraction_fn,
dimension=col,
lowerStrict=False,
upperStrict=False,
lower=eq,
upper=None,
alphaNumeric=is_numeric_col,
)
elif op == '<=':
cond = Bound(col, None, eq, alphaNumeric=is_numeric_col)
cond = Filter(
type='bound',
extraction_function=extraction_fn,
dimension=col,
lowerStrict=False,
upperStrict=False,
lower=None,
upper=eq,
alphaNumeric=is_numeric_col,
)
elif op == '>':
cond = Bound(
col, eq, None,
lowerStrict=True, alphaNumeric=is_numeric_col,
cond = Filter(
type='bound',
extraction_function=extraction_fn,
lowerStrict=True,
upperStrict=False,
dimension=col,
lower=eq,
upper=None,
alphaNumeric=is_numeric_col,
)
elif op == '<':
cond = Bound(
col, None, eq,
upperStrict=True, alphaNumeric=is_numeric_col,
cond = Filter(
type='bound',
extraction_function=extraction_fn,
upperStrict=True,
lowerStrict=False,
dimension=col,
lower=None,
upper=eq,
alphaNumeric=is_numeric_col,
)
elif op == 'IS NULL':
cond = Dimension(col) == None # NOQA

View File

@@ -8,8 +8,10 @@ import json
import unittest
from mock import Mock
from pydruid.utils.dimensions import MapLookupExtraction, RegexExtraction
import pydruid.utils.postaggregator as postaggs
import superset.connectors.druid.models as models
from superset.connectors.druid.models import (
DruidColumn, DruidDatasource, DruidMetric,
@@ -31,14 +33,84 @@ def emplace(metrics_dict, metric_name, is_postagg=False):
# Unit tests that can be run without initializing base tests
class DruidFuncTestCase(unittest.TestCase):
def test_get_filters_extraction_fn_map(self):
filters = [{'col': 'deviceName', 'val': ['iPhone X'], 'op': 'in'}]
dimension_spec = {
'type': 'extraction',
'dimension': 'device',
'outputName': 'deviceName',
'outputType': 'STRING',
'extractionFn': {
'type': 'lookup',
'dimension': 'dimensionName',
'outputName': 'dimensionOutputName',
'replaceMissingValueWith': 'missing_value',
'retainMissingValue': False,
'lookup': {
'type': 'map',
'map': {
'iPhone10,1': 'iPhone 8',
'iPhone10,4': 'iPhone 8',
'iPhone10,2': 'iPhone 8 Plus',
'iPhone10,5': 'iPhone 8 Plus',
'iPhone10,3': 'iPhone X',
'iPhone10,6': 'iPhone X',
},
'isOneToOne': False,
},
},
}
spec_json = json.dumps(dimension_spec)
col = DruidColumn(column_name='deviceName', dimension_spec_json=spec_json)
column_dict = {'deviceName': col}
f = DruidDatasource.get_filters(filters, [], column_dict)
assert isinstance(f.extraction_function, MapLookupExtraction)
dim_ext_fn = dimension_spec['extractionFn']
f_ext_fn = f.extraction_function
self.assertEqual(dim_ext_fn['lookup']['map'], f_ext_fn._mapping)
self.assertEqual(dim_ext_fn['lookup']['isOneToOne'], f_ext_fn._injective)
self.assertEqual(
dim_ext_fn['replaceMissingValueWith'],
f_ext_fn._replace_missing_values,
)
self.assertEqual(
dim_ext_fn['retainMissingValue'],
f_ext_fn._retain_missing_values,
)
def test_get_filters_extraction_fn_regex(self):
filters = [{'col': 'buildPrefix', 'val': ['22B'], 'op': 'in'}]
dimension_spec = {
'type': 'extraction',
'dimension': 'build',
'outputName': 'buildPrefix',
'outputType': 'STRING',
'extractionFn': {
'type': 'regex',
'expr': '(^[0-9A-Za-z]{3})',
},
}
spec_json = json.dumps(dimension_spec)
col = DruidColumn(column_name='buildPrefix', dimension_spec_json=spec_json)
column_dict = {'buildPrefix': col}
f = DruidDatasource.get_filters(filters, [], column_dict)
assert isinstance(f.extraction_function, RegexExtraction)
dim_ext_fn = dimension_spec['extractionFn']
f_ext_fn = f.extraction_function
self.assertEqual(dim_ext_fn['expr'], f_ext_fn._expr)
def test_get_filters_ignores_invalid_filter_objects(self):
filtr = {'col': 'col1', 'op': '=='}
filters = [filtr]
self.assertIsNone(DruidDatasource.get_filters(filters, []))
col = DruidColumn(column_name='col1')
column_dict = {'col1': col}
self.assertIsNone(DruidDatasource.get_filters(filters, [], column_dict))
def test_get_filters_constructs_filter_in(self):
filtr = {'col': 'A', 'op': 'in', 'val': ['a', 'b', 'c']}
res = DruidDatasource.get_filters([filtr], [])
col = DruidColumn(column_name='A')
column_dict = {'A': col}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertIn('filter', res.filter)
self.assertIn('fields', res.filter['filter'])
self.assertEqual('or', res.filter['filter']['type'])
@@ -46,7 +118,9 @@ class DruidFuncTestCase(unittest.TestCase):
def test_get_filters_constructs_filter_not_in(self):
filtr = {'col': 'A', 'op': 'not in', 'val': ['a', 'b', 'c']}
res = DruidDatasource.get_filters([filtr], [])
col = DruidColumn(column_name='A')
column_dict = {'A': col}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertIn('filter', res.filter)
self.assertIn('type', res.filter['filter'])
self.assertEqual('not', res.filter['filter']['type'])
@@ -58,14 +132,18 @@ class DruidFuncTestCase(unittest.TestCase):
def test_get_filters_constructs_filter_equals(self):
filtr = {'col': 'A', 'op': '==', 'val': 'h'}
res = DruidDatasource.get_filters([filtr], [])
col = DruidColumn(column_name='A')
column_dict = {'A': col}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertEqual('selector', res.filter['filter']['type'])
self.assertEqual('A', res.filter['filter']['dimension'])
self.assertEqual('h', res.filter['filter']['value'])
def test_get_filters_constructs_filter_not_equals(self):
filtr = {'col': 'A', 'op': '!=', 'val': 'h'}
res = DruidDatasource.get_filters([filtr], [])
col = DruidColumn(column_name='A')
column_dict = {'A': col}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertEqual('not', res.filter['filter']['type'])
self.assertEqual(
'h',
@@ -74,25 +152,29 @@ class DruidFuncTestCase(unittest.TestCase):
def test_get_filters_constructs_bounds_filter(self):
filtr = {'col': 'A', 'op': '>=', 'val': 'h'}
res = DruidDatasource.get_filters([filtr], [])
col = DruidColumn(column_name='A')
column_dict = {'A': col}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertFalse(res.filter['filter']['lowerStrict'])
self.assertEqual('A', res.filter['filter']['dimension'])
self.assertEqual('h', res.filter['filter']['lower'])
self.assertFalse(res.filter['filter']['alphaNumeric'])
filtr['op'] = '>'
res = DruidDatasource.get_filters([filtr], [])
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertTrue(res.filter['filter']['lowerStrict'])
filtr['op'] = '<='
res = DruidDatasource.get_filters([filtr], [])
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertFalse(res.filter['filter']['upperStrict'])
self.assertEqual('h', res.filter['filter']['upper'])
filtr['op'] = '<'
res = DruidDatasource.get_filters([filtr], [])
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertTrue(res.filter['filter']['upperStrict'])
def test_get_filters_constructs_regex_filter(self):
filtr = {'col': 'A', 'op': 'regex', 'val': '[abc]'}
res = DruidDatasource.get_filters([filtr], [])
col = DruidColumn(column_name='A')
column_dict = {'A': col}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertEqual('regex', res.filter['filter']['type'])
self.assertEqual('[abc]', res.filter['filter']['pattern'])
self.assertEqual('A', res.filter['filter']['dimension'])
@@ -100,46 +182,62 @@ class DruidFuncTestCase(unittest.TestCase):
def test_get_filters_composes_multiple_filters(self):
filtr1 = {'col': 'A', 'op': '!=', 'val': 'y'}
filtr2 = {'col': 'B', 'op': 'in', 'val': ['a', 'b', 'c']}
res = DruidDatasource.get_filters([filtr1, filtr2], [])
cola = DruidColumn(column_name='A')
colb = DruidColumn(column_name='B')
column_dict = {'A': cola, 'B': colb}
res = DruidDatasource.get_filters([filtr1, filtr2], [], column_dict)
self.assertEqual('and', res.filter['filter']['type'])
self.assertEqual(2, len(res.filter['filter']['fields']))
def test_get_filters_ignores_in_not_in_with_empty_value(self):
filtr1 = {'col': 'A', 'op': 'in', 'val': []}
filtr2 = {'col': 'A', 'op': 'not in', 'val': []}
res = DruidDatasource.get_filters([filtr1, filtr2], [])
col = DruidColumn(column_name='A')
column_dict = {'A': col}
res = DruidDatasource.get_filters([filtr1, filtr2], [], column_dict)
self.assertIsNone(res)
def test_get_filters_constructs_equals_for_in_not_in_single_value(self):
filtr = {'col': 'A', 'op': 'in', 'val': ['a']}
res = DruidDatasource.get_filters([filtr], [])
cola = DruidColumn(column_name='A')
colb = DruidColumn(column_name='B')
column_dict = {'A': cola, 'B': colb}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertEqual('selector', res.filter['filter']['type'])
def test_get_filters_handles_arrays_for_string_types(self):
filtr = {'col': 'A', 'op': '==', 'val': ['a', 'b']}
res = DruidDatasource.get_filters([filtr], [])
col = DruidColumn(column_name='A')
column_dict = {'A': col}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertEqual('a', res.filter['filter']['value'])
filtr = {'col': 'A', 'op': '==', 'val': []}
res = DruidDatasource.get_filters([filtr], [])
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertIsNone(res.filter['filter']['value'])
def test_get_filters_handles_none_for_string_types(self):
filtr = {'col': 'A', 'op': '==', 'val': None}
res = DruidDatasource.get_filters([filtr], [])
col = DruidColumn(column_name='A')
column_dict = {'A': col}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertIsNone(res)
def test_get_filters_extracts_values_in_quotes(self):
filtr = {'col': 'A', 'op': 'in', 'val': [' "a" ']}
res = DruidDatasource.get_filters([filtr], [])
col = DruidColumn(column_name='A')
column_dict = {'A': col}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertEqual('a', res.filter['filter']['value'])
def test_get_filters_converts_strings_to_num(self):
filtr = {'col': 'A', 'op': 'in', 'val': ['6']}
res = DruidDatasource.get_filters([filtr], ['A'])
col = DruidColumn(column_name='A')
column_dict = {'A': col}
res = DruidDatasource.get_filters([filtr], ['A'], column_dict)
self.assertEqual(6, res.filter['filter']['value'])
filtr = {'col': 'A', 'op': '==', 'val': '6'}
res = DruidDatasource.get_filters([filtr], ['A'])
res = DruidDatasource.get_filters([filtr], ['A'], column_dict)
self.assertEqual(6, res.filter['filter']['value'])
def test_run_query_no_groupby(self):