feat(db_engine_specs): big query cost estimation (#21325)

Co-authored-by: zamar roura <zamar.roura@cabify.es>
Co-authored-by: Zamar Roura <zamarfazal@gmail.com>
This commit is contained in:
Zamar
2023-01-09 07:56:19 +01:00
committed by GitHub
parent 9cfbc22cd2
commit 001100ddf0
22 changed files with 128 additions and 45 deletions

View File

@@ -31,6 +31,7 @@ from sqlalchemy.engine.base import Engine
from sqlalchemy.sql import sqltypes
from typing_extensions import TypedDict
from superset import sql_parse
from superset.constants import PASSWORD_MASK
from superset.databases.schemas import encrypted_field_properties, EncryptedString
from superset.databases.utils import make_url_safe
@@ -364,6 +365,97 @@ class BigQueryEngineSpec(BaseEngineSpec):
pandas_gbq.to_gbq(df, **to_gbq_kwargs)
@classmethod
def estimate_query_cost(
cls,
database: "Database",
schema: str,
sql: str,
source: Optional[utils.QuerySource] = None,
) -> List[Dict[str, Any]]:
"""
Estimate the cost of a multiple statement SQL query.
:param database: Database instance
:param schema: Database schema
:param sql: SQL query with possibly multiple statements
:param source: Source of the query (eg, "sql_lab")
"""
extra = database.get_extra() or {}
if not cls.get_allow_cost_estimate(extra):
raise Exception("Database does not support cost estimation")
parsed_query = sql_parse.ParsedQuery(sql)
statements = parsed_query.get_statements()
costs = []
for statement in statements:
processed_statement = cls.process_statement(statement, database)
costs.append(cls.estimate_statement_cost(processed_statement, database))
return costs
@classmethod
def get_allow_cost_estimate(cls, extra: Dict[str, Any]) -> bool:
return True
@classmethod
def estimate_statement_cost(cls, statement: str, cursor: Any) -> Dict[str, Any]:
try:
# pylint: disable=import-outside-toplevel
# It's the only way to perfom a dry-run estimate cost
from google.cloud import bigquery
from google.oauth2 import service_account
except ImportError as ex:
raise Exception(
"Could not import libraries `pygibquery` or `google.oauth2`, which are "
"required to be installed in your environment in order "
"to upload data to BigQuery"
) from ex
with cls.get_engine(cursor) as engine:
creds = engine.dialect.credentials_info
creds = service_account.Credentials.from_service_account_info(creds)
client = bigquery.Client(credentials=creds)
job_config = bigquery.QueryJobConfig(dry_run=True)
query_job = client.query(
statement,
job_config=job_config,
) # Make an API request.
# Format Bytes.
# TODO: Humanize in case more db engine specs need to be added,
# this should be made a function outside this scope.
byte_division = 1024
if hasattr(query_job, "total_bytes_processed"):
query_bytes_processed = query_job.total_bytes_processed
if query_bytes_processed // byte_division == 0:
byte_type = "B"
total_bytes_processed = query_bytes_processed
elif query_bytes_processed // (byte_division**2) == 0:
byte_type = "KB"
total_bytes_processed = round(query_bytes_processed / byte_division, 2)
elif query_bytes_processed // (byte_division**3) == 0:
byte_type = "MB"
total_bytes_processed = round(
query_bytes_processed / (byte_division**2), 2
)
else:
byte_type = "GB"
total_bytes_processed = round(
query_bytes_processed / (byte_division**3), 2
)
return {f"{byte_type} Processed": total_bytes_processed}
return {}
@classmethod
def query_cost_formatter(
cls, raw_cost: List[Dict[str, Any]]
) -> List[Dict[str, str]]:
return [{k: str(v) for k, v in row.items()} for row in raw_cost]
@classmethod
def build_sqlalchemy_uri(
cls,