refactor(example_data): replace the way the birth_names data is loaded to DB (#18060)

* refactor: replace the way the birth_names data is loaded to DB

* fix failed unit test

* fix failed unit test

* fix failed tests

* fix pass wrong flag of support datetime type

* remove unused fixture
This commit is contained in:
ofekisr
2022-01-18 23:21:04 +02:00
committed by GitHub
parent 88db2cc0ab
commit 4675ca31c5
29 changed files with 781 additions and 137 deletions

View File

@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

View File

@@ -0,0 +1,33 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from tests.common.example_data.data_loading.data_definitions.types import Table
class DataLoader(ABC):
@abstractmethod
def load_table(self, table: Table) -> None:
...
@abstractmethod
def remove_table(self, table_name: str) -> None:
...

View File

@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

View File

@@ -0,0 +1,64 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from sqlalchemy import DateTime, Integer, String
from tests.consts.birth_names import (
DS,
GENDER,
NAME,
NUM,
NUM_BOYS,
NUM_GIRLS,
STATE,
TABLE_NAME,
)
from tests.example_data.data_loading.data_definitions.types import (
TableMetaData,
TableMetaDataFactory,
)
BIRTH_NAMES_COLUMNS = {
DS: DateTime,
GENDER: String(16),
NAME: String(255),
NUM: Integer,
STATE: String(10),
NUM_BOYS: Integer,
NUM_GIRLS: Integer,
}
BIRTH_NAMES_COLUMNS_WITHOUT_DATETIME = {
DS: String(255),
GENDER: String(16),
NAME: String(255),
NUM: Integer,
STATE: String(10),
NUM_BOYS: Integer,
NUM_GIRLS: Integer,
}
class BirthNamesMetaDataFactory(TableMetaDataFactory):
_datetime_type_support: bool
def __init__(self, datetime_type_support: bool = True):
self._datetime_type_support = datetime_type_support
def make(self) -> TableMetaData:
if self._datetime_type_support:
return TableMetaData(TABLE_NAME, BIRTH_NAMES_COLUMNS.copy())
return TableMetaData(TABLE_NAME, BIRTH_NAMES_COLUMNS_WITHOUT_DATETIME.copy())

View File

@@ -0,0 +1,53 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, Iterable, Optional
from sqlalchemy.types import TypeEngine
@dataclass
class TableMetaData:
table_name: str
types: Optional[Dict[str, TypeEngine]]
@dataclass
class Table:
table_name: str
table_metadata: TableMetaData
data: Iterable[Dict[Any, Any]]
class TableMetaDataFactory(ABC):
@abstractmethod
def make(self) -> TableMetaData:
...
def make_table(self, data: Iterable[Dict[Any, Any]]) -> Table:
metadata = self.make()
return Table(metadata.table_name, metadata, data)

View File

@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

View File

@@ -0,0 +1,89 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Dict, Optional, TYPE_CHECKING
from pandas import DataFrame
from sqlalchemy.inspection import inspect
from tests.example_data.data_loading.base_data_loader import DataLoader
if TYPE_CHECKING:
from sqlalchemy.engine import Engine
from tests.example_data.data_loading.data_definitions.types import Table
from tests.example_data.data_loading.pandas.pands_data_loading_conf import (
PandasLoaderConfigurations,
)
class PandasDataLoader(DataLoader):
_db_engine: Engine
_configurations: PandasLoaderConfigurations
_table_to_df_convertor: TableToDfConvertor
def __init__(
self,
db_engine: Engine,
config: PandasLoaderConfigurations,
table_to_df_convertor: TableToDfConvertor,
) -> None:
self._db_engine = db_engine
self._configurations = config
self._table_to_df_convertor = table_to_df_convertor
def load_table(self, table: Table) -> None:
df = self._table_to_df_convertor.convert(table)
df.to_sql(
table.table_name,
self._db_engine,
if_exists=self._configurations.if_exists,
chunksize=self._configurations.chunksize,
index=self._configurations.index,
dtype=self._take_data_types(table),
method=self._configurations.method,
schema=self._detect_schema_name(),
)
def _detect_schema_name(self) -> Optional[str]:
return inspect(self._db_engine).default_schema_name
def _take_data_types(self, table: Table) -> Optional[Dict[str, str]]:
if metadata_table := table.table_metadata:
if types := metadata_table.types:
return types
return None
def remove_table(self, table_name: str) -> None:
self._db_engine.execute(f"DROP TABLE IF EXISTS {table_name}")
class TableToDfConvertor(ABC):
@abstractmethod
def convert(self, table: Table) -> DataFrame:
...

View File

@@ -0,0 +1,64 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from typing import Any, Dict
default_pandas_data_loader_config = {
"if_exists": "replace",
"chunksize": 500,
"index": False,
"method": "multi",
"strftime": "%Y-%m-%d %H:%M:%S",
"support_datetime_type": False,
}
class PandasLoaderConfigurations:
if_exists: str
chunksize: int
index: bool
method: str
strftime: str
support_datetime_type: bool
def __init__(
self,
*,
if_exists: str,
chunksize: int,
index: bool,
method: str,
strftime: str,
support_datetime_type: bool,
):
self.if_exists = if_exists
self.chunksize = chunksize
self.index = index
self.method = method
self.strftime = strftime
self.support_datetime_type = support_datetime_type
@classmethod
def make_from_dict(cls, _dict: Dict[str, Any]) -> PandasLoaderConfigurations:
copy_dict = default_pandas_data_loader_config.copy()
copy_dict.update(_dict)
return PandasLoaderConfigurations(**copy_dict) # type: ignore
@classmethod
def make_default(cls) -> PandasLoaderConfigurations:
return cls.make_from_dict({})

View File

@@ -0,0 +1,46 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from typing import Optional, TYPE_CHECKING
from pandas import DataFrame
from tests.example_data.data_loading.pandas.pandas_data_loader import TableToDfConvertor
if TYPE_CHECKING:
from tests.example_data.data_loading.data_definitions.types import Table
class TableToDfConvertorImpl(TableToDfConvertor):
convert_datetime_to_str: bool
_time_format: Optional[str]
def __init__(
self, convert_ds_to_datetime: bool, time_format: Optional[str] = None
) -> None:
self.convert_datetime_to_str = convert_ds_to_datetime
self._time_format = time_format
def convert(self, table: Table) -> DataFrame:
df_rv = DataFrame(table.data)
if self._should_convert_datetime_to_str():
df_rv.ds = df_rv.ds.dt.strftime(self._time_format)
return df_rv
def _should_convert_datetime_to_str(self) -> bool:
return self.convert_datetime_to_str and self._time_format is not None