feat: run extra query on QueryObject and add compare operator for post_processing (#15279)

* rebase master and resolve conflicts

* pylint to makefile

* fix crash when pivot operator

* fix comments

* add precision argument

* query test

* wip

* fix ut

* rename

* set time_offsets to cache key

wip

* refactor get_df_payload

wip

* extra query cache

* cache ut

* normalize df

* fix timeoffset

* fix ut

* make cache key logging sense

* resolve conflicts

* backend follow up iteration 1

wip

* rolling window type

* rebase master

* py lint and minor follow ups

* pylintrc
This commit is contained in:
Yongjie Zhao
2021-07-28 15:34:39 +01:00
committed by GitHub
parent bdfc2dc9d5
commit 32d2aa0c40
17 changed files with 744 additions and 149 deletions

View File

@@ -130,6 +130,15 @@ timeseries_df = DataFrame(
data={"label": ["x", "y", "z", "q"], "y": [1.0, 2.0, 3.0, 4.0]},
)
timeseries_df2 = DataFrame(
index=to_datetime(["2019-01-01", "2019-01-02", "2019-01-05", "2019-01-07"]),
data={
"label": ["x", "y", "z", "q"],
"y": [2.0, 2.0, 2.0, 2.0],
"z": [2.0, 4.0, 10.0, 8.0],
},
)
lonlat_df = DataFrame(
{
"city": ["New York City", "Sydney"],

View File

@@ -195,7 +195,7 @@ POSTPROCESSING_OPERATIONS = {
def get_query_object(
query_name: str, add_postprocessing_operations: bool
query_name: str, add_postprocessing_operations: bool, add_time_offsets: bool,
) -> Dict[str, Any]:
if query_name not in QUERY_OBJECTS:
raise Exception(f"QueryObject fixture not defined for datasource: {query_name}")
@@ -212,6 +212,9 @@ def get_query_object(
query_object = copy.deepcopy(obj)
if add_postprocessing_operations:
query_object["post_processing"] = _get_postprocessing_operation(query_name)
if add_time_offsets:
query_object["time_offsets"] = ["1 year ago"]
return query_object
@@ -224,7 +227,9 @@ def _get_postprocessing_operation(query_name: str) -> List[Dict[str, Any]]:
def get_query_context(
query_name: str, add_postprocessing_operations: bool = False,
query_name: str,
add_postprocessing_operations: bool = False,
add_time_offsets: bool = False,
) -> Dict[str, Any]:
"""
Create a request payload for retrieving a QueryContext object via the
@@ -236,11 +241,16 @@ def get_query_context(
:param datasource_id: id of datasource to query.
:param datasource_type: type of datasource to query.
:param add_postprocessing_operations: Add post-processing operations to QueryObject
:param add_time_offsets: Add time offsets to QueryObject(advanced analytics)
:return: Request payload
"""
table_name = query_name.split(":")[0]
table = get_table_by_name(table_name)
return {
"datasource": {"id": table.id, "type": table.type},
"queries": [get_query_object(query_name, add_postprocessing_operations)],
"queries": [
get_query_object(
query_name, add_postprocessing_operations, add_time_offsets,
)
],
}

View File

@@ -38,6 +38,7 @@ from .fixtures.dataframes import (
names_df,
timeseries_df,
prophet_df,
timeseries_df2,
)
AGGREGATES_SINGLE = {"idx_nulls": {"operator": "sum"}}
@@ -422,6 +423,64 @@ class TestPostProcessing(SupersetTestCase):
columns={"abc": "abc"},
)
# diff by columns
post_df = proc.diff(df=timeseries_df2, columns={"y": "y", "z": "z"}, axis=1)
self.assertListEqual(post_df.columns.tolist(), ["label", "y", "z"])
self.assertListEqual(series_to_list(post_df["z"]), [0.0, 2.0, 8.0, 6.0])
def test_compare(self):
# `absolute` comparison
post_df = proc.compare(
df=timeseries_df2,
source_columns=["y"],
compare_columns=["z"],
compare_type="absolute",
)
self.assertListEqual(
post_df.columns.tolist(), ["label", "y", "z", "absolute__y__z",]
)
self.assertListEqual(
series_to_list(post_df["absolute__y__z"]), [0.0, -2.0, -8.0, -6.0],
)
# drop original columns
post_df = proc.compare(
df=timeseries_df2,
source_columns=["y"],
compare_columns=["z"],
compare_type="absolute",
drop_original_columns=True,
)
self.assertListEqual(post_df.columns.tolist(), ["label", "absolute__y__z",])
# `percentage` comparison
post_df = proc.compare(
df=timeseries_df2,
source_columns=["y"],
compare_columns=["z"],
compare_type="percentage",
)
self.assertListEqual(
post_df.columns.tolist(), ["label", "y", "z", "percentage__y__z",]
)
self.assertListEqual(
series_to_list(post_df["percentage__y__z"]), [0.0, -1.0, -4.0, -3],
)
# `ratio` comparison
post_df = proc.compare(
df=timeseries_df2,
source_columns=["y"],
compare_columns=["z"],
compare_type="ratio",
)
self.assertListEqual(
post_df.columns.tolist(), ["label", "y", "z", "ratio__y__z",]
)
self.assertListEqual(
series_to_list(post_df["ratio__y__z"]), [1.0, 0.5, 0.2, 0.25],
)
def test_cum(self):
# create new column (cumsum)
post_df = proc.cum(df=timeseries_df, columns={"y": "y2"}, operator="sum",)

View File

@@ -222,6 +222,20 @@ class TestQueryContext(SupersetTestCase):
cache_key = query_context.query_cache_key(query_object)
self.assertNotEqual(cache_key_original, cache_key)
def test_query_cache_key_changes_when_time_offsets_is_updated(self):
self.login(username="admin")
payload = get_query_context("birth_names", add_time_offsets=True)
query_context = ChartDataQueryContextSchema().load(payload)
query_object = query_context.queries[0]
cache_key_original = query_context.query_cache_key(query_object)
payload["queries"][0]["time_offsets"].pop()
query_context = ChartDataQueryContextSchema().load(payload)
query_object = query_context.queries[0]
cache_key = query_context.query_cache_key(query_object)
self.assertNotEqual(cache_key_original, cache_key)
def test_query_context_time_range_endpoints(self):
"""
Ensure that time_range_endpoints are populated automatically when missing
@@ -476,3 +490,92 @@ class TestQueryContext(SupersetTestCase):
responses = query_context.get_payload()
new_cache_key = responses["queries"][0]["cache_key"]
self.assertEqual(orig_cache_key, new_cache_key)
@pytest.mark.usefixtures("load_birth_names_dashboard_with_slices")
def test_time_offsets_in_query_object(self):
"""
Ensure that time_offsets can generate the correct query
"""
self.login(username="admin")
payload = get_query_context("birth_names")
payload["queries"][0]["metrics"] = ["sum__num"]
payload["queries"][0]["groupby"] = ["name"]
payload["queries"][0]["is_timeseries"] = True
payload["queries"][0]["timeseries_limit"] = 5
payload["queries"][0]["time_offsets"] = ["1 year ago", "1 year later"]
payload["queries"][0]["time_range"] = "1990 : 1991"
query_context = ChartDataQueryContextSchema().load(payload)
responses = query_context.get_payload()
self.assertEqual(
responses["queries"][0]["colnames"],
[
"__timestamp",
"name",
"sum__num",
"sum__num__1 year ago",
"sum__num__1 year later",
],
)
sqls = [
sql for sql in responses["queries"][0]["query"].split(";") if sql.strip()
]
self.assertEqual(len(sqls), 3)
# 1 year ago
assert re.search(r"1989-01-01.+1990-01-01", sqls[1], re.S)
assert re.search(r"1990-01-01.+1991-01-01", sqls[1], re.S)
# # 1 year later
assert re.search(r"1991-01-01.+1992-01-01", sqls[2], re.S)
assert re.search(r"1990-01-01.+1991-01-01", sqls[2], re.S)
@pytest.mark.usefixtures("load_birth_names_dashboard_with_slices")
def test_processing_time_offsets_cache(self):
"""
Ensure that time_offsets can generate the correct query
"""
self.login(username="admin")
payload = get_query_context("birth_names")
payload["queries"][0]["metrics"] = ["sum__num"]
payload["queries"][0]["groupby"] = ["name"]
payload["queries"][0]["is_timeseries"] = True
payload["queries"][0]["timeseries_limit"] = 5
payload["queries"][0]["time_offsets"] = []
payload["queries"][0]["time_range"] = "1990 : 1991"
query_context = ChartDataQueryContextSchema().load(payload)
query_object = query_context.queries[0]
query_result = query_context.get_query_result(query_object)
# get main query dataframe
df = query_result.df
payload["queries"][0]["time_offsets"] = ["1 year ago", "1 year later"]
query_context = ChartDataQueryContextSchema().load(payload)
query_object = query_context.queries[0]
# query without cache
query_context.processing_time_offsets(df, query_object)
# query with cache
rv = query_context.processing_time_offsets(df, query_object)
cache_keys = rv["cache_keys"]
cache_keys__1_year_ago = cache_keys[0]
cache_keys__1_year_later = cache_keys[1]
self.assertIsNotNone(cache_keys__1_year_ago)
self.assertIsNotNone(cache_keys__1_year_later)
self.assertNotEqual(cache_keys__1_year_ago, cache_keys__1_year_later)
# swap offsets
payload["queries"][0]["time_offsets"] = ["1 year later", "1 year ago"]
query_context = ChartDataQueryContextSchema().load(payload)
query_object = query_context.queries[0]
rv = query_context.processing_time_offsets(df, query_object)
cache_keys = rv["cache_keys"]
self.assertEqual(cache_keys__1_year_ago, cache_keys[1])
self.assertEqual(cache_keys__1_year_later, cache_keys[0])
# remove all offsets
payload["queries"][0]["time_offsets"] = []
query_context = ChartDataQueryContextSchema().load(payload)
query_object = query_context.queries[0]
rv = query_context.processing_time_offsets(df, query_object,)
self.assertIs(rv["df"], df)
self.assertEqual(rv["queries"], [])
self.assertEqual(rv["cache_keys"], [])

View File

@@ -14,16 +14,19 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime, timedelta
from datetime import date, datetime, timedelta
from unittest.mock import patch
from dateutil.relativedelta import relativedelta
from superset.charts.commands.exceptions import (
TimeRangeAmbiguousError,
TimeRangeParseFailError,
TimeRangeUnclearError,
)
from superset.utils.date_parser import (
DateRangeMigration,
datetime_eval,
get_past_or_future,
get_since_until,
parse_human_datetime,
parse_human_timedelta,
@@ -288,16 +291,48 @@ class TestDateParser(SupersetTestCase):
self.assertEqual(parse_past_timedelta("52 weeks"), timedelta(364))
self.assertEqual(parse_past_timedelta("1 month"), timedelta(31))
def test_get_past_or_future(self):
# 2020 is a leap year
dttm = datetime(2020, 2, 29)
self.assertEqual(get_past_or_future("1 year", dttm), datetime(2021, 2, 28))
self.assertEqual(get_past_or_future("-1 year", dttm), datetime(2019, 2, 28))
self.assertEqual(get_past_or_future("1 month", dttm), datetime(2020, 3, 29))
self.assertEqual(get_past_or_future("3 month", dttm), datetime(2020, 5, 29))
def test_parse_human_datetime(self):
with self.assertRaises(TimeRangeUnclearError):
with self.assertRaises(TimeRangeAmbiguousError):
parse_human_datetime(" 2 days ")
with self.assertRaises(TimeRangeUnclearError):
with self.assertRaises(TimeRangeAmbiguousError):
parse_human_datetime("2 day")
with self.assertRaises(TimeRangeParseFailError):
parse_human_datetime("xxxxxxx")
self.assertEqual(parse_human_datetime("2015-04-03"), datetime(2015, 4, 3, 0, 0))
self.assertEqual(
parse_human_datetime("2/3/1969"), datetime(1969, 2, 3, 0, 0),
)
self.assertLessEqual(parse_human_datetime("now"), datetime.now())
self.assertLess(parse_human_datetime("yesterday"), datetime.now())
self.assertEqual(
date.today() - timedelta(1), parse_human_datetime("yesterday").date()
)
self.assertEqual(
parse_human_datetime("one year ago").date(),
(datetime.now() - relativedelta(years=1)).date(),
)
self.assertEqual(
parse_human_datetime("2 years after").date(),
(datetime.now() + relativedelta(years=2)).date(),
)
def test_DateRangeMigration(self):
params = '{"time_range": " 8 days : 2020-03-10T00:00:00"}'
self.assertRegex(params, DateRangeMigration.x_dateunit_in_since)