mirror of
https://github.com/vegu-ai/talemate.git
synced 2025-12-16 11:47:48 +01:00
1369 lines
44 KiB
Python
1369 lines
44 KiB
Python
|
|
import os
|
||
|
|
import json
|
||
|
|
import tempfile
|
||
|
|
import shutil
|
||
|
|
import pytest
|
||
|
|
from unittest.mock import Mock
|
||
|
|
|
||
|
|
from talemate.changelog import (
|
||
|
|
save_changelog,
|
||
|
|
append_scene_delta,
|
||
|
|
reconstruct_scene_data,
|
||
|
|
reconstruct_cleanup,
|
||
|
|
write_reconstructed_scene,
|
||
|
|
list_revisions,
|
||
|
|
rollback_scene_to_revision,
|
||
|
|
delete_changelog_files,
|
||
|
|
_changelog_log_path,
|
||
|
|
_base_path,
|
||
|
|
_latest_path,
|
||
|
|
_read_json_or_default,
|
||
|
|
_write_json,
|
||
|
|
_ensure_log_initialized,
|
||
|
|
_load_base_scene_data,
|
||
|
|
_load_latest_scene_data,
|
||
|
|
_ensure_latest_initialized,
|
||
|
|
_apply_delta,
|
||
|
|
_compute_delta,
|
||
|
|
_serialize_scene_plain,
|
||
|
|
_get_changelog_files,
|
||
|
|
_get_latest_changelog_file,
|
||
|
|
_get_overall_latest_revision,
|
||
|
|
_get_file_size,
|
||
|
|
MAX_CHANGELOG_FILE_SIZE,
|
||
|
|
InMemoryChangelog,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def temp_dir():
|
||
|
|
"""Create a temporary directory for tests."""
|
||
|
|
temp_dir = tempfile.mkdtemp()
|
||
|
|
yield temp_dir
|
||
|
|
shutil.rmtree(temp_dir)
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def mock_scene(temp_dir):
|
||
|
|
"""Create a mock scene object."""
|
||
|
|
scene = Mock()
|
||
|
|
scene.filename = "test_scene.json"
|
||
|
|
scene.save_dir = temp_dir
|
||
|
|
scene.changelog_dir = os.path.join(temp_dir, "changelog")
|
||
|
|
scene.backups_dir = os.path.join(temp_dir, "backups")
|
||
|
|
scene.serialize = {"characters": [], "entries": [], "metadata": {"version": "1.0"}}
|
||
|
|
scene.rev = 0 # Initialize revision to 0
|
||
|
|
scene._changelog = None # Explicitly set to None to avoid Mock auto-creation
|
||
|
|
return scene
|
||
|
|
|
||
|
|
|
||
|
|
def test_changelog_log_path(mock_scene):
|
||
|
|
"""Test changelog log path generation."""
|
||
|
|
expected = os.path.join(
|
||
|
|
mock_scene.changelog_dir, "test_scene.json.changelog.0.json"
|
||
|
|
)
|
||
|
|
result = _changelog_log_path(mock_scene, 0)
|
||
|
|
assert result == expected
|
||
|
|
assert os.path.exists(mock_scene.changelog_dir)
|
||
|
|
|
||
|
|
# Test with different start revision
|
||
|
|
expected = os.path.join(
|
||
|
|
mock_scene.changelog_dir, "test_scene.json.changelog.123.json"
|
||
|
|
)
|
||
|
|
result = _changelog_log_path(mock_scene, 123)
|
||
|
|
assert result == expected
|
||
|
|
|
||
|
|
|
||
|
|
def test_base_path(mock_scene):
|
||
|
|
"""Test base path generation."""
|
||
|
|
expected = os.path.join(mock_scene.changelog_dir, "test_scene.json.base.json")
|
||
|
|
result = _base_path(mock_scene)
|
||
|
|
assert result == expected
|
||
|
|
|
||
|
|
|
||
|
|
def test_latest_path(mock_scene):
|
||
|
|
"""Test latest path generation."""
|
||
|
|
expected = os.path.join(mock_scene.changelog_dir, "test_scene.json.latest.json")
|
||
|
|
result = _latest_path(mock_scene)
|
||
|
|
assert result == expected
|
||
|
|
|
||
|
|
|
||
|
|
def test_read_json_or_default_file_exists(temp_dir):
|
||
|
|
"""Test reading existing JSON file."""
|
||
|
|
test_data = {"test": "data"}
|
||
|
|
test_file = os.path.join(temp_dir, "test.json")
|
||
|
|
|
||
|
|
with open(test_file, "w") as f:
|
||
|
|
json.dump(test_data, f)
|
||
|
|
|
||
|
|
result = _read_json_or_default(test_file, {})
|
||
|
|
assert result == test_data
|
||
|
|
|
||
|
|
|
||
|
|
def test_read_json_or_default_file_missing(temp_dir):
|
||
|
|
"""Test reading non-existent JSON file returns default."""
|
||
|
|
test_file = os.path.join(temp_dir, "missing.json")
|
||
|
|
default = {"default": "value"}
|
||
|
|
|
||
|
|
result = _read_json_or_default(test_file, default)
|
||
|
|
assert result == default
|
||
|
|
|
||
|
|
|
||
|
|
def test_read_json_or_default_invalid_json(temp_dir):
|
||
|
|
"""Test reading invalid JSON file returns default."""
|
||
|
|
test_file = os.path.join(temp_dir, "invalid.json")
|
||
|
|
default = {"default": "value"}
|
||
|
|
|
||
|
|
with open(test_file, "w") as f:
|
||
|
|
f.write("invalid json content")
|
||
|
|
|
||
|
|
result = _read_json_or_default(test_file, default)
|
||
|
|
assert result == default
|
||
|
|
|
||
|
|
|
||
|
|
def test_write_json(temp_dir):
|
||
|
|
"""Test writing JSON data to file."""
|
||
|
|
test_data = {"test": "data", "number": 42}
|
||
|
|
test_file = os.path.join(temp_dir, "subdir", "test.json")
|
||
|
|
|
||
|
|
_write_json(test_file, test_data)
|
||
|
|
|
||
|
|
assert os.path.exists(test_file)
|
||
|
|
with open(test_file, "r") as f:
|
||
|
|
result = json.load(f)
|
||
|
|
assert result == test_data
|
||
|
|
|
||
|
|
|
||
|
|
def test_ensure_log_initialized_new_log(mock_scene):
|
||
|
|
"""Test initializing a new changelog log."""
|
||
|
|
result = _ensure_log_initialized(mock_scene, 0)
|
||
|
|
|
||
|
|
expected_structure = {
|
||
|
|
"version": 1,
|
||
|
|
"base": "test_scene.json.base.json",
|
||
|
|
"start_rev": 0,
|
||
|
|
"deltas": [],
|
||
|
|
"latest_rev": 0,
|
||
|
|
}
|
||
|
|
|
||
|
|
assert result == expected_structure
|
||
|
|
|
||
|
|
# Verify file was created
|
||
|
|
log_path = _changelog_log_path(mock_scene, 0)
|
||
|
|
assert os.path.exists(log_path)
|
||
|
|
|
||
|
|
|
||
|
|
def test_ensure_log_initialized_existing_log(mock_scene):
|
||
|
|
"""Test loading existing changelog log."""
|
||
|
|
log_path = _changelog_log_path(mock_scene, 0)
|
||
|
|
existing_data = {
|
||
|
|
"version": 1,
|
||
|
|
"base": "test_scene.json.base.json",
|
||
|
|
"start_rev": 0,
|
||
|
|
"deltas": [{"rev": 1}, {"rev": 3}, {"rev": 2}],
|
||
|
|
"latest_rev": 2,
|
||
|
|
}
|
||
|
|
|
||
|
|
os.makedirs(os.path.dirname(log_path), exist_ok=True)
|
||
|
|
with open(log_path, "w") as f:
|
||
|
|
json.dump(existing_data, f)
|
||
|
|
|
||
|
|
result = _ensure_log_initialized(mock_scene, 0)
|
||
|
|
|
||
|
|
# Should update latest_rev to max of deltas
|
||
|
|
assert result["latest_rev"] == 3
|
||
|
|
assert result["deltas"] == existing_data["deltas"]
|
||
|
|
assert result["start_rev"] == 0
|
||
|
|
|
||
|
|
|
||
|
|
def test_load_base_scene_data(mock_scene):
|
||
|
|
"""Test loading base scene data."""
|
||
|
|
base_data = {"test": "base_data"}
|
||
|
|
base_path = _base_path(mock_scene)
|
||
|
|
|
||
|
|
os.makedirs(os.path.dirname(base_path), exist_ok=True)
|
||
|
|
with open(base_path, "w") as f:
|
||
|
|
json.dump(base_data, f)
|
||
|
|
|
||
|
|
result = _load_base_scene_data(mock_scene)
|
||
|
|
assert result == base_data
|
||
|
|
|
||
|
|
|
||
|
|
def test_load_latest_scene_data_exists(mock_scene):
|
||
|
|
"""Test loading existing latest scene data."""
|
||
|
|
latest_data = {"test": "latest_data"}
|
||
|
|
latest_path = _latest_path(mock_scene)
|
||
|
|
|
||
|
|
os.makedirs(os.path.dirname(latest_path), exist_ok=True)
|
||
|
|
with open(latest_path, "w") as f:
|
||
|
|
json.dump(latest_data, f)
|
||
|
|
|
||
|
|
result = _load_latest_scene_data(mock_scene)
|
||
|
|
assert result == latest_data
|
||
|
|
|
||
|
|
|
||
|
|
def test_load_latest_scene_data_missing(mock_scene):
|
||
|
|
"""Test loading non-existent latest scene data returns None."""
|
||
|
|
result = _load_latest_scene_data(mock_scene)
|
||
|
|
assert result is None
|
||
|
|
|
||
|
|
|
||
|
|
def test_ensure_latest_initialized_creates_file(mock_scene):
|
||
|
|
"""Test that ensure_latest_initialized creates latest file from base."""
|
||
|
|
base_data = {"test": "base_data"}
|
||
|
|
base_path = _base_path(mock_scene)
|
||
|
|
latest_path = _latest_path(mock_scene)
|
||
|
|
|
||
|
|
os.makedirs(os.path.dirname(base_path), exist_ok=True)
|
||
|
|
with open(base_path, "w") as f:
|
||
|
|
json.dump(base_data, f)
|
||
|
|
|
||
|
|
_ensure_latest_initialized(mock_scene)
|
||
|
|
|
||
|
|
assert os.path.exists(latest_path)
|
||
|
|
with open(latest_path, "r") as f:
|
||
|
|
result = json.load(f)
|
||
|
|
assert result == base_data
|
||
|
|
|
||
|
|
|
||
|
|
def test_compute_delta_no_change():
|
||
|
|
"""Test computing delta when there are no changes."""
|
||
|
|
data1 = {"a": 1, "b": {"c": 2}}
|
||
|
|
data2 = {"a": 1, "b": {"c": 2}}
|
||
|
|
|
||
|
|
delta = _compute_delta(data1, data2)
|
||
|
|
assert delta == {}
|
||
|
|
|
||
|
|
|
||
|
|
def test_compute_delta_with_changes():
|
||
|
|
"""Test computing delta when there are changes."""
|
||
|
|
data1 = {"a": 1, "b": {"c": 2}}
|
||
|
|
data2 = {"a": 1, "b": {"c": 3}, "d": 4}
|
||
|
|
|
||
|
|
delta = _compute_delta(data1, data2)
|
||
|
|
assert delta != {}
|
||
|
|
|
||
|
|
|
||
|
|
def test_apply_delta():
|
||
|
|
"""Test applying a delta to data."""
|
||
|
|
import deepdiff
|
||
|
|
|
||
|
|
original = {"a": 1, "b": {"c": 2}}
|
||
|
|
modified = {"a": 1, "b": {"c": 3}, "d": 4}
|
||
|
|
|
||
|
|
diff = deepdiff.DeepDiff(original, modified)
|
||
|
|
delta_dict = diff._to_delta_dict()
|
||
|
|
|
||
|
|
result = _apply_delta(original, delta_dict)
|
||
|
|
assert result == modified
|
||
|
|
|
||
|
|
|
||
|
|
def test_serialize_scene_plain(mock_scene):
|
||
|
|
"""Test serializing scene to plain dict."""
|
||
|
|
mock_scene.serialize = {"test": "data"}
|
||
|
|
|
||
|
|
result = _serialize_scene_plain(mock_scene)
|
||
|
|
assert result == {"test": "data"}
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_save_changelog_new_scene(mock_scene):
|
||
|
|
"""Test saving changelog for a new scene."""
|
||
|
|
await save_changelog(mock_scene)
|
||
|
|
|
||
|
|
# Check that base and latest files were created
|
||
|
|
base_path = _base_path(mock_scene)
|
||
|
|
latest_path = _latest_path(mock_scene)
|
||
|
|
|
||
|
|
assert os.path.exists(base_path)
|
||
|
|
assert os.path.exists(latest_path)
|
||
|
|
|
||
|
|
# Both should contain the serialized scene data
|
||
|
|
with open(base_path, "r") as f:
|
||
|
|
base_data = json.load(f)
|
||
|
|
with open(latest_path, "r") as f:
|
||
|
|
latest_data = json.load(f)
|
||
|
|
|
||
|
|
assert base_data == mock_scene.serialize
|
||
|
|
assert latest_data == mock_scene.serialize
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_save_changelog_existing_scene(mock_scene):
|
||
|
|
"""Test that save_changelog doesn't overwrite existing base."""
|
||
|
|
base_path = _base_path(mock_scene)
|
||
|
|
original_data = {"original": "data"}
|
||
|
|
|
||
|
|
# Create existing base file
|
||
|
|
os.makedirs(os.path.dirname(base_path), exist_ok=True)
|
||
|
|
with open(base_path, "w") as f:
|
||
|
|
json.dump(original_data, f)
|
||
|
|
|
||
|
|
await save_changelog(mock_scene)
|
||
|
|
|
||
|
|
# Base file should be unchanged
|
||
|
|
with open(base_path, "r") as f:
|
||
|
|
result = json.load(f)
|
||
|
|
assert result == original_data
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_append_scene_delta_no_change(mock_scene):
|
||
|
|
"""Test appending delta when scene hasn't changed."""
|
||
|
|
# Initialize scene
|
||
|
|
await save_changelog(mock_scene)
|
||
|
|
|
||
|
|
# Try to append delta with same data
|
||
|
|
result = await append_scene_delta(mock_scene, {"action": "test"})
|
||
|
|
|
||
|
|
# Should return None (no change)
|
||
|
|
assert result is None
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_append_scene_delta_with_change(mock_scene):
|
||
|
|
"""Test appending delta when scene has changed."""
|
||
|
|
# Initialize scene
|
||
|
|
await save_changelog(mock_scene)
|
||
|
|
|
||
|
|
# Change the scene data
|
||
|
|
mock_scene.serialize = {
|
||
|
|
"characters": [{"name": "Alice"}],
|
||
|
|
"entries": [],
|
||
|
|
"metadata": {"version": "1.0"},
|
||
|
|
}
|
||
|
|
|
||
|
|
result = await append_scene_delta(mock_scene, {"action": "add_character"})
|
||
|
|
|
||
|
|
# Should return revision 1
|
||
|
|
assert result == 1
|
||
|
|
|
||
|
|
# Check that changelog log was updated
|
||
|
|
_, log_path = _get_latest_changelog_file(mock_scene)
|
||
|
|
with open(log_path, "r") as f:
|
||
|
|
log_data = json.load(f)
|
||
|
|
|
||
|
|
assert log_data["latest_rev"] == 1
|
||
|
|
assert len(log_data["deltas"]) == 1
|
||
|
|
assert log_data["deltas"][0]["rev"] == 1
|
||
|
|
assert log_data["deltas"][0]["meta"] == {"action": "add_character"}
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_reconstruct_scene_data_base_only(mock_scene):
|
||
|
|
"""Test reconstructing scene data at revision 0."""
|
||
|
|
base_data = {"test": "base"}
|
||
|
|
base_path = _base_path(mock_scene)
|
||
|
|
|
||
|
|
os.makedirs(os.path.dirname(base_path), exist_ok=True)
|
||
|
|
with open(base_path, "w") as f:
|
||
|
|
json.dump(base_data, f)
|
||
|
|
|
||
|
|
result = await reconstruct_scene_data(mock_scene, to_rev=0)
|
||
|
|
assert result == base_data
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_reconstruct_scene_data_with_deltas(mock_scene):
|
||
|
|
"""Test reconstructing scene data with deltas applied."""
|
||
|
|
# Setup base data
|
||
|
|
base_data = {"characters": [], "entries": []}
|
||
|
|
base_path = _base_path(mock_scene)
|
||
|
|
os.makedirs(os.path.dirname(base_path), exist_ok=True)
|
||
|
|
with open(base_path, "w") as f:
|
||
|
|
json.dump(base_data, f)
|
||
|
|
|
||
|
|
# Setup changelog with delta
|
||
|
|
import deepdiff
|
||
|
|
|
||
|
|
modified_data = {"characters": [{"name": "Alice"}], "entries": []}
|
||
|
|
diff = deepdiff.DeepDiff(base_data, modified_data)
|
||
|
|
delta = diff._to_delta_dict()
|
||
|
|
|
||
|
|
log_data = {
|
||
|
|
"version": 1,
|
||
|
|
"base": "test_scene.json.base.json",
|
||
|
|
"deltas": [{"rev": 1, "ts": 1672531200, "delta": delta, "meta": {}}],
|
||
|
|
"latest_rev": 1,
|
||
|
|
}
|
||
|
|
|
||
|
|
log_path = _changelog_log_path(mock_scene, 0)
|
||
|
|
with open(log_path, "w") as f:
|
||
|
|
json.dump(log_data, f)
|
||
|
|
|
||
|
|
result = await reconstruct_scene_data(mock_scene, to_rev=1)
|
||
|
|
assert result == modified_data
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_write_reconstructed_scene(mock_scene):
|
||
|
|
"""Test writing reconstructed scene to file."""
|
||
|
|
# Setup base data
|
||
|
|
base_data = {"test": "data"}
|
||
|
|
base_path = _base_path(mock_scene)
|
||
|
|
os.makedirs(os.path.dirname(base_path), exist_ok=True)
|
||
|
|
with open(base_path, "w") as f:
|
||
|
|
json.dump(base_data, f)
|
||
|
|
|
||
|
|
# Initialize log
|
||
|
|
_ensure_log_initialized(mock_scene, 0)
|
||
|
|
|
||
|
|
output_path = await write_reconstructed_scene(mock_scene, 0, "custom_output.json")
|
||
|
|
|
||
|
|
expected_path = os.path.join(mock_scene.save_dir, "custom_output.json")
|
||
|
|
assert output_path == expected_path
|
||
|
|
assert os.path.exists(output_path)
|
||
|
|
|
||
|
|
with open(output_path, "r") as f:
|
||
|
|
result = json.load(f)
|
||
|
|
assert result == base_data
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_write_reconstructed_scene_default_filename(mock_scene):
|
||
|
|
"""Test writing reconstructed scene with default filename."""
|
||
|
|
base_data = {"test": "data"}
|
||
|
|
base_path = _base_path(mock_scene)
|
||
|
|
os.makedirs(os.path.dirname(base_path), exist_ok=True)
|
||
|
|
with open(base_path, "w") as f:
|
||
|
|
json.dump(base_data, f)
|
||
|
|
|
||
|
|
_ensure_log_initialized(mock_scene, 0)
|
||
|
|
|
||
|
|
output_path = await write_reconstructed_scene(mock_scene, 0)
|
||
|
|
|
||
|
|
expected_filename = "test_scene-rev-0.json"
|
||
|
|
expected_path = os.path.join(mock_scene.save_dir, expected_filename)
|
||
|
|
assert output_path == expected_path
|
||
|
|
|
||
|
|
|
||
|
|
def test_list_revisions_empty(mock_scene):
|
||
|
|
"""Test listing revisions when there are none."""
|
||
|
|
result = list_revisions(mock_scene)
|
||
|
|
assert result == []
|
||
|
|
|
||
|
|
|
||
|
|
def test_list_revisions_with_deltas(mock_scene):
|
||
|
|
"""Test listing revisions with existing deltas."""
|
||
|
|
log_data = {
|
||
|
|
"version": 1,
|
||
|
|
"base": "test_scene.json.base.json",
|
||
|
|
"deltas": [{"rev": 1}, {"rev": 3}, {"rev": 2}],
|
||
|
|
"latest_rev": 3,
|
||
|
|
}
|
||
|
|
|
||
|
|
log_path = _changelog_log_path(mock_scene, 0)
|
||
|
|
os.makedirs(os.path.dirname(log_path), exist_ok=True)
|
||
|
|
with open(log_path, "w") as f:
|
||
|
|
json.dump(log_data, f)
|
||
|
|
|
||
|
|
result = list_revisions(mock_scene)
|
||
|
|
assert result == [
|
||
|
|
3,
|
||
|
|
2,
|
||
|
|
1,
|
||
|
|
] # Should be sorted by revision number descending (newest first)
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_rollback_scene_to_revision_invalid_rev(mock_scene):
|
||
|
|
"""Test rollback with invalid revision number."""
|
||
|
|
# Setup some revisions
|
||
|
|
log_data = {
|
||
|
|
"version": 1,
|
||
|
|
"base": "test_scene.json.base.json",
|
||
|
|
"deltas": [{"rev": 1}, {"rev": 2}],
|
||
|
|
"latest_rev": 2,
|
||
|
|
}
|
||
|
|
|
||
|
|
log_path = _changelog_log_path(mock_scene, 0)
|
||
|
|
os.makedirs(os.path.dirname(log_path), exist_ok=True)
|
||
|
|
with open(log_path, "w") as f:
|
||
|
|
json.dump(log_data, f)
|
||
|
|
|
||
|
|
with pytest.raises(ValueError, match="Invalid revision: 5. Latest available is 2."):
|
||
|
|
await rollback_scene_to_revision(mock_scene, 5)
|
||
|
|
|
||
|
|
with pytest.raises(
|
||
|
|
ValueError, match="Invalid revision: -1. Latest available is 2."
|
||
|
|
):
|
||
|
|
await rollback_scene_to_revision(mock_scene, -1)
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_rollback_scene_to_revision_with_backup(mock_scene):
|
||
|
|
"""Test rollback creates backup and updates scene file."""
|
||
|
|
# Setup base data
|
||
|
|
base_data = {"version": "base"}
|
||
|
|
base_path = _base_path(mock_scene)
|
||
|
|
os.makedirs(os.path.dirname(base_path), exist_ok=True)
|
||
|
|
with open(base_path, "w") as f:
|
||
|
|
json.dump(base_data, f)
|
||
|
|
|
||
|
|
# Setup current scene file
|
||
|
|
current_data = {"version": "current"}
|
||
|
|
current_path = os.path.join(mock_scene.save_dir, mock_scene.filename)
|
||
|
|
with open(current_path, "w") as f:
|
||
|
|
json.dump(current_data, f)
|
||
|
|
|
||
|
|
# Setup log with revision 1
|
||
|
|
_ensure_log_initialized(mock_scene, 0)
|
||
|
|
|
||
|
|
result_path = await rollback_scene_to_revision(mock_scene, 0, create_backup=True)
|
||
|
|
|
||
|
|
# Check that current file was updated to base data
|
||
|
|
assert result_path == current_path
|
||
|
|
with open(current_path, "r") as f:
|
||
|
|
result = json.load(f)
|
||
|
|
assert result == base_data
|
||
|
|
|
||
|
|
# Check that backup was created
|
||
|
|
assert os.path.exists(mock_scene.backups_dir)
|
||
|
|
backup_files = os.listdir(mock_scene.backups_dir)
|
||
|
|
assert len(backup_files) == 1
|
||
|
|
assert backup_files[0].startswith("test_scene_pre_rollback_")
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_rollback_scene_to_revision_no_backup(mock_scene):
|
||
|
|
"""Test rollback without creating backup."""
|
||
|
|
# Setup base data
|
||
|
|
base_data = {"version": "base"}
|
||
|
|
base_path = _base_path(mock_scene)
|
||
|
|
os.makedirs(os.path.dirname(base_path), exist_ok=True)
|
||
|
|
with open(base_path, "w") as f:
|
||
|
|
json.dump(base_data, f)
|
||
|
|
|
||
|
|
# Setup current scene file
|
||
|
|
current_path = os.path.join(mock_scene.save_dir, mock_scene.filename)
|
||
|
|
with open(current_path, "w") as f:
|
||
|
|
json.dump({"version": "current"}, f)
|
||
|
|
|
||
|
|
_ensure_log_initialized(mock_scene, 0)
|
||
|
|
|
||
|
|
await rollback_scene_to_revision(mock_scene, 0, create_backup=False)
|
||
|
|
|
||
|
|
# Should not create backup directory
|
||
|
|
assert not os.path.exists(mock_scene.backups_dir)
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_append_scene_delta_with_exclusions(mock_scene):
|
||
|
|
"""Test that paths in EXCLUDE_FROM_DELTAS are not included in deltas."""
|
||
|
|
# Initialize scene
|
||
|
|
await save_changelog(mock_scene)
|
||
|
|
|
||
|
|
# Change both excluded and included fields
|
||
|
|
mock_scene.serialize = {
|
||
|
|
"characters": [{"name": "Alice"}], # Should be tracked
|
||
|
|
"memory_session_id": "abc123", # Should be excluded (in EXCLUDE_FROM_DELTAS)
|
||
|
|
"saved_memory_session_id": "def456", # Should be excluded (in EXCLUDE_FROM_DELTAS)
|
||
|
|
"entries": [],
|
||
|
|
}
|
||
|
|
|
||
|
|
# Append delta (exclusions happen automatically)
|
||
|
|
rev = await append_scene_delta(mock_scene, {"action": "test"})
|
||
|
|
|
||
|
|
# Should create a revision since characters changed
|
||
|
|
assert rev == 1
|
||
|
|
|
||
|
|
# Check the delta only contains character changes
|
||
|
|
_, log_path = _get_latest_changelog_file(mock_scene)
|
||
|
|
with open(log_path, "r") as f:
|
||
|
|
log_data = json.load(f)
|
||
|
|
|
||
|
|
delta_entry = log_data["deltas"][0]
|
||
|
|
delta = delta_entry["delta"]
|
||
|
|
|
||
|
|
# Should have character changes but not memory session changes
|
||
|
|
# Get all changed paths from different delta types
|
||
|
|
all_changed_paths = []
|
||
|
|
for delta_type in [
|
||
|
|
"values_changed",
|
||
|
|
"iterable_item_added",
|
||
|
|
"dictionary_item_added",
|
||
|
|
]:
|
||
|
|
if delta_type in delta:
|
||
|
|
all_changed_paths.extend(delta[delta_type].keys())
|
||
|
|
|
||
|
|
# Should contain character change
|
||
|
|
assert any("characters" in path for path in all_changed_paths)
|
||
|
|
|
||
|
|
# Should NOT contain memory session changes (automatically excluded)
|
||
|
|
assert not any("memory_session_id" in path for path in all_changed_paths)
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_append_scene_delta_with_regex_exclusions(mock_scene):
|
||
|
|
"""Test that regex patterns in EXCLUDE_FROM_DELTAS_REGEX work correctly."""
|
||
|
|
# Initialize scene with world_state containing reinforce array
|
||
|
|
mock_scene.serialize = {
|
||
|
|
"characters": [],
|
||
|
|
"world_state": {
|
||
|
|
"reinforce": [
|
||
|
|
{"due": "2024-01-01", "id": 1},
|
||
|
|
{"due": "2024-01-02", "id": 2},
|
||
|
|
]
|
||
|
|
},
|
||
|
|
}
|
||
|
|
await save_changelog(mock_scene)
|
||
|
|
|
||
|
|
# Change both the 'due' fields and add character (both should be tracked since no regex patterns are defined)
|
||
|
|
mock_scene.serialize = {
|
||
|
|
"characters": [{"name": "Alice"}], # Should be tracked
|
||
|
|
"world_state": {
|
||
|
|
"reinforce": [
|
||
|
|
{
|
||
|
|
"due": "2024-02-01",
|
||
|
|
"id": 1,
|
||
|
|
}, # Will be tracked since no regex exclusion is defined
|
||
|
|
{
|
||
|
|
"due": "2024-02-02",
|
||
|
|
"id": 2,
|
||
|
|
}, # Will be tracked since no regex exclusion is defined
|
||
|
|
]
|
||
|
|
},
|
||
|
|
}
|
||
|
|
|
||
|
|
rev = await append_scene_delta(mock_scene, {"action": "test_regex"})
|
||
|
|
|
||
|
|
# Should create revision for both character and due field changes
|
||
|
|
assert rev == 1
|
||
|
|
|
||
|
|
# Check that delta contains both character and due field changes since no regex exclusion is active
|
||
|
|
_, log_path = _get_latest_changelog_file(mock_scene)
|
||
|
|
with open(log_path, "r") as f:
|
||
|
|
log_data = json.load(f)
|
||
|
|
|
||
|
|
delta_entry = log_data["deltas"][0]
|
||
|
|
delta = delta_entry["delta"]
|
||
|
|
|
||
|
|
# Get all changed paths
|
||
|
|
all_changed_paths = []
|
||
|
|
for delta_type in [
|
||
|
|
"values_changed",
|
||
|
|
"iterable_item_added",
|
||
|
|
"dictionary_item_added",
|
||
|
|
]:
|
||
|
|
if delta_type in delta:
|
||
|
|
all_changed_paths.extend(delta[delta_type].keys())
|
||
|
|
|
||
|
|
# Should contain character change
|
||
|
|
assert any("characters" in path for path in all_changed_paths)
|
||
|
|
|
||
|
|
# Since no regex exclusions are defined, 'due' field changes should be included
|
||
|
|
# This test verifies the system works correctly when EXCLUDE_FROM_DELTAS_REGEX is empty
|
||
|
|
assert len(all_changed_paths) > 0 # Should have changes tracked
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_full_workflow(mock_scene):
|
||
|
|
"""Test complete workflow: save -> append -> reconstruct -> rollback."""
|
||
|
|
# Initial save
|
||
|
|
initial_data = {"characters": [], "version": "1.0"}
|
||
|
|
mock_scene.serialize = initial_data
|
||
|
|
await save_changelog(mock_scene)
|
||
|
|
|
||
|
|
# First change
|
||
|
|
mock_scene.serialize = {"characters": [{"name": "Alice"}], "version": "1.0"}
|
||
|
|
rev1 = await append_scene_delta(mock_scene, {"action": "add_alice"})
|
||
|
|
assert rev1 == 1
|
||
|
|
|
||
|
|
# Second change
|
||
|
|
mock_scene.serialize = {
|
||
|
|
"characters": [{"name": "Alice"}, {"name": "Bob"}],
|
||
|
|
"version": "1.0",
|
||
|
|
}
|
||
|
|
rev2 = await append_scene_delta(mock_scene, {"action": "add_bob"})
|
||
|
|
assert rev2 == 2
|
||
|
|
|
||
|
|
# List revisions
|
||
|
|
revisions = list_revisions(mock_scene)
|
||
|
|
assert revisions == [2, 1] # Sorted descending (newest first)
|
||
|
|
|
||
|
|
# Reconstruct at different points
|
||
|
|
rev0_data = await reconstruct_scene_data(mock_scene, to_rev=0)
|
||
|
|
assert rev0_data == initial_data
|
||
|
|
|
||
|
|
rev1_data = await reconstruct_scene_data(mock_scene, to_rev=1)
|
||
|
|
assert len(rev1_data["characters"]) == 1
|
||
|
|
assert rev1_data["characters"][0]["name"] == "Alice"
|
||
|
|
|
||
|
|
rev2_data = await reconstruct_scene_data(mock_scene, to_rev=2)
|
||
|
|
assert len(rev2_data["characters"]) == 2
|
||
|
|
|
||
|
|
# Write reconstructed scene
|
||
|
|
output_path = await write_reconstructed_scene(mock_scene, 1)
|
||
|
|
assert os.path.exists(output_path)
|
||
|
|
|
||
|
|
# Create current scene file for rollback test
|
||
|
|
current_path = os.path.join(mock_scene.save_dir, mock_scene.filename)
|
||
|
|
with open(current_path, "w") as f:
|
||
|
|
json.dump({"current": "state"}, f)
|
||
|
|
|
||
|
|
# Rollback to revision 1
|
||
|
|
await rollback_scene_to_revision(mock_scene, 1)
|
||
|
|
|
||
|
|
with open(current_path, "r") as f:
|
||
|
|
rolled_back = json.load(f)
|
||
|
|
assert len(rolled_back["characters"]) == 1
|
||
|
|
assert rolled_back["characters"][0]["name"] == "Alice"
|
||
|
|
|
||
|
|
|
||
|
|
def test_get_changelog_files_empty(mock_scene):
|
||
|
|
"""Test getting changelog files when none exist."""
|
||
|
|
result = _get_changelog_files(mock_scene)
|
||
|
|
assert result == []
|
||
|
|
|
||
|
|
|
||
|
|
def test_get_changelog_files_multiple(mock_scene):
|
||
|
|
"""Test getting multiple changelog files."""
|
||
|
|
# Create multiple changelog files
|
||
|
|
os.makedirs(mock_scene.changelog_dir, exist_ok=True)
|
||
|
|
|
||
|
|
files = [
|
||
|
|
(0, _changelog_log_path(mock_scene, 0)),
|
||
|
|
(100, _changelog_log_path(mock_scene, 100)),
|
||
|
|
(50, _changelog_log_path(mock_scene, 50)),
|
||
|
|
]
|
||
|
|
|
||
|
|
for start_rev, path in files:
|
||
|
|
with open(path, "w") as f:
|
||
|
|
json.dump({"start_rev": start_rev, "deltas": []}, f)
|
||
|
|
|
||
|
|
result = _get_changelog_files(mock_scene)
|
||
|
|
|
||
|
|
# Should be sorted by start_rev
|
||
|
|
expected = [(0, files[0][1]), (50, files[2][1]), (100, files[1][1])]
|
||
|
|
assert result == expected
|
||
|
|
|
||
|
|
|
||
|
|
def test_get_latest_changelog_file_empty(mock_scene):
|
||
|
|
"""Test getting latest changelog file when none exist."""
|
||
|
|
result = _get_latest_changelog_file(mock_scene)
|
||
|
|
expected_path = _changelog_log_path(mock_scene, 0)
|
||
|
|
assert result == (0, expected_path)
|
||
|
|
|
||
|
|
|
||
|
|
def test_get_latest_changelog_file_multiple(mock_scene):
|
||
|
|
"""Test getting latest changelog file from multiple files."""
|
||
|
|
os.makedirs(mock_scene.changelog_dir, exist_ok=True)
|
||
|
|
|
||
|
|
# Create multiple files
|
||
|
|
for start_rev in [0, 50, 100]:
|
||
|
|
path = _changelog_log_path(mock_scene, start_rev)
|
||
|
|
with open(path, "w") as f:
|
||
|
|
json.dump({"start_rev": start_rev, "deltas": []}, f)
|
||
|
|
|
||
|
|
result = _get_latest_changelog_file(mock_scene)
|
||
|
|
expected_path = _changelog_log_path(mock_scene, 100)
|
||
|
|
assert result == (100, expected_path)
|
||
|
|
|
||
|
|
|
||
|
|
def test_get_overall_latest_revision_empty(mock_scene):
|
||
|
|
"""Test getting overall latest revision when no files exist."""
|
||
|
|
result = _get_overall_latest_revision(mock_scene)
|
||
|
|
assert result == 0
|
||
|
|
|
||
|
|
|
||
|
|
def test_get_overall_latest_revision_multiple_files(mock_scene):
|
||
|
|
"""Test getting overall latest revision across multiple files."""
|
||
|
|
os.makedirs(mock_scene.changelog_dir, exist_ok=True)
|
||
|
|
|
||
|
|
# Create files with different latest revisions
|
||
|
|
files_data = [
|
||
|
|
(0, {"start_rev": 0, "latest_rev": 10, "deltas": []}),
|
||
|
|
(50, {"start_rev": 50, "latest_rev": 75, "deltas": []}),
|
||
|
|
(100, {"start_rev": 100, "latest_rev": 150, "deltas": []}),
|
||
|
|
]
|
||
|
|
|
||
|
|
for start_rev, data in files_data:
|
||
|
|
path = _changelog_log_path(mock_scene, start_rev)
|
||
|
|
with open(path, "w") as f:
|
||
|
|
json.dump(data, f)
|
||
|
|
|
||
|
|
result = _get_overall_latest_revision(mock_scene)
|
||
|
|
assert result == 150
|
||
|
|
|
||
|
|
|
||
|
|
def test_get_file_size_existing(temp_dir):
|
||
|
|
"""Test getting size of existing file."""
|
||
|
|
test_file = os.path.join(temp_dir, "test.txt")
|
||
|
|
content = "Hello, World!"
|
||
|
|
|
||
|
|
with open(test_file, "w") as f:
|
||
|
|
f.write(content)
|
||
|
|
|
||
|
|
result = _get_file_size(test_file)
|
||
|
|
assert result == len(content)
|
||
|
|
|
||
|
|
|
||
|
|
def test_get_file_size_missing(temp_dir):
|
||
|
|
"""Test getting size of non-existent file."""
|
||
|
|
test_file = os.path.join(temp_dir, "missing.txt")
|
||
|
|
result = _get_file_size(test_file)
|
||
|
|
assert result == 0
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_append_scene_delta_creates_new_file_when_size_exceeded(mock_scene):
|
||
|
|
"""Test that a new changelog file is created when size limit is exceeded."""
|
||
|
|
# Initialize scene
|
||
|
|
await save_changelog(mock_scene)
|
||
|
|
|
||
|
|
# Mock the file size to exceed limit
|
||
|
|
original_get_file_size = _get_file_size
|
||
|
|
|
||
|
|
def mock_get_file_size(path):
|
||
|
|
if "changelog.0.json" in path:
|
||
|
|
return MAX_CHANGELOG_FILE_SIZE + 1000 # Exceeds limit
|
||
|
|
return original_get_file_size(path)
|
||
|
|
|
||
|
|
# Patch the function temporarily
|
||
|
|
import talemate.changelog
|
||
|
|
|
||
|
|
talemate.changelog._get_file_size = mock_get_file_size
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Change the scene data to trigger delta creation
|
||
|
|
mock_scene.serialize = {
|
||
|
|
"characters": [{"name": "Alice"}],
|
||
|
|
"entries": [],
|
||
|
|
"metadata": {"version": "1.0"},
|
||
|
|
}
|
||
|
|
|
||
|
|
# This should create a new file because current file "exceeds" size limit
|
||
|
|
rev = await append_scene_delta(mock_scene, {"action": "test"})
|
||
|
|
assert rev == 1
|
||
|
|
|
||
|
|
# Should have created a new file starting at revision 1
|
||
|
|
files = _get_changelog_files(mock_scene)
|
||
|
|
assert len(files) >= 1
|
||
|
|
|
||
|
|
# The new revision should be in a file starting at rev 1
|
||
|
|
rev1_file_exists = any(start_rev == 1 for start_rev, _ in files)
|
||
|
|
assert rev1_file_exists
|
||
|
|
|
||
|
|
finally:
|
||
|
|
# Restore original function
|
||
|
|
talemate.changelog._get_file_size = original_get_file_size
|
||
|
|
|
||
|
|
|
||
|
|
# InMemoryChangelog tests
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_in_memory_changelog_basic_usage(mock_scene):
|
||
|
|
"""Test basic usage of InMemoryChangelog."""
|
||
|
|
# Initialize scene
|
||
|
|
await save_changelog(mock_scene)
|
||
|
|
|
||
|
|
async with InMemoryChangelog(mock_scene) as changelog:
|
||
|
|
# Initially no pending changes
|
||
|
|
assert not changelog.has_pending_changes
|
||
|
|
assert changelog.pending_count == 0
|
||
|
|
|
||
|
|
# Make a change to the scene
|
||
|
|
mock_scene.serialize = {
|
||
|
|
"characters": [{"name": "Alice"}],
|
||
|
|
"entries": [],
|
||
|
|
"metadata": {"version": "1.0"},
|
||
|
|
}
|
||
|
|
|
||
|
|
# Append delta
|
||
|
|
real_rev = await changelog.append_delta({"action": "add_character"})
|
||
|
|
|
||
|
|
# Should have pending changes now
|
||
|
|
assert changelog.has_pending_changes
|
||
|
|
assert changelog.pending_count == 1
|
||
|
|
assert real_rev == 1 # First real revision (base scene is rev 0)
|
||
|
|
|
||
|
|
# Make another change
|
||
|
|
mock_scene.serialize = {
|
||
|
|
"characters": [{"name": "Alice"}, {"name": "Bob"}],
|
||
|
|
"entries": [],
|
||
|
|
"metadata": {"version": "1.0"},
|
||
|
|
}
|
||
|
|
|
||
|
|
real_rev2 = await changelog.append_delta({"action": "add_bob"})
|
||
|
|
assert changelog.pending_count == 2
|
||
|
|
assert real_rev2 == 2 # Second real revision
|
||
|
|
|
||
|
|
# Manually commit the changes
|
||
|
|
await changelog.commit()
|
||
|
|
|
||
|
|
# After context exit, changes should be committed
|
||
|
|
assert not changelog.has_pending_changes
|
||
|
|
assert changelog.pending_count == 0
|
||
|
|
|
||
|
|
# Check that real revisions were created
|
||
|
|
revisions = list_revisions(mock_scene)
|
||
|
|
assert len(revisions) == 2
|
||
|
|
assert revisions == [2, 1] # Should be in descending order
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_in_memory_changelog_no_changes(mock_scene):
|
||
|
|
"""Test InMemoryChangelog with no scene changes."""
|
||
|
|
await save_changelog(mock_scene)
|
||
|
|
|
||
|
|
async with InMemoryChangelog(mock_scene) as changelog:
|
||
|
|
# Try to append delta with no changes
|
||
|
|
virtual_rev = await changelog.append_delta({"action": "no_change"})
|
||
|
|
|
||
|
|
# Should return None for no changes
|
||
|
|
assert virtual_rev is None
|
||
|
|
assert not changelog.has_pending_changes
|
||
|
|
assert changelog.pending_count == 0
|
||
|
|
|
||
|
|
# No revisions should be created
|
||
|
|
revisions = list_revisions(mock_scene)
|
||
|
|
assert revisions == []
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_in_memory_changelog_manual_commit(mock_scene):
|
||
|
|
"""Test manually committing changes before context exit."""
|
||
|
|
await save_changelog(mock_scene)
|
||
|
|
|
||
|
|
async with InMemoryChangelog(mock_scene) as changelog:
|
||
|
|
# Make a change
|
||
|
|
mock_scene.serialize = {
|
||
|
|
"characters": [{"name": "Alice"}],
|
||
|
|
"entries": [],
|
||
|
|
"metadata": {"version": "1.0"},
|
||
|
|
}
|
||
|
|
|
||
|
|
await changelog.append_delta({"action": "add_character"})
|
||
|
|
assert changelog.has_pending_changes
|
||
|
|
|
||
|
|
# Manually commit
|
||
|
|
committed_revs = await changelog.commit()
|
||
|
|
|
||
|
|
# Should have committed the change
|
||
|
|
assert not changelog.has_pending_changes
|
||
|
|
assert len(committed_revs) == 1
|
||
|
|
assert committed_revs[0] == 1
|
||
|
|
|
||
|
|
# Should be able to append after commit (changelog can be reused)
|
||
|
|
mock_scene.serialize = {
|
||
|
|
"characters": [{"name": "Alice"}, {"name": "Bob"}],
|
||
|
|
"entries": [],
|
||
|
|
"metadata": {"version": "1.0"},
|
||
|
|
}
|
||
|
|
|
||
|
|
rev = await changelog.append_delta({"action": "add_bob"})
|
||
|
|
assert (
|
||
|
|
rev == 2
|
||
|
|
) # Real revision for new pending delta (after first committed rev)
|
||
|
|
assert changelog.has_pending_changes
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_in_memory_changelog_empty_commit(mock_scene):
|
||
|
|
"""Test committing when there are no pending deltas."""
|
||
|
|
await save_changelog(mock_scene)
|
||
|
|
|
||
|
|
async with InMemoryChangelog(mock_scene) as changelog:
|
||
|
|
# Commit with no pending deltas
|
||
|
|
committed_revs = await changelog.commit()
|
||
|
|
|
||
|
|
# Should return empty list
|
||
|
|
assert committed_revs == []
|
||
|
|
assert not changelog.has_pending_changes
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_in_memory_changelog_multiple_commits(mock_scene):
|
||
|
|
"""Test that multiple commits work correctly (changelog can be reused)."""
|
||
|
|
await save_changelog(mock_scene)
|
||
|
|
|
||
|
|
async with InMemoryChangelog(mock_scene) as changelog:
|
||
|
|
mock_scene.serialize = {
|
||
|
|
"characters": [{"name": "Alice"}],
|
||
|
|
"entries": [],
|
||
|
|
"metadata": {"version": "1.0"},
|
||
|
|
}
|
||
|
|
|
||
|
|
await changelog.append_delta({"action": "add_character"})
|
||
|
|
committed_revs = await changelog.commit()
|
||
|
|
assert len(committed_revs) == 1
|
||
|
|
assert not changelog.has_pending_changes
|
||
|
|
|
||
|
|
# Second commit with no changes should return empty list
|
||
|
|
committed_revs2 = await changelog.commit()
|
||
|
|
assert len(committed_revs2) == 0
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_in_memory_changelog_preserves_metadata(mock_scene):
|
||
|
|
"""Test that metadata is preserved during commit."""
|
||
|
|
await save_changelog(mock_scene)
|
||
|
|
|
||
|
|
async with InMemoryChangelog(mock_scene) as changelog:
|
||
|
|
mock_scene.serialize = {
|
||
|
|
"characters": [{"name": "Alice"}],
|
||
|
|
"entries": [],
|
||
|
|
"metadata": {"version": "1.0"},
|
||
|
|
}
|
||
|
|
|
||
|
|
test_meta = {
|
||
|
|
"action": "add_character",
|
||
|
|
"user": "test",
|
||
|
|
"timestamp": "2024-01-01",
|
||
|
|
}
|
||
|
|
await changelog.append_delta(test_meta)
|
||
|
|
|
||
|
|
# Manually commit the changes
|
||
|
|
await changelog.commit()
|
||
|
|
|
||
|
|
# Check that metadata was preserved in the changelog
|
||
|
|
_, log_path = _get_latest_changelog_file(mock_scene)
|
||
|
|
with open(log_path, "r") as f:
|
||
|
|
log_data = json.load(f)
|
||
|
|
|
||
|
|
assert len(log_data["deltas"]) == 1
|
||
|
|
delta_entry = log_data["deltas"][0]
|
||
|
|
assert delta_entry["meta"] == test_meta
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_in_memory_changelog_updates_latest_snapshot(mock_scene):
|
||
|
|
"""Test that the latest snapshot file is updated after commit."""
|
||
|
|
await save_changelog(mock_scene)
|
||
|
|
|
||
|
|
final_scene_data = {
|
||
|
|
"characters": [{"name": "Alice"}, {"name": "Bob"}],
|
||
|
|
"entries": [],
|
||
|
|
"metadata": {"version": "1.0"},
|
||
|
|
}
|
||
|
|
|
||
|
|
async with InMemoryChangelog(mock_scene) as changelog:
|
||
|
|
# First change
|
||
|
|
mock_scene.serialize = {
|
||
|
|
"characters": [{"name": "Alice"}],
|
||
|
|
"entries": [],
|
||
|
|
"metadata": {"version": "1.0"},
|
||
|
|
}
|
||
|
|
await changelog.append_delta({"action": "add_alice"})
|
||
|
|
|
||
|
|
# Second change
|
||
|
|
mock_scene.serialize = final_scene_data
|
||
|
|
await changelog.append_delta({"action": "add_bob"})
|
||
|
|
|
||
|
|
# Manually commit the changes
|
||
|
|
await changelog.commit()
|
||
|
|
|
||
|
|
# Check that latest snapshot was updated
|
||
|
|
latest_path = _latest_path(mock_scene)
|
||
|
|
assert os.path.exists(latest_path)
|
||
|
|
|
||
|
|
with open(latest_path, "r") as f:
|
||
|
|
latest_data = json.load(f)
|
||
|
|
|
||
|
|
# Should match the final scene state
|
||
|
|
assert latest_data == final_scene_data
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_in_memory_changelog_integration_with_existing_revisions(mock_scene):
|
||
|
|
"""Test that InMemoryChangelog works correctly with existing revisions."""
|
||
|
|
# Create some initial revisions using the traditional method
|
||
|
|
await save_changelog(mock_scene)
|
||
|
|
|
||
|
|
mock_scene.serialize = {"characters": [{"name": "Existing"}], "entries": []}
|
||
|
|
await append_scene_delta(mock_scene, {"action": "existing_change"})
|
||
|
|
|
||
|
|
# Verify we have revision 1 and update mock_scene.rev to match
|
||
|
|
assert _get_overall_latest_revision(mock_scene) == 1
|
||
|
|
mock_scene.rev = 1 # In real usage, scene.rev would be updated
|
||
|
|
|
||
|
|
# Now use InMemoryChangelog to add more changes
|
||
|
|
async with InMemoryChangelog(mock_scene) as changelog:
|
||
|
|
mock_scene.serialize = {
|
||
|
|
"characters": [{"name": "Existing"}, {"name": "New"}],
|
||
|
|
"entries": [],
|
||
|
|
}
|
||
|
|
await changelog.append_delta({"action": "add_new_character"})
|
||
|
|
|
||
|
|
mock_scene.serialize = {
|
||
|
|
"characters": [{"name": "Existing"}, {"name": "New"}, {"name": "Another"}],
|
||
|
|
"entries": [],
|
||
|
|
}
|
||
|
|
await changelog.append_delta({"action": "add_another_character"})
|
||
|
|
|
||
|
|
# Manually commit the changes
|
||
|
|
await changelog.commit()
|
||
|
|
|
||
|
|
# Should now have revisions 1, 2, 3
|
||
|
|
revisions = list_revisions(mock_scene)
|
||
|
|
assert revisions == [3, 2, 1]
|
||
|
|
assert _get_overall_latest_revision(mock_scene) == 3
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_reconstruct_scene_data_disconnects_shared_context(mock_scene):
|
||
|
|
"""Test that shared_context is automatically disconnected during reconstruction."""
|
||
|
|
# Setup base scene with shared_context
|
||
|
|
base_data = {
|
||
|
|
"characters": [],
|
||
|
|
"entries": [],
|
||
|
|
"shared_context": "world.json", # Scene has shared context
|
||
|
|
}
|
||
|
|
base_path = _base_path(mock_scene)
|
||
|
|
os.makedirs(os.path.dirname(base_path), exist_ok=True)
|
||
|
|
with open(base_path, "w") as f:
|
||
|
|
json.dump(base_data, f)
|
||
|
|
|
||
|
|
# Test reconstruction at revision 0 (base only)
|
||
|
|
result = await reconstruct_scene_data(mock_scene, to_rev=0)
|
||
|
|
|
||
|
|
# Verify shared_context was disconnected
|
||
|
|
assert result["shared_context"] == ""
|
||
|
|
assert result["characters"] == []
|
||
|
|
assert result["entries"] == []
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_reconstruct_scene_data_with_deltas_disconnects_shared_context(
|
||
|
|
mock_scene,
|
||
|
|
):
|
||
|
|
"""Test that shared_context is disconnected even when applying deltas."""
|
||
|
|
# Setup base scene with shared_context
|
||
|
|
base_data = {"characters": [], "entries": [], "shared_context": "world.json"}
|
||
|
|
base_path = _base_path(mock_scene)
|
||
|
|
os.makedirs(os.path.dirname(base_path), exist_ok=True)
|
||
|
|
with open(base_path, "w") as f:
|
||
|
|
json.dump(base_data, f)
|
||
|
|
|
||
|
|
# Setup changelog with delta that adds a character (but keeps shared_context)
|
||
|
|
import deepdiff
|
||
|
|
|
||
|
|
modified_data = {
|
||
|
|
"characters": [{"name": "Alice"}],
|
||
|
|
"entries": [],
|
||
|
|
"shared_context": "world.json", # Still has shared context in the delta
|
||
|
|
}
|
||
|
|
diff = deepdiff.DeepDiff(base_data, modified_data)
|
||
|
|
delta = diff._to_delta_dict()
|
||
|
|
|
||
|
|
log_data = {
|
||
|
|
"version": 1,
|
||
|
|
"base": "test_scene.json.base.json",
|
||
|
|
"start_rev": 0,
|
||
|
|
"deltas": [
|
||
|
|
{
|
||
|
|
"rev": 1,
|
||
|
|
"ts": 1672531200,
|
||
|
|
"delta": delta,
|
||
|
|
"meta": {"action": "add_character"},
|
||
|
|
}
|
||
|
|
],
|
||
|
|
"latest_rev": 1,
|
||
|
|
}
|
||
|
|
|
||
|
|
log_path = _changelog_log_path(mock_scene, 0)
|
||
|
|
with open(log_path, "w") as f:
|
||
|
|
json.dump(log_data, f)
|
||
|
|
|
||
|
|
# Reconstruct at revision 1
|
||
|
|
result = await reconstruct_scene_data(mock_scene, to_rev=1)
|
||
|
|
|
||
|
|
# Verify character was added but shared_context was disconnected
|
||
|
|
assert result["characters"] == [{"name": "Alice"}]
|
||
|
|
assert result["shared_context"] == "" # Should be disconnected
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_reconstruct_scene_data_no_shared_context_unchanged(mock_scene):
|
||
|
|
"""Test that scenes without shared_context are unchanged during reconstruction."""
|
||
|
|
# Setup base scene without shared_context
|
||
|
|
base_data = {
|
||
|
|
"characters": [],
|
||
|
|
"entries": [],
|
||
|
|
# No shared_context field
|
||
|
|
}
|
||
|
|
base_path = _base_path(mock_scene)
|
||
|
|
os.makedirs(os.path.dirname(base_path), exist_ok=True)
|
||
|
|
with open(base_path, "w") as f:
|
||
|
|
json.dump(base_data, f)
|
||
|
|
|
||
|
|
# Test reconstruction
|
||
|
|
result = await reconstruct_scene_data(mock_scene, to_rev=0)
|
||
|
|
|
||
|
|
# Should be unchanged since there was no shared_context
|
||
|
|
assert result == base_data
|
||
|
|
assert "shared_context" not in result
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_reconstruct_cleanup_removes_shared_context():
|
||
|
|
"""Test that reconstruct_cleanup properly removes shared_context."""
|
||
|
|
# Data with shared_context
|
||
|
|
data_with_shared = {
|
||
|
|
"characters": [{"name": "Alice"}],
|
||
|
|
"entries": [],
|
||
|
|
"shared_context": "world.json",
|
||
|
|
}
|
||
|
|
|
||
|
|
result = await reconstruct_cleanup(data_with_shared)
|
||
|
|
|
||
|
|
# Should remove shared_context but keep other data
|
||
|
|
assert result["characters"] == [{"name": "Alice"}]
|
||
|
|
assert result["entries"] == []
|
||
|
|
assert result["shared_context"] == ""
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_reconstruct_cleanup_preserves_data_without_shared_context():
|
||
|
|
"""Test that reconstruct_cleanup preserves data when no shared_context exists."""
|
||
|
|
# Data without shared_context
|
||
|
|
data_without_shared = {
|
||
|
|
"characters": [{"name": "Bob"}],
|
||
|
|
"entries": [{"id": 1, "content": "test"}],
|
||
|
|
"metadata": {"version": "1.0"},
|
||
|
|
}
|
||
|
|
|
||
|
|
result = await reconstruct_cleanup(data_without_shared)
|
||
|
|
|
||
|
|
# Should be unchanged
|
||
|
|
assert result == data_without_shared
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_in_memory_changelog_with_save_changelog_bug(mock_scene):
|
||
|
|
"""
|
||
|
|
Test that reproduces the bug where save_changelog() is called while
|
||
|
|
InMemoryChangelog has pending changes, causing duplication.
|
||
|
|
|
||
|
|
This simulates:
|
||
|
|
1. Scene starts with InMemoryChangelog context
|
||
|
|
2. Messages are added (pending in memory)
|
||
|
|
3. save_changelog() is called (creates base with pending changes baked in)
|
||
|
|
4. InMemoryChangelog.commit() is called (writes same changes as deltas)
|
||
|
|
5. Reconstruction duplicates messages
|
||
|
|
"""
|
||
|
|
# Start with a scene that has history
|
||
|
|
initial_history = [
|
||
|
|
{"id": 1, "message": "First message", "rev": 1},
|
||
|
|
{"id": 2, "message": "Second message", "rev": 2},
|
||
|
|
]
|
||
|
|
mock_scene.serialize = {"history": initial_history, "characters": []}
|
||
|
|
mock_scene.rev = 2
|
||
|
|
|
||
|
|
# Open InMemoryChangelog context (captures current state)
|
||
|
|
async with InMemoryChangelog(mock_scene) as changelog:
|
||
|
|
# Set _changelog on mock_scene to simulate real scene behavior
|
||
|
|
mock_scene._changelog = changelog
|
||
|
|
|
||
|
|
# Add more messages
|
||
|
|
mock_scene.serialize = {
|
||
|
|
"history": initial_history
|
||
|
|
+ [
|
||
|
|
{"id": 3, "message": "Third message", "rev": 3},
|
||
|
|
{"id": 4, "message": "Fourth message", "rev": 4},
|
||
|
|
],
|
||
|
|
"characters": [],
|
||
|
|
}
|
||
|
|
|
||
|
|
# Append deltas for the new messages
|
||
|
|
await changelog.append_delta({"action": "add_message_3"})
|
||
|
|
await changelog.append_delta({"action": "add_message_4"})
|
||
|
|
|
||
|
|
# Now save_changelog() is called (e.g., during a fork operation)
|
||
|
|
# This saves the base with ALL 4 messages already in it
|
||
|
|
await save_changelog(mock_scene)
|
||
|
|
|
||
|
|
# Commit the pending deltas
|
||
|
|
# This writes deltas that try to add messages 3 and 4 again
|
||
|
|
await changelog.commit()
|
||
|
|
|
||
|
|
# Now reconstruct to revision 4
|
||
|
|
reconstructed = await reconstruct_scene_data(mock_scene, to_rev=4)
|
||
|
|
|
||
|
|
# BUG: The reconstructed history will have duplicates
|
||
|
|
# Base has 4 messages + deltas add 2 more = 6 messages (with duplicates)
|
||
|
|
history = reconstructed.get("history", [])
|
||
|
|
|
||
|
|
# This assertion SHOULD pass (4 messages) but currently FAILS (6 messages due to bug)
|
||
|
|
assert len(history) == 4, (
|
||
|
|
f"Expected 4 messages, got {len(history)}: {[m['message'] for m in history]}"
|
||
|
|
)
|
||
|
|
|
||
|
|
# Verify no duplicates by checking message IDs
|
||
|
|
message_ids = [msg["id"] for msg in history]
|
||
|
|
assert len(message_ids) == len(set(message_ids)), (
|
||
|
|
f"Duplicate message IDs found: {message_ids}"
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_delete_changelog_files_with_wrong_scene_reference(temp_dir):
|
||
|
|
"""
|
||
|
|
Test that reproduces the bug where delete_changelog_files is called with
|
||
|
|
the wrong scene reference (or self.scene from the wrong context).
|
||
|
|
|
||
|
|
This simulates the bug in server/config.py:handle_delete_scene where
|
||
|
|
delete_changelog_files(self.scene) is called instead of constructing
|
||
|
|
a proper scene reference from the deleted file path.
|
||
|
|
"""
|
||
|
|
# Create a scene with changelogs
|
||
|
|
scene1 = Mock()
|
||
|
|
scene1.filename = "scene_to_delete.json"
|
||
|
|
scene1.save_dir = os.path.join(temp_dir, "project1")
|
||
|
|
scene1.changelog_dir = os.path.join(scene1.save_dir, "changelog")
|
||
|
|
scene1.serialize = {"characters": [], "data": "scene1"}
|
||
|
|
scene1.rev = 0
|
||
|
|
scene1._changelog = None
|
||
|
|
|
||
|
|
# Initialize changelog for scene1
|
||
|
|
await save_changelog(scene1)
|
||
|
|
|
||
|
|
# Verify changelog files were created
|
||
|
|
assert os.path.exists(_base_path(scene1))
|
||
|
|
assert os.path.exists(_latest_path(scene1))
|
||
|
|
|
||
|
|
# Now simulate the bug: calling delete_changelog_files with a different scene
|
||
|
|
# (or None, which would be self.scene when no scene is loaded)
|
||
|
|
wrong_scene = Mock()
|
||
|
|
wrong_scene.filename = "different_scene.json" # Wrong filename!
|
||
|
|
wrong_scene.save_dir = os.path.join(temp_dir, "project2") # Wrong directory!
|
||
|
|
wrong_scene.changelog_dir = os.path.join(wrong_scene.save_dir, "changelog")
|
||
|
|
|
||
|
|
# Try to delete with wrong scene reference
|
||
|
|
result = delete_changelog_files(wrong_scene)
|
||
|
|
|
||
|
|
# BUG: This will try to delete files for "different_scene.json" in "project2"
|
||
|
|
# but scene1's files are in "project1" with filename "scene_to_delete.json"
|
||
|
|
# So scene1's changelog files will NOT be deleted
|
||
|
|
|
||
|
|
# Verify scene1's changelog files still exist (they were not deleted)
|
||
|
|
assert os.path.exists(_base_path(scene1)), "Base file should still exist due to bug"
|
||
|
|
assert os.path.exists(_latest_path(scene1)), (
|
||
|
|
"Latest file should still exist due to bug"
|
||
|
|
)
|
||
|
|
|
||
|
|
# Now show the correct way: construct scene reference from the file path
|
||
|
|
scene_path = os.path.join(scene1.save_dir, scene1.filename)
|
||
|
|
scene_dir = os.path.dirname(scene_path)
|
||
|
|
scene_filename = os.path.basename(scene_path)
|
||
|
|
correct_scene_ref = type(
|
||
|
|
"Scene",
|
||
|
|
(),
|
||
|
|
{
|
||
|
|
"save_dir": scene_dir,
|
||
|
|
"filename": scene_filename,
|
||
|
|
"changelog_dir": os.path.join(scene_dir, "changelog"),
|
||
|
|
},
|
||
|
|
)()
|
||
|
|
|
||
|
|
# Delete with correct reference
|
||
|
|
result = delete_changelog_files(correct_scene_ref)
|
||
|
|
|
||
|
|
# Now the files should be deleted
|
||
|
|
assert not os.path.exists(_base_path(scene1)), "Base file should be deleted"
|
||
|
|
assert not os.path.exists(_latest_path(scene1)), "Latest file should be deleted"
|
||
|
|
assert len(result.get("deleted", [])) >= 2, (
|
||
|
|
"Should have deleted at least base and latest"
|
||
|
|
)
|