Files
talemate/tests/test_nodegraph_writer.py
veguAI f5d41c04c8 0.37.0 (#267)
0.37.0

- **Director Planning** — Multi-step todo lists in director chat plus a Generate long progress action for multi-beat scene arcs.
- **Auto Narration** — Unified auto-narration replacing the old Narrate after Dialogue toggle, with a chance slider and weighted action mix.
- **LLM Prompt Templates Manager** — Dedicated UI tab for viewing, creating, editing, and deleting prompt templates.
- **Character Folders** — Collapsible folders in the World Editor character list, synced across linked scenes.
- **OpenAI Compatible TTS** — Connect any number of OpenAI-compatible TTS servers in parallel.
- **KoboldCpp TTS Auto-Setup** — KoboldCpp clients with a TTS model loaded register themselves as a TTS backend.
- **Model Testing Harness** — Bundled scene that runs basic capability tests against any connected LLM.

Plus 27 improvements and 28 bug fixes
2026-05-12 21:01:51 +03:00

1238 lines
38 KiB
Python

"""
Tests for ``talemate.game.engine.nodes.tools`` mutation + layout APIs.
See also ``tests/test_nodegraph_tools.py`` for the analysis layer.
"""
from __future__ import annotations
import uuid
from pathlib import Path
import pytest
from talemate.game.engine.nodes.tools import (
AlreadyConnectedError,
CycleError,
DynamicInputError,
GraphWriter,
GroupColor,
GroupError,
LayoutError,
LayoutOptions,
NodeNotFoundError,
NotConnectedError,
UnknownPropertyError,
UnknownRegistryError,
UnknownSocketError,
add_group,
analysis,
ensure_registry_loaded,
get_node_metadata,
layout_graph,
load_graph,
)
from talemate.game.engine.nodes.tools import writer as writer_mod
from talemate.game.engine.nodes.tools.layout import (
WCCKind,
_apply_estimated_heights,
_classify_wcc,
_estimate_height,
_wcc_over_subset,
)
REPO_ROOT = Path(__file__).resolve().parent.parent
DIRECTOR_FIXTURE = (
REPO_ROOT
/ "src/talemate/agents/director/modules/director-action-direct-story-arc.json"
)
ROLL_DICE_FIXTURE = REPO_ROOT / "scenes/infinity-quest/nodes/roll-dice.json"
# ---------------------------------------------------------------------------
# Helpers / fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True, scope="module")
def _load_registry():
ensure_registry_loaded()
yield
def _empty_graph() -> dict:
"""Minimal valid graph dict for writer tests."""
return {
"title": "Test",
"id": str(uuid.uuid4()),
"registry": "test/WriterGraph",
"base_type": "core/Graph",
"extends": None,
"properties": {},
"nodes": {},
"edges": {},
"groups": [],
"comments": [],
"inputs": [],
"outputs": [],
"module_properties": {},
}
def _node_at(graph: dict, nid: str) -> dict:
return graph["nodes"][nid]
# ---------------------------------------------------------------------------
# add_node / remove_node
# ---------------------------------------------------------------------------
def test_add_node_happy_path():
g = _empty_graph()
w = GraphWriter(g)
nid = w.add_node(
"data/number/Random",
title="Roll",
properties={"method": "integer"},
)
# id is a valid uuid4 string
uuid.UUID(nid) # raises if invalid
node = _node_at(g, nid)
assert node["registry"] == "data/number/Random"
assert node["title"] == "Roll"
assert node["properties"] == {"method": "integer"}
assert node["base_type"] == "core/Node"
assert node["x"] == 0
assert node["y"] == 0
assert node["width"] == 210
# Writer intentionally does NOT set a default height; layout owns
# height estimation so collision avoidance has a realistic rect.
assert "height" not in node
def test_add_node_respects_explicit_height():
g = _empty_graph()
w = GraphWriter(g)
nid = w.add_node(
"core/Watch",
title="Pinned",
height=200,
)
node = _node_at(g, nid)
assert node["height"] == 200
def test_add_node_unknown_registry_raises():
g = _empty_graph()
w = GraphWriter(g)
with pytest.raises(UnknownRegistryError, match="not registered"):
w.add_node("totally/Fake")
def test_add_node_dynamic_class_gets_dynamic_inputs_list():
g = _empty_graph()
w = GraphWriter(g)
nid = w.add_node("data/string/AdvancedFormat", title="Format")
assert _node_at(g, nid).get("dynamic_inputs") == []
def test_remove_node_prunes_edges():
g = _empty_graph()
w = GraphWriter(g)
a = w.add_node("core/Watch", title="A")
b = w.add_node("core/Watch", title="B")
c = w.add_node("core/Watch", title="C")
w.connect(a, "value", b, "value")
w.connect(b, "value", c, "value")
writer_mod.remove_node(g, b)
assert b not in g["nodes"]
# Every edge that referenced b should be gone
for src, targets in g["edges"].items():
assert src.split(".", 1)[0] != b
for tgt in targets:
assert tgt.split(".", 1)[0] != b
def test_remove_node_missing_raises():
g = _empty_graph()
GraphWriter(g)
with pytest.raises(NodeNotFoundError):
writer_mod.remove_node(g, "no-such-node")
# ---------------------------------------------------------------------------
# connect / disconnect
# ---------------------------------------------------------------------------
def test_connect_happy_path_creates_edge():
g = _empty_graph()
w = GraphWriter(g)
get_min = w.add_node(
"state/GetState",
title="GET min",
properties={"name": "min", "scope": "local"},
)
roll = w.add_node(
"data/number/Random",
title="Roll",
properties={"method": "integer"},
)
w.connect(get_min, "value", roll, "min")
edges = g["edges"]
key = f"{get_min}.value"
assert key in edges
assert f"{roll}.min" in edges[key]
def test_connect_dotted_shortcut():
g = _empty_graph()
w = GraphWriter(g)
a = w.add_node("core/Watch")
b = w.add_node("core/Watch")
w.connect(f"{a}.value", f"{b}.value")
assert f"{a}.value" in g["edges"]
def test_connect_unknown_source_socket():
g = _empty_graph()
w = GraphWriter(g)
a = w.add_node("core/Watch")
b = w.add_node("core/Watch")
with pytest.raises(UnknownSocketError, match="output socket 'nope'"):
w.connect(a, "nope", b, "value")
def test_connect_unknown_target_socket():
g = _empty_graph()
w = GraphWriter(g)
a = w.add_node("core/Watch")
b = w.add_node("core/Watch")
with pytest.raises(UnknownSocketError, match="input socket 'nope'"):
w.connect(a, "value", b, "nope")
def test_connect_already_connected_raises():
g = _empty_graph()
w = GraphWriter(g)
a = w.add_node("core/Watch")
b = w.add_node("core/Watch")
c = w.add_node("core/Watch")
w.connect(a, "value", b, "value")
with pytest.raises(AlreadyConnectedError):
w.connect(c, "value", b, "value")
def test_connect_cycle_rolls_back_edge():
g = _empty_graph()
w = GraphWriter(g)
a = w.add_node("core/Watch")
b = w.add_node("core/Watch")
w.connect(a, "value", b, "value")
with pytest.raises(CycleError):
w.connect(b, "value", a, "value")
# The rolled-back edge must not be present.
assert f"{b}.value" not in g["edges"]
# The original edge must still be there.
assert f"{a}.value" in g["edges"]
def test_connect_self_loop_is_cycle():
g = _empty_graph()
w = GraphWriter(g)
a = w.add_node("core/Watch")
with pytest.raises(CycleError):
w.connect(a, "value", a, "value")
assert f"{a}.value" not in g["edges"]
def test_disconnect_happy_path_and_missing():
g = _empty_graph()
w = GraphWriter(g)
a = w.add_node("core/Watch")
b = w.add_node("core/Watch")
w.connect(a, "value", b, "value")
w.disconnect(a, "value", b, "value")
assert f"{a}.value" not in g["edges"]
with pytest.raises(NotConnectedError):
w.disconnect(a, "value", b, "value")
# ---------------------------------------------------------------------------
# dynamic inputs
# ---------------------------------------------------------------------------
def test_add_dynamic_input_on_dynamic_node():
g = _empty_graph()
w = GraphWriter(g)
fmt = w.add_node("data/string/AdvancedFormat", title="Format")
w.add_dynamic_input(fmt, "item0", "any")
w.add_dynamic_input(fmt, "item1", "str")
dyn = _node_at(g, fmt)["dynamic_inputs"]
assert [d["name"] for d in dyn] == ["item0", "item1"]
assert dyn[1]["type"] == "str"
def test_add_dynamic_input_duplicate_raises():
g = _empty_graph()
w = GraphWriter(g)
fmt = w.add_node("data/string/AdvancedFormat")
w.add_dynamic_input(fmt, "item0")
with pytest.raises(DynamicInputError, match="already exists"):
w.add_dynamic_input(fmt, "item0")
def test_add_dynamic_input_on_non_dynamic_raises():
g = _empty_graph()
w = GraphWriter(g)
# core/Watch is a regular Node, not a DynamicSocketNodeBase subclass.
a = w.add_node("core/Watch")
with pytest.raises(DynamicInputError, match="DynamicSocketNodeBase"):
w.add_dynamic_input(a, "item0")
def test_remove_dynamic_input_sweeps_edges():
g = _empty_graph()
w = GraphWriter(g)
fmt = w.add_node("data/string/AdvancedFormat")
src = w.add_node("core/Watch")
w.add_dynamic_input(fmt, "item0")
w.connect(src, "value", fmt, "item0")
assert f"{src}.value" in g["edges"]
w.remove_dynamic_input(fmt, "item0")
assert f"{src}.value" not in g["edges"]
assert _node_at(g, fmt)["dynamic_inputs"] == []
with pytest.raises(DynamicInputError, match="No dynamic input"):
w.remove_dynamic_input(fmt, "item0")
# ---------------------------------------------------------------------------
# round-trip via load / save
# ---------------------------------------------------------------------------
def test_graphwriter_load_save_round_trip(tmp_path):
src = tmp_path / "orig.json"
g = _empty_graph()
a = str(uuid.uuid4())
b = str(uuid.uuid4())
g["nodes"] = {
a: {
"title": "A",
"id": a,
"properties": {},
"registry": "core/Watch",
"base_type": "core/Node",
"x": 10,
"y": 10,
"width": 210,
"height": 100,
},
b: {
"title": "B",
"id": b,
"properties": {},
"registry": "core/Watch",
"base_type": "core/Node",
"x": 300,
"y": 10,
"width": 210,
"height": 100,
},
}
g["edges"] = {f"{a}.value": [f"{b}.value"]}
import json
src.write_text(json.dumps(g), encoding="utf-8")
w = GraphWriter.load(src)
dest = tmp_path / "out.json"
saved = w.save(dest)
assert saved == dest
reloaded = load_graph(dest)
summary = analysis.summarize(reloaded)
assert summary.node_count == 2
def test_graphwriter_modify_and_reload(tmp_path):
g = _empty_graph()
w = GraphWriter(g)
a = w.add_node("core/Watch")
b = w.add_node("core/Watch")
w.connect(a, "value", b, "value")
dest = tmp_path / "mod.json"
w.save(dest)
reloaded = load_graph(dest)
assert analysis.summarize(reloaded).node_count == 2
assert f"{a}.value" in reloaded["edges"]
def test_save_without_path_raises():
w = GraphWriter(_empty_graph())
with pytest.raises(writer_mod.WriterError):
w.save()
# ---------------------------------------------------------------------------
# layout
# ---------------------------------------------------------------------------
def _make_layout_graph_with_existing() -> tuple[dict, list[str]]:
"""Graph with one existing node at (50, 20) plus three new nodes at origin."""
g = _empty_graph()
existing = str(uuid.uuid4())
g["nodes"][existing] = {
"title": "Existing",
"id": existing,
"properties": {},
"registry": "core/Watch",
"base_type": "core/Node",
"x": 50,
"y": 20,
"width": 200,
"height": 100,
}
w = GraphWriter(g)
a = w.add_node("core/Watch", title="A")
b = w.add_node("core/Watch", title="B")
c = w.add_node("core/Watch", title="C")
w.connect(a, "value", b, "value")
w.connect(b, "value", c, "value")
return g, [a, b, c]
def test_layout_anchor_right_places_offset_from_existing():
g, target = _make_layout_graph_with_existing()
a, b, c = target
existing_x = 50
existing_right = existing_x + 200 # 250
opts = LayoutOptions()
layout_graph(g, new_node_ids=target, anchor="right", options=opts)
# All target nodes end up at x >= existing_right + 300
first_col = existing_right + 300
assert g["nodes"][a]["x"] == first_col
assert g["nodes"][b]["x"] == first_col + opts.col_width
assert g["nodes"][c]["x"] == first_col + 2 * opts.col_width
# And they share min_y with the existing node.
assert g["nodes"][a]["y"] == 20
def test_layout_anchor_below_places_below_existing():
g, target = _make_layout_graph_with_existing()
a, b, c = target
layout_graph(g, new_node_ids=target, anchor="below")
# Existing node ends at y = 120; target should be at y = 320.
assert g["nodes"][a]["y"] == 320
assert g["nodes"][a]["x"] == 50 # same min_x as existing
def test_layout_anchor_full_starts_at_origin_and_relays_everything():
g, target = _make_layout_graph_with_existing()
# anchor=full ignores new_node_ids and relays everything.
existing_id = next(nid for nid in g["nodes"] if nid not in target)
layout_graph(g, anchor="full")
xs = [n["x"] for n in g["nodes"].values()]
ys = [n["y"] for n in g["nodes"].values()]
assert min(xs) == 0
assert min(ys) == 0
# The existing node is now in the target set.
assert g["nodes"][existing_id]["x"] >= 0
def test_layout_topological_ordering():
g = _empty_graph()
w = GraphWriter(g)
a = w.add_node("core/Watch", title="A")
b = w.add_node("core/Watch", title="B")
c = w.add_node("core/Watch", title="C")
w.connect(a, "value", b, "value")
w.connect(b, "value", c, "value")
layout_graph(g, new_node_ids=[a, b, c], anchor="full")
ax = g["nodes"][a]["x"]
bx = g["nodes"][b]["x"]
cx = g["nodes"][c]["x"]
assert ax < bx < cx
def test_layout_collision_avoidance_within_column():
"""Two isolated nodes both at depth 0 must land in different rows."""
g = _empty_graph()
w = GraphWriter(g)
a = w.add_node("core/Watch")
b = w.add_node("core/Watch")
layout_graph(g, new_node_ids=[a, b], anchor="full")
assert g["nodes"][a]["x"] == g["nodes"][b]["x"]
assert g["nodes"][a]["y"] != g["nodes"][b]["y"]
def test_layout_isolated_node_gets_column_zero():
g = _empty_graph()
w = GraphWriter(g)
solo = w.add_node("core/Watch")
layout_graph(g, new_node_ids=[solo], anchor="full")
assert g["nodes"][solo]["x"] == 0
assert g["nodes"][solo]["y"] == 0
def test_layout_does_not_move_nodes_outside_target_set():
g, target = _make_layout_graph_with_existing()
existing_id = next(nid for nid in g["nodes"] if nid not in target)
orig_x = g["nodes"][existing_id]["x"]
orig_y = g["nodes"][existing_id]["y"]
layout_graph(g, new_node_ids=target, anchor="right")
assert g["nodes"][existing_id]["x"] == orig_x
assert g["nodes"][existing_id]["y"] == orig_y
def test_layout_unknown_anchor_raises():
g = _empty_graph()
with pytest.raises(LayoutError):
layout_graph(g, anchor="sideways")
def test_layout_unknown_target_raises():
g = _empty_graph()
w = GraphWriter(g)
w.add_node("core/Watch") # layout needs at least one real node
with pytest.raises(LayoutError):
layout_graph(g, new_node_ids=["no-such-node"], anchor="right")
# ---------------------------------------------------------------------------
# combined smoke: writer + layout + analysis
# ---------------------------------------------------------------------------
def test_combined_smoke_build_layout_reload(tmp_path):
w = GraphWriter(_empty_graph())
in_node = w.add_node(
"core/Input",
title="IN value",
properties={"input_type": "int", "input_name": "value"},
)
get_state = w.add_node(
"state/GetState",
title="GET local.max",
properties={"name": "max", "scope": "local"},
)
roll = w.add_node(
"data/number/Random",
title="Roll",
properties={"method": "integer"},
)
out_node = w.add_node(
"core/Output",
title="OUT result",
properties={"output_type": "int", "output_name": "result"},
)
w.connect(in_node, "value", roll, "min")
w.connect(get_state, "value", roll, "max")
w.connect(roll, "result", out_node, "value")
layout_graph(
w.graph,
new_node_ids=[in_node, get_state, roll, out_node],
anchor="full",
)
dest = tmp_path / "combined.json"
w.save(dest)
reloaded = load_graph(dest)
summary = analysis.summarize(reloaded)
assert summary.node_count == 4
# Layout sanity: Random must sit right of both of its inputs.
nodes = reloaded["nodes"]
assert nodes[roll]["x"] > nodes[in_node]["x"]
assert nodes[roll]["x"] > nodes[get_state]["x"]
# And the output must sit right of Random.
assert nodes[out_node]["x"] > nodes[roll]["x"]
# ---------------------------------------------------------------------------
# layout v2: height estimation
# ---------------------------------------------------------------------------
def _expected_height(registry: str, options: LayoutOptions) -> int:
"""Replicate the layout _estimate_height formula for test assertions.
Mirrors ``layout._estimate_height`` (v2.1 formula):
title_bar_height
+ (max(num_inputs, num_outputs) + num_properties) * socket_row_height
+ padding
+ dynamic_socket_bonus (iff class is a DynamicSocketNodeBase)
Floored at ``options.min_height``.
"""
meta = get_node_metadata(registry)
rows = max(len(meta.inputs), len(meta.outputs)) + len(meta.properties)
estimated = (
options.title_bar_height + rows * options.socket_row_height + options.padding
)
if meta.is_dynamic:
estimated += options.dynamic_socket_bonus
return max(estimated, options.min_height)
def test_layout_estimates_height_for_target_nodes():
"""A target node with no height gets an estimate matching the formula."""
g = _empty_graph()
w = GraphWriter(g)
nid = w.add_node("state/SetState")
# Writer must not have set a height.
assert "height" not in g["nodes"][nid]
opts = LayoutOptions()
layout_graph(g, new_node_ids=[nid], anchor="full", options=opts)
expected = _expected_height("state/SetState", opts)
assert g["nodes"][nid]["height"] == expected
assert expected > opts.min_height # sanity: SetState has real sockets
def test_layout_height_formula_is_deterministic():
"""Direct unit test on _estimate_height to pin the v2.1 formula."""
g = _empty_graph()
w = GraphWriter(g)
nid = w.add_node("state/SetState")
opts = LayoutOptions()
meta = get_node_metadata("state/SetState")
rows = max(len(meta.inputs), len(meta.outputs)) + len(meta.properties)
manual = opts.title_bar_height + rows * opts.socket_row_height + opts.padding
if meta.is_dynamic:
manual += opts.dynamic_socket_bonus
manual = max(manual, opts.min_height)
assert _estimate_height(g["nodes"][nid], opts) == manual
def test_layout_overwrites_target_height_even_when_preset():
"""Target-set height is always replaced, even if the caller pinned one."""
g = _empty_graph()
w = GraphWriter(g)
nid = w.add_node("core/Watch", height=999)
assert g["nodes"][nid]["height"] == 999 # pinned by writer argument
opts = LayoutOptions()
layout_graph(g, new_node_ids=[nid], anchor="full", options=opts)
expected = _expected_height("core/Watch", opts)
assert g["nodes"][nid]["height"] == expected
assert g["nodes"][nid]["height"] != 999
def test_layout_does_not_touch_heights_outside_target_set():
"""Non-target nodes keep whatever height they had before layout ran."""
g = _empty_graph()
# Hand-build a non-target existing node with a pinned height.
existing_id = str(uuid.uuid4())
g["nodes"][existing_id] = {
"title": "Existing",
"id": existing_id,
"properties": {},
"registry": "core/Watch",
"base_type": "core/Node",
"x": 0,
"y": 0,
"width": 210,
"height": 777,
}
w = GraphWriter(g)
new_id = w.add_node("core/Watch")
layout_graph(g, new_node_ids=[new_id], anchor="right")
# Existing node keeps its pinned height
assert g["nodes"][existing_id]["height"] == 777
# New node got an estimate
assert g["nodes"][new_id]["height"] != 777
assert g["nodes"][new_id]["height"] == _expected_height(
"core/Watch", LayoutOptions()
)
def test_apply_estimated_heights_skips_unknown_registry():
"""Helper falls back to min_height for a registry not in NODES."""
g = _empty_graph()
bogus = str(uuid.uuid4())
g["nodes"][bogus] = {
"title": "Mystery",
"id": bogus,
"properties": {},
"registry": "totally/Fake",
"base_type": "core/Node",
"x": 0,
"y": 0,
"width": 210,
}
opts = LayoutOptions()
_apply_estimated_heights(g, {bogus}, opts)
assert g["nodes"][bogus]["height"] == opts.min_height
# ---------------------------------------------------------------------------
# layout v2: WCC classification
# ---------------------------------------------------------------------------
def test_wcc_classify_input_output_passthrough_is_top():
"""A 2-node Input->Output chain is TOP (passthrough)."""
g = _empty_graph()
w = GraphWriter(g)
in_node = w.add_node(
"core/Input",
properties={"input_type": "any", "input_name": "state"},
)
out_node = w.add_node(
"core/Output",
properties={"output_type": "any", "output_name": "state"},
)
w.connect(in_node, "value", out_node, "value")
components = _wcc_over_subset(g, {in_node, out_node})
assert len(components) == 1
info = _classify_wcc(g, components[0])
assert info.kind is WCCKind.TOP
def test_wcc_classify_stage_wins_over_input():
"""A chain containing both Input and Stage classifies as STAGE."""
g = _empty_graph()
w = GraphWriter(g)
in_node = w.add_node(
"core/Input",
properties={"input_type": "int", "input_name": "value"},
)
stage = w.add_node("core/Stage", properties={"stage": 3})
w.connect(in_node, "value", stage, "state")
info = _classify_wcc(g, sorted([in_node, stage]))
assert info.kind is WCCKind.STAGE
assert info.stage_value == 3
def test_wcc_classify_stage_value_is_min_across_multiple_stage_nodes():
"""If a WCC contains multiple Stage nodes, stage_value is the min."""
g = _empty_graph()
w = GraphWriter(g)
s_low = w.add_node("core/Stage", properties={"stage": 1})
s_high = w.add_node("core/Stage", properties={"stage": 5})
# Link them so they're one WCC.
w.connect(s_low, "state", s_high, "state")
info = _classify_wcc(g, sorted([s_low, s_high]))
assert info.kind is WCCKind.STAGE
assert info.stage_value == 1
def test_wcc_classify_output_only_is_bottom():
"""A chain with only Output (no Input, no Stage) is BOTTOM."""
g = _empty_graph()
w = GraphWriter(g)
out_node = w.add_node(
"core/Output",
properties={"output_type": "int", "output_name": "result"},
)
watch = w.add_node("core/Watch")
w.connect(watch, "value", out_node, "value")
info = _classify_wcc(g, sorted([out_node, watch]))
assert info.kind is WCCKind.BOTTOM
def test_wcc_classify_plain_watch_is_middle():
"""A lone Watch (no Input / Output / Stage) is MIDDLE."""
g = _empty_graph()
w = GraphWriter(g)
watch = w.add_node("core/Watch")
info = _classify_wcc(g, [watch])
assert info.kind is WCCKind.MIDDLE
# ---------------------------------------------------------------------------
# layout v2: vertical band ordering
# ---------------------------------------------------------------------------
def test_layout_vertical_band_ordering():
"""TOP on top, STAGE(0) above STAGE(1), BOTTOM on the bottom."""
g = _empty_graph()
w = GraphWriter(g)
# TOP component: Input -> Watch (passthrough)
top_in = w.add_node(
"core/Input",
title="IN top",
properties={"input_type": "any", "input_name": "x"},
)
top_watch = w.add_node("core/Watch", title="top watch")
w.connect(top_in, "value", top_watch, "value")
# STAGE(0) component
stage0 = w.add_node("core/Stage", title="Stage 0", properties={"stage": 0})
stage0_watch = w.add_node("core/Watch", title="s0 watch")
w.connect(stage0_watch, "value", stage0, "state")
# STAGE(1) component
stage1 = w.add_node("core/Stage", title="Stage 1", properties={"stage": 1})
stage1_watch = w.add_node("core/Watch", title="s1 watch")
w.connect(stage1_watch, "value", stage1, "state")
# BOTTOM component: Watch -> Output
bottom_watch = w.add_node("core/Watch", title="bottom watch")
bottom_out = w.add_node(
"core/Output",
title="OUT bottom",
properties={"output_type": "any", "output_name": "x"},
)
w.connect(bottom_watch, "value", bottom_out, "value")
opts = LayoutOptions()
layout_graph(g, anchor="full", options=opts)
def _comp_y(node_ids: list[str]) -> int:
return min(g["nodes"][nid]["y"] for nid in node_ids)
def _comp_bottom(node_ids: list[str]) -> int:
return max(g["nodes"][nid]["y"] + g["nodes"][nid]["height"] for nid in node_ids)
top_y = _comp_y([top_in, top_watch])
stage0_y = _comp_y([stage0, stage0_watch])
stage1_y = _comp_y([stage1, stage1_watch])
bottom_y = _comp_y([bottom_watch, bottom_out])
# Strict vertical ordering
assert top_y < stage0_y < stage1_y < bottom_y
# Each band separated from the next by at least band_gap.
assert stage0_y - _comp_bottom([top_in, top_watch]) >= opts.band_gap
assert stage1_y - _comp_bottom([stage0, stage0_watch]) >= opts.band_gap
assert bottom_y - _comp_bottom([stage1, stage1_watch]) >= opts.band_gap
# Bands don't vertically overlap.
assert _comp_bottom([top_in, top_watch]) < stage0_y
assert _comp_bottom([stage0, stage0_watch]) < stage1_y
assert _comp_bottom([stage1, stage1_watch]) < bottom_y
# ---------------------------------------------------------------------------
# layout v2.1: new formula, dynamic bonus, height-aware + predecessor-aware
# packing
# ---------------------------------------------------------------------------
def test_layout_v21_height_formula_matches_explicit_math():
"""Spell out the v2.1 height math for state/SetState explicitly."""
g = _empty_graph()
w = GraphWriter(g)
nid = w.add_node("state/SetState")
opts = LayoutOptions()
meta = get_node_metadata("state/SetState")
rows = max(len(meta.inputs), len(meta.outputs)) + len(meta.properties)
expected = opts.title_bar_height + rows * opts.socket_row_height + opts.padding
# state/SetState is a regular node, not a DynamicSocketNodeBase.
assert meta.is_dynamic is False
expected = max(expected, opts.min_height)
assert _estimate_height(g["nodes"][nid], opts) == expected
def test_layout_v21_dynamic_socket_bonus_applies_to_dynamic_class():
"""data/string/AdvancedFormat is a DynamicSocketNodeBase subclass."""
g = _empty_graph()
w = GraphWriter(g)
dyn_nid = w.add_node("data/string/AdvancedFormat")
opts = LayoutOptions()
meta = get_node_metadata("data/string/AdvancedFormat")
assert meta.is_dynamic is True
rows = max(len(meta.inputs), len(meta.outputs)) + len(meta.properties)
base = opts.title_bar_height + rows * opts.socket_row_height + opts.padding
expected = max(base + opts.dynamic_socket_bonus, opts.min_height)
assert _estimate_height(g["nodes"][dyn_nid], opts) == expected
# Matched non-dynamic control: a node without the bonus should be
# exactly ``dynamic_socket_bonus`` shorter for the same row count.
# Compute a plain height using the same formula minus the bonus and
# assert the delta.
without_bonus = max(base, opts.min_height)
assert expected - without_bonus == opts.dynamic_socket_bonus
def test_layout_v21_height_aware_column_packing():
"""Two isolated nodes in the same column respect variable heights."""
from talemate.game.engine.nodes.tools.layout import _place_band
opts = LayoutOptions()
g = _empty_graph()
# Hand-build a tall pre-sized node so we can assert the packer
# clears its actual height rather than the legacy 160 stride.
tall_id = str(uuid.uuid4())
g["nodes"][tall_id] = {
"title": "Tall",
"id": tall_id,
"properties": {},
"registry": "core/Watch",
"base_type": "core/Node",
"x": 0,
"y": 0,
"width": 210,
"height": 200,
}
short_id = str(uuid.uuid4())
g["nodes"][short_id] = {
"title": "Short",
"id": short_id,
"properties": {},
"registry": "core/Watch",
"base_type": "core/Node",
"x": 0,
"y": 0,
"width": 210,
"height": 100,
}
# No edges — both nodes land in column 0. Sorted id order
# determines which goes first.
placement = _place_band(
g,
sorted([tall_id, short_id]),
origin_x=0,
origin_y=0,
options=opts,
)
y_tall = g["nodes"][tall_id]["y"]
y_short = g["nodes"][short_id]["y"]
assert y_tall != y_short
top_id, bottom_id = (tall_id, short_id) if y_tall < y_short else (short_id, tall_id)
top_y = g["nodes"][top_id]["y"]
top_h = g["nodes"][top_id]["height"]
bottom_y = g["nodes"][bottom_id]["y"]
assert bottom_y >= top_y + top_h + opts.min_vertical_gap
# If the "top" node is the tall one, bottom_y must clear 200 + gap.
if top_id == tall_id:
assert bottom_y >= 200 + opts.min_vertical_gap
# Legacy stride would have put the second node at y=160; make sure
# we're not doing that anymore.
assert bottom_y != 160
# placement.max_y covers both rects.
assert placement.max_y >= bottom_y + g["nodes"][bottom_id]["height"]
def test_layout_v21_predecessor_aware_row_placement_uncrosses_wires():
"""Crossed Input->SetState edges should be un-crossed by the layout pass.
Fixture::
IN a ---> SET local.a
IN b ---> SET local.b
With the original IDs chosen so that ``a`` and ``b`` would sort in
the "wrong" order in column 1 under pure stable-id ordering, we
should still see ``IN a`` and ``SET local.a`` share a y coordinate
after layout, and likewise for ``b``.
"""
g = _empty_graph()
w = GraphWriter(g)
# Inputs in column 0
in_a = w.add_node(
"core/Input",
title="IN a",
properties={"input_type": "any", "input_name": "a"},
)
in_b = w.add_node(
"core/Input",
title="IN b",
properties={"input_type": "any", "input_name": "b"},
)
# SetStates in column 1 -- connect a -> set_a, b -> set_b, but add
# them to the graph in reverse-dependency order so that stable-id
# sort inside the column puts set_b before set_a.
set_b = w.add_node(
"state/SetState",
title="SET local.b",
properties={"name": "b", "scope": "local"},
)
set_a = w.add_node(
"state/SetState",
title="SET local.a",
properties={"name": "a", "scope": "local"},
)
w.connect(in_a, "value", set_a, "value")
w.connect(in_b, "value", set_b, "value")
layout_graph(g, anchor="full")
# Each SetState must land at the same y as its source Input.
assert g["nodes"][in_a]["y"] == g["nodes"][set_a]["y"], (
"IN a should line up with SET local.a after predecessor-aware placement"
)
assert g["nodes"][in_b]["y"] == g["nodes"][set_b]["y"], (
"IN b should line up with SET local.b after predecessor-aware placement"
)
def test_layout_v21_col_width_default_is_wider():
"""col_width default bumped from 260 to 360 in v2.1."""
opts = LayoutOptions()
assert opts.col_width == 360
# ---------------------------------------------------------------------------
# add_group / GroupColor
# ---------------------------------------------------------------------------
def _empty_graph() -> dict:
return {
"title": "Test",
"id": str(uuid.uuid4()),
"registry": "test/Test",
"base_type": "core/Graph",
"properties": {},
"x": 0,
"y": 0,
"width": 200,
"height": 100,
"collapsed": False,
"inherited": False,
"nodes": {},
"edges": {},
"groups": [],
"comments": [],
"extends": None,
"inputs": [],
"outputs": [],
"module_properties": {},
}
def test_add_group_creates_group_around_nodes():
"""add_group computes bbox from node positions and produces a group dict."""
g = GraphWriter(_empty_graph())
a = g.add_node(
"core/Input", properties={"input_name": "a", "input_type": "any", "num": 0}
)
b = g.add_node(
"core/Output", properties={"output_name": "a", "output_type": "any", "num": 0}
)
g.connect(a, "value", b, "value")
layout_graph(g.graph, anchor="full")
group = g.add_group("Input", GroupColor.INPUT, [a, b])
assert group in g.graph["groups"]
assert group["title"] == "Input"
assert group["color"] == "#88A"
assert group["color"] == GroupColor.INPUT
assert group["font_size"] == 24
assert group["inherited"] is False
# Bounding box should contain both placed nodes (with padding).
a_node = g.graph["nodes"][a]
b_node = g.graph["nodes"][b]
assert group["x"] <= min(a_node["x"], b_node["x"])
assert group["y"] <= min(a_node["y"], b_node["y"])
assert group["x"] + group["width"] >= max(
a_node["x"] + a_node["width"], b_node["x"] + b_node["width"]
)
assert group["y"] + group["height"] >= max(
a_node["y"] + a_node["height"], b_node["y"] + b_node["height"]
)
def test_add_group_resolves_short_prefix_ids():
"""GraphWriter.add_group accepts short-prefix node ids like other methods."""
g = GraphWriter(_empty_graph())
a = g.add_node(
"core/Input", properties={"input_name": "a", "input_type": "any", "num": 0}
)
layout_graph(g.graph, anchor="full")
group = g.add_group("X", GroupColor.SPECIAL, [a[:8]])
assert group["title"] == "X"
def test_add_group_empty_node_ids_raises():
g = GraphWriter(_empty_graph())
with pytest.raises(GroupError):
g.add_group("Empty", GroupColor.INPUT, [])
def test_add_group_unknown_node_raises():
g = GraphWriter(_empty_graph())
with pytest.raises(GroupError):
add_group(g.graph, "X", GroupColor.INPUT, ["does-not-exist"])
def test_add_group_appends_to_existing_groups():
"""add_group never clobbers user-drawn groups; it appends."""
g = GraphWriter(_empty_graph())
g.graph["groups"].append(
{"title": "User Group", "x": 0, "y": 0, "width": 100, "height": 50}
)
a = g.add_node(
"core/Input", properties={"input_name": "a", "input_type": "any", "num": 0}
)
layout_graph(g.graph, anchor="full")
g.add_group("Auto", GroupColor.OUTPUT, [a])
titles = [grp["title"] for grp in g.graph["groups"]]
assert "User Group" in titles
assert "Auto" in titles
assert len(g.graph["groups"]) == 2
def test_group_color_palette_matches_litegraph_presets():
"""Sanity check the constants. These are the LiteGraph groupcolor hex
values; if any drift, the visual match with the frontend breaks."""
assert GroupColor.INPUT == "#88A"
assert GroupColor.OUTPUT == "#8A8"
assert GroupColor.PROCESS == "#3f789e"
assert GroupColor.PREPARE == "#8AA"
assert GroupColor.VALIDATION == "#b58b2a"
assert GroupColor.FUNCTION == "#b06634"
assert GroupColor.SPECIAL == "#a1309b"
assert GroupColor.ERROR_HANDLING == "#A88"
assert GroupColor.UX == "#207e7e"
# ---------------------------------------------------------------------------
# add_node property validation (UnknownPropertyError)
# ---------------------------------------------------------------------------
def test_add_node_accepts_known_property():
"""Sanity check: a property that exists on the class is accepted.
``state/SetState`` declares ``name`` and ``scope`` as Fields — passing
either should work without raising.
"""
g = GraphWriter(_empty_graph())
nid = g.add_node(
"state/SetState",
properties={"name": "foo", "scope": "local"},
)
assert g.graph["nodes"][nid]["properties"] == {"name": "foo", "scope": "local"}
def test_add_node_unknown_property_raises():
"""A property key not declared on the class raises UnknownPropertyError.
This is the exact failure mode the agent hit when it set
``properties={"agent": "summarizer"}`` on ``agents/GetAgent`` instead
of the real property name ``agent_name``.
"""
g = GraphWriter(_empty_graph())
with pytest.raises(UnknownPropertyError) as excinfo:
g.add_node(
"agents/GetAgent",
properties={"agent": "summarizer"}, # bogus key
)
msg = str(excinfo.value)
assert "'agent'" in msg
assert "agent_name" in msg # the real property name should appear in the hint
def test_add_node_unknown_property_lists_valid_options_in_message():
"""The error message must list the valid property names so the caller
can fix the typo without further lookup."""
g = GraphWriter(_empty_graph())
with pytest.raises(UnknownPropertyError) as excinfo:
g.add_node(
"state/SetState",
properties={"naem": "foo"}, # typo for "name"
)
msg = str(excinfo.value)
assert "'naem'" in msg
assert "'name'" in msg
assert "'scope'" in msg
def test_add_node_no_properties_still_works():
"""Calling add_node without a properties arg should NOT trigger
validation (there's nothing to validate)."""
g = GraphWriter(_empty_graph())
nid = g.add_node("core/Stage")
assert g.graph["nodes"][nid]["properties"] == {}
def test_add_node_empty_properties_dict_still_works():
"""Same as above but with an explicit empty dict."""
g = GraphWriter(_empty_graph())
nid = g.add_node("core/Stage", properties={})
assert g.graph["nodes"][nid]["properties"] == {}
def test_add_node_validation_runs_before_mutation():
"""When add_node raises UnknownPropertyError, no node should have
been added to the graph (no partial-mutation footprint)."""
g = GraphWriter(_empty_graph())
before_count = len(g.graph["nodes"])
with pytest.raises(UnknownPropertyError):
g.add_node("state/SetState", properties={"bogus_key": "value"})
after_count = len(g.graph["nodes"])
assert after_count == before_count