__all__ = [
"parse_keys",
"parse_md",
"parse_fc",
]
from .debug_utils import raise_mismatch
import ast
import re
from typing import Literal, Optional, List, Dict
[docs]
def parse_keys(response: str, keys: Optional[List[str]] = None, mode: Literal["list", "dict"] = "dict"):
"""\
Parse keys from an LLM response based on the provided mode.
The LLM response is expected to be formatted as "<key>: <value>" pairs.
Args:
response (str): The LLM response containing key-value pairs.
keys (list, optional): A list of keys to parse from the response. If None, all keys will be parsed.
mode (Literal['list', 'dict'], optional): The mode of parsing. 'list' returns a list of key-value pairs, while 'dict' returns a dictionary with keys and their corresponding values.
Returns:
list or dict: Parsed key-value pairs in the specified mode.
Examples:
>>> parse_keys("name: John Doe\\nage: 30", keys=["name", "age", "height"], mode="list")
[{'key': 'name', 'value': 'John Doe'}, {'key': 'age', 'value': '30'}]
>>> parse_keys("name: John Doe\\nage: 30", keys=["name", "age", "height"], mode="dict")
{'name': 'John Doe', 'age': '30', 'height': None}
"""
key_occurs = list()
if keys is None:
for match in re.finditer(r"^(\w+):", response, re.MULTILINE):
key_occurs.append({"key": match.group(1), "start": match.start(), "end": match.end()})
else:
keys = list(keys)
for key in keys:
for match in re.finditer(re.escape(key) + r":", response):
key_occurs.append({"key": key, "start": match.start(), "end": match.end()})
sorted_key_occurs = sorted(key_occurs, key=lambda x: x["start"])
blocks = list()
for i, key_occurrence in enumerate(sorted_key_occurs):
end = key_occurrence["end"]
next_start = sorted_key_occurs[i + 1]["start"] if i + 1 < len(sorted_key_occurs) else len(response)
value = response[end:next_start].strip()
blocks.append({"key": key_occurrence["key"], "value": value})
if mode == "list":
return blocks
elif mode == "dict":
parsed = {key: None for key in ([block["key"] for block in blocks] if keys is None else keys)}
for block in blocks:
parsed[block["key"]] = block["value"]
return parsed
raise_mismatch(["list", "dict"], got=mode, name="mode")
[docs]
def parse_md(response: str, recurse: bool = False, mode: Literal["list", "dict"] = "dict"):
"""\
Parses a markdown-like string into structured blocks.
This function extracts blocks from the input string that are either:
- XML-like tags (e.g., <tag>...</tag>)
- Fenced code blocks (e.g., ```python ... ```, ````sql ... ````), languages are optional and case-sensitive.
Supports variable-length backtick fences (3+ backticks). Missing language defaults to "markdown".
- Plain text between blocks
This parser is streaming-compatible: incomplete/unfinished input will never raise errors and will produce
the best possible parse result given the available data.
Args:
response (str): The input string to parse.
recurse (bool, optional): If True, recursively parses nested blocks. Defaults to False.
mode (Literal["list", "dict"], optional):
- "list": Returns a list of blocks, each as a dict with 'key' and 'value'.
- "dict": Returns a flattened dictionary with dot-separated keys for nested blocks. Notice that duplicate keys will be overwritten.
Defaults to "dict".
Returns:
Union[list[dict], dict]: The parsed structure, as a list or dict depending on ``mode``.
Examples:
>>> parse_md("<think>Hello!</think>\\nSome textual output.\\n```sql\\nSELECT *\\nFROM table;\\n```\\n<rating>\\n```json\\n{\\\"rating\\\": 5}\\n```</rating>")
{'think': 'Hello!', 'text': 'Some textual output.', 'sql': 'SELECT *\\nFROM table;', 'rating': '```json\\n{"rating": 5}\\n```'}
>>> parse_md("<think>Hello!</think>\\nSome textual output.\\n```sql\\nSELECT *\\nFROM table;\\n```\\n<rating>\\n```json\\n{\\\"rating\\\": 5}\\n```</rating>", recurse=True)
{'think.text': 'Hello!', 'text': 'Some textual output.', 'sql': 'SELECT *\\nFROM table;', 'rating.json': '{"rating": 5}'}
>>> parse_md("<think>Hello!</think>\\nSome textual output.\\n```sql\\nSELECT *\\nFROM table;\\n```\\n<rating>\\n```json\\n{\\\"rating\\\": 5}\\n```</rating>", mode="list")
[{'key': 'think', 'value': 'Hello!'}, {'key': 'text', 'value': 'Some textual output.'}, {'key': 'sql', 'value': 'SELECT *\\nFROM table;'}, {'key': 'rating', 'value': '```json\\n{"rating": 5}\\n```'}]
"""
blocks = _parse_md_blocks(response, recurse=recurse)
if mode == "list":
return blocks
elif mode == "dict":
parsed = dict()
def _dfs(blocks, prefix=None):
prefix = prefix or list()
for block in blocks:
if isinstance(block["value"], list):
_dfs(block["value"], prefix=prefix + [block["key"]])
else:
parsed[".".join(prefix + [block["key"]])] = block["value"]
_dfs(blocks)
return parsed
raise_mismatch(["list", "dict"], got=mode, name="mode")
def _parse_md_blocks(response: str, recurse: bool = False) -> List[Dict]:
"""Internal function to parse markdown blocks with streaming support."""
blocks = list()
i = 0
n = len(response)
text_buffer = list()
def flush_text():
nonlocal text_buffer
if text_buffer:
content = "".join(text_buffer).strip()
if content:
blocks.append({"key": "text", "value": content})
text_buffer = list()
while i < n:
# Check for XML-like tag opening: <tag>
if response[i] == "<" and i + 1 < n and response[i + 1].isalpha():
tag_match = re.match(r"<(\w+)>", response[i:])
if tag_match:
tag = tag_match.group(1)
tag_open_end = i + len(tag_match.group(0))
# Find matching closing tag </tag>
close_pos = _find_matching_close_tag(response, tag, tag_open_end)
if close_pos != -1:
# Found matching close tag
flush_text()
content = response[tag_open_end:close_pos].strip()
if recurse and content:
nested = _parse_md_blocks(content, recurse=True)
blocks.append({"key": tag, "value": nested if nested else list()})
else:
blocks.append({"key": tag, "value": content})
i = close_pos + len(tag) + 3 # len("</tag>") = len(tag) + 3
continue
else:
# No matching close tag found (streaming case) - treat rest as content inside tag
flush_text()
content = response[tag_open_end:].strip()
if recurse and content:
nested = _parse_md_blocks(content, recurse=True)
blocks.append({"key": tag, "value": nested if nested else list()})
else:
blocks.append({"key": tag, "value": content})
i = n
continue
# Check for fenced code block with 3+ backticks
if response[i] == "`":
backtick_count = 0
j = i
while j < n and response[j] == "`":
backtick_count += 1
j += 1
if backtick_count >= 3:
# Parse the language identifier (optional, until newline)
lang_start = j
while j < n and response[j] != "\n":
j += 1
lang = response[lang_start:j].strip() if lang_start < j else ""
if not lang:
lang = "markdown"
# Find the closing fence (same number of backticks)
fence = "`" * backtick_count
code_start = j + 1 if j < n else j
close_pos = _find_code_fence_close(response, fence, code_start)
if close_pos != -1:
# Found closing fence
flush_text()
content = response[code_start:close_pos].strip()
blocks.append({"key": lang, "value": content})
i = close_pos + len(fence)
# Skip trailing newline if present
if i < n and response[i] == "\n":
i += 1
continue
else:
# No closing fence found (streaming case) - treat rest as code content
flush_text()
content = response[code_start:].strip()
blocks.append({"key": lang, "value": content})
i = n
continue
# Regular character - add to text buffer
text_buffer.append(response[i])
i += 1
flush_text()
return blocks
def _find_matching_close_tag(response: str, tag: str, start: int) -> int:
"""Find the matching closing tag, handling nested tags of the same name."""
open_tag_pattern = re.compile(rf"<{re.escape(tag)}>")
close_tag_pattern = re.compile(rf"</{re.escape(tag)}>")
depth = 1
i = start
n = len(response)
while i < n and depth > 0:
# Look for next open or close tag
open_match = open_tag_pattern.search(response, i)
close_match = close_tag_pattern.search(response, i)
if close_match is None:
# No closing tag found
return -1
if open_match is None or close_match.start() < open_match.start():
# Closing tag comes first
depth -= 1
if depth == 0:
return close_match.start()
i = close_match.end()
else:
# Opening tag comes first
depth += 1
i = open_match.end()
return -1
def _find_code_fence_close(response: str, fence: str, start: int) -> int:
"""Find the closing code fence, must be at the start of a line."""
i = start
n = len(response)
while i < n:
# Look for the fence pattern
pos = response.find(fence, i)
if pos == -1:
return -1
# Check if fence is at start of line (or preceded by newline)
if pos == 0 or response[pos - 1] == "\n":
# Verify it's the exact fence (not more backticks)
fence_len = len(fence)
end_pos = pos + fence_len
if end_pos >= n or response[end_pos] != "`":
return pos
i = pos + 1
return -1
def _resolve_positional_args(tool_args: List[str], positional_args: List, keyword_args: Dict):
final_args = dict()
positional_args = positional_args.copy()
for idx, param in enumerate(tool_args):
if param in keyword_args:
final_args[param] = keyword_args[param]
continue
if not len(positional_args):
raise ValueError("Not enough positional arguments provided.")
final_args[param] = positional_args.pop(0)
return final_args
[docs]
def parse_fc(call: str, tools_args: Optional[Dict] = None):
"""Parse a simple function call string into name, positional and keyword arguments.
Supported syntax mirrors typical Python-style calls with both positional and keyword arguments. Examples:
- ``"fibonacci(32)"`` -> ``{"name": "fibonacci", "positional_args": [32], "keyword_args": {}}``
- ``"fibonacci(n=32)"`` -> ``{"name": "fibonacci", "positional_args": [], "keyword_args": {"n": 32}}``
- ``"foo(1, 'baz', qux=true, nada=None)"`` -> booleans and ``None``/``null`` are normalized.
- ``"foo(1, bar='baz', 2)"`` -> mixed positional and keyword arguments.
- Empty argument lists like ``"ping()"`` yield empty positional and keyword arg collections.
Args:
call: The function call string, e.g., ``"func(1, a='x')"``.
tools_args: Optional dictionary mapping function names to their argument names (list of strings). This is used to resolve positional arguments into keyword arguments when both are present.
Returns:
dict: ``{"name": <function_name>, "positional_args": [val1, ...], "keyword_args": {<key>: <parsed_value>, ...}}``
Raises:
ValueError: If the call string cannot be parsed.
"""
def _split_args(arg_str: str):
parts = list()
current = list()
depth = 0
in_single = False
in_double = False
for ch in arg_str:
if ch == "'" and not in_double:
in_single = not in_single
elif ch == '"' and not in_single:
in_double = not in_double
elif ch in "([{" and not in_single and not in_double:
depth += 1
elif ch in ")]}" and not in_single and not in_double and depth > 0:
depth -= 1
if ch == "," and depth == 0 and not in_single and not in_double:
part = "".join(current).strip()
if part:
parts.append(part)
current = list()
continue
current.append(ch)
tail = "".join(current).strip()
if tail:
parts.append(tail)
return parts
def _split_kv(item: str):
depth = 0
in_single = False
in_double = False
for idx, ch in enumerate(item):
if ch == "'" and not in_double:
in_single = not in_single
elif ch == '"' and not in_single:
in_double = not in_double
elif ch in "([{" and not in_single and not in_double:
depth += 1
elif ch in ")]}" and not in_single and not in_double and depth > 0:
depth -= 1
if ch == "=" and depth == 0 and not in_single and not in_double:
return item[:idx].strip(), item[idx + 1 :].strip()
return None, item
def _convert(value: str):
lowered = value.lower()
if lowered == "true":
return True
if lowered == "false":
return False
if lowered in {"none", "null"}:
return None
try:
return ast.literal_eval(value)
except Exception:
return value
match = re.match(r"^\s*([A-Za-z_]\w*)\s*(?:\((.*)\))?\s*$", call)
if not match:
raise ValueError("Invalid function call format")
name = match.group(1)
arg_str = match.group(2)
if arg_str is None or not arg_str.strip():
return {"name": name, "arguments": dict()}
positional_args = list()
keyword_args = dict()
for part in _split_args(arg_str):
key, value = _split_kv(part)
if key is not None:
keyword_args[key] = _convert(value)
else:
positional_args.append(_convert(value))
if positional_args:
if (tools_args is None) or (name not in tools_args):
raise ValueError(f"Positional arguments parsing requires tool definitions for function '{name}', which is not provided.")
keyword_args = _resolve_positional_args(tools_args.get(name), positional_args, keyword_args)
return {"name": name, "arguments": keyword_args}