mirror of
https://github.com/apache/superset.git
synced 2026-04-07 10:31:50 +00:00
829 lines
28 KiB
Python
829 lines
28 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""
|
|
Fix missing schema references in the OpenAPI spec.
|
|
|
|
This script patches the openapi.json file to add any missing schemas
|
|
that are referenced but not defined.
|
|
"""
|
|
|
|
import json # noqa: TID251 - standalone docs script
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
def add_missing_schemas(spec: dict[str, Any]) -> tuple[dict[str, Any], list[str]]:
|
|
"""Add missing schema definitions to the OpenAPI spec."""
|
|
schemas = spec.get("components", {}).get("schemas", {})
|
|
fixed = []
|
|
|
|
# DashboardScreenshotPostSchema - based on superset/dashboards/schemas.py
|
|
if "DashboardScreenshotPostSchema" not in schemas:
|
|
schemas["DashboardScreenshotPostSchema"] = {
|
|
"type": "object",
|
|
"properties": {
|
|
"dataMask": {
|
|
"type": "object",
|
|
"description": "An object representing the data mask.",
|
|
"additionalProperties": True,
|
|
},
|
|
"activeTabs": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "A list representing active tabs.",
|
|
},
|
|
"anchor": {
|
|
"type": "string",
|
|
"description": "A string representing the anchor.",
|
|
},
|
|
"urlParams": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"minItems": 2,
|
|
"maxItems": 2,
|
|
},
|
|
"description": "A list of tuples, each containing two strings.",
|
|
},
|
|
},
|
|
}
|
|
fixed.append("DashboardScreenshotPostSchema")
|
|
|
|
# DashboardNativeFiltersConfigUpdateSchema - based on superset/dashboards/schemas.py
|
|
if "DashboardNativeFiltersConfigUpdateSchema" not in schemas:
|
|
schemas["DashboardNativeFiltersConfigUpdateSchema"] = {
|
|
"type": "object",
|
|
"properties": {
|
|
"deleted": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "List of deleted filter IDs.",
|
|
},
|
|
"modified": {
|
|
"type": "array",
|
|
"items": {"type": "object"},
|
|
"description": "List of modified filter configurations.",
|
|
},
|
|
"reordered": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "List of filter IDs in new order.",
|
|
},
|
|
},
|
|
}
|
|
fixed.append("DashboardNativeFiltersConfigUpdateSchema")
|
|
|
|
# DashboardColorsConfigUpdateSchema - based on superset/dashboards/schemas.py
|
|
if "DashboardColorsConfigUpdateSchema" not in schemas:
|
|
schemas["DashboardColorsConfigUpdateSchema"] = {
|
|
"type": "object",
|
|
"properties": {
|
|
"color_namespace": {
|
|
"type": "string",
|
|
"nullable": True,
|
|
"description": "The color namespace.",
|
|
},
|
|
"color_scheme": {
|
|
"type": "string",
|
|
"nullable": True,
|
|
"description": "The color scheme name.",
|
|
},
|
|
"map_label_colors": {
|
|
"type": "object",
|
|
"additionalProperties": {"type": "string"},
|
|
"description": "Mapping of labels to colors.",
|
|
},
|
|
"shared_label_colors": {
|
|
"type": "object",
|
|
"additionalProperties": {"type": "string"},
|
|
"description": "Shared label colors across charts.",
|
|
},
|
|
"label_colors": {
|
|
"type": "object",
|
|
"additionalProperties": {"type": "string"},
|
|
"description": "Label to color mapping.",
|
|
},
|
|
"color_scheme_domain": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "Color scheme domain values.",
|
|
},
|
|
},
|
|
}
|
|
fixed.append("DashboardColorsConfigUpdateSchema")
|
|
|
|
# FormatQueryPayloadSchema - based on superset/sqllab/schemas.py
|
|
if "FormatQueryPayloadSchema" not in schemas:
|
|
schemas["FormatQueryPayloadSchema"] = {
|
|
"type": "object",
|
|
"required": ["sql"],
|
|
"properties": {
|
|
"sql": {
|
|
"type": "string",
|
|
"description": "The SQL query to format.",
|
|
},
|
|
"engine": {
|
|
"type": "string",
|
|
"nullable": True,
|
|
"description": "The database engine.",
|
|
},
|
|
"database_id": {
|
|
"type": "integer",
|
|
"nullable": True,
|
|
"description": "The database id.",
|
|
},
|
|
"template_params": {
|
|
"type": "string",
|
|
"nullable": True,
|
|
"description": "The SQL query template params as JSON string.",
|
|
},
|
|
},
|
|
}
|
|
fixed.append("FormatQueryPayloadSchema")
|
|
|
|
# get_slack_channels_schema - based on superset/reports/schemas.py
|
|
if "get_slack_channels_schema" not in schemas:
|
|
schemas["get_slack_channels_schema"] = {
|
|
"type": "object",
|
|
"properties": {
|
|
"search_string": {
|
|
"type": "string",
|
|
"description": "String to search for in channel names.",
|
|
},
|
|
"types": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "string",
|
|
"enum": ["public_channel", "private_channel"],
|
|
},
|
|
"description": "Types of channels to search.",
|
|
},
|
|
"exact_match": {
|
|
"type": "boolean",
|
|
"description": "Whether to match channel names exactly.",
|
|
},
|
|
},
|
|
}
|
|
fixed.append("get_slack_channels_schema")
|
|
|
|
if "components" not in spec:
|
|
spec["components"] = {}
|
|
spec["components"]["schemas"] = schemas
|
|
|
|
return spec, fixed
|
|
|
|
|
|
def path_to_operation_id(path: str, method: str) -> str:
|
|
"""Convert a path and method to an operationId."""
|
|
# Remove /api/v1/ prefix
|
|
clean_path = path.replace("/api/v1/", "").strip("/")
|
|
|
|
# Replace path parameters
|
|
clean_path = clean_path.replace("{", "by_").replace("}", "")
|
|
|
|
# Create operation name
|
|
method_prefix = {
|
|
"get": "get",
|
|
"post": "create",
|
|
"put": "update",
|
|
"delete": "delete",
|
|
"patch": "patch",
|
|
}.get(method.lower(), method.lower())
|
|
|
|
return f"{method_prefix}_{clean_path}".replace("/", "_").replace("-", "_")
|
|
|
|
|
|
def path_to_summary(path: str, method: str) -> str:
|
|
"""Generate a human-readable summary from path and method."""
|
|
# Remove /api/v1/ prefix
|
|
clean_path = path.replace("/api/v1/", "").strip("/")
|
|
|
|
# Handle path parameters
|
|
parts = []
|
|
for part in clean_path.split("/"):
|
|
if part.startswith("{") and part.endswith("}"):
|
|
param = part[1:-1]
|
|
parts.append(f"by {param}")
|
|
else:
|
|
parts.append(part.replace("_", " ").replace("-", " "))
|
|
|
|
resource = " ".join(parts)
|
|
|
|
method_verb = {
|
|
"get": "Get",
|
|
"post": "Create",
|
|
"put": "Update",
|
|
"delete": "Delete",
|
|
"patch": "Update",
|
|
}.get(method.lower(), method.capitalize())
|
|
|
|
return f"{method_verb} {resource}"
|
|
|
|
|
|
def add_missing_operation_ids(spec: dict[str, Any]) -> int:
|
|
"""Add operationId and summary to operations that are missing them."""
|
|
fixed_count = 0
|
|
|
|
for path, methods in spec.get("paths", {}).items():
|
|
for method, details in methods.items():
|
|
if method not in ["get", "post", "put", "delete", "patch"]:
|
|
continue
|
|
|
|
if not isinstance(details, dict):
|
|
continue
|
|
|
|
summary = details.get("summary")
|
|
operation_id = details.get("operationId")
|
|
|
|
if not summary and not operation_id:
|
|
details["operationId"] = path_to_operation_id(path, method)
|
|
details["summary"] = path_to_summary(path, method)
|
|
fixed_count += 1
|
|
|
|
return fixed_count
|
|
|
|
|
|
TAG_DESCRIPTIONS = {
|
|
"Advanced Data Type": "Advanced data type operations and conversions.",
|
|
"Annotation Layers": "Manage annotation layers and annotations for charts.",
|
|
"AsyncEventsRestApi": "Real-time event streaming via Server-Sent Events (SSE).",
|
|
"Available Domains": "Get available domains for the Superset instance.",
|
|
"CSS Templates": "Manage CSS templates for custom dashboard styling.",
|
|
"CacheRestApi": "Cache management and invalidation operations.",
|
|
"Charts": "Create, read, update, and delete charts (slices).",
|
|
"Current User": "Get information about the authenticated user.",
|
|
"Dashboard Filter State": "Manage temporary filter state for dashboards.",
|
|
"Dashboard Permanent Link": "Permanent links to dashboard states.",
|
|
"Dashboards": "Create, read, update, and delete dashboards.",
|
|
"Database": "Manage database connections and metadata.",
|
|
"Datasets": "Manage datasets (tables) used for building charts.",
|
|
"Datasources": "Query datasource metadata and column values.",
|
|
"Embedded Dashboard": "Configure embedded dashboard settings.",
|
|
"Explore": "Chart exploration and data querying endpoints.",
|
|
"Explore Form Data": "Manage temporary form data for chart exploration.",
|
|
"Explore Permanent Link": "Permanent links to chart explore states.",
|
|
"Import/export": "Import and export Superset assets.",
|
|
"LogRestApi": "Access audit logs and activity history.",
|
|
"Menu": "Get the Superset menu structure.",
|
|
"OpenApi": "Access the OpenAPI specification.",
|
|
"Queries": "View and manage SQL Lab query history.",
|
|
"Report Schedules": "Configure scheduled reports and alerts.",
|
|
"Row Level Security": "Manage row-level security rules for data access.",
|
|
"SQL Lab": "Execute SQL queries and manage SQL Lab sessions.",
|
|
"SQL Lab Permanent Link": "Permanent links to SQL Lab states.",
|
|
"Security": "Authentication and token management.",
|
|
"Security Permissions": "View available permissions.",
|
|
"Security Permissions on Resources (View Menus)": "Permission-resource mappings.",
|
|
"Security Resources (View Menus)": "Manage security resources (view menus).",
|
|
"Security Roles": "Manage security roles and their permissions.",
|
|
"Security Users": "Manage user accounts.",
|
|
"Tags": "Organize assets with tags.",
|
|
"User": "User profile and preferences.",
|
|
}
|
|
|
|
|
|
def generate_code_sample(
|
|
method: str, path: str, has_body: bool = False
|
|
) -> list[dict[str, str]]:
|
|
"""Generate code samples for an endpoint in multiple languages."""
|
|
# Clean up path for display
|
|
example_path = path.replace("{pk}", "1").replace("{id_or_slug}", "1")
|
|
|
|
samples = []
|
|
|
|
# cURL sample
|
|
curl_cmd = f'curl -X {method.upper()} "http://localhost:8088{example_path}"'
|
|
curl_cmd += ' \\\n -H "Authorization: Bearer $ACCESS_TOKEN"'
|
|
if has_body:
|
|
curl_cmd += ' \\\n -H "Content-Type: application/json"'
|
|
curl_cmd += ' \\\n -d \'{"key": "value"}\''
|
|
|
|
samples.append(
|
|
{
|
|
"lang": "cURL",
|
|
"label": "cURL",
|
|
"source": curl_cmd,
|
|
}
|
|
)
|
|
|
|
# Python sample
|
|
if method.lower() == "get":
|
|
python_code = f"""import requests
|
|
|
|
response = requests.get(
|
|
"http://localhost:8088{example_path}",
|
|
headers={{"Authorization": "Bearer " + access_token}}
|
|
)
|
|
print(response.json())"""
|
|
elif method.lower() == "post":
|
|
python_code = f"""import requests
|
|
|
|
response = requests.post(
|
|
"http://localhost:8088{example_path}",
|
|
headers={{"Authorization": "Bearer " + access_token}},
|
|
json={{"key": "value"}}
|
|
)
|
|
print(response.json())"""
|
|
elif method.lower() == "put":
|
|
python_code = f"""import requests
|
|
|
|
response = requests.put(
|
|
"http://localhost:8088{example_path}",
|
|
headers={{"Authorization": "Bearer " + access_token}},
|
|
json={{"key": "value"}}
|
|
)
|
|
print(response.json())"""
|
|
elif method.lower() == "delete":
|
|
python_code = f"""import requests
|
|
|
|
response = requests.delete(
|
|
"http://localhost:8088{example_path}",
|
|
headers={{"Authorization": "Bearer " + access_token}}
|
|
)
|
|
print(response.status_code)"""
|
|
else:
|
|
python_code = f"""import requests
|
|
|
|
response = requests.{method.lower()}(
|
|
"http://localhost:8088{example_path}",
|
|
headers={{"Authorization": "Bearer " + access_token}}
|
|
)
|
|
print(response.json())"""
|
|
|
|
samples.append(
|
|
{
|
|
"lang": "Python",
|
|
"label": "Python",
|
|
"source": python_code,
|
|
}
|
|
)
|
|
|
|
# JavaScript sample
|
|
if method.lower() == "get":
|
|
js_code = f"""const response = await fetch(
|
|
"http://localhost:8088{example_path}",
|
|
{{
|
|
headers: {{
|
|
"Authorization": `Bearer ${{accessToken}}`
|
|
}}
|
|
}}
|
|
);
|
|
const data = await response.json();
|
|
console.log(data);"""
|
|
elif method.lower() in ["post", "put", "patch"]:
|
|
js_code = f"""const response = await fetch(
|
|
"http://localhost:8088{example_path}",
|
|
{{
|
|
method: "{method.upper()}",
|
|
headers: {{
|
|
"Authorization": `Bearer ${{accessToken}}`,
|
|
"Content-Type": "application/json"
|
|
}},
|
|
body: JSON.stringify({{ key: "value" }})
|
|
}}
|
|
);
|
|
const data = await response.json();
|
|
console.log(data);"""
|
|
else:
|
|
js_code = f"""const response = await fetch(
|
|
"http://localhost:8088{example_path}",
|
|
{{
|
|
method: "{method.upper()}",
|
|
headers: {{
|
|
"Authorization": `Bearer ${{accessToken}}`
|
|
}}
|
|
}}
|
|
);
|
|
console.log(response.status);"""
|
|
|
|
samples.append(
|
|
{
|
|
"lang": "JavaScript",
|
|
"label": "JavaScript",
|
|
"source": js_code,
|
|
}
|
|
)
|
|
|
|
return samples
|
|
|
|
|
|
def add_code_samples(spec: dict[str, Any]) -> int:
|
|
"""Add code samples to all endpoints."""
|
|
count = 0
|
|
|
|
for path, methods in spec.get("paths", {}).items():
|
|
for method, details in methods.items():
|
|
if method not in ["get", "post", "put", "delete", "patch"]:
|
|
continue
|
|
if not isinstance(details, dict):
|
|
continue
|
|
|
|
# Skip if already has code samples
|
|
if "x-codeSamples" in details:
|
|
continue
|
|
|
|
# Check if endpoint has a request body
|
|
has_body = "requestBody" in details
|
|
|
|
details["x-codeSamples"] = generate_code_sample(method, path, has_body)
|
|
count += 1
|
|
|
|
return count
|
|
|
|
|
|
def configure_servers(spec: dict[str, Any]) -> bool:
|
|
"""Configure server URLs with variables for flexible API testing."""
|
|
new_servers = [
|
|
{
|
|
"url": "http://localhost:8088",
|
|
"description": "Local development server",
|
|
},
|
|
{
|
|
"url": "{protocol}://{host}:{port}",
|
|
"description": "Custom server",
|
|
"variables": {
|
|
"protocol": {
|
|
"default": "http",
|
|
"enum": ["http", "https"],
|
|
"description": "HTTP protocol",
|
|
},
|
|
"host": {
|
|
"default": "localhost",
|
|
"description": "Server hostname or IP",
|
|
},
|
|
"port": {
|
|
"default": "8088",
|
|
"description": "Server port",
|
|
},
|
|
},
|
|
},
|
|
]
|
|
|
|
# Check if already configured
|
|
existing = spec.get("servers", [])
|
|
if len(existing) >= 2 and any("variables" in s for s in existing):
|
|
return False
|
|
|
|
spec["servers"] = new_servers
|
|
return True
|
|
|
|
|
|
def add_tag_definitions(spec: dict[str, Any]) -> int:
|
|
"""Add tag definitions with descriptions to the OpenAPI spec."""
|
|
# Collect all unique tags used in operations
|
|
used_tags: set[str] = set()
|
|
for _path, methods in spec.get("paths", {}).items():
|
|
for method, details in methods.items():
|
|
if method not in ["get", "post", "put", "delete", "patch"]:
|
|
continue
|
|
if not isinstance(details, dict):
|
|
continue
|
|
tags = details.get("tags", [])
|
|
used_tags.update(tags)
|
|
|
|
# Create tag definitions
|
|
tag_definitions = []
|
|
for tag in sorted(used_tags):
|
|
tag_def = {"name": tag}
|
|
if tag in TAG_DESCRIPTIONS:
|
|
tag_def["description"] = TAG_DESCRIPTIONS[tag]
|
|
else:
|
|
# Generate a generic description
|
|
tag_def["description"] = f"Endpoints related to {tag}."
|
|
tag_definitions.append(tag_def)
|
|
|
|
# Only update if we have new tags
|
|
existing_tags = {t.get("name") for t in spec.get("tags", [])}
|
|
new_tags = [t for t in tag_definitions if t["name"] not in existing_tags]
|
|
|
|
if new_tags or not spec.get("tags"):
|
|
spec["tags"] = tag_definitions
|
|
return len(tag_definitions)
|
|
|
|
return 0
|
|
|
|
|
|
def generate_example_from_schema( # noqa: C901
|
|
schema: dict[str, Any],
|
|
spec: dict[str, Any],
|
|
depth: int = 0,
|
|
max_depth: int = 5,
|
|
) -> dict[str, Any] | list[Any] | str | int | float | bool | None:
|
|
"""Generate an example value from an OpenAPI schema definition."""
|
|
if depth > max_depth:
|
|
return None
|
|
|
|
# Handle $ref
|
|
if "$ref" in schema:
|
|
ref_path = schema["$ref"]
|
|
if ref_path.startswith("#/components/schemas/"):
|
|
schema_name = ref_path.split("/")[-1]
|
|
ref_schema = (
|
|
spec.get("components", {}).get("schemas", {}).get(schema_name, {})
|
|
)
|
|
return generate_example_from_schema(ref_schema, spec, depth + 1, max_depth)
|
|
return None
|
|
|
|
# If schema already has an example, use it
|
|
if "example" in schema:
|
|
return schema["example"]
|
|
|
|
schema_type = schema.get("type", "object")
|
|
|
|
if schema_type == "object":
|
|
properties = schema.get("properties", {})
|
|
if not properties:
|
|
# Check for additionalProperties
|
|
if schema.get("additionalProperties"):
|
|
return {"key": "value"}
|
|
return {}
|
|
|
|
result = {}
|
|
for prop_name, prop_schema in properties.items():
|
|
# Limit object depth and skip large nested objects
|
|
if depth < max_depth:
|
|
example_val = generate_example_from_schema(
|
|
prop_schema, spec, depth + 1, max_depth
|
|
)
|
|
if example_val is not None:
|
|
result[prop_name] = example_val
|
|
return result
|
|
|
|
elif schema_type == "array":
|
|
items_schema = schema.get("items", {})
|
|
if items_schema:
|
|
item_example = generate_example_from_schema(
|
|
items_schema, spec, depth + 1, max_depth
|
|
)
|
|
if item_example is not None:
|
|
return [item_example]
|
|
return []
|
|
|
|
elif schema_type == "string":
|
|
# Check for enum
|
|
if "enum" in schema:
|
|
return schema["enum"][0]
|
|
# Check for format
|
|
fmt = schema.get("format", "")
|
|
if fmt == "date-time":
|
|
return "2024-01-15T10:30:00Z"
|
|
elif fmt == "date":
|
|
return "2024-01-15"
|
|
elif fmt == "email":
|
|
return "user@example.com"
|
|
elif fmt == "uri" or fmt == "url":
|
|
return "https://example.com"
|
|
elif fmt == "uuid":
|
|
return "550e8400-e29b-41d4-a716-446655440000"
|
|
# Use description hints or prop name
|
|
return "string"
|
|
|
|
elif schema_type == "integer":
|
|
if "minimum" in schema:
|
|
return schema["minimum"]
|
|
return 1
|
|
|
|
elif schema_type == "number":
|
|
if "minimum" in schema:
|
|
return schema["minimum"]
|
|
return 1.0
|
|
|
|
elif schema_type == "boolean":
|
|
return True
|
|
|
|
elif schema_type == "null":
|
|
return None
|
|
|
|
# Handle oneOf, anyOf
|
|
if "oneOf" in schema and schema["oneOf"]:
|
|
return generate_example_from_schema(
|
|
schema["oneOf"][0], spec, depth + 1, max_depth
|
|
)
|
|
if "anyOf" in schema and schema["anyOf"]:
|
|
return generate_example_from_schema(
|
|
schema["anyOf"][0], spec, depth + 1, max_depth
|
|
)
|
|
|
|
return None
|
|
|
|
|
|
def add_response_examples(spec: dict[str, Any]) -> int: # noqa: C901
|
|
"""Add example values to API responses for better documentation."""
|
|
count = 0
|
|
|
|
# First, add examples to standard error responses in components
|
|
standard_errors = {
|
|
"400": {"message": "Bad request: Invalid parameters provided"},
|
|
"401": {"message": "Unauthorized: Authentication required"},
|
|
"403": {
|
|
"message": "Forbidden: You don't have permission to access this resource"
|
|
},
|
|
"404": {"message": "Not found: The requested resource does not exist"},
|
|
"422": {"message": "Unprocessable entity: Validation error"},
|
|
"500": {"message": "Internal server error: An unexpected error occurred"},
|
|
}
|
|
|
|
responses = spec.get("components", {}).get("responses", {})
|
|
for code, example_value in standard_errors.items():
|
|
if code in responses:
|
|
response = responses[code]
|
|
content = response.get("content", {}).get("application/json", {})
|
|
if content and "example" not in content:
|
|
content["example"] = example_value
|
|
count += 1
|
|
|
|
# Now add examples to inline response schemas in operations
|
|
for _path, methods in spec.get("paths", {}).items():
|
|
for method, details in methods.items():
|
|
if method not in ["get", "post", "put", "delete", "patch"]:
|
|
continue
|
|
if not isinstance(details, dict):
|
|
continue
|
|
|
|
responses_dict = details.get("responses", {})
|
|
for _status_code, response in responses_dict.items():
|
|
# Skip $ref responses (already handled above)
|
|
if "$ref" in response:
|
|
continue
|
|
|
|
content = response.get("content", {}).get("application/json", {})
|
|
if not content:
|
|
continue
|
|
|
|
# Skip if already has an example
|
|
if "example" in content:
|
|
continue
|
|
|
|
schema = content.get("schema", {})
|
|
if schema:
|
|
example = generate_example_from_schema(
|
|
schema, spec, depth=0, max_depth=3
|
|
)
|
|
if example is not None and example != {}:
|
|
content["example"] = example
|
|
count += 1
|
|
|
|
return count
|
|
|
|
|
|
def add_request_body_examples(spec: dict[str, Any]) -> int:
|
|
"""Add example values to API request bodies for better documentation."""
|
|
count = 0
|
|
|
|
for _path, methods in spec.get("paths", {}).items():
|
|
for method, details in methods.items():
|
|
if method not in ["post", "put", "patch"]:
|
|
continue
|
|
if not isinstance(details, dict):
|
|
continue
|
|
|
|
request_body = details.get("requestBody", {})
|
|
if not request_body or "$ref" in request_body:
|
|
continue
|
|
|
|
content = request_body.get("content", {}).get("application/json", {})
|
|
if not content:
|
|
continue
|
|
|
|
# Skip if already has an example
|
|
if "example" in content:
|
|
continue
|
|
|
|
schema = content.get("schema", {})
|
|
if schema:
|
|
example = generate_example_from_schema(
|
|
schema, spec, depth=0, max_depth=4
|
|
)
|
|
if example is not None and example != {}:
|
|
content["example"] = example
|
|
count += 1
|
|
|
|
return count
|
|
|
|
|
|
def make_summaries_unique(spec: dict[str, Any]) -> int: # noqa: C901
|
|
"""Make duplicate summaries unique by adding context from the path."""
|
|
summary_info: dict[str, list[tuple[str, str]]] = {}
|
|
fixed_count = 0
|
|
|
|
# First pass: collect all summaries and their paths (regardless of method)
|
|
for path, methods in spec.get("paths", {}).items():
|
|
for method, details in methods.items():
|
|
if method not in ["get", "post", "put", "delete", "patch"]:
|
|
continue
|
|
if not isinstance(details, dict):
|
|
continue
|
|
summary = details.get("summary")
|
|
if summary:
|
|
if summary not in summary_info:
|
|
summary_info[summary] = []
|
|
summary_info[summary].append((path, method))
|
|
|
|
# Second pass: make duplicate summaries unique
|
|
for path, methods in spec.get("paths", {}).items():
|
|
for method, details in methods.items():
|
|
if method not in ["get", "post", "put", "delete", "patch"]:
|
|
continue
|
|
if not isinstance(details, dict):
|
|
continue
|
|
summary = details.get("summary")
|
|
if summary and len(summary_info.get(summary, [])) > 1:
|
|
# Create a unique suffix from the full path
|
|
# e.g., /api/v1/chart/{pk}/cache_screenshot/ -> "chart-cache-screenshot"
|
|
clean_path = path.replace("/api/v1/", "").strip("/")
|
|
# Remove parameter placeholders and convert to slug
|
|
clean_path = clean_path.replace("{", "").replace("}", "")
|
|
path_slug = clean_path.replace("/", "-").replace("_", "-")
|
|
|
|
# Check if this suffix is already in the summary
|
|
if path_slug not in summary.lower():
|
|
new_summary = f"{summary} ({path_slug})"
|
|
details["summary"] = new_summary
|
|
fixed_count += 1
|
|
|
|
return fixed_count
|
|
|
|
|
|
def main() -> None: # noqa: C901
|
|
"""Main function to fix the OpenAPI spec."""
|
|
script_dir = Path(__file__).parent
|
|
spec_path = script_dir.parent / "static" / "resources" / "openapi.json"
|
|
|
|
if not spec_path.exists():
|
|
print(f"Error: OpenAPI spec not found at {spec_path}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
print(f"Reading OpenAPI spec from {spec_path}")
|
|
|
|
with open(spec_path, encoding="utf-8") as f:
|
|
spec = json.load(f)
|
|
|
|
spec, fixed_schemas = add_missing_schemas(spec)
|
|
fixed_ops = add_missing_operation_ids(spec)
|
|
fixed_tags = add_tag_definitions(spec)
|
|
fixed_servers = configure_servers(spec)
|
|
|
|
changes_made = False
|
|
|
|
if fixed_servers:
|
|
print("Configured server URLs with variables for flexible API testing")
|
|
changes_made = True
|
|
|
|
if fixed_samples := add_code_samples(spec):
|
|
print(f"Added code samples to {fixed_samples} endpoints")
|
|
changes_made = True
|
|
|
|
if fixed_examples := add_response_examples(spec):
|
|
print(f"Added example JSON responses to {fixed_examples} response schemas")
|
|
changes_made = True
|
|
|
|
if fixed_request_examples := add_request_body_examples(spec):
|
|
print(f"Added example JSON to {fixed_request_examples} request bodies")
|
|
changes_made = True
|
|
|
|
if fixed_schemas:
|
|
print(f"Added missing schemas: {', '.join(fixed_schemas)}")
|
|
changes_made = True
|
|
|
|
if fixed_ops:
|
|
print(f"Added operationId/summary to {fixed_ops} operations")
|
|
changes_made = True
|
|
|
|
if fixed_tags:
|
|
print(f"Added {fixed_tags} tag definitions with descriptions")
|
|
changes_made = True
|
|
|
|
if fixed_summaries := make_summaries_unique(spec):
|
|
print(f"Made {fixed_summaries} duplicate summaries unique")
|
|
changes_made = True
|
|
|
|
if changes_made:
|
|
with open(spec_path, "w", encoding="utf-8") as f:
|
|
json.dump(spec, f, indent=2)
|
|
f.write("\n") # Ensure trailing newline for pre-commit
|
|
|
|
print(f"Updated {spec_path}")
|
|
else:
|
|
print("No fixes needed")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|