This commit is contained in:
vegu-ai-tools
2025-08-30 00:39:13 +03:00
parent 9bf08b1f00
commit f3d02530d5
7 changed files with 208 additions and 174 deletions

View File

@@ -3,6 +3,7 @@ Auto backup functionality for scenes.
Similar to auto save, but creates rolling backup files instead of overwriting the main save.
"""
import json
import os
import datetime
@@ -22,61 +23,63 @@ log = structlog.get_logger("talemate.auto_backup")
async def auto_backup(scene: "Scene") -> None:
"""
Creates an automatic backup of the scene.
Unlike save_as, this function:
- Does not create a new memory database
- Saves to a dedicated backups directory
- Maintains a rolling set of backup files
- Only proceeds if auto_backup is enabled in config
Args:
scene: The scene instance to backup
"""
config = get_config()
# Check if auto backup is enabled
if not config.game.general.auto_backup:
log.debug("Auto backup disabled, skipping")
return
# Skip if scene has never been saved (no filename or name)
if not scene.filename or not scene.name:
log.debug("Scene has never been saved, skipping auto backup")
return
max_backups = config.game.general.auto_backup_max_backups
log.debug("Creating auto backup", filename=scene.filename, max_backups=max_backups)
# Create backups directory structure
backups_dir = os.path.join(scene.save_dir, "backups")
os.makedirs(backups_dir, exist_ok=True)
# Generate backup filename with timestamp
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
base_name = os.path.splitext(scene.filename)[0]
backup_filename = f"{base_name}_backup_{timestamp}.json"
backup_filepath = os.path.join(backups_dir, backup_filename)
# Get scene data (same as normal save)
scene_data = scene.serialize
# Write backup file
try:
with open(backup_filepath, "w") as f:
json.dump(scene_data, f, indent=2, cls=save.SceneEncoder)
log.info("Auto backup created", backup_file=backup_filename)
# Clean up old backups
await _cleanup_old_backups(backups_dir, base_name, max_backups)
except OSError as e:
log.error("Failed to create auto backup", backup_file=backup_filename, error=e)
async def _cleanup_old_backups(backups_dir: str, base_name: str, max_backups: int) -> None:
async def _cleanup_old_backups(
backups_dir: str, base_name: str, max_backups: int
) -> None:
"""
Removes old backup files, keeping only the most recent max_backups files.
Args:
backups_dir: Directory containing backup files
base_name: Base name of the scene file (without extension)
@@ -84,20 +87,22 @@ async def _cleanup_old_backups(backups_dir: str, base_name: str, max_backups: in
"""
if max_backups <= 0:
return
try:
# Find all backup files for this scene
backup_files = []
for filename in os.listdir(backups_dir):
if filename.startswith(f"{base_name}_backup_") and filename.endswith(".json"):
if filename.startswith(f"{base_name}_backup_") and filename.endswith(
".json"
):
filepath = os.path.join(backups_dir, filename)
# Get modification time for sorting
mtime = os.path.getmtime(filepath)
backup_files.append((mtime, filepath))
# Sort by modification time (newest first)
backup_files.sort(reverse=True)
# Remove excess backup files
for _, filepath in backup_files[max_backups:]:
try:
@@ -105,7 +110,7 @@ async def _cleanup_old_backups(backups_dir: str, base_name: str, max_backups: in
log.debug("Removed old backup", filepath=filepath)
except OSError as e:
log.warning("Failed to remove old backup", filepath=filepath, error=e)
except OSError as e:
log.warning("Failed to cleanup old backups", backups_dir=backups_dir, error=e)
@@ -113,39 +118,43 @@ async def _cleanup_old_backups(backups_dir: str, base_name: str, max_backups: in
def get_backup_files(scene: "Scene") -> list[dict]:
"""
Returns a list of backup files for the given scene.
Args:
scene: The scene instance
Returns:
List of dicts with backup file information (name, path, timestamp)
"""
if not scene.filename or not scene.name:
return []
backups_dir = os.path.join(scene.save_dir, "backups")
if not os.path.exists(backups_dir):
return []
base_name = os.path.splitext(scene.filename)[0]
backup_files = []
try:
for filename in os.listdir(backups_dir):
if filename.startswith(f"{base_name}_backup_") and filename.endswith(".json"):
if filename.startswith(f"{base_name}_backup_") and filename.endswith(
".json"
):
filepath = os.path.join(backups_dir, filename)
mtime = os.path.getmtime(filepath)
backup_files.append({
"name": filename,
"path": filepath,
"timestamp": datetime.datetime.fromtimestamp(mtime).isoformat(),
"size": os.path.getsize(filepath)
})
backup_files.append(
{
"name": filename,
"path": filepath,
"timestamp": datetime.datetime.fromtimestamp(mtime).isoformat(),
"size": os.path.getsize(filepath),
}
)
# Sort by modification time (newest first)
backup_files.sort(key=lambda x: x["timestamp"], reverse=True)
except OSError as e:
log.warning("Failed to list backup files", backups_dir=backups_dir, error=e)
return backup_files
return backup_files

View File

@@ -76,6 +76,7 @@ RESERVED_PROPERTY_NAMES = [
"title",
]
def get_type_class(type_str: str) -> Any:
if TYPE_TO_CLASS.get(type_str):
return TYPE_TO_CLASS[type_str]
@@ -503,7 +504,6 @@ class PropertyField(pydantic.BaseModel):
data["choices"] = self.generate_choices()
return data
# validate name - cannot be in FORBIDDEN_PROPERTY_NAMES
@pydantic.model_validator(mode="before")
@classmethod
@@ -1389,30 +1389,41 @@ class Graph(NodeBase):
# Control which fields are serialized for nodes - None means serialize all fields
_node_serialization_fields: ClassVar[set[str] | None] = {
"title", "id", "properties", "x", "y", "width", "height",
"collapsed", "inherited", "registry", "base_type", "dynamic_inputs"
"title",
"id",
"properties",
"x",
"y",
"width",
"height",
"collapsed",
"inherited",
"registry",
"base_type",
"dynamic_inputs",
}
@pydantic.field_serializer('nodes')
@pydantic.field_serializer("nodes")
def serialize_nodes(self, nodes_dict):
"""
Custom serializer that calls model_dump on each node directly to preserve
Custom serializer that calls model_dump on each node directly to preserve
all derived class fields. Uses _node_serialization_fields to control which
fields are included.
"""
result = {}
for node_id, node in nodes_dict.items():
node_data = node.model_dump()
# Filter fields if _node_serialization_fields is set
if self._node_serialization_fields is not None:
node_data = {
k: v for k, v in node_data.items()
k: v
for k, v in node_data.items()
if k in self._node_serialization_fields
}
result[node_id] = node_data
return result
@property

View File

@@ -1,24 +1,27 @@
import pydantic
from typing import ClassVar
from . import Node
from talemate.game.engine.nodes.registry import base_node_type
@base_node_type("core/DynamicSocketNodeBase")
class DynamicSocketNodeBase(Node):
"""
Base class for nodes that support dynamic sockets.
Dynamic sockets are stored in the dynamic_inputs property and
Dynamic sockets are stored in the dynamic_inputs property and
automatically included in the inputs computed property.
"""
dynamic_input_label: str = "input{i}"
dynamic_inputs: list[dict] = pydantic.Field(default_factory=list)
def setup(self):
super().setup()
self.add_static_inputs()
for dynamic_input in self.dynamic_inputs:
self.add_input(dynamic_input["name"], socket_type=dynamic_input["type"], optional=True)
self.add_input(
dynamic_input["name"], socket_type=dynamic_input["type"], optional=True
)
def add_static_inputs(self):
pass
pass

View File

@@ -851,11 +851,11 @@ class DictCollector(DynamicSocketNodeBase):
Collects key-value pairs into a dictionary with dynamic inputs.
Connect tuple outputs like (key, value) to the dynamic input slots.
"""
dynamic_input_label: str = "item{i}"
supports_dynamic_sockets: bool = True # Frontend flag
dynamic_input_type: str = "key/value" # Type for dynamic sockets
dynamic_input_type: str = "key/value" # Type for dynamic sockets
@pydantic.computed_field(description="Node style")
@property
def style(self) -> NodeStyle:
@@ -863,7 +863,7 @@ class DictCollector(DynamicSocketNodeBase):
icon="F1C83",
title_color="#4f413a",
)
def __init__(self, title="Dict Collector", **kwargs):
super().__init__(title=title, **kwargs)
@@ -874,16 +874,15 @@ class DictCollector(DynamicSocketNodeBase):
super().setup()
# Start with just the output - inputs added dynamically
self.add_output("dict", socket_type="dict")
async def run(self, state: GraphState):
result_dict = self.normalized_input_value("dict") or {}
# Process all inputs
for socket in self.inputs:
if socket.name in ["dict"]:
continue
if socket.source and socket.value is not UNRESOLVED:
value = socket.value
if isinstance(value, tuple) and len(value) == 2:
@@ -900,21 +899,25 @@ class DictCollector(DynamicSocketNodeBase):
# fallback to socket name
result_dict[socket.name] = value
if state.verbosity >= NodeVerbosity.VERBOSE:
log.debug("Source node has no name or key property, falling back to socket name", node=source_node)
log.debug(
"Source node has no name or key property, falling back to socket name",
node=source_node,
)
self.set_output_values({"dict": result_dict})
@register("data/ListCollector")
class ListCollector(DynamicSocketNodeBase):
"""
Collects items into a list with dynamic inputs.
Connect tuple outputs like (key, value) to the dynamic input slots.
"""
dynamic_input_label: str = "item{i}"
supports_dynamic_sockets: bool = True # Frontend flag
dynamic_input_type: str = "any" # Type for dynamic sockets
dynamic_input_type: str = "any" # Type for dynamic sockets
@pydantic.computed_field(description="Node style")
@property
def style(self) -> NodeStyle:
@@ -922,37 +925,37 @@ class ListCollector(DynamicSocketNodeBase):
icon="F1C84",
title_color="#4f413a",
)
def __init__(self, title="List Collector", **kwargs):
super().__init__(title=title, **kwargs)
def add_static_inputs(self):
self.add_input("list", socket_type="list", optional=True)
def setup(self):
super().setup()
self.add_output("list", socket_type="list")
async def run(self, state: GraphState):
result_list = self.normalized_input_value("list") or []
for socket in self.inputs:
if socket.name in ["list"]:
continue
if socket.source and socket.value is not UNRESOLVED:
result_list.append(socket.value)
self.set_output_values({"list": result_list})
@register("data/MakeKeyValuePair")
class MakeKeyValuePair(Node):
"""
Creates a key-value pair tuple from separate key and value inputs.
Outputs a tuple (key, value) that can be connected to DictCollector.
"""
class Fields:
key = PropertyField(
name="key",
@@ -960,7 +963,7 @@ class MakeKeyValuePair(Node):
type="str",
default="",
)
value = PropertyField(
name="value",
description="Value",
@@ -971,9 +974,7 @@ class MakeKeyValuePair(Node):
@pydantic.computed_field(description="Node style")
@property
def style(self) -> NodeStyle:
return NodeStyle(
auto_title="KV {key}"
)
return NodeStyle(auto_title="KV {key}")
def __init__(self, title="Make Key-Value Pair", **kwargs):
super().__init__(title=title, **kwargs)
@@ -981,19 +982,19 @@ class MakeKeyValuePair(Node):
def setup(self):
self.add_input("key", socket_type="str", optional=True)
self.add_input("value", socket_type="any", optional=True)
self.set_property("key", "")
self.set_property("value", "")
self.add_output("kv", socket_type="key/value")
self.add_output("key", socket_type="str")
self.add_output("value", socket_type="any")
async def run(self, state: GraphState):
key = self.get_input_value("key")
value = self.get_input_value("value")
# Create tuple from key and value
result_tuple = (key, value)
self.set_output_values({"kv": result_tuple, "key": key, "value": value})

View File

@@ -111,7 +111,6 @@ def export_flat_graph(graph: "Graph") -> dict:
graph.ensure_connections()
for node in graph.nodes.values():
flat_node: dict = {
"id": node.id,
"registry": node.registry,
@@ -125,13 +124,11 @@ def export_flat_graph(graph: "Graph") -> dict:
"collapsed": node.collapsed,
"inherited": node.inherited,
}
# Export dynamic sockets from dynamic_inputs property
if getattr(node, 'dynamic_inputs', None) is not None:
flat_node["dynamic_sockets"] = {
"inputs": node.dynamic_inputs
}
if getattr(node, "dynamic_inputs", None) is not None:
flat_node["dynamic_sockets"] = {"inputs": node.dynamic_inputs}
flat["nodes"].append(flat_node)
for input in node.inputs:
@@ -217,7 +214,6 @@ def import_flat_graph(flat_data: dict, main_graph: "Graph" = None) -> Graph:
if not node_cls:
raise ValueError(f"Unknown node type: {node_data['registry']}")
dynamic_inputs = node_data.get("dynamic_sockets", {}).get("inputs", [])
node = node_cls(
@@ -286,7 +282,7 @@ def import_flat_graph(flat_data: dict, main_graph: "Graph" = None) -> Graph:
graph_data = main_graph.model_dump()
load_extended_components(main_graph.extends, graph_data)
main_graph = main_graph.__class__(**graph_data)
# Initialize the graph
return main_graph.reinitialize()

View File

@@ -1403,7 +1403,7 @@ class SceneLoop(Loop):
scene: "Scene" = state.outer.data["scene"]
if scene.auto_save:
await scene.save(auto=True)
if scene.auto_backup:
await auto_backup.auto_backup(scene)

View File

@@ -2,11 +2,9 @@ import os
import json
import tempfile
import shutil
import asyncio
import types
import pytest
from unittest.mock import patch, MagicMock, AsyncMock
from datetime import datetime
from unittest.mock import patch, MagicMock
from talemate import auto_backup
from talemate.config import Config
@@ -37,7 +35,7 @@ def mock_scene(temp_save_dir):
"name": "Test Scene",
"description": "A test scene",
"characters": [],
"history": []
"history": [],
}
return scene
@@ -57,7 +55,7 @@ def mock_config():
def disabled_auto_backup_config():
"""Create a mock config with auto backup disabled."""
config = Config()
config.game = Game()
config.game = Game()
config.game.general = General()
config.game.general.auto_backup = False
config.game.general.auto_backup_max_backups = 3
@@ -72,9 +70,11 @@ def disabled_auto_backup_config():
@pytest.mark.asyncio
async def test_auto_backup_disabled(mock_scene, disabled_auto_backup_config):
"""Test that auto backup does nothing when disabled."""
with patch('talemate.auto_backup.get_config', return_value=disabled_auto_backup_config):
with patch(
"talemate.auto_backup.get_config", return_value=disabled_auto_backup_config
):
await auto_backup.auto_backup(mock_scene)
# No backup directory should be created
backups_dir = os.path.join(mock_scene.save_dir, "backups")
assert not os.path.exists(backups_dir)
@@ -86,10 +86,10 @@ async def test_auto_backup_no_filename(mock_config):
scene = types.SimpleNamespace()
scene.filename = None
scene.name = "Test Scene"
with patch('talemate.auto_backup.get_config', return_value=mock_config):
with patch("talemate.auto_backup.get_config", return_value=mock_config):
await auto_backup.auto_backup(scene)
# No backup should be created since scene has no filename
@@ -100,10 +100,10 @@ async def test_auto_backup_no_name(mock_config, temp_save_dir):
scene.filename = "test.json"
scene.name = None
scene.save_dir = temp_save_dir
with patch('talemate.auto_backup.get_config', return_value=mock_config):
with patch("talemate.auto_backup.get_config", return_value=mock_config):
await auto_backup.auto_backup(scene)
# No backup directory should be created
backups_dir = os.path.join(temp_save_dir, "backups")
assert not os.path.exists(backups_dir)
@@ -112,22 +112,24 @@ async def test_auto_backup_no_name(mock_config, temp_save_dir):
@pytest.mark.asyncio
async def test_auto_backup_creates_backup(mock_scene, mock_config):
"""Test that auto backup creates a backup file."""
with patch('talemate.auto_backup.get_config', return_value=mock_config):
with patch("talemate.auto_backup.get_config", return_value=mock_config):
await auto_backup.auto_backup(mock_scene)
# Check that backup directory was created
backups_dir = os.path.join(mock_scene.save_dir, "backups")
assert os.path.exists(backups_dir)
# Check that a backup file was created
backup_files = [f for f in os.listdir(backups_dir) if f.startswith("test_scene_backup_")]
backup_files = [
f for f in os.listdir(backups_dir) if f.startswith("test_scene_backup_")
]
assert len(backup_files) == 1
# Check backup file content
backup_path = os.path.join(backups_dir, backup_files[0])
with open(backup_path, 'r') as f:
with open(backup_path, "r") as f:
backup_data = json.load(f)
assert backup_data["name"] == "Test Scene"
assert backup_data["description"] == "A test scene"
@@ -135,19 +137,19 @@ async def test_auto_backup_creates_backup(mock_scene, mock_config):
@pytest.mark.asyncio
async def test_auto_backup_filename_format(mock_scene, mock_config):
"""Test that backup filenames follow the expected format."""
with patch('talemate.auto_backup.get_config', return_value=mock_config):
with patch("talemate.auto_backup.get_config", return_value=mock_config):
# Mock datetime.now() to return a consistent timestamp
mock_now = MagicMock()
mock_now.strftime.return_value = "20250829_143022"
with patch('talemate.auto_backup.datetime.datetime') as mock_datetime:
with patch("talemate.auto_backup.datetime.datetime") as mock_datetime:
mock_datetime.now.return_value = mock_now
await auto_backup.auto_backup(mock_scene)
backups_dir = os.path.join(mock_scene.save_dir, "backups")
backup_files = os.listdir(backups_dir)
assert len(backup_files) == 1
assert backup_files[0] == "test_scene_backup_20250829_143022.json"
@@ -162,24 +164,26 @@ async def test_cleanup_old_backups(mock_scene, mock_config):
"""Test that old backups are cleaned up when exceeding max_backups."""
backups_dir = os.path.join(mock_scene.save_dir, "backups")
os.makedirs(backups_dir)
# Create 5 backup files (more than max_backups = 3)
for i in range(5):
backup_name = f"test_scene_backup_2025082{i}_143022.json"
backup_path = os.path.join(backups_dir, backup_name)
with open(backup_path, 'w') as f:
with open(backup_path, "w") as f:
json.dump({"backup": i}, f)
with patch('talemate.auto_backup.get_config', return_value=mock_config):
with patch("talemate.auto_backup.get_config", return_value=mock_config):
await auto_backup.auto_backup(mock_scene)
# Should have max_backups + 1 (newly created) = 4 files initially
# But cleanup should reduce it to max_backups = 3
backup_files = [f for f in os.listdir(backups_dir) if f.startswith("test_scene_backup_")]
backup_files = [
f for f in os.listdir(backups_dir) if f.startswith("test_scene_backup_")
]
assert len(backup_files) <= 3
@pytest.mark.asyncio
@pytest.mark.asyncio
async def test_cleanup_zero_max_backups(mock_scene):
"""Test that cleanup handles zero max_backups correctly."""
config = Config()
@@ -187,22 +191,24 @@ async def test_cleanup_zero_max_backups(mock_scene):
config.game.general = General()
config.game.general.auto_backup = True
config.game.general.auto_backup_max_backups = 0
backups_dir = os.path.join(mock_scene.save_dir, "backups")
os.makedirs(backups_dir)
# Create some backup files
for i in range(3):
backup_name = f"test_scene_backup_2025082{i}_143022.json"
backup_path = os.path.join(backups_dir, backup_name)
with open(backup_path, 'w') as f:
with open(backup_path, "w") as f:
json.dump({"backup": i}, f)
with patch('talemate.auto_backup.get_config', return_value=config):
with patch("talemate.auto_backup.get_config", return_value=config):
await auto_backup.auto_backup(mock_scene)
# All old backups should remain since max_backups=0 means no cleanup
backup_files = [f for f in os.listdir(backups_dir) if f.startswith("test_scene_backup_")]
backup_files = [
f for f in os.listdir(backups_dir) if f.startswith("test_scene_backup_")
]
assert len(backup_files) == 4 # 3 original + 1 new
@@ -211,29 +217,33 @@ async def test_cleanup_different_scenes(mock_config, temp_save_dir):
"""Test that cleanup only affects backups for the same scene."""
backups_dir = os.path.join(temp_save_dir, "backups")
os.makedirs(backups_dir)
# Create backups for different scenes
for scene_name in ["scene1", "scene2", "test_scene"]:
for i in range(2):
backup_name = f"{scene_name}_backup_2025082{i}_143022.json"
backup_path = os.path.join(backups_dir, backup_name)
with open(backup_path, 'w') as f:
with open(backup_path, "w") as f:
json.dump({"scene": scene_name, "backup": i}, f)
# Create scene for test_scene
scene = types.SimpleNamespace()
scene.name = "Test Scene"
scene.filename = "test_scene.json"
scene.save_dir = temp_save_dir
scene.serialize = {"name": "Test Scene"}
with patch('talemate.auto_backup.get_config', return_value=mock_config):
with patch("talemate.auto_backup.get_config", return_value=mock_config):
await auto_backup.auto_backup(scene)
# Check that other scene backups weren't affected
scene1_backups = [f for f in os.listdir(backups_dir) if f.startswith("scene1_backup_")]
scene2_backups = [f for f in os.listdir(backups_dir) if f.startswith("scene2_backup_")]
scene1_backups = [
f for f in os.listdir(backups_dir) if f.startswith("scene1_backup_")
]
scene2_backups = [
f for f in os.listdir(backups_dir) if f.startswith("scene2_backup_")
]
assert len(scene1_backups) == 2
assert len(scene2_backups) == 2
@@ -249,7 +259,7 @@ def test_get_backup_files_no_scene_filename(temp_save_dir):
scene.filename = None
scene.name = "Test Scene"
scene.save_dir = temp_save_dir
result = auto_backup.get_backup_files(scene)
assert result == []
@@ -264,7 +274,7 @@ def test_get_backup_files_empty_backups_dir(mock_scene):
"""Test get_backup_files returns empty list when backups directory is empty."""
backups_dir = os.path.join(mock_scene.save_dir, "backups")
os.makedirs(backups_dir)
result = auto_backup.get_backup_files(mock_scene)
assert result == []
@@ -273,26 +283,26 @@ def test_get_backup_files_with_backups(mock_scene):
"""Test get_backup_files returns correct backup file information."""
backups_dir = os.path.join(mock_scene.save_dir, "backups")
os.makedirs(backups_dir)
# Create backup files
backup_names = [
"test_scene_backup_20250829_143022.json",
"test_scene_backup_20250829_143045.json"
"test_scene_backup_20250829_143045.json",
]
for name in backup_names:
backup_path = os.path.join(backups_dir, name)
with open(backup_path, 'w') as f:
with open(backup_path, "w") as f:
json.dump({"test": "data"}, f)
result = auto_backup.get_backup_files(mock_scene)
assert len(result) == 2
# Check that results are sorted by timestamp (newest first)
timestamps = [backup["timestamp"] for backup in result]
assert timestamps == sorted(timestamps, reverse=True)
# Check that each backup has required fields
for backup in result:
assert "name" in backup
@@ -306,22 +316,22 @@ def test_get_backup_files_filters_other_scenes(mock_scene):
"""Test get_backup_files only returns backups for the correct scene."""
backups_dir = os.path.join(mock_scene.save_dir, "backups")
os.makedirs(backups_dir)
# Create backup files for different scenes
files = [
"test_scene_backup_20250829_143022.json", # Should be included
"other_scene_backup_20250829_143022.json", # Should be excluded
"test_scene_backup_20250829_143045.json", # Should be included
"random_file.json" # Should be excluded
"random_file.json", # Should be excluded
]
for name in files:
backup_path = os.path.join(backups_dir, name)
with open(backup_path, 'w') as f:
with open(backup_path, "w") as f:
json.dump({"test": "data"}, f)
result = auto_backup.get_backup_files(mock_scene)
# Should only return the 2 backups for test_scene
assert len(result) == 2
for backup in result:
@@ -336,11 +346,11 @@ def test_get_backup_files_filters_other_scenes(mock_scene):
@pytest.mark.asyncio
async def test_auto_backup_file_write_error(mock_scene, mock_config):
"""Test auto backup handles file write errors gracefully."""
with patch('talemate.auto_backup.get_config', return_value=mock_config):
with patch('builtins.open', side_effect=PermissionError("Permission denied")):
with patch("talemate.auto_backup.get_config", return_value=mock_config):
with patch("builtins.open", side_effect=PermissionError("Permission denied")):
# Should not raise an exception, just log the error
await auto_backup.auto_backup(mock_scene)
# Verify no backup was created due to the error
backups_dir = os.path.join(mock_scene.save_dir, "backups")
if os.path.exists(backups_dir):
@@ -353,29 +363,31 @@ async def test_cleanup_handles_removal_errors(mock_scene, mock_config):
"""Test that cleanup handles file removal errors gracefully."""
backups_dir = os.path.join(mock_scene.save_dir, "backups")
os.makedirs(backups_dir)
# Create more backup files than max_backups
for i in range(5):
backup_name = f"test_scene_backup_2025082{i}_143022.json"
backup_path = os.path.join(backups_dir, backup_name)
with open(backup_path, 'w') as f:
with open(backup_path, "w") as f:
json.dump({"backup": i}, f)
with patch('talemate.auto_backup.get_config', return_value=mock_config):
with patch('os.remove', side_effect=OSError("Cannot remove file")):
with patch("talemate.auto_backup.get_config", return_value=mock_config):
with patch("os.remove", side_effect=OSError("Cannot remove file")):
# Should not raise an exception, just log the error
try:
await auto_backup.auto_backup(mock_scene)
except OSError:
pytest.fail("auto_backup cleanup should handle file removal errors gracefully")
pytest.fail(
"auto_backup cleanup should handle file removal errors gracefully"
)
def test_get_backup_files_handles_listdir_error(mock_scene):
"""Test get_backup_files handles directory listing errors gracefully."""
backups_dir = os.path.join(mock_scene.save_dir, "backups")
os.makedirs(backups_dir)
with patch('os.listdir', side_effect=OSError("Cannot list directory")):
with patch("os.listdir", side_effect=OSError("Cannot list directory")):
result = auto_backup.get_backup_files(mock_scene)
assert result == []
@@ -388,22 +400,24 @@ def test_get_backup_files_handles_listdir_error(mock_scene):
@pytest.mark.asyncio
async def test_multiple_backup_cycles(mock_scene, mock_config):
"""Test multiple backup cycles work correctly."""
with patch('talemate.auto_backup.get_config', return_value=mock_config):
with patch("talemate.auto_backup.get_config", return_value=mock_config):
# Create multiple backups
for i in range(5):
mock_scene.serialize = {"backup_number": i, "name": "Test Scene"}
await auto_backup.auto_backup(mock_scene)
backups_dir = os.path.join(mock_scene.save_dir, "backups")
backup_files = [f for f in os.listdir(backups_dir) if f.startswith("test_scene_backup_")]
backup_files = [
f for f in os.listdir(backups_dir) if f.startswith("test_scene_backup_")
]
# Should respect max_backups = 3
assert len(backup_files) <= 3
# Verify the content of remaining backups
for backup_file in backup_files:
backup_path = os.path.join(backups_dir, backup_file)
with open(backup_path, 'r') as f:
with open(backup_path, "r") as f:
backup_data = json.load(f)
assert "backup_number" in backup_data
assert backup_data["name"] == "Test Scene"
assert backup_data["name"] == "Test Scene"