# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """ Fix missing schema references in the OpenAPI spec. This script patches the openapi.json file to add any missing schemas that are referenced but not defined. """ import json # noqa: TID251 - standalone docs script import sys from pathlib import Path from typing import Any def add_missing_schemas(spec: dict[str, Any]) -> tuple[dict[str, Any], list[str]]: """Add missing schema definitions to the OpenAPI spec.""" schemas = spec.get("components", {}).get("schemas", {}) fixed = [] # DashboardScreenshotPostSchema - based on superset/dashboards/schemas.py if "DashboardScreenshotPostSchema" not in schemas: schemas["DashboardScreenshotPostSchema"] = { "type": "object", "properties": { "dataMask": { "type": "object", "description": "An object representing the data mask.", "additionalProperties": True, }, "activeTabs": { "type": "array", "items": {"type": "string"}, "description": "A list representing active tabs.", }, "anchor": { "type": "string", "description": "A string representing the anchor.", }, "urlParams": { "type": "array", "items": { "type": "array", "items": {"type": "string"}, "minItems": 2, "maxItems": 2, }, "description": "A list of tuples, each containing two strings.", }, }, } fixed.append("DashboardScreenshotPostSchema") # DashboardNativeFiltersConfigUpdateSchema - based on superset/dashboards/schemas.py if "DashboardNativeFiltersConfigUpdateSchema" not in schemas: schemas["DashboardNativeFiltersConfigUpdateSchema"] = { "type": "object", "properties": { "deleted": { "type": "array", "items": {"type": "string"}, "description": "List of deleted filter IDs.", }, "modified": { "type": "array", "items": {"type": "object"}, "description": "List of modified filter configurations.", }, "reordered": { "type": "array", "items": {"type": "string"}, "description": "List of filter IDs in new order.", }, }, } fixed.append("DashboardNativeFiltersConfigUpdateSchema") # DashboardColorsConfigUpdateSchema - based on superset/dashboards/schemas.py if "DashboardColorsConfigUpdateSchema" not in schemas: schemas["DashboardColorsConfigUpdateSchema"] = { "type": "object", "properties": { "color_namespace": { "type": "string", "nullable": True, "description": "The color namespace.", }, "color_scheme": { "type": "string", "nullable": True, "description": "The color scheme name.", }, "map_label_colors": { "type": "object", "additionalProperties": {"type": "string"}, "description": "Mapping of labels to colors.", }, "shared_label_colors": { "type": "object", "additionalProperties": {"type": "string"}, "description": "Shared label colors across charts.", }, "label_colors": { "type": "object", "additionalProperties": {"type": "string"}, "description": "Label to color mapping.", }, "color_scheme_domain": { "type": "array", "items": {"type": "string"}, "description": "Color scheme domain values.", }, }, } fixed.append("DashboardColorsConfigUpdateSchema") # FormatQueryPayloadSchema - based on superset/sqllab/schemas.py if "FormatQueryPayloadSchema" not in schemas: schemas["FormatQueryPayloadSchema"] = { "type": "object", "required": ["sql"], "properties": { "sql": { "type": "string", "description": "The SQL query to format.", }, "engine": { "type": "string", "nullable": True, "description": "The database engine.", }, "database_id": { "type": "integer", "nullable": True, "description": "The database id.", }, "template_params": { "type": "string", "nullable": True, "description": "The SQL query template params as JSON string.", }, }, } fixed.append("FormatQueryPayloadSchema") # get_slack_channels_schema - based on superset/reports/schemas.py if "get_slack_channels_schema" not in schemas: schemas["get_slack_channels_schema"] = { "type": "object", "properties": { "search_string": { "type": "string", "description": "String to search for in channel names.", }, "types": { "type": "array", "items": { "type": "string", "enum": ["public_channel", "private_channel"], }, "description": "Types of channels to search.", }, "exact_match": { "type": "boolean", "description": "Whether to match channel names exactly.", }, }, } fixed.append("get_slack_channels_schema") if "components" not in spec: spec["components"] = {} spec["components"]["schemas"] = schemas return spec, fixed def path_to_operation_id(path: str, method: str) -> str: """Convert a path and method to an operationId.""" # Remove /api/v1/ prefix clean_path = path.replace("/api/v1/", "").strip("/") # Replace path parameters clean_path = clean_path.replace("{", "by_").replace("}", "") # Create operation name method_prefix = { "get": "get", "post": "create", "put": "update", "delete": "delete", "patch": "patch", }.get(method.lower(), method.lower()) return f"{method_prefix}_{clean_path}".replace("/", "_").replace("-", "_") def path_to_summary(path: str, method: str) -> str: """Generate a human-readable summary from path and method.""" # Remove /api/v1/ prefix clean_path = path.replace("/api/v1/", "").strip("/") # Handle path parameters parts = [] for part in clean_path.split("/"): if part.startswith("{") and part.endswith("}"): param = part[1:-1] parts.append(f"by {param}") else: parts.append(part.replace("_", " ").replace("-", " ")) resource = " ".join(parts) method_verb = { "get": "Get", "post": "Create", "put": "Update", "delete": "Delete", "patch": "Update", }.get(method.lower(), method.capitalize()) return f"{method_verb} {resource}" def add_missing_operation_ids(spec: dict[str, Any]) -> int: """Add operationId and summary to operations that are missing them.""" fixed_count = 0 for path, methods in spec.get("paths", {}).items(): for method, details in methods.items(): if method not in ["get", "post", "put", "delete", "patch"]: continue if not isinstance(details, dict): continue summary = details.get("summary") operation_id = details.get("operationId") if not summary and not operation_id: details["operationId"] = path_to_operation_id(path, method) details["summary"] = path_to_summary(path, method) fixed_count += 1 return fixed_count TAG_DESCRIPTIONS = { "Advanced Data Type": "Advanced data type operations and conversions.", "Annotation Layers": "Manage annotation layers and annotations for charts.", "AsyncEventsRestApi": "Real-time event streaming via Server-Sent Events (SSE).", "Available Domains": "Get available domains for the Superset instance.", "CSS Templates": "Manage CSS templates for custom dashboard styling.", "CacheRestApi": "Cache management and invalidation operations.", "Charts": "Create, read, update, and delete charts (slices).", "Current User": "Get information about the authenticated user.", "Dashboard Filter State": "Manage temporary filter state for dashboards.", "Dashboard Permanent Link": "Permanent links to dashboard states.", "Dashboards": "Create, read, update, and delete dashboards.", "Database": "Manage database connections and metadata.", "Datasets": "Manage datasets (tables) used for building charts.", "Datasources": "Query datasource metadata and column values.", "Embedded Dashboard": "Configure embedded dashboard settings.", "Explore": "Chart exploration and data querying endpoints.", "Explore Form Data": "Manage temporary form data for chart exploration.", "Explore Permanent Link": "Permanent links to chart explore states.", "Import/export": "Import and export Superset assets.", "LogRestApi": "Access audit logs and activity history.", "Menu": "Get the Superset menu structure.", "OpenApi": "Access the OpenAPI specification.", "Queries": "View and manage SQL Lab query history.", "Report Schedules": "Configure scheduled reports and alerts.", "Row Level Security": "Manage row-level security rules for data access.", "SQL Lab": "Execute SQL queries and manage SQL Lab sessions.", "SQL Lab Permanent Link": "Permanent links to SQL Lab states.", "Security": "Authentication and token management.", "Security Permissions": "View available permissions.", "Security Permissions on Resources (View Menus)": "Permission-resource mappings.", "Security Resources (View Menus)": "Manage security resources (view menus).", "Security Roles": "Manage security roles and their permissions.", "Security Users": "Manage user accounts.", "Tags": "Organize assets with tags.", "User": "User profile and preferences.", } def generate_code_sample( method: str, path: str, has_body: bool = False ) -> list[dict[str, str]]: """Generate code samples for an endpoint in multiple languages.""" # Clean up path for display example_path = path.replace("{pk}", "1").replace("{id_or_slug}", "1") samples = [] # cURL sample curl_cmd = f'curl -X {method.upper()} "http://localhost:8088{example_path}"' curl_cmd += ' \\\n -H "Authorization: Bearer $ACCESS_TOKEN"' if has_body: curl_cmd += ' \\\n -H "Content-Type: application/json"' curl_cmd += ' \\\n -d \'{"key": "value"}\'' samples.append( { "lang": "cURL", "label": "cURL", "source": curl_cmd, } ) # Python sample if method.lower() == "get": python_code = f"""import requests response = requests.get( "http://localhost:8088{example_path}", headers={{"Authorization": "Bearer " + access_token}} ) print(response.json())""" elif method.lower() == "post": python_code = f"""import requests response = requests.post( "http://localhost:8088{example_path}", headers={{"Authorization": "Bearer " + access_token}}, json={{"key": "value"}} ) print(response.json())""" elif method.lower() == "put": python_code = f"""import requests response = requests.put( "http://localhost:8088{example_path}", headers={{"Authorization": "Bearer " + access_token}}, json={{"key": "value"}} ) print(response.json())""" elif method.lower() == "delete": python_code = f"""import requests response = requests.delete( "http://localhost:8088{example_path}", headers={{"Authorization": "Bearer " + access_token}} ) print(response.status_code)""" else: python_code = f"""import requests response = requests.{method.lower()}( "http://localhost:8088{example_path}", headers={{"Authorization": "Bearer " + access_token}} ) print(response.json())""" samples.append( { "lang": "Python", "label": "Python", "source": python_code, } ) # JavaScript sample if method.lower() == "get": js_code = f"""const response = await fetch( "http://localhost:8088{example_path}", {{ headers: {{ "Authorization": `Bearer ${{accessToken}}` }} }} ); const data = await response.json(); console.log(data);""" elif method.lower() in ["post", "put", "patch"]: js_code = f"""const response = await fetch( "http://localhost:8088{example_path}", {{ method: "{method.upper()}", headers: {{ "Authorization": `Bearer ${{accessToken}}`, "Content-Type": "application/json" }}, body: JSON.stringify({{ key: "value" }}) }} ); const data = await response.json(); console.log(data);""" else: js_code = f"""const response = await fetch( "http://localhost:8088{example_path}", {{ method: "{method.upper()}", headers: {{ "Authorization": `Bearer ${{accessToken}}` }} }} ); console.log(response.status);""" samples.append( { "lang": "JavaScript", "label": "JavaScript", "source": js_code, } ) return samples def add_code_samples(spec: dict[str, Any]) -> int: """Add code samples to all endpoints.""" count = 0 for path, methods in spec.get("paths", {}).items(): for method, details in methods.items(): if method not in ["get", "post", "put", "delete", "patch"]: continue if not isinstance(details, dict): continue # Skip if already has code samples if "x-codeSamples" in details: continue # Check if endpoint has a request body has_body = "requestBody" in details details["x-codeSamples"] = generate_code_sample(method, path, has_body) count += 1 return count def configure_servers(spec: dict[str, Any]) -> bool: """Configure server URLs with variables for flexible API testing.""" new_servers = [ { "url": "http://localhost:8088", "description": "Local development server", }, { "url": "{protocol}://{host}:{port}", "description": "Custom server", "variables": { "protocol": { "default": "http", "enum": ["http", "https"], "description": "HTTP protocol", }, "host": { "default": "localhost", "description": "Server hostname or IP", }, "port": { "default": "8088", "description": "Server port", }, }, }, ] # Check if already configured existing = spec.get("servers", []) if len(existing) >= 2 and any("variables" in s for s in existing): return False spec["servers"] = new_servers return True def add_tag_definitions(spec: dict[str, Any]) -> int: """Add tag definitions with descriptions to the OpenAPI spec.""" # Collect all unique tags used in operations used_tags: set[str] = set() for _path, methods in spec.get("paths", {}).items(): for method, details in methods.items(): if method not in ["get", "post", "put", "delete", "patch"]: continue if not isinstance(details, dict): continue tags = details.get("tags", []) used_tags.update(tags) # Create tag definitions tag_definitions = [] for tag in sorted(used_tags): tag_def = {"name": tag} if tag in TAG_DESCRIPTIONS: tag_def["description"] = TAG_DESCRIPTIONS[tag] else: # Generate a generic description tag_def["description"] = f"Endpoints related to {tag}." tag_definitions.append(tag_def) # Only update if we have new tags existing_tags = {t.get("name") for t in spec.get("tags", [])} new_tags = [t for t in tag_definitions if t["name"] not in existing_tags] if new_tags or not spec.get("tags"): spec["tags"] = tag_definitions return len(tag_definitions) return 0 def generate_example_from_schema( # noqa: C901 schema: dict[str, Any], spec: dict[str, Any], depth: int = 0, max_depth: int = 5, ) -> dict[str, Any] | list[Any] | str | int | float | bool | None: """Generate an example value from an OpenAPI schema definition.""" if depth > max_depth: return None # Handle $ref if "$ref" in schema: ref_path = schema["$ref"] if ref_path.startswith("#/components/schemas/"): schema_name = ref_path.split("/")[-1] ref_schema = ( spec.get("components", {}).get("schemas", {}).get(schema_name, {}) ) return generate_example_from_schema(ref_schema, spec, depth + 1, max_depth) return None # If schema already has an example, use it if "example" in schema: return schema["example"] schema_type = schema.get("type", "object") if schema_type == "object": properties = schema.get("properties", {}) if not properties: # Check for additionalProperties if schema.get("additionalProperties"): return {"key": "value"} return {} result = {} for prop_name, prop_schema in properties.items(): # Limit object depth and skip large nested objects if depth < max_depth: example_val = generate_example_from_schema( prop_schema, spec, depth + 1, max_depth ) if example_val is not None: result[prop_name] = example_val return result elif schema_type == "array": items_schema = schema.get("items", {}) if items_schema: item_example = generate_example_from_schema( items_schema, spec, depth + 1, max_depth ) if item_example is not None: return [item_example] return [] elif schema_type == "string": # Check for enum if "enum" in schema: return schema["enum"][0] # Check for format fmt = schema.get("format", "") if fmt == "date-time": return "2024-01-15T10:30:00Z" elif fmt == "date": return "2024-01-15" elif fmt == "email": return "user@example.com" elif fmt == "uri" or fmt == "url": return "https://example.com" elif fmt == "uuid": return "550e8400-e29b-41d4-a716-446655440000" # Use description hints or prop name return "string" elif schema_type == "integer": if "minimum" in schema: return schema["minimum"] return 1 elif schema_type == "number": if "minimum" in schema: return schema["minimum"] return 1.0 elif schema_type == "boolean": return True elif schema_type == "null": return None # Handle oneOf, anyOf if "oneOf" in schema and schema["oneOf"]: return generate_example_from_schema( schema["oneOf"][0], spec, depth + 1, max_depth ) if "anyOf" in schema and schema["anyOf"]: return generate_example_from_schema( schema["anyOf"][0], spec, depth + 1, max_depth ) return None def add_response_examples(spec: dict[str, Any]) -> int: # noqa: C901 """Add example values to API responses for better documentation.""" count = 0 # First, add examples to standard error responses in components standard_errors = { "400": {"message": "Bad request: Invalid parameters provided"}, "401": {"message": "Unauthorized: Authentication required"}, "403": { "message": "Forbidden: You don't have permission to access this resource" }, "404": {"message": "Not found: The requested resource does not exist"}, "422": {"message": "Unprocessable entity: Validation error"}, "500": {"message": "Internal server error: An unexpected error occurred"}, } responses = spec.get("components", {}).get("responses", {}) for code, example_value in standard_errors.items(): if code in responses: response = responses[code] content = response.get("content", {}).get("application/json", {}) if content and "example" not in content: content["example"] = example_value count += 1 # Now add examples to inline response schemas in operations for _path, methods in spec.get("paths", {}).items(): for method, details in methods.items(): if method not in ["get", "post", "put", "delete", "patch"]: continue if not isinstance(details, dict): continue responses_dict = details.get("responses", {}) for _status_code, response in responses_dict.items(): # Skip $ref responses (already handled above) if "$ref" in response: continue content = response.get("content", {}).get("application/json", {}) if not content: continue # Skip if already has an example if "example" in content: continue schema = content.get("schema", {}) if schema: example = generate_example_from_schema( schema, spec, depth=0, max_depth=3 ) if example is not None and example != {}: content["example"] = example count += 1 return count def add_request_body_examples(spec: dict[str, Any]) -> int: """Add example values to API request bodies for better documentation.""" count = 0 for _path, methods in spec.get("paths", {}).items(): for method, details in methods.items(): if method not in ["post", "put", "patch"]: continue if not isinstance(details, dict): continue request_body = details.get("requestBody", {}) if not request_body or "$ref" in request_body: continue content = request_body.get("content", {}).get("application/json", {}) if not content: continue # Skip if already has an example if "example" in content: continue schema = content.get("schema", {}) if schema: example = generate_example_from_schema( schema, spec, depth=0, max_depth=4 ) if example is not None and example != {}: content["example"] = example count += 1 return count def make_summaries_unique(spec: dict[str, Any]) -> int: # noqa: C901 """Make duplicate summaries unique by adding context from the path.""" summary_info: dict[str, list[tuple[str, str]]] = {} fixed_count = 0 # First pass: collect all summaries and their paths (regardless of method) for path, methods in spec.get("paths", {}).items(): for method, details in methods.items(): if method not in ["get", "post", "put", "delete", "patch"]: continue if not isinstance(details, dict): continue summary = details.get("summary") if summary: if summary not in summary_info: summary_info[summary] = [] summary_info[summary].append((path, method)) # Second pass: make duplicate summaries unique for path, methods in spec.get("paths", {}).items(): for method, details in methods.items(): if method not in ["get", "post", "put", "delete", "patch"]: continue if not isinstance(details, dict): continue summary = details.get("summary") if summary and len(summary_info.get(summary, [])) > 1: # Create a unique suffix from the full path # e.g., /api/v1/chart/{pk}/cache_screenshot/ -> "chart-cache-screenshot" clean_path = path.replace("/api/v1/", "").strip("/") # Remove parameter placeholders and convert to slug clean_path = clean_path.replace("{", "").replace("}", "") path_slug = clean_path.replace("/", "-").replace("_", "-") # Check if this suffix is already in the summary if path_slug not in summary.lower(): new_summary = f"{summary} ({path_slug})" details["summary"] = new_summary fixed_count += 1 return fixed_count def main() -> None: # noqa: C901 """Main function to fix the OpenAPI spec.""" script_dir = Path(__file__).parent spec_path = script_dir.parent / "static" / "resources" / "openapi.json" if not spec_path.exists(): print(f"Error: OpenAPI spec not found at {spec_path}", file=sys.stderr) sys.exit(1) print(f"Reading OpenAPI spec from {spec_path}") with open(spec_path, encoding="utf-8") as f: spec = json.load(f) spec, fixed_schemas = add_missing_schemas(spec) fixed_ops = add_missing_operation_ids(spec) fixed_tags = add_tag_definitions(spec) fixed_servers = configure_servers(spec) changes_made = False if fixed_servers: print("Configured server URLs with variables for flexible API testing") changes_made = True if fixed_samples := add_code_samples(spec): print(f"Added code samples to {fixed_samples} endpoints") changes_made = True if fixed_examples := add_response_examples(spec): print(f"Added example JSON responses to {fixed_examples} response schemas") changes_made = True if fixed_request_examples := add_request_body_examples(spec): print(f"Added example JSON to {fixed_request_examples} request bodies") changes_made = True if fixed_schemas: print(f"Added missing schemas: {', '.join(fixed_schemas)}") changes_made = True if fixed_ops: print(f"Added operationId/summary to {fixed_ops} operations") changes_made = True if fixed_tags: print(f"Added {fixed_tags} tag definitions with descriptions") changes_made = True if fixed_summaries := make_summaries_unique(spec): print(f"Made {fixed_summaries} duplicate summaries unique") changes_made = True if changes_made: with open(spec_path, "w", encoding="utf-8") as f: json.dump(spec, f, indent=2) f.write("\n") # Ensure trailing newline for pre-commit print(f"Updated {spec_path}") else: print("No fixes needed") if __name__ == "__main__": main()