From 42d08e5ac92e2da4a4bf70823907cd42a29fe0cd Mon Sep 17 00:00:00 2001 From: vegu-ai-tools <152010387+vegu-ai-tools@users.noreply.github.com> Date: Thu, 2 Oct 2025 15:40:31 +0300 Subject: [PATCH] Refactor DirectorChatMixin to utilize standalone utility functions for parsing response sections and extracting action blocks. This improves code clarity and maintainability. Added tests for new utility functions in test_utils_prompt.py to ensure correct functionality. --- src/talemate/agents/director/chat/mixin.py | 88 +---- src/talemate/util/prompt.py | 168 ++++++++- tests/test_utils_prompt.py | 386 +++++++++++++++++++++ 3 files changed, 562 insertions(+), 80 deletions(-) create mode 100644 tests/test_utils_prompt.py diff --git a/src/talemate/agents/director/chat/mixin.py b/src/talemate/agents/director/chat/mixin.py index 61afd6d4..c7d97708 100644 --- a/src/talemate/agents/director/chat/mixin.py +++ b/src/talemate/agents/director/chat/mixin.py @@ -21,6 +21,7 @@ from talemate.game.engine.nodes.core import InputValueError from talemate.game.engine.context_id import get_meta_groups from talemate.util.data import extract_data_with_ai_fallback import talemate.util as util +from talemate.util.prompt import parse_response_section, extract_actions_block, clean_visible_response from talemate.instance import get_agent from talemate.agents.director.chat.nodes import DirectorChatActionArgument @@ -650,42 +651,7 @@ class DirectorChatMixin: 2) open-ended ... to end after 3) same two fallbacks over entire response. """ - try: - # Prefer only content after a closed analysis block. - # Find the index right after the first closing (case-insensitive). - tail_start = 0 - m_after = re.search(r"", response, re.IGNORECASE) - if m_after: - tail_start = m_after.end() - tail = response[tail_start:] - - # Step 1: Greedily capture the last closed ... after . - # (?is) enables DOTALL and IGNORECASE. We match lazily inside to find each pair, then take the last. - matches = re.findall(r"(?is)\s*([\s\S]*?)\s*", tail) - if matches: - return matches[-1].strip() - - # Step 2: If no closed block, capture everything after the first to the end (still after if present). - m_open = re.search(r"(?is)\s*([\s\S]*)$", tail) - if m_open: - return m_open.group(1).strip() - - # Step 3: Fall back to searching the entire response for a closed block and take the last one. - matches_all = re.findall( - r"(?is)\s*([\s\S]*?)\s*", response - ) - if matches_all: - return matches_all[-1].strip() - - # Step 4: Last resort, open-ended from to the end over the whole response. - m_open_all = re.search(r"(?is)\s*([\s\S]*)$", response) - if m_open_all: - return m_open_all.group(1).strip() - - return None - except Exception: - log.error("director.chat.parse_response_section.error", response=response) - return None + return parse_response_section(response) async def chat_extract_actions_block(self, response: str) -> list[dict] | None: """ @@ -694,39 +660,14 @@ class DirectorChatMixin: - Falls back to legacy ```actions ... ``` block - Tolerates a missing closing tag if the ACTIONS block is the final block - Tolerates a missing closing code fence ``` by capturing to or end-of-text + - Skips ACTIONS blocks that appear within ANALYSIS sections Returns a list of {name, instructions} dicts or None if not found/parsable. + + This method extends the standalone extract_actions_block with AI fallback support. """ try: - content: str | None = None - - # Prefer new ... ```(json|yaml) ... ``` ... - match = re.search( - r"\s*```(?:json|yaml)?\s*([\s\S]*?)\s*```\s*", - response, - re.IGNORECASE, - ) - if match: - content = match.group(1).strip() - - if content is None: - # Accept missing if it's the final block and we at least have a closing code fence - partial_fenced = re.search( - r"\s*```(?:json|yaml)?\s*([\s\S]*?)\s*```", - response, - re.IGNORECASE, - ) - if partial_fenced: - content = partial_fenced.group(1).strip() - - if content is None: - # Accept missing closing code fence by capturing to or end-of-text - open_fence_to_end = re.search( - r"\s*```(?:json|yaml)?\s*([\s\S]*?)(?:|$)", - response, - re.IGNORECASE, - ) - if open_fence_to_end: - content = open_fence_to_end.group(1).strip() + # Use standalone function to extract raw content + content = extract_actions_block(response) if not content: return None @@ -773,19 +714,8 @@ class DirectorChatMixin: return None def chat_clean_visible_response(self, text: str) -> str: - """Remove any action selection blocks from user-visible text and trim.""" - try: - # remove new-style blocks - cleaned = re.sub( - r"[\s\S]*?", "", text, flags=re.IGNORECASE - ).strip() - # also remove any legacy ```actions``` blocks if present - cleaned = re.sub( - r"```actions[\s\S]*?```", "", cleaned, flags=re.IGNORECASE - ).strip() - return cleaned - except Exception: - return text.strip() + """Remove any action selection blocks and decision blocks from user-visible text and trim.""" + return clean_visible_response(text) async def chat_build_prompt_vars( self, diff --git a/src/talemate/util/prompt.py b/src/talemate/util/prompt.py index 9d8a27c8..018036f7 100644 --- a/src/talemate/util/prompt.py +++ b/src/talemate/util/prompt.py @@ -1,6 +1,16 @@ import re +import structlog -__all__ = ["condensed", "no_chapters", "replace_special_tokens"] +log = structlog.get_logger("talemate.util.prompt") + +__all__ = [ + "condensed", + "no_chapters", + "replace_special_tokens", + "parse_response_section", + "extract_actions_block", + "clean_visible_response", +] def replace_special_tokens(prompt: str): @@ -80,3 +90,159 @@ def no_chapters(text: str, replacement: str = "chapter") -> str: pattern = r"(?i)chapter\s*(?:\d+(?:\.\d+)?)?" return re.sub(pattern, replace_with_case, text) + + +def parse_response_section(response: str) -> str | None: + """ + Extract the section using greedy regex preference: + 1) last ... after + 2) open-ended ... to end after + 3) same two fallbacks over entire response. + + Args: + response: The response text to parse + + Returns: + The extracted message content, or None if not found + """ + try: + # Prefer only content after a closed analysis block. + # Find the index right after the first closing (case-insensitive). + tail_start = 0 + m_after = re.search(r"", response, re.IGNORECASE) + if m_after: + tail_start = m_after.end() + tail = response[tail_start:] + + # Step 1: Greedily capture the last closed ... after . + # (?is) enables DOTALL and IGNORECASE. We match lazily inside to find each pair, then take the last. + matches = re.findall(r"(?is)\s*([\s\S]*?)\s*", tail) + if matches: + return matches[-1].strip() + + # Step 2: If no closed block, capture everything after the first to the end (still after if present). + m_open = re.search(r"(?is)\s*([\s\S]*)$", tail) + if m_open: + return m_open.group(1).strip() + + # Step 3: Fall back to searching the entire response for a closed block and take the last one. + matches_all = re.findall( + r"(?is)\s*([\s\S]*?)\s*", response + ) + if matches_all: + return matches_all[-1].strip() + + # Step 4: Last resort, open-ended from to the end over the whole response. + m_open_all = re.search(r"(?is)\s*([\s\S]*)$", response) + if m_open_all: + return m_open_all.group(1).strip() + + return None + except Exception: + log.error("parse_response_section.error", response=response) + return None + + +def extract_actions_block(response: str) -> str | None: + """ + Extract the raw content from an section containing a code block. + - Supports full ... + - Tolerates a missing closing tag if the ACTIONS block is the final block + - Tolerates a missing closing code fence ``` by capturing to or end-of-text + - Skips ACTIONS blocks that appear within ANALYSIS sections + + This function only extracts the raw string content - parsing is left to the caller. + + Args: + response: The response text to parse + + Returns: + The raw content string from within the ACTIONS code block, or None if not found + """ + try: + # First, prefer content after if present + tail_start = 0 + m_after = re.search(r"", response, re.IGNORECASE) + if m_after: + tail_start = m_after.end() + tail = response[tail_start:] + + content: str | None = None + + # Prefer new ... ```(json|yaml) ... ``` ... + match = re.search( + r"\s*```(?:json|yaml)?\s*([\s\S]*?)\s*```\s*", + tail, + re.IGNORECASE, + ) + if match: + content = match.group(1).strip() + + if content is None: + # Accept missing if it's the final block and we at least have a closing code fence + partial_fenced = re.search( + r"\s*```(?:json|yaml)?\s*([\s\S]*?)\s*```", + tail, + re.IGNORECASE, + ) + if partial_fenced: + content = partial_fenced.group(1).strip() + + if content is None: + # Accept missing closing code fence by capturing to or end-of-text + open_fence_to_end = re.search( + r"\s*```(?:json|yaml)?\s*([\s\S]*?)(?:|$)", + tail, + re.IGNORECASE, + ) + if open_fence_to_end: + content = open_fence_to_end.group(1).strip() + + # If still no content and no ANALYSIS block, fall back to searching entire response + if content is None and tail_start == 0: + match = re.search( + r"\s*```(?:json|yaml)?\s*([\s\S]*?)\s*```\s*", + response, + re.IGNORECASE, + ) + if match: + content = match.group(1).strip() + + return content if content else None + except Exception: + log.error("extract_actions_block.error", response=response) + return None + + +def clean_visible_response(text: str) -> str: + """ + Remove action selection blocks and decision blocks from user-visible text. + + This function strips: + - ... blocks + - Legacy ```actions...``` blocks + - Everything from tag onwards (including the tag) + + Args: + text: The text to clean + + Returns: + Cleaned text with special blocks removed + """ + try: + # remove new-style blocks + cleaned = re.sub( + r"[\s\S]*?", "", text, flags=re.IGNORECASE + ).strip() + # also remove any legacy ```actions``` blocks if present + cleaned = re.sub( + r"```actions[\s\S]*?```", "", cleaned, flags=re.IGNORECASE + ).strip() + # remove blocks (everything from tag onwards) + cleaned = re.sub( + r"[\s\S]*", "", cleaned, flags=re.IGNORECASE + ).strip() + return cleaned + except Exception: + log.error("clean_visible_response.error", text=text) + return text.strip() diff --git a/tests/test_utils_prompt.py b/tests/test_utils_prompt.py new file mode 100644 index 00000000..7c9ab514 --- /dev/null +++ b/tests/test_utils_prompt.py @@ -0,0 +1,386 @@ +import pytest +import json +from talemate.util.prompt import parse_response_section, extract_actions_block, clean_visible_response + + +# Helper to parse extracted content (since extract_actions_block now returns raw string) +def parse_actions_content(content: str | None) -> list[dict] | None: + """Parse the raw actions content string into a list of dicts.""" + if not content: + return None + try: + data = json.loads(content) + if isinstance(data, dict): + data = [data] + if not isinstance(data, list): + return None + + normalized = [] + for item in data: + if isinstance(item, list): + for sub in item: + if isinstance(sub, dict): + name = sub.get("name") or sub.get("function") + instructions = sub.get("instructions") or "" + if name: + normalized.append({"name": str(name), "instructions": str(instructions)}) + continue + if not isinstance(item, dict): + continue + name = item.get("name") or item.get("function") + instructions = item.get("instructions") or "" + if name: + normalized.append({"name": str(name), "instructions": str(instructions)}) + return normalized or None + except json.JSONDecodeError: + return None + + +# ============================================================================ +# Tests for parse_response_section +# ============================================================================ + + +class TestParseResponseSection: + """Tests for the parse_response_section function.""" + + def test_basic_message_with_analysis(self): + """Test extracting a MESSAGE after ANALYSIS block.""" + response = """ + +This is some analysis text. + + +This is the response message. + +""" + result = parse_response_section(response) + assert result == "This is the response message." + + def test_message_without_analysis(self): + """Test extracting MESSAGE when no ANALYSIS block present.""" + response = """ + +Simple message without analysis. + +""" + result = parse_response_section(response) + assert result == "Simple message without analysis." + + def test_actions_in_analysis_ignored(self): + """Test that tags within ANALYSIS don't interfere.""" + response = """ + +The best action would be test but we need to consider: +- Multiple tags here +- Even nested or malformed ones + + +This is the actual message. + +""" + result = parse_response_section(response) + assert result == "This is the actual message." + + def test_nested_message_like_text_in_analysis(self): + """Test MESSAGE-like text in ANALYSIS doesn't confuse parser.""" + response = """ + +The user might want to see this but that's just analysis. +We could also say "something else" in quotes. + + +Real message here. + +""" + result = parse_response_section(response) + assert result == "Real message here." + + def test_decision_tag_in_analysis_ignored(self): + """Test that tags within ANALYSIS don't interfere with MESSAGE parsing.""" + response = """ + +The best decision would be option_a based on: +- Multiple tags here +- Even nested ones like option_b + + +This is the actual message. + +""" + result = parse_response_section(response) + assert result == "This is the actual message." + + def test_decision_block_in_message_extracted(self): + """Test that DECISION blocks within MESSAGE are extracted (but will be stripped later).""" + response = """ + +Analysis text. + + +Here is my response with a decision: + +The character should proceed cautiously. + +More message text after decision. + +""" + result = parse_response_section(response) + assert "Here is my response" in result + assert "" in result + # Note: The actual stripping happens in chat_clean_visible_response + # This test just verifies the MESSAGE is extracted with DECISION intact + + +# ============================================================================ +# Tests for extract_actions_block +# ============================================================================ + + +class TestExtractActionsBlock: + """Tests for the extract_actions_block function.""" + + def test_basic_actions_json(self): + """Test extracting basic ACTIONS block with JSON.""" + response = """ + +```json +[ + {"name": "test_action", "instructions": "Do something"} +] +``` + +""" + content = extract_actions_block(response) + assert content is not None + result = parse_actions_content(content) + assert result is not None + assert len(result) == 1 + assert result[0]["name"] == "test_action" + assert result[0]["instructions"] == "Do something" + + def test_actions_in_analysis_ignored(self): + """Test that ACTIONS blocks within ANALYSIS are not extracted.""" + response = """ + +We could perform +```json +[{"name": "fake", "instructions": "This is just analysis"}] +``` + +but that's just analysis. + + +Here's the real response. + + +```json +[{"name": "real_action", "instructions": "This is the real action"}] +``` + +""" + content = extract_actions_block(response) + result = parse_actions_content(content) + assert result is not None + assert len(result) == 1 + assert result[0]["name"] == "real_action" + assert "real action" in result[0]["instructions"] + + def test_only_actions_in_analysis_returns_none(self): + """ + Test that if ACTIONS only appears within ANALYSIS (and not after), + we return None since those are not real actions. + """ + response = """ + +We could use +```json +[{"name": "fake_action", "instructions": "This is just theoretical"}] +``` + +but that's just analysis. + + +Let me think about this more. + +""" + result = extract_actions_block(response) + assert result is None + + def test_actions_with_action_tag_in_analysis(self): + """Test the edge case where tags appear in before .""" + response = """ + +Looking at the scene, I notice several things: +- The character could move to the door +- Or they could speak to the other character +- Maybe even hide behind something +Based on this analysis, I recommend we proceed. + + +I think the best course of action is to have them move cautiously. + + +```json +[ + {"name": "move", "instructions": "Move slowly toward the door"} +] +``` + +""" + content = extract_actions_block(response) + result = parse_actions_content(content) + assert result is not None + assert len(result) == 1 + assert result[0]["name"] == "move" + assert "door" in result[0]["instructions"] + + def test_decision_in_analysis_ignored(self): + """Test that DECISION blocks within ANALYSIS are ignored when extracting ACTIONS.""" + response = """ + +I need to decide on the approach cautious_approach. +Also considering aggressive_approach as alternative. + + +Based on my analysis, here's the plan. + + +```json +[{"name": "proceed", "instructions": "Move forward"}] +``` + +""" + content = extract_actions_block(response) + result = parse_actions_content(content) + assert result is not None + assert len(result) == 1 + assert result[0]["name"] == "proceed" + + def test_actions_and_decision_in_message(self): + """Test that both ACTIONS and DECISION can appear in MESSAGE after ANALYSIS.""" + response = """ + +Analyzing the scene with potential test and ```json +[{"name": "fake", "instructions": "fake"}] +``` blocks. + + +My decision: proceed_with_caution +And here are the actions to take: + +```json +[{"name": "real_action", "instructions": "Do this"}] +``` + + +""" + content = extract_actions_block(response) + result = parse_actions_content(content) + assert result is not None + assert len(result) == 1 + assert result[0]["name"] == "real_action" + + +# ============================================================================ +# Tests for clean_visible_response +# ============================================================================ + + +class TestCleanVisibleResponse: + """Tests for the clean_visible_response function.""" + + def test_removes_actions_block(self): + """Test that ACTIONS blocks are removed.""" + text = """Here is my response. + +```json +[{"name": "test"}] +``` + +More text after.""" + result = clean_visible_response(text) + assert result == "Here is my response.\n\nMore text after." + + def test_removes_decision_and_everything_after(self): + """Test that everything from DECISION tag onwards is removed.""" + text = """Here is my response. + +Choose option A + +This text should be removed too.""" + result = clean_visible_response(text) + assert result == "Here is my response." + + def test_removes_legacy_actions_block(self): + """Test that legacy ```actions``` blocks are removed.""" + text = """Here is my response. +```actions +some action data +``` +More text after.""" + result = clean_visible_response(text) + assert result == "Here is my response.\n\nMore text after." + + def test_removes_actions_then_decision(self): + """Test removing both ACTIONS and DECISION blocks.""" + text = """Here is my response. + +```json +[{"name": "test"}] +``` + +Everything from here onwards is removed""" + result = clean_visible_response(text) + assert result == "Here is my response." + + def test_no_special_tags(self): + """Test text without special tags is unchanged.""" + text = "Just a message with no decision or actions." + result = clean_visible_response(text) + assert result == "Just a message with no decision or actions." + + def test_case_insensitive(self): + """Test that tag matching is case insensitive.""" + text = """My response. + +```json +[{"name": "test"}] +``` + +removed""" + result = clean_visible_response(text) + assert result == "My response." + + def test_decision_without_closing_tag(self): + """Test that DECISION without closing tag removes everything after.""" + text = """Here is my response. + +This is my decision and all this text +continues for many lines +and should all be removed.""" + result = clean_visible_response(text) + assert result == "Here is my response." + + def test_multiple_actions_blocks(self): + """Test that multiple ACTIONS blocks are all removed.""" + text = """Start. + +```json +[{"name": "first"}] +``` + +Middle. + +```json +[{"name": "second"}] +``` + +End.""" + result = clean_visible_response(text) + assert "Start." in result + assert "Middle." in result + assert "End." in result + assert "" not in result + assert "first" not in result + assert "second" not in result