fix: adhoc metrics (#30202)

This commit is contained in:
Beto Dealmeida
2024-10-10 16:46:17 -04:00
committed by GitHub
parent ef0ede7c13
commit 0db59b45b8
7 changed files with 80 additions and 45 deletions

View File

@@ -1286,46 +1286,66 @@ def test_sqlparse_issue_652():
@pytest.mark.parametrize(
"sql,expected",
("engine", "sql", "expected"),
[
("SELECT * FROM table", True),
("SELECT a FROM (SELECT 1 AS a) JOIN (SELECT * FROM table)", True),
("(SELECT COUNT(DISTINCT name) AS foo FROM birth_names)", True),
("COUNT(*)", False),
("SELECT a FROM (SELECT 1 AS a)", False),
("SELECT a FROM (SELECT 1 AS a) JOIN table", True),
("SELECT * FROM (SELECT 1 AS foo, 2 AS bar) ORDER BY foo ASC, bar", False),
("SELECT * FROM other_table", True),
("extract(HOUR from from_unixtime(hour_ts)", False),
("(SELECT * FROM table)", True),
("(SELECT COUNT(DISTINCT name) from birth_names)", True),
("postgresql", "extract(HOUR from from_unixtime(hour_ts))", False),
("postgresql", "SELECT * FROM table", True),
("postgresql", "(SELECT * FROM table)", True),
(
"postgresql",
"SELECT a FROM (SELECT 1 AS a) JOIN (SELECT * FROM table)",
True,
),
(
"postgresql",
"(SELECT COUNT(DISTINCT name) AS foo FROM birth_names)",
True,
),
("postgresql", "COUNT(*)", False),
("postgresql", "SELECT a FROM (SELECT 1 AS a)", False),
("postgresql", "SELECT a FROM (SELECT 1 AS a) JOIN table", True),
(
"postgresql",
"SELECT * FROM (SELECT 1 AS foo, 2 AS bar) ORDER BY foo ASC, bar",
False,
),
("postgresql", "SELECT * FROM other_table", True),
("postgresql", "(SELECT COUNT(DISTINCT name) from birth_names)", True),
(
"postgresql",
"(SELECT table_name FROM information_schema.tables WHERE table_name LIKE '%user%' LIMIT 1)",
True,
),
(
"postgresql",
"(SELECT table_name FROM /**/ information_schema.tables WHERE table_name LIKE '%user%' LIMIT 1)",
True,
),
(
"postgresql",
"SELECT FROM (SELECT FROM forbidden_table) AS forbidden_table;",
True,
),
(
"postgresql",
"SELECT * FROM (SELECT * FROM forbidden_table) forbidden_table",
True,
),
(
"postgresql",
"((select users.id from (select 'majorie' as a) b, users where b.a = users.name and users.name in ('majorie') limit 1) like 'U%')",
True,
),
],
)
def test_has_table_query(sql: str, expected: bool) -> None:
def test_has_table_query(engine: str, sql: str, expected: bool) -> None:
"""
Test if a given statement queries a table.
This is used to prevent ad-hoc metrics from querying unauthorized tables, bypassing
row-level security.
"""
statement = sqlparse.parse(sql)[0]
assert has_table_query(statement) == expected
assert has_table_query(sql, engine) == expected
@pytest.mark.parametrize(