Files
talemate/tests/test_world_state_init.py

566 lines
20 KiB
Python
Raw Normal View History

"""
Unit tests for `talemate.world_state.__init__`.
Covers the pure pydantic models and helper methods on `WorldState`,
`Reinforcement`, `Suggestion`, `ContextPin`, `ManualContext`. The async
methods that simply mutate state are also exercised.
Tests use real `MockScene` + bootstrapped agents; they do NOT mock the
function under test.
"""
import pytest
from _world_state_helpers import (
install_tracking_memory,
make_actor,
manager, # noqa: F401 - pytest fixture
manager_with_memory, # noqa: F401 - pytest fixture
scene, # noqa: F401 - pytest fixture
scene_with_memory, # noqa: F401 - pytest fixture
world_state, # noqa: F401 - pytest fixture
)
from talemate.game.focal.schema import Call
from talemate.world_state import (
ANY_CHARACTER,
CharacterState,
ContextPin,
InsertionMode,
ManualContext,
ObjectState,
Reinforcement,
Suggestion,
WorldState,
)
# ---------------------------------------------------------------------------
# Reinforcement.as_context_line
# ---------------------------------------------------------------------------
class TestReinforcementAsContextLine:
def test_question_with_question_mark_no_character(self):
r = Reinforcement(question="What is the weather?", answer="Sunny.")
assert r.as_context_line == "What is the weather? Sunny."
def test_question_with_question_mark_with_character(self):
r = Reinforcement(
question="What is the weather?", answer="Sunny.", character="Alice"
)
assert r.as_context_line == "Alice: What is the weather? Sunny."
def test_statement_no_character(self):
r = Reinforcement(question="weather", answer="sunny")
assert r.as_context_line == "weather: sunny"
def test_statement_with_character(self):
r = Reinforcement(question="mood", answer="happy", character="Alice")
assert r.as_context_line == "Alice's mood: happy"
def test_question_with_trailing_whitespace_still_treated_as_question(self):
r = Reinforcement(question="What time? ", answer="3pm")
# strip().endswith("?") is true for "What time? "
assert r.as_context_line == "What time? 3pm"
# ---------------------------------------------------------------------------
# WorldState.normalize_name
# ---------------------------------------------------------------------------
class TestNormalizeName:
def test_normalize_underscore_name(self):
ws = WorldState()
assert ws.normalize_name("john_doe") == "John Doe"
def test_normalize_preserves_possessive_lowercase_s(self):
"""title() converts 's to 'S; we expect the regex fix-up."""
ws = WorldState()
assert ws.normalize_name("john's hat") == "John's Hat"
def test_normalize_strips_whitespace(self):
ws = WorldState()
assert ws.normalize_name(" alice ") == "Alice"
def test_normalize_already_titled(self):
ws = WorldState()
assert ws.normalize_name("Bob") == "Bob"
# ---------------------------------------------------------------------------
# WorldState.filter_reinforcements
# ---------------------------------------------------------------------------
class TestFilterReinforcements:
def _ws(self) -> WorldState:
ws = WorldState()
ws.reinforce = [
Reinforcement(
question="q1", answer="a1", character="Alice", insert="sequential"
),
Reinforcement(
question="q2",
answer="a2",
character="Alice",
insert="conversation-context",
),
Reinforcement(
question="q3", answer="a3", character=None, insert="sequential"
),
Reinforcement(
question="q4", answer=None, character="Bob", insert="sequential"
),
]
return ws
def test_skips_reinforcement_with_no_answer(self):
ws = self._ws()
results = ws.filter_reinforcements()
questions = [r.question for r in results]
# q4 has answer=None so it must be excluded
assert "q4" not in questions
def test_default_returns_all_with_answer(self):
ws = self._ws()
results = ws.filter_reinforcements()
assert {r.question for r in results} == {"q1", "q2", "q3"}
def test_filter_by_character(self):
ws = self._ws()
results = ws.filter_reinforcements(character="Alice")
assert {r.question for r in results} == {"q1", "q2"}
def test_filter_by_none_character(self):
ws = self._ws()
results = ws.filter_reinforcements(character=None)
# character=None filters to those whose character is None
assert {r.question for r in results} == {"q3"}
def test_filter_by_insert(self):
ws = self._ws()
results = ws.filter_reinforcements(insert=["sequential"])
assert {r.question for r in results} == {"q1", "q3"}
def test_filter_by_character_and_insert(self):
ws = self._ws()
results = ws.filter_reinforcements(
character="Alice", insert=["conversation-context"]
)
assert {r.question for r in results} == {"q2"}
def test_any_character_constant_includes_all(self):
ws = self._ws()
results = ws.filter_reinforcements(character=ANY_CHARACTER)
# Should match both Alice's, world's; q4 still skipped (no answer)
assert {r.question for r in results} == {"q1", "q2", "q3"}
# ---------------------------------------------------------------------------
# WorldState.reset
# ---------------------------------------------------------------------------
class TestReset:
def test_reset_clears_characters_items_location(self):
ws = WorldState()
ws.characters = {"Alice": CharacterState(snapshot="x", emotion="happy")}
ws.items = {"key": ObjectState(snapshot="rusty")}
ws.location = "library"
# reinforce / pins / manual_context should NOT be reset
ws.reinforce = [Reinforcement(question="q", answer="a")]
ws.reset()
assert ws.characters == {}
assert ws.items == {}
assert ws.location is None
# reset only resets these three; others remain
assert len(ws.reinforce) == 1
# ---------------------------------------------------------------------------
# WorldState.reinforcements_for_character / world / manual_context_for_world
# ---------------------------------------------------------------------------
class TestReinforcementGroups:
def _ws(self) -> WorldState:
ws = WorldState()
ws.reinforce = [
Reinforcement(question="qA1", answer="x", character="Alice"),
Reinforcement(question="qA2", answer="y", character="Alice"),
Reinforcement(question="qB", answer="z", character="Bob"),
Reinforcement(question="qWorld", answer="w", character=None),
]
return ws
def test_reinforcements_for_character(self):
ws = self._ws()
results = ws.reinforcements_for_character("Alice")
assert set(results.keys()) == {"qA1", "qA2"}
assert all(r.character == "Alice" for r in results.values())
def test_reinforcements_for_character_unknown_returns_empty(self):
ws = self._ws()
assert ws.reinforcements_for_character("Nobody") == {}
def test_reinforcements_for_world(self):
ws = self._ws()
results = ws.reinforcements_for_world()
assert set(results.keys()) == {"qWorld"}
def test_manual_context_for_world_filters_by_typ(self):
ws = WorldState()
ws.manual_context["w1"] = ManualContext(
id="w1", text="t1", meta={"typ": "world_state"}
)
ws.manual_context["d1"] = ManualContext(
id="d1", text="t2", meta={"typ": "details"}
)
ws.manual_context["x"] = ManualContext(id="x", text="t3", meta={})
result = ws.manual_context_for_world()
assert set(result.keys()) == {"w1"}
# ---------------------------------------------------------------------------
# WorldState.character_emotion
# ---------------------------------------------------------------------------
class TestCharacterEmotion:
def test_returns_emotion_when_character_exists(self):
ws = WorldState()
ws.characters = {"Alice": CharacterState(emotion="elated")}
assert ws.character_emotion("Alice") == "elated"
def test_returns_none_when_character_missing(self):
ws = WorldState()
assert ws.character_emotion("Ghost") is None
# ---------------------------------------------------------------------------
# Suggestion model
# ---------------------------------------------------------------------------
class TestSuggestion:
@staticmethod
def _proposal(uid: str, name: str = "n") -> Call:
return Call(uid=uid, name=name, arguments={})
def test_remove_proposal(self):
s = Suggestion(
type="character",
name="Alice",
id="sug1",
proposals=[self._proposal("a"), self._proposal("b")],
)
s.remove_proposal("a")
assert [p.uid for p in s.proposals] == ["b"]
def test_remove_proposal_unknown_is_noop(self):
s = Suggestion(
type="character",
name="Alice",
id="sug1",
proposals=[self._proposal("a")],
)
s.remove_proposal("does-not-exist")
assert [p.uid for p in s.proposals] == ["a"]
def test_merge_appends_new_proposal(self):
s1 = Suggestion(
type="character", name="Alice", id="sug1", proposals=[self._proposal("a")]
)
s2 = Suggestion(
type="character", name="Alice", id="sug1", proposals=[self._proposal("b")]
)
s1.merge(s2)
assert [p.uid for p in s1.proposals] == ["a", "b"]
def test_merge_overrides_matching_proposal(self):
first = self._proposal("a", name="old")
replacement = self._proposal("a", name="new")
s1 = Suggestion(type="character", name="Alice", id="sug1", proposals=[first])
s2 = Suggestion(
type="character", name="Alice", id="sug1", proposals=[replacement]
)
s1.merge(s2)
assert len(s1.proposals) == 1
assert s1.proposals[0].name == "new"
def test_merge_assert_id_match(self):
s1 = Suggestion(type="character", name="Alice", id="sug1", proposals=[])
s2 = Suggestion(type="character", name="Alice", id="sug2", proposals=[])
with pytest.raises(AssertionError):
s1.merge(s2)
# ---------------------------------------------------------------------------
# WorldState.add_reinforcement (existing-update branch coverage)
# ---------------------------------------------------------------------------
class TestAddReinforcementUpdates:
@pytest.mark.asyncio
async def test_creating_world_reinforcement_appends(self, world_state):
r = await world_state.add_reinforcement(
question="What is the weather?", answer="sunny"
)
assert len(world_state.reinforce) == 1
assert world_state.reinforce[0] is r
@pytest.mark.asyncio
async def test_updating_existing_world_reinforcement_in_place(self, world_state):
r1 = await world_state.add_reinforcement(question="q", answer="a", interval=5)
r2 = await world_state.add_reinforcement(question="q", answer="b", interval=12)
# Same object updated, not a duplicate
assert r1 is r2
assert len(world_state.reinforce) == 1
assert r2.answer == "b"
assert r2.interval == 12
@pytest.mark.asyncio
async def test_change_insert_method_from_other_to_sequential_resets_due(
self, world_state
):
r = await world_state.add_reinforcement(
question="q", answer="a", insert="conversation-context"
)
r.due = 7 # simulate countdown
# Switch to sequential -> due should be reset to 0 to run next loop
await world_state.add_reinforcement(
question="q", answer="a", insert="sequential"
)
assert r.due == 0
assert r.insert == "sequential"
@pytest.mark.asyncio
async def test_update_character_reinforcement_sets_detail(self, scene):
"""When the reinforcement is character-bound, updating it should also
write the answer to the character's detail map."""
character = make_actor(scene, "Alice")
install_tracking_memory(scene) # avoid memory side effects
ws = scene.world_state
await ws.add_reinforcement(
question="What is Alice's mood?", character="Alice", answer="curious"
)
# update the existing reinforcement - this should also call set_detail
await ws.add_reinforcement(
question="What is Alice's mood?", character="Alice", answer="happy"
)
assert character.get_detail("What is Alice's mood?") == "happy"
# ---------------------------------------------------------------------------
# WorldState.find_reinforcement
# ---------------------------------------------------------------------------
class TestFindReinforcement:
@pytest.mark.asyncio
async def test_finds_existing(self, world_state):
await world_state.add_reinforcement(question="q", answer="a", character="Alice")
idx, r = await world_state.find_reinforcement("q", "Alice")
assert idx == 0
assert r is not None
assert r.character == "Alice"
@pytest.mark.asyncio
async def test_returns_none_when_missing(self, world_state):
idx, r = await world_state.find_reinforcement("nope", "Alice")
assert idx is None
assert r is None
@pytest.mark.asyncio
async def test_distinguishes_character_vs_world(self, world_state):
await world_state.add_reinforcement(question="q", answer="a", character="Alice")
await world_state.add_reinforcement(question="q", answer="b", character=None)
idx_alice, r_alice = await world_state.find_reinforcement("q", "Alice")
idx_world, r_world = await world_state.find_reinforcement("q", None)
assert r_alice.answer == "a"
assert r_world.answer == "b"
assert idx_alice != idx_world
# ---------------------------------------------------------------------------
# WorldState.commit_to_memory
# ---------------------------------------------------------------------------
class TestCommitToMemory:
@pytest.mark.asyncio
async def test_commit_to_memory_strips_complex_meta_values(self, scene_with_memory):
scene, tracking = scene_with_memory
ws = scene.world_state
ws.manual_context["a"] = ManualContext(
id="a",
text="text-a",
meta={
"source": "manual",
"typ": "world_state",
"complex": {"nested": "dict"}, # should be filtered out
"list": [1, 2, 3], # should be filtered out
"count": 1,
"flag": True,
"ratio": 0.5,
},
)
await ws.commit_to_memory(tracking)
assert len(tracking.add_many_calls) == 1
items = tracking.add_many_calls[0]
assert len(items) == 1
meta = items[0]["meta"]
# Only simple-typed values survive
assert "complex" not in meta
assert "list" not in meta
assert meta["count"] == 1
assert meta["flag"] is True
assert meta["ratio"] == 0.5
assert meta["source"] == "manual"
@pytest.mark.asyncio
async def test_commit_to_memory_passes_all_entries(self, scene_with_memory):
scene, tracking = scene_with_memory
ws = scene.world_state
ws.manual_context["a"] = ManualContext(id="a", text="t1")
ws.manual_context["b"] = ManualContext(id="b", text="t2")
await ws.commit_to_memory(tracking)
assert len(tracking.add_many_calls) == 1
items = tracking.add_many_calls[0]
ids = sorted([i["id"] for i in items])
assert ids == ["a", "b"]
# ---------------------------------------------------------------------------
# WorldState.persist
# ---------------------------------------------------------------------------
class TestPersist:
@pytest.mark.asyncio
async def test_persist_no_chars_no_items_does_not_call_memory(
self, scene_with_memory
):
scene, tracking = scene_with_memory
await scene.world_state.persist()
assert tracking.add_many_calls == []
@pytest.mark.asyncio
async def test_persist_emits_one_state_per_character_and_item(
self, scene_with_memory
):
scene, tracking = scene_with_memory
ws = scene.world_state
ws.characters = {"Alice": CharacterState(snapshot="standing", emotion="calm")}
ws.items = {"Sword": ObjectState(snapshot="sheathed")}
await ws.persist()
assert len(tracking.add_many_calls) == 1
items = tracking.add_many_calls[0]
ids = sorted(i["id"] for i in items)
assert ids == ["Alice.world_state.snapshot", "Sword.world_state.snapshot"]
# Snapshots are concatenated into the text
text_by_id = {i["id"]: i["text"] for i in items}
assert "standing" in text_by_id["Alice.world_state.snapshot"]
assert "sheathed" in text_by_id["Sword.world_state.snapshot"]
# ---------------------------------------------------------------------------
# WorldState.render -- jinja2 template rendering
# ---------------------------------------------------------------------------
class TestRenderTemplate:
"""Verifies the world-state render Jinja2 template produces meaningful
structure for the data we feed it. This isn't a baseline snapshot;
it asserts the load-bearing structure (markers and expected fields)."""
def test_render_with_characters_and_items(self):
ws = WorldState()
ws.characters = {"Alice": CharacterState(snapshot="standing", emotion="calm")}
ws.items = {"Sword": ObjectState(snapshot="sheathed")}
ws.location = "Library"
rendered = str(ws.render())
assert "[world state]" in rendered
assert "[end of world state]" in rendered
assert "Name: Alice" in rendered
assert "Emotion: calm" in rendered
assert "Snapshot: standing" in rendered
assert "Name: Sword" in rendered
assert "Snapshot: sheathed" in rendered
def test_render_empty(self):
ws = WorldState()
rendered = str(ws.render())
assert "[world state]" in rendered
assert "[end of world state]" in rendered
def test_as_list_returns_lines_after_render(self):
ws = WorldState()
ws.characters = {"Alice": CharacterState(snapshot="x", emotion="y")}
prompt = ws.render()
# Need to actually render the template before .as_list is populated.
prompt.render()
lines = prompt.as_list
assert any("Name: Alice" in line for line in lines)
def test_world_state_as_list_property_renders(self):
ws = WorldState()
ws.characters = {"Alice": CharacterState(snapshot="x", emotion="y")}
# Property short-cut: WorldState.as_list directly returns rendered list
# (this can return empty string if prompt isn't rendered—we just verify
# accessing the property doesn't crash.)
result = ws.as_list
# Either a list (after render) or empty string (before render)
assert isinstance(result, (list, str))
# ---------------------------------------------------------------------------
# InsertionMode enum
# ---------------------------------------------------------------------------
class TestInsertionMode:
def test_enum_values(self):
assert InsertionMode.sequential.value == "sequential"
assert InsertionMode.conversation_context.value == "conversation-context"
assert InsertionMode.all_context.value == "all-context"
assert InsertionMode.never.value == "never"
def test_enum_is_string(self):
assert InsertionMode.sequential == "sequential"
# ---------------------------------------------------------------------------
# ContextPin defaults / construction
# ---------------------------------------------------------------------------
class TestContextPinModel:
def test_default_values(self):
pin = ContextPin(entry_id="x")
assert pin.entry_id == "x"
assert pin.condition is None
assert pin.condition_state is False
assert pin.gamestate_condition is None
assert pin.active is False
assert pin.decay is None
assert pin.decay_due is None
def test_with_decay(self):
pin = ContextPin(entry_id="x", decay=5, active=True, decay_due=5)
assert pin.decay == 5
assert pin.decay_due == 5