diff --git a/superset/assets/spec/javascripts/sqllab/fixtures.js b/superset/assets/spec/javascripts/sqllab/fixtures.js index 6bd090ecaf8..99e740c3382 100644 --- a/superset/assets/spec/javascripts/sqllab/fixtures.js +++ b/superset/assets/spec/javascripts/sqllab/fixtures.js @@ -223,6 +223,20 @@ export const queries = [ type: 'STRING', }, ], + selected_columns: [ + { + is_date: true, + is_dim: false, + name: 'ds', + type: 'STRING', + }, + { + is_date: false, + is_dim: true, + name: 'gender', + type: 'STRING', + }, + ], data: [{ col1: 0, col2: 1 }, { col1: 2, col2: 3 }], }, }, @@ -264,7 +278,7 @@ export const queryWithBadColumns = { ...queries[0], results: { data: queries[0].results.data, - columns: [ + selected_columns: [ { is_date: true, is_dim: false, diff --git a/superset/assets/src/SqlLab/components/ExploreResultsButton.jsx b/superset/assets/src/SqlLab/components/ExploreResultsButton.jsx index 2394c6713a9..5ac1e2570df 100644 --- a/superset/assets/src/SqlLab/components/ExploreResultsButton.jsx +++ b/superset/assets/src/SqlLab/components/ExploreResultsButton.jsx @@ -85,8 +85,8 @@ class ExploreResultsButton extends React.PureComponent { } getColumns() { const props = this.props; - if (props.query && props.query.results && props.query.results.columns) { - return props.query.results.columns; + if (props.query && props.query.results && props.query.results.selected_columns) { + return props.query.results.selected_columns; } return []; } @@ -97,7 +97,7 @@ class ExploreResultsButton extends React.PureComponent { const re1 = /^[A-Za-z_]\w*$/; // starts with char or _, then only alphanum const re2 = /__\d+$/; // does not finish with __ and then a number which screams dup col name - return this.props.query.results.columns.map(col => col.name) + return this.props.query.results.selected_columns.map(col => col.name) .filter(col => !re1.test(col) || re2.test(col)); } datasourceName() { diff --git a/superset/assets/src/SqlLab/main.less b/superset/assets/src/SqlLab/main.less index 822e7d84ce4..c7be8fbc20c 100644 --- a/superset/assets/src/SqlLab/main.less +++ b/superset/assets/src/SqlLab/main.less @@ -238,6 +238,7 @@ div.Workspace { .schemaPane { flex: 0 0 300px; + max-width: 300px; transition: all .3s ease-in-out; } diff --git a/superset/db_engine_specs.py b/superset/db_engine_specs.py index b6103c34776..00a9310b2ac 100644 --- a/superset/db_engine_specs.py +++ b/superset/db_engine_specs.py @@ -28,7 +28,7 @@ at all. The classes here will use a common interface to specify all this. The general idea is to use static classes and an inheritance scheme. """ -from collections import namedtuple +from collections import namedtuple, OrderedDict import hashlib import inspect import logging @@ -36,7 +36,7 @@ import os import re import textwrap import time -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Set, Tuple from urllib import parse from flask import g @@ -194,6 +194,12 @@ class BaseEngineSpec(object): return cursor.fetchmany(limit) return cursor.fetchall() + @classmethod + def expand_data(cls, + columns: List[dict], + data: List[dict]) -> Tuple[List[dict], List[dict], List[dict]]: + return columns, data, [] + @classmethod def alter_new_orm_column(cls, orm_col): """Allow altering default column attributes when first detected/added @@ -891,20 +897,16 @@ class PrestoEngineSpec(BaseEngineSpec): return [] @classmethod - def _create_column_info(cls, column: RowProxy, name: str, data_type: str) -> dict: + def _create_column_info(cls, name: str, data_type: str) -> dict: """ Create column info object - :param column: column object :param name: column name :param data_type: column data type :return: column info object """ return { 'name': name, - 'type': data_type, - # newer Presto no longer includes this column - 'nullable': getattr(column, 'Null', True), - 'default': None, + 'type': f'{data_type}', } @classmethod @@ -943,13 +945,11 @@ class PrestoEngineSpec(BaseEngineSpec): r'{}(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)'.format(delimiter), data_type) @classmethod - def _parse_structural_column(cls, column: RowProxy, result: List[dict]) -> None: + def _parse_structural_column(cls, full_data_type: str, result: List[dict]) -> None: """ Parse a row or array column - :param column: column :param result: list tracking the results """ - full_data_type = '{} {}'.format(column.Column, column.Type) # split on open parenthesis ( to get the structural # data type and its component types data_types = cls._split_data_type(full_data_type, r'\(') @@ -964,8 +964,9 @@ class PrestoEngineSpec(BaseEngineSpec): stack.pop() elif cls._has_nested_data_types(inner_type): # split on comma , to get individual data types - single_fields = cls._split_data_type(inner_type, ', ') + single_fields = cls._split_data_type(inner_type, ',') for single_field in single_fields: + single_field = single_field.strip() # If component type starts with a comma, the first single field # will be an empty string. Disregard this empty string. if not single_field: @@ -978,13 +979,13 @@ class PrestoEngineSpec(BaseEngineSpec): stack.append((field_info[0], field_info[1])) full_parent_path = cls._get_full_name(stack) result.append(cls._create_column_info( - column, full_parent_path, + full_parent_path, presto_type_map[field_info[1]]())) else: # otherwise this field is a basic data type full_parent_path = cls._get_full_name(stack) column_name = '{}.{}'.format(full_parent_path, field_info[0]) result.append(cls._create_column_info( - column, column_name, presto_type_map[field_info[1]]())) + column_name, presto_type_map[field_info[1]]())) # If the component type ends with a structural data type, do not pop # the stack. We have run across a structural data type within the # overall structural data type. Otherwise, we have completely parsed @@ -1036,7 +1037,12 @@ class PrestoEngineSpec(BaseEngineSpec): try: # parse column if it is a row or array if 'array' in column.Type or 'row' in column.Type: - cls._parse_structural_column(column, result) + full_data_type = '{} {}'.format(column.Column, column.Type) + structural_column_index = len(result) + cls._parse_structural_column(full_data_type, result) + result[structural_column_index]['nullable'] = getattr( + column, 'Null', True) + result[structural_column_index]['default'] = None continue else: # otherwise column is a basic data type column_type = presto_type_map[column.Type]() @@ -1044,7 +1050,10 @@ class PrestoEngineSpec(BaseEngineSpec): logging.info('Did not recognize type {} of column {}'.format( column.Type, column.Column)) column_type = types.NullType - result.append(cls._create_column_info(column, column.Column, column_type)) + column_info = cls._create_column_info(column.Column, column_type) + column_info['nullable'] = getattr(column, 'Null', True) + column_info['default'] = None + result.append(column_info) return result @classmethod @@ -1089,18 +1098,12 @@ class PrestoEngineSpec(BaseEngineSpec): return column_clauses @classmethod - def _filter_presto_cols(cls, cols: List[dict]) -> List[dict]: + def _filter_out_array_nested_cols( + cls, cols: List[dict]) -> Tuple[List[dict], List[dict]]: """ - We want to filter out columns that correspond to array content because expanding - arrays would require us to use unnest and join. This can lead to a large, - complicated, and slow query. - - Example: select array_content - from TABLE - cross join UNNEST(array_column) as t(array_content); - - We know which columns to skip because cols is a list provided to us in a specific - order where a structural column is positioned right before its content. + Filter out columns that correspond to array content. We know which columns to + skip because cols is a list provided to us in a specific order where a structural + column is positioned right before its content. Example: Column Name: ColA, Column Data Type: array(row(nest_obj int)) cols = [ ..., ColA, ColA.nest_obj, ... ] @@ -1108,23 +1111,26 @@ class PrestoEngineSpec(BaseEngineSpec): When we run across an array, check if subsequent column names start with the array name and skip them. :param cols: columns - :return: filtered list of columns + :return: filtered list of columns and list of array columns and its nested fields """ filtered_cols = [] - curr_array_col_name = '' + array_cols = [] + curr_array_col_name = None for col in cols: # col corresponds to an array's content and should be skipped if curr_array_col_name and col['name'].startswith(curr_array_col_name): + array_cols.append(col) continue # col is an array so we need to check if subsequent # columns correspond to the array's contents elif str(col['type']) == 'ARRAY': curr_array_col_name = col['name'] + array_cols.append(col) filtered_cols.append(col) else: - curr_array_col_name = '' + curr_array_col_name = None filtered_cols.append(col) - return filtered_cols + return filtered_cols, array_cols @classmethod def select_star(cls, my_db, table_name: str, engine: Engine, schema: str = None, @@ -1137,7 +1143,9 @@ class PrestoEngineSpec(BaseEngineSpec): """ presto_cols = cols if show_cols: - presto_cols = cls._filter_presto_cols(cols) + dot_regex = r'\.(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)' + presto_cols = [ + col for col in presto_cols if not re.search(dot_regex, col['name'])] return super(PrestoEngineSpec, cls).select_star( my_db, table_name, engine, schema, limit, show_cols, indent, latest_partition, presto_cols, @@ -1185,6 +1193,370 @@ class PrestoEngineSpec(BaseEngineSpec): schema=row['table_schema'], table=row['table_name'])) return datasource_names + @classmethod + def _build_column_hierarchy(cls, + columns: List[dict], + parent_column_types: List[str], + column_hierarchy: dict) -> None: + """ + Build a graph where the root node represents a column whose data type is in + parent_column_types. A node's children represent that column's nested fields + :param columns: list of columns + :param parent_column_types: list of data types that decide what columns can + be root nodes + :param column_hierarchy: dictionary representing the graph + """ + if len(columns) == 0: + return + root = columns.pop(0) + root_info = {'type': root['type'], 'children': []} + column_hierarchy[root['name']] = root_info + while columns: + column = columns[0] + # If the column name does not start with the root's name, + # then this column is not a nested field + if not column['name'].startswith(f"{root['name']}."): + break + # If the column's data type is one of the parent types, + # then this column may have nested fields + if str(column['type']) in parent_column_types: + cls._build_column_hierarchy(columns, parent_column_types, + column_hierarchy) + root_info['children'].append(column['name']) + continue + else: # The column is a nested field + root_info['children'].append(column['name']) + columns.pop(0) + + @classmethod + def _create_row_and_array_hierarchy( + cls, selected_columns: List[dict]) -> Tuple[dict, dict, List[dict]]: + """ + Build graphs where the root node represents a row or array and its children + are that column's nested fields + :param selected_columns: columns selected in a query + :return: graph representing a row, graph representing an array, and a list + of all the nested fields + """ + row_column_hierarchy: OrderedDict = OrderedDict() + array_column_hierarchy: OrderedDict = OrderedDict() + expanded_columns: List[dict] = [] + for column in selected_columns: + if column['type'].startswith('ROW'): + parsed_row_columns: List[dict] = [] + full_data_type = '{} {}'.format(column['name'], column['type'].lower()) + cls._parse_structural_column(full_data_type, parsed_row_columns) + expanded_columns = expanded_columns + parsed_row_columns[1:] + filtered_row_columns, array_columns = cls._filter_out_array_nested_cols( + parsed_row_columns) + cls._build_column_hierarchy(filtered_row_columns, + ['ROW'], + row_column_hierarchy) + cls._build_column_hierarchy(array_columns, + ['ROW', 'ARRAY'], + array_column_hierarchy) + elif column['type'].startswith('ARRAY'): + parsed_array_columns: List[dict] = [] + full_data_type = '{} {}'.format(column['name'], column['type'].lower()) + cls._parse_structural_column(full_data_type, parsed_array_columns) + expanded_columns = expanded_columns + parsed_array_columns[1:] + cls._build_column_hierarchy(parsed_array_columns, + ['ROW', 'ARRAY'], + array_column_hierarchy) + return row_column_hierarchy, array_column_hierarchy, expanded_columns + + @classmethod + def _create_empty_row_of_data(cls, columns: List[dict]) -> dict: + """ + Create an empty row of data + :param columns: list of columns + :return: dictionary representing an empty row of data + """ + return {column['name']: '' for column in columns} + + @classmethod + def _expand_row_data(cls, datum: dict, column: str, column_hierarchy: dict) -> None: + """ + Separate out nested fields and its value in a row of data + :param datum: row of data + :param column: row column name + :param column_hierarchy: dictionary tracking structural columns and its + nested fields + """ + if column in datum: + row_data = datum[column] + row_children = column_hierarchy[column]['children'] + if row_data and len(row_data) != len(row_children): + raise Exception('The number of data values and number of nested' + 'fields are not equal') + elif row_data: + for index, data_value in enumerate(row_data): + datum[row_children[index]] = data_value + else: + for row_child in row_children: + datum[row_child] = '' + + @classmethod + def _split_array_columns_by_process_state( + cls, array_columns: List[str], + array_column_hierarchy: dict, + datum: dict) -> Tuple[List[str], Set[str]]: + """ + Take a list of array columns and split them according to whether or not we are + ready to process them from a data set + :param array_columns: list of array columns + :param array_column_hierarchy: graph representing array columns + :param datum: row of data + :return: list of array columns ready to be processed and set of array columns + not ready to be processed + """ + array_columns_to_process = [] + unprocessed_array_columns = set() + child_array = None + for array_column in array_columns: + if array_column in datum: + array_columns_to_process.append(array_column) + elif str(array_column_hierarchy[array_column]['type']) == 'ARRAY': + child_array = array_column + unprocessed_array_columns.add(child_array) + elif child_array and array_column.startswith(child_array): + unprocessed_array_columns.add(array_column) + return array_columns_to_process, unprocessed_array_columns + + @classmethod + def _convert_data_list_to_array_data_dict( + cls, data: List[dict], array_columns_to_process: List[str]) -> dict: + """ + Pull out array data from rows of data into a dictionary where the key represents + the index in the data list and the value is the array data values + Example: + data = [ + {'ColumnA': [1, 2], 'ColumnB': 3}, + {'ColumnA': [11, 22], 'ColumnB': 3} + ] + data dictionary = { + 0: [{'ColumnA': [1, 2]], + 1: [{'ColumnA': [11, 22]] + } + :param data: rows of data + :param array_columns_to_process: array columns we want to pull out + :return: data dictionary + """ + array_data_dict = {} + for data_index, datum in enumerate(data): + all_array_datum = {} + for array_column in array_columns_to_process: + all_array_datum[array_column] = datum[array_column] + array_data_dict[data_index] = [all_array_datum] + return array_data_dict + + @classmethod + def _process_array_data(cls, + data: List[dict], + all_columns: List[dict], + array_column_hierarchy: dict) -> dict: + """ + Pull out array data that is ready to be processed into a dictionary. + The key refers to the index in the original data set. The value is + a list of data values. Initially this list will contain just one value, + the row of data that corresponds to the index in the original data set. + As we process arrays, we will pull out array values into separate rows + and append them to the list of data values. + Example: + Original data set = [ + {'ColumnA': [1, 2], 'ColumnB': [3]}, + {'ColumnA': [11, 22], 'ColumnB': [33]} + ] + all_array_data (intially) = { + 0: [{'ColumnA': [1, 2], 'ColumnB': [3}], + 1: [{'ColumnA': [11, 22], 'ColumnB': [33]}] + } + all_array_data (after processing) = { + 0: [ + {'ColumnA': 1, 'ColumnB': 3}, + {'ColumnA': 2, 'ColumnB': ''}, + ], + 1: [ + {'ColumnA': 11, 'ColumnB': 33}, + {'ColumnA': 22, 'ColumnB': ''}, + ], + } + :param data: rows of data + :param all_columns: list of columns + :param array_column_hierarchy: graph representing array columns + :return: dictionary representing processed array data + """ + array_columns = list(array_column_hierarchy.keys()) + # Determine what columns are ready to be processed. This is necessary for + # array columns that contain rows with nested arrays. We first process + # the outer arrays before processing inner arrays. + array_columns_to_process, \ + unprocessed_array_columns = cls._split_array_columns_by_process_state( + array_columns, array_column_hierarchy, data[0]) + + # Pull out array data that is ready to be processed into a dictionary. + all_array_data = cls._convert_data_list_to_array_data_dict( + data, array_columns_to_process) + + for original_data_index, expanded_array_data in all_array_data.items(): + for array_column in array_columns: + if array_column in unprocessed_array_columns: + continue + # Expand array values that are rows + if str(array_column_hierarchy[array_column]['type']) == 'ROW': + for array_value in expanded_array_data: + cls._expand_row_data(array_value, + array_column, + array_column_hierarchy) + continue + array_data = expanded_array_data[0][array_column] + array_children = array_column_hierarchy[array_column] + # This is an empty array of primitive data type + if not array_data and not array_children['children']: + continue + # Pull out complex array values into its own row of data + elif array_data and array_children['children']: + for array_index, data_value in enumerate(array_data): + if array_index >= len(expanded_array_data): + empty_data = cls._create_empty_row_of_data(all_columns) + expanded_array_data.append(empty_data) + for index, datum_value in enumerate(data_value): + array_child = array_children['children'][index] + expanded_array_data[array_index][array_child] = datum_value + # Pull out primitive array values into its own row of data + elif array_data: + for array_index, data_value in enumerate(array_data): + if array_index >= len(expanded_array_data): + empty_data = cls._create_empty_row_of_data(all_columns) + expanded_array_data.append(empty_data) + expanded_array_data[array_index][array_column] = data_value + # This is an empty array with nested fields + else: + for index, array_child in enumerate(array_children['children']): + for array_value in expanded_array_data: + array_value[array_child] = '' + return all_array_data + + @classmethod + def _consolidate_array_data_into_data(cls, + data: List[dict], + array_data: dict) -> None: + """ + Consolidate data given a list representing rows of data and a dictionary + representing expanded array data + Example: + Original data set = [ + {'ColumnA': [1, 2], 'ColumnB': [3]}, + {'ColumnA': [11, 22], 'ColumnB': [33]} + ] + array_data = { + 0: [ + {'ColumnA': 1, 'ColumnB': 3}, + {'ColumnA': 2, 'ColumnB': ''}, + ], + 1: [ + {'ColumnA': 11, 'ColumnB': 33}, + {'ColumnA': 22, 'ColumnB': ''}, + ], + } + Final data set = [ + {'ColumnA': 1, 'ColumnB': 3}, + {'ColumnA': 2, 'ColumnB': ''}, + {'ColumnA': 11, 'ColumnB': 33}, + {'ColumnA': 22, 'ColumnB': ''}, + ] + :param data: list representing rows of data + :param array_data: dictionary representing expanded array data + :return: list where data and array_data are combined + """ + data_index = 0 + original_data_index = 0 + while data_index < len(data): + data[data_index].update(array_data[original_data_index][0]) + array_data[original_data_index].pop(0) + data[data_index + 1:data_index + 1] = array_data[original_data_index] + data_index = data_index + len(array_data[original_data_index]) + 1 + original_data_index = original_data_index + 1 + + @classmethod + def _remove_processed_array_columns(cls, + unprocessed_array_columns: Set[str], + array_column_hierarchy: dict) -> None: + """ + Remove keys representing array columns that have already been processed + :param unprocessed_array_columns: list of unprocessed array columns + :param array_column_hierarchy: graph representing array columns + """ + array_columns = list(array_column_hierarchy.keys()) + for array_column in array_columns: + if array_column in unprocessed_array_columns: + continue + else: + del array_column_hierarchy[array_column] + + @classmethod + def expand_data(cls, + columns: List[dict], + data: List[dict]) -> Tuple[List[dict], List[dict], List[dict]]: + """ + We do not immediately display rows and arrays clearly in the data grid. This + method separates out nested fields and data values to help clearly display + structural columns. + + Example: ColumnA is a row(nested_obj varchar) and ColumnB is an array(int) + Original data set = [ + {'ColumnA': ['a1'], 'ColumnB': [1, 2]}, + {'ColumnA': ['a2'], 'ColumnB': [3, 4]}, + ] + Expanded data set = [ + {'ColumnA': ['a1'], 'ColumnA.nested_obj': 'a1', 'ColumnB': 1}, + {'ColumnA': '', 'ColumnA.nested_obj': '', 'ColumnB': 2}, + {'ColumnA': ['a2'], 'ColumnA.nested_obj': 'a2', 'ColumnB': 3}, + {'ColumnA': '', 'ColumnA.nested_obj': '', 'ColumnB': 4}, + ] + :param columns: columns selected in the query + :param data: original data set + :return: list of all columns(selected columns and their nested fields), + expanded data set, listed of nested fields + """ + all_columns: List[dict] = [] + # Get the list of all columns (selected fields and their nested fields) + for column in columns: + if column['type'].startswith('ARRAY') or column['type'].startswith('ROW'): + full_data_type = '{} {}'.format(column['name'], column['type'].lower()) + cls._parse_structural_column(full_data_type, all_columns) + else: + all_columns.append(column) + + # Build graphs where the root node is a row or array and its children are that + # column's nested fields + row_column_hierarchy,\ + array_column_hierarchy,\ + expanded_columns = cls._create_row_and_array_hierarchy(columns) + + # Pull out a row's nested fields and their values into separate columns + ordered_row_columns = row_column_hierarchy.keys() + for datum in data: + for row_column in ordered_row_columns: + cls._expand_row_data(datum, row_column, row_column_hierarchy) + + while array_column_hierarchy: + array_columns = list(array_column_hierarchy.keys()) + # Determine what columns are ready to be processed. + array_columns_to_process,\ + unprocessed_array_columns = cls._split_array_columns_by_process_state( + array_columns, array_column_hierarchy, data[0]) + all_array_data = cls._process_array_data(data, + all_columns, + array_column_hierarchy) + # Consolidate the original data set and the expanded array data + cls._consolidate_array_data_into_data(data, all_array_data) + # Remove processed array columns from the graph + cls._remove_processed_array_columns(unprocessed_array_columns, + array_column_hierarchy) + + return all_columns, data, expanded_columns + @classmethod def extra_table_metadata(cls, database, table_name, schema_name): indexes = database.get_indexes(table_name, schema_name) diff --git a/superset/sql_lab.py b/superset/sql_lab.py index 6a5ee8ebc53..86e171ba44d 100644 --- a/superset/sql_lab.py +++ b/superset/sql_lab.py @@ -279,10 +279,17 @@ def execute_sql_statements( latest_partition=False) query.end_time = now_as_float() + selected_columns = cdf.columns or [] + data = cdf.data or [] + all_columns, data, expanded_columns = db_engine_spec.expand_data( + selected_columns, data) + payload.update({ 'status': query.status, - 'data': cdf.data if cdf.data else [], - 'columns': cdf.columns if cdf.columns else [], + 'data': data, + 'columns': all_columns, + 'selected_columns': selected_columns, + 'expanded_columns': expanded_columns, 'query': query.to_dict(), }) diff --git a/tests/db_engine_specs_test.py b/tests/db_engine_specs_test.py index 43f89c14353..02dbbae8bcf 100644 --- a/tests/db_engine_specs_test.py +++ b/tests/db_engine_specs_test.py @@ -399,13 +399,286 @@ class DbEngineSpecsTestCase(SupersetTestCase): self.assertEqual(actual_result.element.name, expected_result['name']) self.assertEqual(actual_result.name, expected_result['label']) - def test_presto_filter_presto_cols(self): + def test_presto_filter_out_array_nested_cols(self): cols = [ {'name': 'column', 'type': 'ARRAY'}, {'name': 'column.nested_obj', 'type': 'FLOAT'}] - actual_results = PrestoEngineSpec._filter_presto_cols(cols) - expected_results = [cols[0]] - self.assertEqual(actual_results, expected_results) + actual_filtered_cols,\ + actual_array_cols = PrestoEngineSpec._filter_out_array_nested_cols(cols) + expected_filtered_cols = [{'name': 'column', 'type': 'ARRAY'}] + self.assertEqual(actual_filtered_cols, expected_filtered_cols) + self.assertEqual(actual_array_cols, cols) + + def test_presto_create_row_and_array_hierarchy(self): + cols = [ + {'name': 'row_column', + 'type': 'ROW(NESTED_OBJ1 VARCHAR, NESTED_ROW ROW(NESTED_OBJ2 VARCHAR)'}, + {'name': 'array_column', + 'type': 'ARRAY(ROW(NESTED_ARRAY ARRAY(ROW(NESTED_OBJ VARCHAR))))'}] + actual_row_col_hierarchy,\ + actual_array_col_hierarchy,\ + actual_expanded_cols = PrestoEngineSpec._create_row_and_array_hierarchy(cols) + expected_row_col_hierarchy = { + 'row_column': { + 'type': 'ROW', + 'children': ['row_column.nested_obj1', 'row_column.nested_row'], + }, + 'row_column.nested_row': { + 'type': 'ROW', + 'children': ['row_column.nested_row.nested_obj2']}, + } + expected_array_col_hierarchy = { + 'array_column': { + 'type': 'ARRAY', + 'children': ['array_column.nested_array'], + }, + 'array_column.nested_array': { + 'type': 'ARRAY', + 'children': ['array_column.nested_array.nested_obj']}, + } + expected_expanded_cols = [ + {'name': 'row_column.nested_obj1', 'type': 'VARCHAR'}, + {'name': 'row_column.nested_row', 'type': 'ROW'}, + {'name': 'row_column.nested_row.nested_obj2', 'type': 'VARCHAR'}, + {'name': 'array_column.nested_array', 'type': 'ARRAY'}, + {'name': 'array_column.nested_array.nested_obj', 'type': 'VARCHAR'}] + self.assertEqual(actual_row_col_hierarchy, expected_row_col_hierarchy) + self.assertEqual(actual_array_col_hierarchy, expected_array_col_hierarchy) + self.assertEqual(actual_expanded_cols, expected_expanded_cols) + + def test_presto_expand_row_data(self): + datum = {'row_col': [1, 'a']} + row_column = 'row_col' + row_col_hierarchy = { + 'row_col': { + 'type': 'ROW', + 'children': ['row_col.nested_int', 'row_col.nested_str'], + }, + } + PrestoEngineSpec._expand_row_data(datum, row_column, row_col_hierarchy) + expected_datum = { + 'row_col': [1, 'a'], 'row_col.nested_int': 1, 'row_col.nested_str': 'a', + } + self.assertEqual(datum, expected_datum) + + def test_split_array_columns_by_process_state(self): + array_cols = ['array_column', 'array_column.nested_array'] + array_col_hierarchy = { + 'array_column': { + 'type': 'ARRAY', + 'children': ['array_column.nested_array'], + }, + 'array_column.nested_array': { + 'type': 'ARRAY', + 'children': ['array_column.nested_array.nested_obj']}, + } + datum = {'array_column': [[[1], [2]]]} + actual_array_cols_to_process, actual_unprocessed_array_cols = \ + PrestoEngineSpec._split_array_columns_by_process_state( + array_cols, array_col_hierarchy, datum) + expected_array_cols_to_process = ['array_column'] + expected_unprocessed_array_cols = {'array_column.nested_array'} + self.assertEqual(actual_array_cols_to_process, expected_array_cols_to_process) + self.assertEqual(actual_unprocessed_array_cols, expected_unprocessed_array_cols) + + def test_presto_convert_data_list_to_array_data_dict(self): + data = [ + {'array_column': [1, 2], 'int_column': 3}, + {'array_column': [11, 22], 'int_column': 33}, + ] + array_columns_to_process = ['array_column'] + actual_array_data_dict = PrestoEngineSpec._convert_data_list_to_array_data_dict( + data, array_columns_to_process) + expected_array_data_dict = { + 0: [{'array_column': [1, 2]}], + 1: [{'array_column': [11, 22]}]} + self.assertEqual(actual_array_data_dict, expected_array_data_dict) + + def test_presto_process_array_data(self): + data = [ + {'array_column': [[1], [2]], 'int_column': 3}, + {'array_column': [[11], [22]], 'int_column': 33}, + ] + all_columns = [ + {'name': 'array_column', 'type': 'ARRAY'}, + {'name': 'array_column.nested_row', 'type': 'BIGINT'}, + {'name': 'int_column', 'type': 'BIGINT'}, + ] + array_column_hierarchy = { + 'array_column': { + 'type': 'ARRAY', + 'children': ['array_column.nested_row'], + }, + } + actual_array_data = PrestoEngineSpec._process_array_data( + data, all_columns, array_column_hierarchy) + expected_array_data = { + 0: [ + {'array_column': [[1], [2]], 'array_column.nested_row': 1}, + {'array_column': '', 'array_column.nested_row': 2, 'int_column': ''}, + ], + 1: [ + {'array_column': [[11], [22]], 'array_column.nested_row': 11}, + {'array_column': '', 'array_column.nested_row': 22, 'int_column': ''}, + ], + } + self.assertEqual(actual_array_data, expected_array_data) + + def test_presto_consolidate_array_data_into_data(self): + data = [ + {'arr_col': [[1], [2]], 'int_col': 3}, + {'arr_col': [[11], [22]], 'int_col': 33}, + ] + array_data = { + 0: [ + {'arr_col': [[1], [2]], 'arr_col.nested_row': 1}, + {'arr_col': '', 'arr_col.nested_row': 2, 'int_col': ''}, + ], + 1: [ + {'arr_col': [[11], [22]], 'arr_col.nested_row': 11}, + {'arr_col': '', 'arr_col.nested_row': 22, 'int_col': ''}, + ], + } + PrestoEngineSpec._consolidate_array_data_into_data(data, array_data) + expected_data = [ + {'arr_col': [[1], [2]], 'arr_col.nested_row': 1, 'int_col': 3}, + {'arr_col': '', 'arr_col.nested_row': 2, 'int_col': ''}, + {'arr_col': [[11], [22]], 'arr_col.nested_row': 11, 'int_col': 33}, + {'arr_col': '', 'arr_col.nested_row': 22, 'int_col': ''}, + ] + self.assertEqual(data, expected_data) + + def test_presto_remove_processed_array_columns(self): + array_col_hierarchy = { + 'array_column': { + 'type': 'ARRAY', + 'children': ['array_column.nested_array'], + }, + 'array_column.nested_array': { + 'type': 'ARRAY', + 'children': ['array_column.nested_array.nested_obj']}, + } + unprocessed_array_cols = {'array_column.nested_array'} + PrestoEngineSpec._remove_processed_array_columns( + unprocessed_array_cols, array_col_hierarchy) + expected_array_col_hierarchy = { + 'array_column.nested_array': { + 'type': 'ARRAY', + 'children': ['array_column.nested_array.nested_obj']}, + } + self.assertEqual(array_col_hierarchy, expected_array_col_hierarchy) + + def test_presto_expand_data_with_simple_structural_columns(self): + cols = [ + {'name': 'row_column', 'type': 'ROW(NESTED_OBJ VARCHAR)'}, + {'name': 'array_column', 'type': 'ARRAY(BIGINT)'}] + data = [ + {'row_column': ['a'], 'array_column': [1, 2, 3]}, + {'row_column': ['b'], 'array_column': [4, 5, 6]}] + actual_cols, actual_data, actual_expanded_cols = PrestoEngineSpec.expand_data( + cols, data) + expected_cols = [ + {'name': 'row_column', 'type': 'ROW'}, + {'name': 'row_column.nested_obj', 'type': 'VARCHAR'}, + {'name': 'array_column', 'type': 'ARRAY'}] + expected_data = [ + {'row_column': ['a'], 'row_column.nested_obj': 'a', 'array_column': 1}, + {'row_column': '', 'row_column.nested_obj': '', 'array_column': 2}, + {'row_column': '', 'row_column.nested_obj': '', 'array_column': 3}, + {'row_column': ['b'], 'row_column.nested_obj': 'b', 'array_column': 4}, + {'row_column': '', 'row_column.nested_obj': '', 'array_column': 5}, + {'row_column': '', 'row_column.nested_obj': '', 'array_column': 6}] + expected_expanded_cols = [ + {'name': 'row_column.nested_obj', 'type': 'VARCHAR'}] + self.assertEqual(actual_cols, expected_cols) + self.assertEqual(actual_data, expected_data) + self.assertEqual(actual_expanded_cols, expected_expanded_cols) + + def test_presto_expand_data_with_complex_row_columns(self): + cols = [ + {'name': 'row_column', + 'type': 'ROW(NESTED_OBJ1 VARCHAR, NESTED_ROW ROW(NESTED_OBJ2 VARCHAR)'}] + data = [ + {'row_column': ['a1', ['a2']]}, + {'row_column': ['b1', ['b2']]}] + actual_cols, actual_data, actual_expanded_cols = PrestoEngineSpec.expand_data( + cols, data) + expected_cols = [ + {'name': 'row_column', 'type': 'ROW'}, + {'name': 'row_column.nested_obj1', 'type': 'VARCHAR'}, + {'name': 'row_column.nested_row', 'type': 'ROW'}, + {'name': 'row_column.nested_row.nested_obj2', 'type': 'VARCHAR'}] + expected_data = [ + {'row_column': ['a1', ['a2']], + 'row_column.nested_obj1': 'a1', + 'row_column.nested_row': ['a2'], + 'row_column.nested_row.nested_obj2': 'a2'}, + {'row_column': ['b1', ['b2']], + 'row_column.nested_obj1': 'b1', + 'row_column.nested_row': ['b2'], + 'row_column.nested_row.nested_obj2': 'b2'}] + expected_expanded_cols = [ + {'name': 'row_column.nested_obj1', 'type': 'VARCHAR'}, + {'name': 'row_column.nested_row', 'type': 'ROW'}, + {'name': 'row_column.nested_row.nested_obj2', 'type': 'VARCHAR'}] + self.assertEqual(actual_cols, expected_cols) + self.assertEqual(actual_data, expected_data) + self.assertEqual(actual_expanded_cols, expected_expanded_cols) + + def test_presto_expand_data_with_complex_array_columns(self): + cols = [ + {'name': 'int_column', 'type': 'BIGINT'}, + {'name': 'array_column', + 'type': 'ARRAY(ROW(NESTED_ARRAY ARRAY(ROW(NESTED_OBJ VARCHAR))))'}] + data = [ + {'int_column': 1, 'array_column': [[[['a'], ['b']]], [[['c'], ['d']]]]}, + {'int_column': 2, 'array_column': [[[['e'], ['f']]], [[['g'], ['h']]]]}] + actual_cols, actual_data, actual_expanded_cols = PrestoEngineSpec.expand_data( + cols, data) + expected_cols = [ + {'name': 'int_column', 'type': 'BIGINT'}, + {'name': 'array_column', 'type': 'ARRAY'}, + {'name': 'array_column.nested_array', 'type': 'ARRAY'}, + {'name': 'array_column.nested_array.nested_obj', 'type': 'VARCHAR'}] + expected_data = [ + {'int_column': 1, + 'array_column': [[[['a'], ['b']]], [[['c'], ['d']]]], + 'array_column.nested_array': [['a'], ['b']], + 'array_column.nested_array.nested_obj': 'a'}, + {'int_column': '', + 'array_column': '', + 'array_column.nested_array': '', + 'array_column.nested_array.nested_obj': 'b'}, + {'int_column': '', + 'array_column': '', + 'array_column.nested_array': [['c'], ['d']], + 'array_column.nested_array.nested_obj': 'c'}, + {'int_column': '', + 'array_column': '', + 'array_column.nested_array': '', + 'array_column.nested_array.nested_obj': 'd'}, + {'int_column': 2, + 'array_column': [[[['e'], ['f']]], [[['g'], ['h']]]], + 'array_column.nested_array': [['e'], ['f']], + 'array_column.nested_array.nested_obj': 'e'}, + {'int_column': '', + 'array_column': '', + 'array_column.nested_array': '', + 'array_column.nested_array.nested_obj': 'f'}, + {'int_column': '', + 'array_column': '', + 'array_column.nested_array': [['g'], ['h']], + 'array_column.nested_array.nested_obj': 'g'}, + {'int_column': '', + 'array_column': '', + 'array_column.nested_array': '', + 'array_column.nested_array.nested_obj': 'h'}] + expected_expanded_cols = [ + {'name': 'array_column.nested_array', 'type': 'ARRAY'}, + {'name': 'array_column.nested_array.nested_obj', 'type': 'VARCHAR'}] + self.assertEqual(actual_cols, expected_cols) + self.assertEqual(actual_data, expected_data) + self.assertEqual(actual_expanded_cols, expected_expanded_cols) def test_hive_get_view_names_return_empty_list(self): self.assertEquals([], HiveEngineSpec.get_view_names(mock.ANY, mock.ANY))