__all__ = [
"code2func",
"funcwrap",
"parse_docstring",
"synthesize_docstring",
"synthesize_def",
"synthesize_signature",
]
from docstring_parser import parse
from typing import Callable, Optional, Dict, Any, List
import inspect
import functools
import re
from .type_utils import (
jsonschema_type,
autotype,
)
from .str_utils import indent
_DEFAULT_SENTINEL = object()
[docs]
def code2func(code: str, func_name: Optional[str] = None, env: Optional[Dict] = None):
"""\
Extract a callable function from a code snippet.
Warning:
This function uses `exec()` to execute the provided code snippet.
Executing arbitrary code is a security risk and can lead to remote code execution.
Only use this function with trusted code sources. Do not use it to process
untrusted user input.
Args:
code (str): The code snippet containing the function definition.
func_name (Optional[str], optional): The name of the function to extract. Defaults to None.
If None, and only one callable is found, that function will be used.
env (Optional[Dict], optional): The environment in which to execute the code. Defaults to None.
Returns:
Callable: The extracted callable function.
Raises:
ValueError: If no callable is found or multiple callables without specifying func_name.
"""
env = globals() | (env or dict())
locals_dict = dict()
exec(code, env, locals_dict)
funcs = {k: v for k, v in locals_dict.items() if callable(v)}
if not funcs:
raise ValueError("No callable found in the provided code.")
if (func_name is None) and (len(funcs) > 1):
raise ValueError("Multiple callables found in the provided code. Please provide a single function or specify the function name via `func_name`.")
if (func_name is None) and (len(funcs) == 1):
func_name = funcs[next(iter(funcs))].__name__
if (func_name not in funcs) or (not callable(funcs[func_name])):
raise ValueError(f"No callable named '{func_name}' found in the provided code.")
return funcs[func_name]
def _coerce_doc_default(value: Optional[str], schema_type: Optional[str]) -> Any:
"""Coerce a default value from docstring to the appropriate type."""
if value is None:
return _DEFAULT_SENTINEL
raw = value.strip()
if not raw:
return _DEFAULT_SENTINEL
# Use autotype for intelligent conversion
try:
converted = autotype(raw)
# For target types, ensure the converted value matches expectations
if schema_type == "integer" and isinstance(converted, (int, float)):
return int(converted)
elif schema_type == "number" and isinstance(converted, (int, float)):
return float(converted)
elif schema_type == "boolean" and isinstance(converted, bool):
return converted
return converted
except Exception:
return raw
def _coerce_signature_default(value: Any) -> Any:
"""Coerce a default value from function signature."""
if value is inspect._empty:
return _DEFAULT_SENTINEL
if isinstance(value, (int, float, bool, str)) or value is None:
return value
return _DEFAULT_SENTINEL
def _clean_text(text: Optional[str]) -> Optional[str]:
if text is None:
return None
stripped = text.strip()
return stripped or None
def _compose_description(parsed) -> Optional[str]:
parts = [_clean_text(parsed.short_description), _clean_text(parsed.long_description)]
parts = [part for part in parts if part]
if not parts:
return None
return "\n\n".join(parts)
def _extract_section_lines(docstring: str, section_name: str) -> List[str]:
lines = docstring.splitlines()
collected: List[str] = []
in_section = False
section_lower = f"{section_name.lower()}:"
for raw_line in lines:
stripped = raw_line.strip()
if not in_section:
if stripped.lower() == section_lower:
in_section = True
continue
if stripped.endswith(":") and not raw_line.startswith((" ", "\t")):
break
collected.append(stripped)
while collected and not collected[0]:
collected.pop(0)
while collected and not collected[-1]:
collected.pop()
return collected
def _summarize_docstring_args(parsed, signature) -> Optional[Dict[str, Any]]:
if not parsed.params:
return None
signature_params = signature.parameters if signature else {}
properties: Dict[str, Dict[str, Any]] = {}
required: List[str] = []
for param in parsed.params:
schema_entry = jsonschema_type(param.type_name)
if "type" not in schema_entry:
schema_entry["type"] = "string"
if param.type_name:
schema_entry.setdefault("x-original-type", param.type_name)
description = _clean_text(param.description)
if description:
schema_entry["description"] = description
doc_default = _coerce_doc_default(param.default, schema_entry.get("type"))
signature_default = _DEFAULT_SENTINEL
signature_has_default = False
if signature_params:
signature_param = signature_params.get(param.arg_name)
if signature_param is not None:
signature_has_default = signature_param.default is not inspect._empty
signature_default = _coerce_signature_default(signature_param.default)
effective_default = doc_default
if effective_default is _DEFAULT_SENTINEL and signature_default is not _DEFAULT_SENTINEL:
effective_default = signature_default
if effective_default is not _DEFAULT_SENTINEL:
schema_entry["default"] = effective_default
properties[param.arg_name] = schema_entry
is_optional = bool(param.is_optional)
if not is_optional and signature_has_default:
is_optional = True
if not is_optional and effective_default is not _DEFAULT_SENTINEL:
is_optional = True
if not is_optional:
required.append(param.arg_name)
args_schema: Dict[str, Any] = {"type": "object", "properties": properties}
if required:
args_schema["required"] = required
return args_schema
def _build_return_entry(type_name: Optional[str], description: Optional[str], is_generator: bool, name: Optional[str]) -> Dict[str, Any]:
schema = jsonschema_type(type_name)
if not schema:
schema = {"type": "string"}
elif "type" not in schema:
schema["type"] = "string"
if type_name:
schema.setdefault("x-original-type", type_name)
entry: Dict[str, Any] = {"schema": schema}
clean_description = _clean_text(description)
if clean_description:
entry["description"] = clean_description
clean_name = _clean_text(name)
if clean_name:
entry["name"] = clean_name
if is_generator:
entry["is_generator"] = True
return entry
def _parse_structured_return_description(description: Optional[str]) -> Optional[Dict[str, Dict[str, Any]]]:
"""\
Parse structured return description to extract field definitions.
Expects format like:
A dictionary containing:
- field_name (type): description
- another_field (type): description
Or:
field_name (type): description
another_field (type): description
Returns:
Dict mapping field names to their schema definitions, or None if not parseable.
"""
if not description:
return None
lines = description.strip().split("\n")
properties: Dict[str, Dict[str, Any]] = {}
for line in lines:
line = line.strip()
if not line or line.endswith(":"):
continue
# Remove leading dash/bullet
if line.startswith(("-", "*", "•")):
line = line[1:].strip()
# Try to match pattern: field_name (type): description
match = re.match(r"^(\w+)\s*\(([^)]+)\)\s*:\s*(.*)$", line)
if match:
field_name, type_str, field_desc = match.groups()
field_schema = jsonschema_type(type_str.strip())
if not field_schema:
field_schema = {"type": "string"}
if field_desc.strip():
field_schema["description"] = field_desc.strip()
properties[field_name] = field_schema
return properties if properties else None
def _build_returns(parsed) -> Optional[Dict[str, Any]]:
"""\
Build an output schema from the parsed return information.
MCP spec requires output schemas to be of type "object". This function:
1. For simple types: wraps in {"result": <type>}
2. For dict/object with structured description: parses fields into properties
3. For named returns: uses the name as the property key
"""
if not parsed.returns:
return None
return_type = parsed.returns.type_name
return_desc = parsed.returns.description
return_name = parsed.returns.return_name
# Normalize the return type
type_schema = jsonschema_type(return_type)
if not type_schema:
type_schema = {"type": "string"}
elif "type" not in type_schema:
type_schema["type"] = "string"
if return_type:
type_schema.setdefault("x-original-type", return_type)
# Try to parse structured fields from description for object/dict/array types
if type_schema.get("type") in ("object", "dict", "array"):
structured_fields = _parse_structured_return_description(return_desc)
if structured_fields:
# Build object schema with parsed properties
output_schema = {"type": "object", "properties": structured_fields}
if return_desc:
output_schema["description"] = _clean_text(return_desc)
return output_schema
# Determine the property name for the return value
property_name = _clean_text(return_name) or "result"
# Add description if available
if return_desc:
type_schema["description"] = _clean_text(return_desc)
# Wrap in object schema as required by MCP spec
output_schema = {"type": "object", "properties": {property_name: type_schema}}
return output_schema
def _split_many_returns(parsed, has_primary: bool) -> Dict[str, List[Dict[str, Any]]]:
entries = getattr(parsed, "many_returns", None)
if not entries:
return {}
yields_meta: List[Dict[str, Any]] = []
additional_returns: List[Dict[str, Any]] = []
for item in entries:
entry = _build_return_entry(
item.type_name,
item.description,
item.is_generator,
item.return_name,
)
if item.is_generator:
yields_meta.append(entry)
else:
additional_returns.append(entry)
result: Dict[str, List[Dict[str, Any]]] = {}
if yields_meta:
result["yields"] = yields_meta
if additional_returns and (not has_primary or len(additional_returns) > 1):
result["returns_list"] = additional_returns
return result
def _build_raises(parsed) -> Optional[List[Dict[str, Any]]]:
if not parsed.raises:
return None
return [
{
"type": raise_.type_name,
"description": _clean_text(raise_.description),
}
for raise_ in parsed.raises
]
def _build_examples(parsed, docstring: str) -> Optional[List[Dict[str, Any]]]:
if not parsed.examples:
return None
examples_meta = [
{
"description": _clean_text(example.description),
"snippet": example.snippet,
}
for example in parsed.examples
]
if not any(entry["description"] or entry["snippet"] for entry in examples_meta):
fallback_lines = _extract_section_lines(docstring, "Examples")
if fallback_lines:
examples_meta = [
{
"description": "\n".join(fallback_lines),
"snippet": None,
}
]
return examples_meta
def _build_deprecation(parsed) -> Optional[Dict[str, Any]]:
if not parsed.deprecation:
return None
description = _clean_text(getattr(parsed.deprecation, "description", None))
version = _clean_text(getattr(parsed.deprecation, "versions", None)) or _clean_text(getattr(parsed.deprecation, "version", None))
return {"description": description, "version": version}
[docs]
def parse_docstring(func: Callable) -> Dict[str, Any]:
"""\
Parse the docstring of a Python function.
Args:
func (Callable): The Python function whose docstring is to be parsed.
Returns:
Dict: A dictionary containing the parsed components of the docstring.
"""
docstring = inspect.getdoc(func)
if not docstring:
return {}
parsed = parse(docstring)
try:
signature = inspect.signature(func)
except (TypeError, ValueError):
signature = None
result: Dict[str, Any] = {}
description = _compose_description(parsed)
if description:
result["description"] = description
args_summary = _summarize_docstring_args(parsed, signature)
if args_summary:
result["args"] = args_summary
returns_meta = _build_returns(parsed)
if returns_meta:
result["returns"] = returns_meta
split_returns = _split_many_returns(parsed, has_primary=bool(returns_meta))
result.update(split_returns)
raises_meta = _build_raises(parsed)
if raises_meta:
result["raises"] = raises_meta
examples_meta = _build_examples(parsed, docstring)
if examples_meta:
result["examples"] = examples_meta
deprecation_meta = _build_deprecation(parsed)
if deprecation_meta:
result["deprecation"] = deprecation_meta
if parsed.style:
result["style"] = parsed.style.name
return result
def _jsonschema_type_to_python(schema: Dict[str, Any]) -> str:
"""\
Convert JSON schema type to Python type hint string.
Args:
schema (Dict[str, Any]): JSON schema definition.
Returns:
str: Python type hint string.
"""
schema_type = schema.get("type")
# Check for x-original-type first
if "x-original-type" in schema:
return schema["x-original-type"]
# Handle array type
if schema_type == "array":
items = schema.get("items", {})
item_type = _jsonschema_type_to_python(items) if items else "Any"
return f"List[{item_type}]"
# Handle object/dict type
if schema_type in ("object", "dict"):
return "Dict[str, Any]"
# Handle basic types
type_map = {
"string": "str",
"integer": "int",
"number": "float",
"boolean": "bool",
"null": "None",
}
return type_map.get(schema_type, "Any")
def _format_param_description(param_name: str, schema: Dict[str, Any], required: bool) -> str:
"""\
Format a parameter description for docstring.
Args:
param_name (str): Name of the parameter.
schema (Dict[str, Any]): JSON schema for the parameter.
required (bool): Whether the parameter is required.
Returns:
str: Formatted parameter description line.
"""
type_hint = _jsonschema_type_to_python(schema)
description = schema.get("description", "")
default = schema.get("default")
# Build the description line
parts = [f"{param_name} ({type_hint})"]
# Add description
if description:
parts.append(f": {description}")
else:
parts.append(":")
# Add default/optional info
if not required and default is not None:
parts.append(f" Defaults to {repr(default)}.")
elif not required:
parts.append(" Optional.")
return "".join(parts)
def _format_return_description(output_schema: Dict[str, Any]) -> str:
"""\
Format return value description for docstring.
Args:
output_schema (Dict[str, Any]): Output schema from tool.
Returns:
str: Formatted return description.
"""
if (output_schema or {}).get("type") != "object":
return "Any: The return value"
properties = (output_schema or {}).get("properties", {})
# Single property case - unwrap it
if len(properties) == 1:
prop_name, prop_schema = next(iter(properties.items()))
type_hint = _jsonschema_type_to_python(prop_schema)
description = prop_schema.get("description", "The return value")
return f"{type_hint}: {description}"
# Multiple properties - format as dict with fields
if len(properties) > 1:
first_line = "Dict[str, Any]: A dictionary containing:"
property_lines = []
for prop_name, prop_schema in properties.items():
type_hint = _jsonschema_type_to_python(prop_schema)
description = prop_schema.get("description", "")
if description:
property_lines.append(f"- {prop_name} ({type_hint}): {description}")
else:
property_lines.append(f"- {prop_name} ({type_hint})")
indented_properties = indent("\n".join(property_lines), tab=4)
return f"{first_line}\n{indented_properties}"
return "Any: The return value"
[docs]
def synthesize_docstring(
description: Optional[str] = None,
input_schema: Optional[Dict[str, Any]] = None,
output_schema: Optional[Dict[str, Any]] = None,
style: str = "google",
) -> str:
"""\
Synthesize a docstring from tool specification attributes.
Args:
description (Optional[str], optional): Tool description. Defaults to None.
input_schema (Optional[Dict[str, Any]], optional): Parameters schema (JSON schema object). Defaults to None.
output_schema (Optional[Dict[str, Any]], optional): Output schema (JSON schema object). Defaults to None.
style (str, optional): Docstring style ('google', 'numpy', 'rest'). Defaults to 'google'.
Returns:
str: The synthesized docstring.
"""
if style != "google":
raise NotImplementedError(f"Docstring style '{style}' is not yet supported. Only 'google' style is currently implemented.")
lines = []
# Add description
if description:
# Handle multi-line descriptions
desc_lines = description.strip().split("\n")
lines.extend(desc_lines)
lines.append("")
# Add Args section
properties = (input_schema or {}).get("properties", {})
if properties:
lines.append("Args:")
required_params = set((input_schema or {}).get("required", []))
param_lines = []
for param_name, param_schema in properties.items():
is_required = param_name in required_params
param_desc = _format_param_description(param_name, param_schema, is_required)
param_lines.append(param_desc)
if param_lines:
indented_params = indent("\n".join(param_lines), tab=4)
lines.append(indented_params)
lines.append("")
# Add Returns section
if output_schema:
lines.append("Returns:")
return_desc = _format_return_description(output_schema)
indented_return = indent(return_desc, tab=4)
lines.append(indented_return)
# Join all lines and ensure proper formatting
docstring = "\n".join(lines)
# Remove trailing empty line if present
docstring = docstring.rstrip("\n")
return docstring
[docs]
def synthesize_def(
name: str,
input_schema: Optional[Dict[str, Any]] = None,
output_schema: Optional[Dict[str, Any]] = None,
docstring: Optional[str] = None,
code: str = "pass",
) -> str:
"""Generate a Python function definition from schema metadata."""
param_list: List[str] = []
properties = (input_schema or {}).get("properties", {})
for param_name, param_schema in properties.items():
type_hint = _jsonschema_type_to_python(param_schema)
param_str = f"{param_name}: {type_hint}"
if "default" in param_schema:
param_str += f" = {repr(param_schema['default'])}"
param_list.append(param_str)
return_annotation = "Any"
if (output_schema or {}).get("type") == "object":
properties = (output_schema or {}).get("properties", {})
if len(properties) == 1:
prop_schema = next(iter(properties.values()))
return_annotation = _jsonschema_type_to_python(prop_schema)
elif properties:
return_annotation = "Dict[str, Any]"
elif output_schema:
return_annotation = _jsonschema_type_to_python(output_schema)
params_str = ", ".join(param_list)
lines = [f"def {name}({params_str}) -> {return_annotation}:"]
if docstring:
full_docstring = f'"""\\\n{docstring}\n"""'
indented_docstring_block = indent(full_docstring, tab=4)
lines.append(indented_docstring_block)
indented_code = indent(code, tab=4)
lines.append(indented_code)
return "\n".join(lines)
[docs]
def synthesize_signature(
name: str,
input_schema: Optional[Dict[str, Any]] = None,
arguments: Optional[Dict[str, Any]] = None,
) -> str:
"""Generate a Python function call signature with provided arguments and default values.
Args:
name: The function name.
input_schema: JSON schema for function input_schema (same format as synthesize_def).
arguments: Dict of argument values to include in the signature. Missing arguments
will use their default values from the schema.
Returns:
str: The function call signature, e.g., "f(a=1, b=5)".
Example:
>>> synthesize_signature("f", {"type": "object", "properties": {"a": {"type": "int"}, "b": {"type": "int", "default": 5}}}, {"a": 1})
'f(a=1, b=5)'
"""
properties = (input_schema or {}).get("properties", {})
arguments = arguments or {}
required_params = set((input_schema or {}).get("required", []))
arg_list: List[str] = []
for param_name, param_schema in properties.items():
if param_name in arguments:
arg_list.append(f"{param_name}={repr(arguments[param_name])}")
elif "default" in param_schema:
arg_list.append(f"{param_name}={repr(param_schema['default'])}")
elif param_name in required_params:
arg_list.append(param_name)
else:
raise ValueError(f"Missing value for optional parameter '{param_name}' with no default.")
args_str = ", ".join(arg_list)
return f"{name}({args_str})"
[docs]
def funcwrap(exec_func: Callable, sig_func: Callable) -> Callable:
"""\
Create a wrapper function that calls `exec_func` but has the signature and metadata of `sig_func`.
Args:
exec_func: The function to be called (the implementation).
sig_func: The function whose signature and metadata should be adopted.
Returns:
Callable: A wrapper function.
"""
sig = inspect.signature(sig_func)
@functools.wraps(sig_func)
def wrapper(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
return exec_func(**bound.arguments)
return wrapper