Files
talemate/tests/test_scene_methods.py
veguAI f5d41c04c8 0.37.0 (#267)
0.37.0

- **Director Planning** — Multi-step todo lists in director chat plus a Generate long progress action for multi-beat scene arcs.
- **Auto Narration** — Unified auto-narration replacing the old Narrate after Dialogue toggle, with a chance slider and weighted action mix.
- **LLM Prompt Templates Manager** — Dedicated UI tab for viewing, creating, editing, and deleting prompt templates.
- **Character Folders** — Collapsible folders in the World Editor character list, synced across linked scenes.
- **OpenAI Compatible TTS** — Connect any number of OpenAI-compatible TTS servers in parallel.
- **KoboldCpp TTS Auto-Setup** — KoboldCpp clients with a TTS model loaded register themselves as a TTS backend.
- **Model Testing Harness** — Bundled scene that runs basic capability tests against any connected LLM.

Plus 27 improvements and 28 bug fixes
2026-05-12 21:01:51 +03:00

1530 lines
62 KiB
Python

"""Unit tests for sync (and a few simple async) methods on talemate.tale_mate.
Covers Actor/Player constructors plus a wide range of Scene methods that don't
require LLM round-trips: history mutation, message lookup/edit/delete, actor
add/remove, character lookups, time advancement, snapshot, serialize, simple
setters, and the connect/disconnect signal lifecycle.
LLM-driven flows (start, save/load, narrate, conversation) are intentionally
out of scope. Real Scene/Character/Actor objects are used throughout — no
MagicMock for domain objects.
"""
from __future__ import annotations
import os
import pytest
import talemate.emit.async_signals as async_signals
from conftest import MockScene, bootstrap_scene
from talemate.character import Character
from talemate.exceptions import GenerationCancelled
from talemate.scene_message import (
CharacterMessage,
DirectorMessage,
NarratorMessage,
ReinforcementMessage,
TimePassageMessage,
reset_message_id,
)
from talemate.tale_mate import Actor, Player, Scene
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _char_msg(
text: str, character: str = "Alice", source: str = "ai"
) -> CharacterMessage:
return CharacterMessage(message=f"{character}: {text}", source=source)
def _add_char(
scene,
name: str,
is_player: bool = False,
base_attributes: dict | None = None,
greeting: str = "",
) -> Character:
"""Attach a real Character + Actor/Player to a scene synchronously
(does NOT touch memory). Mirrors the helper in test_history_helpers."""
character = Character(
name=name,
is_player=is_player,
base_attributes=base_attributes or {},
greeting_text=greeting,
)
actor_cls = Player if is_player else Actor
actor = actor_cls(character=character, agent=None)
actor.scene = scene
scene.actors.append(actor)
scene.character_data[name] = character
if name not in scene.active_characters:
scene.active_characters.append(name)
return character
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def _reset_message_ids():
reset_message_id()
yield
reset_message_id()
@pytest.fixture
def real_scene():
"""Bootstrapped MockScene with all real agents wired in."""
scene = MockScene()
bootstrap_scene(scene)
return scene
@pytest.fixture
def isolated_scene(tmp_path, monkeypatch):
"""Real Scene whose save_dir lives under tmp_path. Useful for tests
that touch project_name/save_dir/full_path/save_files."""
monkeypatch.setattr(
Scene, "scenes_dir", classmethod(lambda cls: str(tmp_path)), raising=True
)
scene = Scene()
scene.project_name = "test_project"
os.makedirs(scene.save_dir, exist_ok=True)
return scene
# ---------------------------------------------------------------------------
# Actor / Player wiring
# ---------------------------------------------------------------------------
class TestActorConstructor:
def test_actor_links_character_and_agent_attributes(self):
char = Character(name="Alice")
# A truthy "agent" placeholder; we only check the back-reference plumbing.
agent = type("StubAgent", (), {})()
actor = Actor(character=char, agent=agent)
# Actor stores both, and the back-references on character are set.
assert actor.character is char
assert actor.agent is agent
assert char.agent is agent
assert char.actor is actor
# And the agent receives the character pointer back.
assert agent.character is char
def test_actor_with_none_agent_skips_agent_back_reference(self):
char = Character(name="Bob")
actor = Actor(character=char, agent=None)
# No agent.character was set (no agent), but character.agent is still None.
assert actor.agent is None
assert char.agent is None
assert char.actor is actor
def test_actor_history_property_delegates_to_scene(self, real_scene):
msg = _char_msg("hi")
real_scene.history = [msg]
actor = Actor(character=Character(name="X"), agent=None)
actor.scene = real_scene
assert actor.history is real_scene.history
assert actor.history[0] is msg
def test_player_inherits_actor_and_has_default_class_attrs(self):
char = Character(name="P", is_player=True)
player = Player(character=char, agent=None)
# Class-level defaults inherited from the Player declaration.
assert player.muted == 0
assert player.ai_controlled == 0
# Player IS an Actor.
assert isinstance(player, Actor)
# ---------------------------------------------------------------------------
# Actor add/remove and character lookups
# ---------------------------------------------------------------------------
class TestAddActorAndCharacterLookups:
@pytest.mark.asyncio
async def test_add_actor_registers_in_scene_state(self, real_scene):
char = Character(name="Alice")
actor = Actor(character=char, agent=None)
await real_scene.add_actor(actor, commit_to_memory=False)
assert actor in real_scene.actors
assert actor.scene is real_scene
assert "Alice" in real_scene.character_data
assert real_scene.character_data["Alice"] is char
@pytest.mark.asyncio
async def test_add_actor_replaces_duplicate_character(self, real_scene):
# Adding the same character twice via two Actor wrappers must not leave
# two Actor entries for the same character.
char = Character(name="Alice")
a1 = Actor(character=char, agent=None)
a2 = Actor(character=char, agent=None)
await real_scene.add_actor(a1, commit_to_memory=False)
await real_scene.add_actor(a2, commit_to_memory=False)
assert real_scene.actors == [a2]
@pytest.mark.asyncio
async def test_add_actor_marks_player_character_is_player(self, real_scene):
char = Character(name="Player", is_player=False)
player = Player(character=char, agent=None)
await real_scene.add_actor(player, commit_to_memory=False)
assert char.is_player is True
@pytest.mark.asyncio
async def test_add_actor_seeds_intro_and_description_from_character(
self, real_scene
):
char = Character(
name="Alice",
greeting_text="Welcome to the inn.",
base_attributes={
"scenario_context": "fantasy",
"scenario overview": "An inn at sunset.",
},
)
actor = Actor(character=char, agent=None)
# Ensure scene starts unset so the seeding paths fire.
real_scene.context = ""
real_scene.intro = ""
real_scene.description = ""
real_scene.name = ""
await real_scene.add_actor(actor, commit_to_memory=False)
assert real_scene.context == "fantasy"
assert real_scene.intro == "Welcome to the inn."
assert real_scene.description == "An inn at sunset."
# An unnamed scene takes the first NPC's name.
assert real_scene.name == "Alice"
@pytest.mark.asyncio
async def test_add_actor_does_not_overwrite_existing_intro(self, real_scene):
real_scene.intro = "Pre-existing intro"
char = Character(name="Alice", greeting_text="ignored greeting")
await real_scene.add_actor(
Actor(character=char, agent=None), commit_to_memory=False
)
assert real_scene.intro == "Pre-existing intro"
@pytest.mark.asyncio
async def test_remove_character_clears_state(self, real_scene):
char = Character(name="Alice")
await real_scene.add_actor(
Actor(character=char, agent=None), commit_to_memory=False
)
# add_actor populates character_data; active_characters is managed
# separately (e.g. via load_scene). Manually mark active for this test.
real_scene.active_characters.append("Alice")
await real_scene.remove_character(char, purge_from_memory=False)
assert "Alice" not in real_scene.character_data
assert "Alice" not in real_scene.active_characters
assert all(a.character is not char for a in real_scene.actors)
@pytest.mark.asyncio
async def test_remove_actor_detaches_character(self, real_scene):
char = Character(name="Alice")
actor = Actor(character=char, agent=None)
await real_scene.add_actor(actor, commit_to_memory=False)
await real_scene.remove_actor(actor)
assert actor not in real_scene.actors
assert actor.character is None
@pytest.mark.asyncio
async def test_remove_all_actors_clears_actors_list(self, real_scene):
await real_scene.add_actor(
Actor(character=Character(name="A"), agent=None), commit_to_memory=False
)
await real_scene.add_actor(
Actor(character=Character(name="B"), agent=None), commit_to_memory=False
)
await real_scene.remove_all_actors()
assert real_scene.actors == []
class TestCharacterAccessors:
def test_get_character_exact_match_case_insensitive(self, real_scene):
_add_char(real_scene, "Alice")
assert real_scene.get_character("alice").name == "Alice"
assert real_scene.get_character("ALICE").name == "Alice"
def test_get_character_returns_none_for_missing(self, real_scene):
assert real_scene.get_character("ghost") is None
assert real_scene.get_character("") is None
def test_get_character_partial_match_either_direction(self, real_scene):
_add_char(real_scene, "Alice the Brave")
# Search-term contained in character name.
assert real_scene.get_character("Alice", partial=True).name == "Alice the Brave"
# Character name contained in search-term.
assert (
real_scene.get_character("Alice the Brave Warrior", partial=True).name
== "Alice the Brave"
)
def test_get_character_returns_narrator_object(self, real_scene):
narrator = real_scene.get_character("__narrator__")
assert narrator is real_scene.narrator_character_object
assert narrator.name == "__narrator__"
def test_get_character_finds_inactive_characters(self, real_scene):
char = Character(name="Ghost")
# Inactive: registered in character_data but not in active_characters.
real_scene.character_data["Ghost"] = char
assert real_scene.get_character("Ghost") is char
def test_get_explicit_player_character_only_returns_player(self, real_scene):
_add_char(real_scene, "Alice")
# No Player yet -> returns None.
assert real_scene.get_explicit_player_character() is None
_add_char(real_scene, "Hero", is_player=True)
assert real_scene.get_explicit_player_character().name == "Hero"
def test_get_player_character_falls_back_to_first_actor(self, real_scene):
_add_char(real_scene, "Alice")
# No Player; falls back to first NPC.
assert real_scene.get_player_character().name == "Alice"
def test_get_player_character_prefers_player_when_present(self, real_scene):
_add_char(real_scene, "Alice")
_add_char(real_scene, "Hero", is_player=True)
assert real_scene.get_player_character().name == "Hero"
def test_main_character_and_player_character_exists(self, real_scene):
# No characters yet
assert real_scene.main_character is None
assert real_scene.player_character_exists is False
_add_char(real_scene, "Alice") # NPC only
assert real_scene.player_character_exists is False # not is_player
_add_char(real_scene, "Hero", is_player=True)
# main_character returns the actor, not the character.
main = real_scene.main_character
assert main is not None
assert main.character.name == "Hero"
assert real_scene.player_character_exists is True
def test_npc_helpers(self, real_scene):
_add_char(real_scene, "Hero", is_player=True)
_add_char(real_scene, "Alice")
_add_char(real_scene, "Bob")
assert sorted(real_scene.npc_character_names) == ["Alice", "Bob"]
assert real_scene.has_active_npcs is True
assert real_scene.num_npc_characters() == 2
npc_names = [c.name for c in real_scene.get_npc_characters()]
assert sorted(npc_names) == ["Alice", "Bob"]
def test_character_is_active_accepts_string_or_object(self, real_scene):
char = _add_char(real_scene, "Alice")
assert real_scene.character_is_active("Alice") is True
assert real_scene.character_is_active(char) is True
# Inactive character returns False.
ghost = Character(name="Ghost")
real_scene.character_data["Ghost"] = ghost
assert real_scene.character_is_active("Ghost") is False
def test_character_is_active_returns_false_for_unknown_name(self, real_scene):
# get_character returns None for unknown names. character_is_active
# should treat that as "not active" rather than crashing on
# `None.name`.
assert real_scene.character_is_active("Nonexistent") is False
def test_inactive_characters_excludes_active(self, real_scene):
_add_char(real_scene, "Alice")
ghost = Character(name="Ghost")
real_scene.character_data["Ghost"] = ghost
# active_characters is ["Alice"]; Ghost is in character_data but not active.
inactive = real_scene.inactive_characters
assert "Ghost" in inactive
assert inactive["Ghost"] is ghost
assert "Alice" not in inactive
def test_all_character_names_includes_inactive(self, real_scene):
_add_char(real_scene, "Alice")
real_scene.character_data["Ghost"] = Character(name="Ghost")
assert sorted(real_scene.all_character_names) == ["Alice", "Ghost"]
def test_character_names_and_characters_only_yield_active(self, real_scene):
_add_char(real_scene, "Alice")
real_scene.character_data["Ghost"] = Character(name="Ghost")
assert real_scene.character_names == ["Alice"]
assert [c.name for c in real_scene.characters] == ["Alice"]
# Generator-based get_characters mirrors `characters`.
assert [c.name for c in real_scene.get_characters()] == ["Alice"]
# ---------------------------------------------------------------------------
# parse_character_from_line / parse_characters_from_text
# ---------------------------------------------------------------------------
class TestParseCharacters:
def test_parse_character_from_line_returns_first_match(self, real_scene):
_add_char(real_scene, "Alice")
_add_char(real_scene, "Bob")
# Both match "Alice and Bob"; the first actor wins.
result = real_scene.parse_character_from_line("Alice and Bob walk in.")
assert result.name == "Alice"
def test_parse_character_from_line_returns_none_if_no_match(self, real_scene):
_add_char(real_scene, "Alice")
assert real_scene.parse_character_from_line("the wizard appears") is None
def test_parse_characters_from_text_includes_active_and_inactive(self, real_scene):
_add_char(real_scene, "Alice")
# Inactive
real_scene.character_data["Ghost"] = Character(name="Ghost")
result = real_scene.parse_characters_from_text("Alice meets the Ghost.")
names = [c.name for c in result]
# Sorted by len(name): "Alice"(5), "Ghost"(5) -> stable order in py3.7+ but
# both should be present.
assert set(names) == {"Alice", "Ghost"}
def test_parse_characters_from_text_can_exclude_active(self, real_scene):
_add_char(real_scene, "Alice")
real_scene.character_data["Ghost"] = Character(name="Ghost")
result = real_scene.parse_characters_from_text(
"Alice meets the Ghost.", exclude_active=True
)
names = [c.name for c in result]
assert names == ["Ghost"]
def test_parse_characters_from_text_uses_word_boundaries(self, real_scene):
# "Bob" should not match "Bobcat".
_add_char(real_scene, "Bob")
result = real_scene.parse_characters_from_text("the bobcat is hungry")
assert result == []
# ---------------------------------------------------------------------------
# Setters and simple property surface
# ---------------------------------------------------------------------------
class TestSimpleSetters:
def test_set_intro_name_title_description(self, real_scene):
real_scene.set_intro("the intro")
real_scene.set_name("the name")
real_scene.set_title("the title")
real_scene.set_description("the description")
assert real_scene.intro == "the intro"
assert real_scene.name == "the name"
assert real_scene.title == "the title"
assert real_scene.description == "the description"
@pytest.mark.asyncio
async def test_set_environment_writes_attribute(self, real_scene):
# set_environment also calls emit_status which requires a running loop;
# active=False makes the debounced task a no-op anyway, but we still
# need a loop in scope.
real_scene.set_environment("creative")
assert real_scene.environment == "creative"
@pytest.mark.asyncio
async def test_set_content_context_writes_attribute(self, real_scene):
real_scene.set_content_context("dark fantasy")
assert real_scene.context == "dark fantasy"
def test_project_name_default_normalizes_scene_name(self, real_scene):
real_scene.name = "My Awesome Scene"
real_scene._project_name = ""
assert real_scene.project_name == "my-awesome-scene"
def test_project_name_strips_apostrophes(self, real_scene):
real_scene.name = "Alice's Adventure"
real_scene._project_name = ""
assert real_scene.project_name == "alices-adventure"
def test_project_name_explicit_takes_precedence(self, real_scene):
real_scene.name = "Some Other Name"
real_scene.project_name = "explicit-project"
assert real_scene.project_name == "explicit-project"
def test_nodes_filename_defaults_and_setter(self, real_scene):
# Default
assert real_scene.nodes_filename == "scene-loop.json"
real_scene.nodes_filename = "alt.json"
assert real_scene.nodes_filename == "alt.json"
# Empty string falls back to default again.
real_scene.nodes_filename = ""
assert real_scene.nodes_filename == "scene-loop.json"
def test_creative_nodes_filename_defaults_and_setter(self, real_scene):
assert real_scene.creative_nodes_filename == "creative-loop.json"
real_scene.creative_nodes_filename = "creative.json"
assert real_scene.creative_nodes_filename == "creative.json"
real_scene.creative_nodes_filename = None
assert real_scene.creative_nodes_filename == "creative-loop.json"
class TestPathProperties:
def test_save_dir_creates_directory(self, isolated_scene):
path = isolated_scene.save_dir
assert os.path.isdir(path)
assert path.endswith("test_project")
def test_full_path_combines_save_dir_and_filename(self, isolated_scene):
# No filename -> None.
assert isolated_scene.full_path is None
isolated_scene.filename = "scene.json"
assert isolated_scene.full_path == os.path.join(
isolated_scene.save_dir, "scene.json"
)
def test_subdir_paths_relative_to_save_dir(self, isolated_scene):
save_dir = isolated_scene.save_dir
assert isolated_scene.template_dir == os.path.join(save_dir, "templates")
assert isolated_scene.nodes_dir == os.path.join(save_dir, "nodes")
assert isolated_scene.info_dir == os.path.join(save_dir, "info")
assert isolated_scene.backups_dir == os.path.join(save_dir, "backups")
assert isolated_scene.changelog_dir == os.path.join(save_dir, "changelog")
assert isolated_scene.shared_context_dir == os.path.join(
save_dir, "shared-context"
)
def test_nodes_filepath_combines_nodes_dir_and_filename(self, isolated_scene):
isolated_scene.nodes_filename = "loop.json"
assert isolated_scene.nodes_filepath == os.path.join(
isolated_scene.nodes_dir, "loop.json"
)
def test_creative_nodes_filepath_combines_nodes_dir_and_filename(
self, isolated_scene
):
isolated_scene.creative_nodes_filename = "creative.json"
assert isolated_scene.creative_nodes_filepath == os.path.join(
isolated_scene.nodes_dir, "creative.json"
)
def test_save_files_lists_only_json_files_sorted(self, isolated_scene):
# Set up files in save_dir
with open(os.path.join(isolated_scene.save_dir, "z_save.json"), "w") as f:
f.write("{}")
with open(os.path.join(isolated_scene.save_dir, "a_save.json"), "w") as f:
f.write("{}")
with open(os.path.join(isolated_scene.save_dir, "ignore.txt"), "w") as f:
f.write("ignored")
files = isolated_scene.save_files
assert files == ["a_save.json", "z_save.json"]
def test_save_files_caches_after_first_read(self, isolated_scene):
# Initially empty.
assert isolated_scene.save_files == []
# Adding a file after the cache is populated should not be visible.
with open(os.path.join(isolated_scene.save_dir, "later.json"), "w") as f:
f.write("{}")
assert isolated_scene.save_files == []
# ---------------------------------------------------------------------------
# History helpers: recent_history, find_message, message_index, get_message
# ---------------------------------------------------------------------------
class TestHistoryAccessors:
def test_num_history_entries_matches_history_length(self, real_scene):
real_scene.history = [_char_msg("a"), _char_msg("b"), _char_msg("c")]
assert real_scene.num_history_entries == 3
def test_recent_history_walks_back_from_tail(self, real_scene):
msgs = [_char_msg(f"m{i}") for i in range(5)]
real_scene.history = list(msgs)
# With a generous budget, all messages are returned in original order.
result = real_scene.recent_history(max_tokens=100_000)
assert result == msgs
def test_recent_history_respects_token_budget(self, real_scene):
# Each message is small but token-counted; with a tiny budget we should
# get fewer than the full set, but at least one.
real_scene.history = [_char_msg(f"m{i}") for i in range(20)]
result = real_scene.recent_history(max_tokens=1)
assert 0 < len(result) < 20
def test_recent_history_empty_history_returns_empty_list(self, real_scene):
real_scene.history = []
assert real_scene.recent_history() == []
def test_prev_actor_returns_last_character_message_speaker(self, real_scene):
a = _char_msg("hi", character="Alice")
b = _char_msg("hi back", character="Bob")
real_scene.history = [a, b, NarratorMessage(message="ignored", source="ai")]
# Walks from the end skipping non-character messages.
assert real_scene.prev_actor == "Bob"
def test_prev_actor_returns_none_when_no_character_messages(self, real_scene):
real_scene.history = [NarratorMessage(message="x", source="ai")]
assert real_scene.prev_actor is None
def test_find_message_returns_last_matching_typ(self, real_scene):
n1 = NarratorMessage(message="n1", source="ai")
n2 = NarratorMessage(message="n2", source="ai")
real_scene.history = [n1, _char_msg("c"), n2]
assert real_scene.find_message("narrator") is n2
def test_find_message_returns_none_when_max_iterations_blocks(self, real_scene):
# With max_iterations=1, the loop returns None on the very first non-match.
real_scene.history = [_char_msg("c"), NarratorMessage(message="n", source="ai")]
assert real_scene.find_message("narrator", max_iterations=1) is None
def test_message_index_finds_id(self, real_scene):
a = _char_msg("a")
b = _char_msg("b")
real_scene.history = [a, b]
assert real_scene.message_index(a.id) == 0
assert real_scene.message_index(b.id) == 1
def test_message_index_returns_neg_one_when_missing(self, real_scene):
real_scene.history = [_char_msg("a")]
assert real_scene.message_index(98765) == -1
def test_get_message_returns_message_or_none(self, real_scene):
a = _char_msg("a")
real_scene.history = [a]
assert real_scene.get_message(a.id) is a
assert real_scene.get_message(99999) is None
def test_last_player_message_returns_player_source(self, real_scene):
ai_msg = _char_msg("hi", character="Alice", source="ai")
player_msg = _char_msg("hello", character="Hero", source="player")
real_scene.history = [
ai_msg,
player_msg,
NarratorMessage(message="N", source="ai"),
]
assert real_scene.last_player_message() is player_msg
def test_last_player_message_returns_none_when_absent(self, real_scene):
real_scene.history = [_char_msg("hi", source="ai")]
assert real_scene.last_player_message() is None
def test_last_message_by_character_walks_from_end(self, real_scene):
a1 = _char_msg("first", character="Alice")
b1 = _char_msg("hi", character="Bob")
a2 = _char_msg("last", character="Alice")
real_scene.history = [a1, b1, a2]
assert real_scene.last_message_by_character("Alice") is a2
assert real_scene.last_message_by_character("Bob") is b1
assert real_scene.last_message_by_character("Charlie") is None
# ---------------------------------------------------------------------------
# count_character_messages_since_director
# ---------------------------------------------------------------------------
class TestCountCharacterMessagesSinceDirector:
def test_returns_zero_when_no_director_message(self, real_scene):
real_scene.history = [
_char_msg("a", character="Alice"),
_char_msg("b", character="Alice"),
]
assert real_scene.count_character_messages_since_director("Alice") == 0
def test_counts_messages_since_director_for_character(self, real_scene):
director = DirectorMessage(
message="Be brave!",
source="ai",
meta={"character": "Alice"},
)
real_scene.history = [
director,
_char_msg("doing it", character="Alice"),
_char_msg("still going", character="Alice"),
]
assert real_scene.count_character_messages_since_director("Alice") == 2
def test_other_character_messages_do_not_count(self, real_scene):
director = DirectorMessage(
message="Be brave!",
source="ai",
meta={"character": "Alice"},
)
real_scene.history = [
director,
_char_msg("hello", character="Bob"),
_char_msg("alice acts", character="Alice"),
]
assert real_scene.count_character_messages_since_director("Alice") == 1
def test_stop_on_time_passage_short_circuits(self, real_scene):
director = DirectorMessage(
message="Be brave!", source="ai", meta={"character": "Alice"}
)
# Layout (oldest -> newest): director, alice-msg, time-passage, alice-msg
# Walking from the end: alice-msg(+1), time-passage(stop) -> returns 0
# because the director was never reached.
real_scene.history = [
director,
_char_msg("a1", character="Alice"),
TimePassageMessage(ts="PT1H", message="an hour later"),
_char_msg("a2", character="Alice"),
]
result = real_scene.count_character_messages_since_director(
"Alice", stop_on_time_passage=True
)
assert result == 0
# ---------------------------------------------------------------------------
# last_message_of_type / collect_messages / count_messages
# ---------------------------------------------------------------------------
class TestLastMessageOfType:
def test_returns_last_matching_type(self, real_scene):
n1 = NarratorMessage(message="n1", source="ai")
n2 = NarratorMessage(message="n2", source="ai")
real_scene.history = [n1, _char_msg("c"), n2]
assert real_scene.last_message_of_type("narrator") is n2
def test_accepts_list_of_types(self, real_scene):
c = _char_msg("hi")
n = NarratorMessage(message="n", source="ai")
real_scene.history = [c, n]
# Both types are valid; the LAST matching message wins.
assert real_scene.last_message_of_type(["character", "narrator"]) is n
def test_filters_by_source(self, real_scene):
n_ai = NarratorMessage(message="ai-narr", source="ai")
n_manual = NarratorMessage(message="manual-narr", source="manual")
real_scene.history = [n_ai, n_manual]
assert real_scene.last_message_of_type("narrator", source="ai") is n_ai
def test_max_iterations_limit(self, real_scene):
# Tail has 3 character messages and one narrator at index 0.
# max_iterations=2 only inspects the last 2 messages -> never reaches narrator.
n = NarratorMessage(message="n", source="ai")
real_scene.history = [
n,
_char_msg("a"),
_char_msg("b"),
_char_msg("c"),
]
assert real_scene.last_message_of_type("narrator", max_iterations=2) is None
def test_stop_on_time_passage(self, real_scene):
n = NarratorMessage(message="n", source="ai")
tp = TimePassageMessage(ts="PT1H", message="later")
real_scene.history = [n, tp, _char_msg("c")]
# Walking from the end: char (no match), time-passage (stop) -> None.
assert (
real_scene.last_message_of_type("narrator", stop_on_time_passage=True)
is None
)
def test_on_iterate_callback_invoked_per_message(self, real_scene):
seen = []
real_scene.history = [
_char_msg("a"),
NarratorMessage(message="n", source="ai"),
]
real_scene.last_message_of_type("narrator", on_iterate=seen.append)
# We start from the tail, so the first observed message is the narrator.
assert len(seen) >= 1
class TestCollectMessages:
def test_collects_in_reverse_chronological_order(self, real_scene):
a = _char_msg("a")
b = _char_msg("b")
n = NarratorMessage(message="n", source="ai")
real_scene.history = [a, n, b]
# No type filter -> all messages, walking from end.
result = real_scene.collect_messages()
assert result == [b, n, a]
def test_filters_by_type(self, real_scene):
a = _char_msg("a")
b = _char_msg("b")
n = NarratorMessage(message="n", source="ai")
real_scene.history = [a, n, b]
result = real_scene.collect_messages(typ="character")
assert result == [b, a]
def test_max_messages_caps_collection(self, real_scene):
msgs = [_char_msg(f"m{i}") for i in range(5)]
real_scene.history = msgs
result = real_scene.collect_messages(max_messages=2)
# Only the latest 2.
assert result == [msgs[-1], msgs[-2]]
def test_stop_on_time_passage_breaks(self, real_scene):
a = _char_msg("a")
tp = TimePassageMessage(ts="PT1H", message="later")
b = _char_msg("b")
real_scene.history = [a, tp, b]
result = real_scene.collect_messages(stop_on_time_passage=True)
# Walking from the end: b (collected), tp (collected then break).
# `a` (older than the passage) is NOT collected.
assert result == [b, tp]
assert a not in result
def test_start_idx_anchors_search(self, real_scene):
a = _char_msg("a")
b = _char_msg("b")
c = _char_msg("c")
real_scene.history = [a, b, c]
# Start at index 1 -> walks back from b.
result = real_scene.collect_messages(start_idx=1)
assert result == [b, a]
class TestCountMessages:
def test_counts_all_when_no_filters(self, real_scene):
real_scene.history = [
_char_msg("a"),
NarratorMessage(message="n", source="ai"),
_char_msg("b"),
]
assert real_scene.count_messages() == 3
def test_counts_by_type(self, real_scene):
real_scene.history = [
_char_msg("a"),
NarratorMessage(message="n", source="ai"),
_char_msg("b"),
]
assert real_scene.count_messages(message_type="character") == 2
assert real_scene.count_messages(message_type="narrator") == 1
def test_counts_by_source(self, real_scene):
ai = _char_msg("a", source="ai")
player = _char_msg("p", character="Hero", source="player")
real_scene.history = [ai, player]
assert real_scene.count_messages(source="ai") == 1
assert real_scene.count_messages(source="player") == 1
def test_counts_match_secondary_source(self, real_scene):
# CharacterMessage.secondary_source is the character name.
ai = _char_msg("hi", character="Alice", source="ai")
real_scene.history = [ai]
# Source filter matches *either* source or secondary_source.
assert real_scene.count_messages(source="Alice") == 1
class TestSnapshot:
def test_returns_movie_script_format_by_default(self, real_scene):
msgs = [
_char_msg("hi", character="Alice"),
_char_msg("hello", character="Bob"),
]
real_scene.history = msgs
result = real_scene.snapshot(lines=2)
# Script format includes character names in upper case and END-OF-LINE markers.
assert "ALICE" in result
assert "BOB" in result
assert "END-OF-LINE" in result
def test_returns_list_when_requested(self, real_scene):
a = _char_msg("a")
b = _char_msg("b")
real_scene.history = [a, b]
result = real_scene.snapshot(lines=2, return_as_list=True)
assert result == [a, b]
def test_default_ignores_director_and_reinforcement(self, real_scene):
a = _char_msg("hi")
director = DirectorMessage(message="cut!", source="ai")
reinf = ReinforcementMessage(message="reinforced", source="ai")
b = _char_msg("bye")
real_scene.history = [a, director, reinf, b]
result = real_scene.snapshot(lines=4, return_as_list=True)
# Director and reinforcement are filtered out; we get only the chars.
assert result == [a, b]
def test_custom_ignore_with_string_types(self, real_scene):
a = _char_msg("a")
n = NarratorMessage(message="n", source="ai")
real_scene.history = [a, n]
# Pass type as string; it should be resolved through MESSAGES.
result = real_scene.snapshot(lines=2, ignore=["narrator"], return_as_list=True)
assert result == [a]
def test_custom_ignore_invalid_type_raises(self, real_scene):
real_scene.history = [_char_msg("a")]
with pytest.raises(ValueError):
real_scene.snapshot(lines=1, ignore=[123]) # type: ignore[list-item]
# ---------------------------------------------------------------------------
# pop_message / pop_history / edit_message / delete_message
# ---------------------------------------------------------------------------
class TestPopMessage:
def test_pop_by_message_object(self, real_scene):
a = _char_msg("a")
b = _char_msg("b")
real_scene.history = [a, b]
assert real_scene.pop_message(a) is True
assert real_scene.history == [b]
def test_pop_by_int_returns_false_when_no_typ_match(self, real_scene):
# The int branch calls find_message(message) treating the int as a typ
# filter. Since message.typ is always a string, no message will match
# and pop_message returns False without mutating history.
a = _char_msg("a")
real_scene.history = [a]
assert real_scene.pop_message(123) is False
assert real_scene.history == [a]
def test_pop_nonexistent_returns_false(self, real_scene):
ghost = _char_msg("ghost")
real_scene.history = [_char_msg("a")]
assert real_scene.pop_message(ghost) is False
def test_pop_invalid_type_raises_value_error(self, real_scene):
with pytest.raises(ValueError):
real_scene.pop_message("not a message") # type: ignore[arg-type]
class TestPopHistory:
def test_pop_last_matching_by_default(self, real_scene):
n1 = NarratorMessage(message="n1", source="ai")
n2 = NarratorMessage(message="n2", source="ai")
real_scene.history = [n1, _char_msg("c"), n2]
real_scene.pop_history(typ="narrator")
# Only the last narrator is popped.
assert n2 not in real_scene.history
assert n1 in real_scene.history
def test_pop_all_matching(self, real_scene):
n1 = NarratorMessage(message="n1", source="ai")
n2 = NarratorMessage(message="n2", source="ai")
c = _char_msg("c")
real_scene.history = [n1, c, n2]
real_scene.pop_history(typ="narrator", all=True)
assert n1 not in real_scene.history
assert n2 not in real_scene.history
assert c in real_scene.history
def test_pop_filtered_by_source(self, real_scene):
n_ai = NarratorMessage(message="ai", source="ai")
n_manual = NarratorMessage(message="man", source="manual")
real_scene.history = [n_ai, n_manual]
real_scene.pop_history(typ="narrator", source="manual", all=True)
assert n_ai in real_scene.history
assert n_manual not in real_scene.history
def test_pop_filtered_by_meta_hash(self, real_scene):
n_a = NarratorMessage(message="a", source="ai", meta={"k": 1})
n_b = NarratorMessage(message="b", source="ai", meta={"k": 2})
real_scene.history = [n_a, n_b]
real_scene.pop_history(typ="narrator", meta_hash=n_b.meta_hash, all=True)
# Only n_b is popped.
assert n_a in real_scene.history
assert n_b not in real_scene.history
def test_pop_filtered_by_arbitrary_attribute(self, real_scene):
a_player = _char_msg("a", character="Hero", source="player")
a_ai = _char_msg("b", character="Alice", source="ai")
real_scene.history = [a_player, a_ai]
# Custom filter: source="ai" is already supported via the source param,
# but additional kwargs are passed as attribute filters too.
real_scene.pop_history(typ="character", all=True, source="player")
assert a_player not in real_scene.history
assert a_ai in real_scene.history
def test_pop_reverse_pops_oldest_match_first(self, real_scene):
n_old = NarratorMessage(message="old", source="ai")
n_new = NarratorMessage(message="new", source="ai")
real_scene.history = [n_old, _char_msg("c"), n_new]
real_scene.pop_history(typ="narrator", reverse=True)
# Reverse pops the oldest match.
assert n_old not in real_scene.history
assert n_new in real_scene.history
class TestEditMessage:
def test_edit_message_updates_message_text(self, real_scene):
a = _char_msg("hi")
real_scene.history = [a]
real_scene.edit_message(a.id, "Alice: bye")
assert real_scene.history[0].message == "Alice: bye"
def test_edit_message_unknown_id_is_silent_noop(self, real_scene):
a = _char_msg("hi")
real_scene.history = [a]
# Non-matching id should not raise; history is untouched.
real_scene.edit_message(98765, "ignored")
assert real_scene.history[0].message == "Alice: hi"
class TestDeleteMessage:
def test_delete_removes_matching_message(self, real_scene):
a = _char_msg("a")
b = _char_msg("b")
real_scene.history = [a, b]
real_scene.delete_message(a.id)
assert real_scene.history == [b]
def test_delete_unknown_id_is_silent_noop(self, real_scene):
a = _char_msg("a")
real_scene.history = [a]
real_scene.delete_message(98765)
assert real_scene.history == [a]
@pytest.mark.asyncio
async def test_delete_time_passage_resyncs_time(self, real_scene):
# When a TimePassageMessage is removed, sync_time runs and emit_status
# is called -- the latter requires a running event loop.
tp = TimePassageMessage(ts="PT1H", message="later")
real_scene.history = [tp]
real_scene.ts = "PT1H"
real_scene.delete_message(tp.id)
# sync_time without remaining time-passage messages resets to PT0S.
assert real_scene.ts == "PT0S"
# ---------------------------------------------------------------------------
# advance_time / sync_time / calc_time
# ---------------------------------------------------------------------------
class TestAdvanceTime:
def test_advance_time_adds_durations(self, real_scene):
real_scene.ts = "PT1H"
real_scene.advance_time("PT30M")
assert real_scene.ts == "PT1H30M"
def test_advance_time_supports_zero(self, real_scene):
real_scene.ts = "PT5M"
real_scene.advance_time("PT0S")
assert real_scene.ts == "PT5M"
class TestSyncTime:
def test_sync_time_sums_history_passages(self, real_scene):
real_scene.history = [
_char_msg("a"),
TimePassageMessage(ts="PT1H", message="later"),
_char_msg("b"),
TimePassageMessage(ts="PT30M", message="more later"),
]
# No archived_history, so we start from PT0S and sum.
real_scene.sync_time()
assert real_scene.ts == "PT1H30M"
def test_sync_time_uses_archived_baseline(self, real_scene):
real_scene.archived_history = [{"ts": "PT2H", "end": 0}]
# No additional time passages after end=0.
real_scene.history = [_char_msg("a")]
real_scene.sync_time()
assert real_scene.ts == "PT2H"
def test_sync_time_with_no_data_resets_to_zero(self, real_scene):
real_scene.ts = "PT5H" # leftover
real_scene.history = []
real_scene.archived_history = []
real_scene.sync_time()
assert real_scene.ts == "PT0S"
class TestCalcTime:
def test_returns_none_when_no_passages(self, real_scene):
real_scene.history = [_char_msg("a"), _char_msg("b")]
assert real_scene.calc_time(0, 2) is None
def test_sums_passages_in_range(self, real_scene):
real_scene.history = [
TimePassageMessage(ts="PT1H", message="later"),
_char_msg("a"),
TimePassageMessage(ts="PT30M", message="more"),
]
# Full range
assert real_scene.calc_time() == "PT1H30M"
def test_partial_range(self, real_scene):
real_scene.history = [
TimePassageMessage(ts="PT1H", message="later"),
TimePassageMessage(ts="PT30M", message="more"),
]
# Only the first passage included.
assert real_scene.calc_time(0, 1) == "PT1H"
# ---------------------------------------------------------------------------
# get_intro
# ---------------------------------------------------------------------------
class TestGetIntro:
def test_get_intro_returns_intro_when_no_player(self, real_scene):
real_scene.intro = "Welcome adventurer."
# editor.fix_exposition_enabled defaults to whatever config.example sets;
# the intro string contains no quotes/asterisks, but we don't assert the
# exact post-processed value — only that the string is preserved as a
# substring (since fix_exposition may wrap it in asterisks).
result = real_scene.get_intro()
assert "Welcome adventurer." in result
def test_get_intro_substitutes_user_placeholder(self, real_scene):
_add_char(real_scene, "Hero", is_player=True)
real_scene.intro = "Hello, {{user}}!"
result = real_scene.get_intro()
assert "Hero" in result
assert "{{user}}" not in result
def test_get_intro_can_take_explicit_intro_argument(self, real_scene):
_add_char(real_scene, "Hero", is_player=True)
result = real_scene.get_intro("Greetings, {{char}}!")
assert "Hero" in result
# ---------------------------------------------------------------------------
# can_auto_save / interrupt / continue_actions / reset / serialize / json
# ---------------------------------------------------------------------------
class TestCanAutoSave:
def test_no_filename_means_cannot_auto_save(self, real_scene):
real_scene.filename = ""
# Implementation returns ``self.filename and not self.immutable_save``,
# which short-circuits to the falsy filename string. Compare via bool.
assert not real_scene.can_auto_save()
def test_immutable_save_blocks_auto_save(self, real_scene):
real_scene.filename = "x.json"
real_scene.immutable_save = True
assert real_scene.can_auto_save() is False
def test_filename_and_not_immutable_can_auto_save(self, real_scene):
real_scene.filename = "x.json"
real_scene.immutable_save = False
assert real_scene.can_auto_save() is True
class TestInterruptAndContinueActions:
def test_interrupt_sets_cancel_requested(self, real_scene):
assert real_scene.cancel_requested is False
real_scene.interrupt()
assert real_scene.cancel_requested is True
def test_continue_actions_raises_when_cancel_requested(self, real_scene):
real_scene.cancel_requested = True
with pytest.raises(GenerationCancelled):
real_scene.continue_actions()
# The flag is reset after the raise so a subsequent call is a no-op.
assert real_scene.cancel_requested is False
def test_continue_actions_is_noop_when_not_requested(self, real_scene):
real_scene.cancel_requested = False
# Should not raise.
real_scene.continue_actions()
class TestReset:
def test_reset_clears_history_and_filename(self, real_scene):
real_scene.history = [_char_msg("a")]
real_scene.filename = "scene.json"
real_scene.archived_history = [
{"text": "static", "end": None},
{"text": "dynamic", "end": 5},
]
real_scene.reset()
assert real_scene.history == []
assert real_scene.filename == ""
# Static (end=None) is preserved; dynamic is wiped.
assert len(real_scene.archived_history) == 1
assert real_scene.archived_history[0]["text"] == "static"
def test_reset_preserves_pre_established_archived_entries(self, real_scene):
real_scene.archived_history = [
{"text": "intro", "end": None}, # static, kept
{"text": "summary", "end": 3}, # dynamic, removed
]
real_scene.reset()
assert real_scene.archived_history == [{"text": "intro", "end": None}]
class TestSerialize:
def test_serialize_returns_dict_with_core_fields(self, real_scene):
real_scene.name = "Test"
real_scene.intro = "Hi"
real_scene.history = [_char_msg("a")]
data = real_scene.serialize
# Spot-check key fields.
assert data["name"] == "Test"
assert data["intro"] == "Hi"
assert data["history"] is real_scene.history
assert data["archived_history"] is real_scene.archived_history
assert "id" in data
assert "memory_id" in data
def test_serialize_handles_inactive_characters(self, real_scene):
real_scene.character_data["Ghost"] = Character(name="Ghost")
data = real_scene.serialize
assert "Ghost" in data["character_data"]
# active_characters list is empty since we only registered character_data.
assert data["active_characters"] == real_scene.active_characters
def test_json_property_returns_valid_json_string(self, real_scene):
import json as _json
real_scene.name = "Test"
text = real_scene.json
decoded = _json.loads(text)
assert decoded["name"] == "Test"
# ---------------------------------------------------------------------------
# push_history (sync portion + signals) / push_archive
# ---------------------------------------------------------------------------
class TestPushHistory:
@pytest.mark.asyncio
async def test_push_history_appends_single_message(self, real_scene):
msg = _char_msg("hi")
await real_scene.push_history(msg)
assert msg in real_scene.history
assert real_scene.history[-1] is msg
@pytest.mark.asyncio
async def test_push_history_extends_with_list(self, real_scene):
a = _char_msg("a")
b = _char_msg("b")
await real_scene.push_history([a, b])
assert real_scene.history[-2:] == [a, b]
@pytest.mark.asyncio
async def test_push_history_dedupes_director_by_source(self, real_scene):
d1 = DirectorMessage(message="first", source="director-source-1")
d2 = DirectorMessage(message="second", source="director-source-1")
await real_scene.push_history(d1)
await real_scene.push_history(d2)
# Same source -> the older one is removed.
directors = [m for m in real_scene.history if isinstance(m, DirectorMessage)]
assert directors == [d2]
@pytest.mark.asyncio
async def test_push_history_keeps_director_with_distinct_sources(self, real_scene):
d1 = DirectorMessage(message="from-A", source="A")
d2 = DirectorMessage(message="from-B", source="B")
await real_scene.push_history(d1)
await real_scene.push_history(d2)
directors = [m for m in real_scene.history if isinstance(m, DirectorMessage)]
assert directors == [d1, d2]
@pytest.mark.asyncio
async def test_push_history_drops_empty_director_messages(self, real_scene):
d_empty = DirectorMessage(message=" ", source="x")
d_real = DirectorMessage(message="real", source="x")
await real_scene.push_history([d_empty, d_real])
directors = [m for m in real_scene.history if isinstance(m, DirectorMessage)]
# Only the real one is kept.
assert directors == [d_real]
@pytest.mark.asyncio
async def test_push_history_advances_time_for_time_passage(self, real_scene):
real_scene.ts = "PT0S"
tp = TimePassageMessage(ts="PT1H", message="later")
await real_scene.push_history(tp)
assert real_scene.ts == "PT1H"
@pytest.mark.asyncio
async def test_push_history_emits_push_history_signal(self, real_scene):
captured: list = []
async def listener(event):
captured.append(event)
signal = async_signals.get("push_history")
signal.connect(listener)
try:
await real_scene.push_history(_char_msg("hi"))
finally:
signal.disconnect(listener)
assert len(captured) == 1
evt = captured[0]
assert evt.scene is real_scene
assert evt.event_type == "push_history"
assert len(evt.messages) == 1
class TestPushArchive:
@pytest.mark.asyncio
async def test_push_archive_appends_dict_form(self, real_scene):
from talemate.history import ArchiveEntry
entry = ArchiveEntry(text="my entry", id="abc", ts="PT5M")
captured: list = []
async def listener(event):
captured.append(event)
signal = async_signals.get("archive_add")
signal.connect(listener)
try:
await real_scene.push_archive(entry)
finally:
signal.disconnect(listener)
# The archive list should contain the dict form (no None fields).
assert len(real_scene.archived_history) == 1
stored = real_scene.archived_history[0]
assert stored["text"] == "my entry"
assert stored["id"] == "abc"
# And the archive_add signal fired.
assert len(captured) == 1
# ---------------------------------------------------------------------------
# story_intent / intent / active_node_graph / max_backscroll / conversation_format
# ---------------------------------------------------------------------------
class TestSimpleProperties:
def test_story_intent_delegates_to_intent_state(self, real_scene):
real_scene.intent_state.intent = "high adventure"
assert real_scene.story_intent == "high adventure"
def test_intent_returns_empty_dict_when_no_phase(self, real_scene):
# Force the no-phase branch of Scene.intent.
real_scene.intent_state.phase = None
assert real_scene.intent == {}
def test_intent_returns_phase_data_when_phase_set(self, real_scene):
# The default factory leaves us with a 'roleplay' SceneType + phase.
# The intent property returns name + intent (which can be None).
assert real_scene.intent_state.phase is not None
result = real_scene.intent
assert "name" in result
assert "intent" in result
def test_active_node_graph_returns_none_when_neither_set(self, real_scene):
# Neither node_graph nor creative_node_graph is set on a fresh scene.
assert real_scene.active_node_graph is None
def test_active_node_graph_prefers_node_graph(self, real_scene):
real_scene.node_graph = "primary"
real_scene.creative_node_graph = "creative"
assert real_scene.active_node_graph == "primary"
def test_max_backscroll_reads_from_config(self, real_scene):
# Just verify it returns the int from config.example.yaml's general section.
assert isinstance(real_scene.max_backscroll, int)
assert real_scene.max_backscroll > 0
def test_auto_save_reads_from_config(self, real_scene):
# Boolean from config.example.yaml.
assert isinstance(real_scene.auto_save, bool)
def test_auto_backup_is_always_false(self, real_scene):
assert real_scene.auto_backup is False
def test_conversation_format_property_resolves(self, real_scene):
# Pulls from instance.get_agent("conversation").conversation_format.
# We just need the access to succeed and return a non-empty string.
fmt = real_scene.conversation_format
assert fmt
assert isinstance(fmt, str)
# ---------------------------------------------------------------------------
# agent_persona / agent_persona_names / writing_style (None paths)
# ---------------------------------------------------------------------------
class TestPersonaResolvers:
def test_agent_persona_returns_none_when_unset(self, real_scene):
assert real_scene.agent_personas == {}
assert real_scene.agent_persona_names == {}
assert real_scene.agent_persona("anything") is None
def test_agent_persona_returns_none_for_invalid_uid(self, real_scene):
# An entry without `__` separator returns None instead of raising.
real_scene.agent_persona_templates = {"narrator": "no-double-underscore"}
assert real_scene.agent_persona("narrator") is None
# And the helpers stay empty for unresolved templates.
assert real_scene.agent_persona_names == {}
assert real_scene.agent_personas == {}
def test_writing_style_returns_none_when_unset(self, real_scene):
real_scene.writing_style_template = None
assert real_scene.writing_style is None
def test_writing_style_returns_none_for_invalid_uid(self, real_scene):
real_scene.writing_style_template = "no-underscore"
# split('__', 1) raises ValueError -> swallowed, returns None.
assert real_scene.writing_style is None
# ---------------------------------------------------------------------------
# set_new_memory_session_id
# ---------------------------------------------------------------------------
class TestSetNewMemorySessionId:
@pytest.mark.asyncio
async def test_rotates_session_id_and_saves_previous(self, real_scene):
# set_new_memory_session_id calls emit_status which requires a loop.
prev = real_scene.memory_session_id
real_scene.set_new_memory_session_id()
# The old id is preserved as saved_memory_session_id.
assert real_scene.saved_memory_session_id == prev
# And a new id is generated (10-char uuid prefix).
assert real_scene.memory_session_id != prev
assert len(real_scene.memory_session_id) == 10
# ---------------------------------------------------------------------------
# connect / disconnect signal lifecycle
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# Misc additional coverage: scenes_dir default, snapshot start, attempt_auto_save,
# fix_time error handling, push_archive ts-only path, recent_history extras
# ---------------------------------------------------------------------------
class TestScenesDirDefault:
def test_scenes_dir_resolves_to_absolute_path(self):
# The classmethod returns an absolute path under the repo root.
path = Scene.scenes_dir()
assert os.path.isabs(path)
assert path.endswith("scenes")
class TestSnapshotStartIndex:
def test_snapshot_with_start_anchors_to_earlier_segment(self, real_scene):
a = _char_msg("a")
b = _char_msg("b")
c = _char_msg("c")
real_scene.history = [a, b, c]
# start=1 => segment is history[:2][-3:] = [a, b]. We then collect
# in reverse and insert at the front, so the final list preserves
# input order.
result = real_scene.snapshot(lines=3, start=1, return_as_list=True)
assert result == [a, b]
class TestAttemptAutoSave:
@pytest.mark.asyncio
async def test_no_filename_marks_unsaved_and_skips(self, real_scene):
# Without a filename and with auto_save=False semantics, the scene
# should be flagged unsaved and not raise.
real_scene.filename = ""
real_scene.saved = True
# Force auto_save to False via an attribute override on the instance.
# The method short-circuits to setting saved=False, then emit_status.
# auto_save is a property that reads from config; the example config's
# default is False, but be defensive and assert the post-condition.
await real_scene.attempt_auto_save()
# In the auto_save=False branch, saved is set to False.
if not real_scene.auto_save:
assert real_scene.saved is False
class TestFixTimeErrorHandling:
def test_fix_time_swallows_exception_and_restores_ts(self, real_scene):
# Force _fix_time to raise by giving it malformed archived data and
# confirm fix_time restores the original ts and does not propagate.
real_scene.ts = "PT3H"
# archived_history is iterated as dicts; non-dict entries crash the
# `"ts" in archived_entry` membership test.
real_scene.archived_history = [None] # type: ignore[list-item]
real_scene.history = []
# Should not raise.
real_scene.fix_time()
# ts is restored to the snapshot taken at the start of fix_time.
assert real_scene.ts == "PT3H"
class TestRecentHistoryNoBudget:
def test_recent_history_default_max_tokens_returns_all(self, real_scene):
msgs = [_char_msg(f"m{i}") for i in range(3)]
real_scene.history = list(msgs)
# Default budget (2048) is more than enough for 3 short messages.
assert real_scene.recent_history() == msgs
# ---------------------------------------------------------------------------
# last_message_of_type: count_only_types branch
# ---------------------------------------------------------------------------
class TestLastMessageCountOnlyTypes:
def test_count_only_types_excludes_other_types_from_max_iterations(
self, real_scene
):
# The narrator we want to find is past the max_iterations boundary if
# all messages count toward it. With count_only_types=["character"],
# only character messages tick the counter, so the narrator is found.
n = NarratorMessage(message="goal", source="ai")
real_scene.history = [
n,
_char_msg("c1"),
_char_msg("c2"),
]
# Without count_only_types and max_iterations=2: walk c2(+1), c1(+2),
# boundary -> None.
assert real_scene.last_message_of_type("narrator", max_iterations=2) is None
# With count_only_types=["character"]: characters tick, narrator does not.
# walk c2(+1), c1(+2) -> still doesn't reach the narrator (max_iterations=2),
# so we further increase max to 3 to exercise the branch.
assert (
real_scene.last_message_of_type(
"narrator",
max_iterations=3,
count_only_types=["character"],
)
is n
)
# ---------------------------------------------------------------------------
# push_archive emits ts on archived_history regardless of None ts argument
# ---------------------------------------------------------------------------
class TestPushArchiveDictShape:
@pytest.mark.asyncio
async def test_archived_dict_excludes_none_fields(self, real_scene):
from talemate.history import ArchiveEntry
entry = ArchiveEntry(text="x", id="i1", ts="PT1S")
await real_scene.push_archive(entry)
stored = real_scene.archived_history[0]
# Optional fields like start/end/ts_start/ts_end aren't on the entry,
# so they shouldn't appear in the stored dict.
for opt in ("start", "end", "ts_start", "ts_end"):
assert opt not in stored
# ---------------------------------------------------------------------------
# Scene.connect/disconnect signal lifecycle
# ---------------------------------------------------------------------------
class TestConnectDisconnect:
def test_connect_then_disconnect_does_not_double_subscribe(self, real_scene):
signal = real_scene.signals["config.changed"]
# Disconnect any leftover connections from __init__/prior tests; the
# important thing is we can reconnect safely.
try:
signal.disconnect(real_scene.on_config_changed)
except Exception:
pass
# Idempotent connect: connect twice is fine and disconnect must remove it.
real_scene.connect()
# Disconnect should remove the listener (no exception).
real_scene.disconnect()