From f3d02530d55490a135cc45905be0dc62673a8d0f Mon Sep 17 00:00:00 2001 From: vegu-ai-tools <152010387+vegu-ai-tools@users.noreply.github.com> Date: Sat, 30 Aug 2025 00:39:13 +0300 Subject: [PATCH] linting --- src/talemate/auto_backup.py | 77 ++++---- .../game/engine/nodes/core/__init__.py | 29 ++- .../game/engine/nodes/core/dynamic.py | 17 +- src/talemate/game/engine/nodes/data.py | 63 +++--- src/talemate/game/engine/nodes/layout.py | 14 +- src/talemate/game/engine/nodes/scene.py | 2 +- tests/test_auto_backup.py | 180 ++++++++++-------- 7 files changed, 208 insertions(+), 174 deletions(-) diff --git a/src/talemate/auto_backup.py b/src/talemate/auto_backup.py index 5149cc0b..ac8b894f 100644 --- a/src/talemate/auto_backup.py +++ b/src/talemate/auto_backup.py @@ -3,6 +3,7 @@ Auto backup functionality for scenes. Similar to auto save, but creates rolling backup files instead of overwriting the main save. """ + import json import os import datetime @@ -22,61 +23,63 @@ log = structlog.get_logger("talemate.auto_backup") async def auto_backup(scene: "Scene") -> None: """ Creates an automatic backup of the scene. - + Unlike save_as, this function: - Does not create a new memory database - Saves to a dedicated backups directory - Maintains a rolling set of backup files - Only proceeds if auto_backup is enabled in config - + Args: scene: The scene instance to backup """ config = get_config() - + # Check if auto backup is enabled if not config.game.general.auto_backup: log.debug("Auto backup disabled, skipping") return - + # Skip if scene has never been saved (no filename or name) if not scene.filename or not scene.name: log.debug("Scene has never been saved, skipping auto backup") return - + max_backups = config.game.general.auto_backup_max_backups log.debug("Creating auto backup", filename=scene.filename, max_backups=max_backups) - + # Create backups directory structure backups_dir = os.path.join(scene.save_dir, "backups") os.makedirs(backups_dir, exist_ok=True) - + # Generate backup filename with timestamp timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") base_name = os.path.splitext(scene.filename)[0] backup_filename = f"{base_name}_backup_{timestamp}.json" backup_filepath = os.path.join(backups_dir, backup_filename) - + # Get scene data (same as normal save) scene_data = scene.serialize - + # Write backup file try: with open(backup_filepath, "w") as f: json.dump(scene_data, f, indent=2, cls=save.SceneEncoder) - + log.info("Auto backup created", backup_file=backup_filename) - + # Clean up old backups await _cleanup_old_backups(backups_dir, base_name, max_backups) except OSError as e: log.error("Failed to create auto backup", backup_file=backup_filename, error=e) -async def _cleanup_old_backups(backups_dir: str, base_name: str, max_backups: int) -> None: +async def _cleanup_old_backups( + backups_dir: str, base_name: str, max_backups: int +) -> None: """ Removes old backup files, keeping only the most recent max_backups files. - + Args: backups_dir: Directory containing backup files base_name: Base name of the scene file (without extension) @@ -84,20 +87,22 @@ async def _cleanup_old_backups(backups_dir: str, base_name: str, max_backups: in """ if max_backups <= 0: return - + try: # Find all backup files for this scene backup_files = [] for filename in os.listdir(backups_dir): - if filename.startswith(f"{base_name}_backup_") and filename.endswith(".json"): + if filename.startswith(f"{base_name}_backup_") and filename.endswith( + ".json" + ): filepath = os.path.join(backups_dir, filename) # Get modification time for sorting mtime = os.path.getmtime(filepath) backup_files.append((mtime, filepath)) - + # Sort by modification time (newest first) backup_files.sort(reverse=True) - + # Remove excess backup files for _, filepath in backup_files[max_backups:]: try: @@ -105,7 +110,7 @@ async def _cleanup_old_backups(backups_dir: str, base_name: str, max_backups: in log.debug("Removed old backup", filepath=filepath) except OSError as e: log.warning("Failed to remove old backup", filepath=filepath, error=e) - + except OSError as e: log.warning("Failed to cleanup old backups", backups_dir=backups_dir, error=e) @@ -113,39 +118,43 @@ async def _cleanup_old_backups(backups_dir: str, base_name: str, max_backups: in def get_backup_files(scene: "Scene") -> list[dict]: """ Returns a list of backup files for the given scene. - + Args: scene: The scene instance - + Returns: List of dicts with backup file information (name, path, timestamp) """ if not scene.filename or not scene.name: return [] - + backups_dir = os.path.join(scene.save_dir, "backups") if not os.path.exists(backups_dir): return [] - + base_name = os.path.splitext(scene.filename)[0] backup_files = [] - + try: for filename in os.listdir(backups_dir): - if filename.startswith(f"{base_name}_backup_") and filename.endswith(".json"): + if filename.startswith(f"{base_name}_backup_") and filename.endswith( + ".json" + ): filepath = os.path.join(backups_dir, filename) mtime = os.path.getmtime(filepath) - backup_files.append({ - "name": filename, - "path": filepath, - "timestamp": datetime.datetime.fromtimestamp(mtime).isoformat(), - "size": os.path.getsize(filepath) - }) - + backup_files.append( + { + "name": filename, + "path": filepath, + "timestamp": datetime.datetime.fromtimestamp(mtime).isoformat(), + "size": os.path.getsize(filepath), + } + ) + # Sort by modification time (newest first) backup_files.sort(key=lambda x: x["timestamp"], reverse=True) - + except OSError as e: log.warning("Failed to list backup files", backups_dir=backups_dir, error=e) - - return backup_files \ No newline at end of file + + return backup_files diff --git a/src/talemate/game/engine/nodes/core/__init__.py b/src/talemate/game/engine/nodes/core/__init__.py index 08473262..3a7af614 100644 --- a/src/talemate/game/engine/nodes/core/__init__.py +++ b/src/talemate/game/engine/nodes/core/__init__.py @@ -76,6 +76,7 @@ RESERVED_PROPERTY_NAMES = [ "title", ] + def get_type_class(type_str: str) -> Any: if TYPE_TO_CLASS.get(type_str): return TYPE_TO_CLASS[type_str] @@ -503,7 +504,6 @@ class PropertyField(pydantic.BaseModel): data["choices"] = self.generate_choices() return data - # validate name - cannot be in FORBIDDEN_PROPERTY_NAMES @pydantic.model_validator(mode="before") @classmethod @@ -1389,30 +1389,41 @@ class Graph(NodeBase): # Control which fields are serialized for nodes - None means serialize all fields _node_serialization_fields: ClassVar[set[str] | None] = { - "title", "id", "properties", "x", "y", "width", "height", - "collapsed", "inherited", "registry", "base_type", "dynamic_inputs" + "title", + "id", + "properties", + "x", + "y", + "width", + "height", + "collapsed", + "inherited", + "registry", + "base_type", + "dynamic_inputs", } - @pydantic.field_serializer('nodes') + @pydantic.field_serializer("nodes") def serialize_nodes(self, nodes_dict): """ - Custom serializer that calls model_dump on each node directly to preserve + Custom serializer that calls model_dump on each node directly to preserve all derived class fields. Uses _node_serialization_fields to control which fields are included. """ result = {} for node_id, node in nodes_dict.items(): node_data = node.model_dump() - + # Filter fields if _node_serialization_fields is set if self._node_serialization_fields is not None: node_data = { - k: v for k, v in node_data.items() + k: v + for k, v in node_data.items() if k in self._node_serialization_fields } - + result[node_id] = node_data - + return result @property diff --git a/src/talemate/game/engine/nodes/core/dynamic.py b/src/talemate/game/engine/nodes/core/dynamic.py index 13ff8522..ba52c8e6 100644 --- a/src/talemate/game/engine/nodes/core/dynamic.py +++ b/src/talemate/game/engine/nodes/core/dynamic.py @@ -1,24 +1,27 @@ import pydantic -from typing import ClassVar from . import Node from talemate.game.engine.nodes.registry import base_node_type + @base_node_type("core/DynamicSocketNodeBase") class DynamicSocketNodeBase(Node): """ Base class for nodes that support dynamic sockets. - Dynamic sockets are stored in the dynamic_inputs property and + Dynamic sockets are stored in the dynamic_inputs property and automatically included in the inputs computed property. """ + dynamic_input_label: str = "input{i}" dynamic_inputs: list[dict] = pydantic.Field(default_factory=list) - + def setup(self): super().setup() - + self.add_static_inputs() for dynamic_input in self.dynamic_inputs: - self.add_input(dynamic_input["name"], socket_type=dynamic_input["type"], optional=True) - + self.add_input( + dynamic_input["name"], socket_type=dynamic_input["type"], optional=True + ) + def add_static_inputs(self): - pass \ No newline at end of file + pass diff --git a/src/talemate/game/engine/nodes/data.py b/src/talemate/game/engine/nodes/data.py index 62116a49..7239eb4e 100644 --- a/src/talemate/game/engine/nodes/data.py +++ b/src/talemate/game/engine/nodes/data.py @@ -851,11 +851,11 @@ class DictCollector(DynamicSocketNodeBase): Collects key-value pairs into a dictionary with dynamic inputs. Connect tuple outputs like (key, value) to the dynamic input slots. """ - + dynamic_input_label: str = "item{i}" supports_dynamic_sockets: bool = True # Frontend flag - dynamic_input_type: str = "key/value" # Type for dynamic sockets - + dynamic_input_type: str = "key/value" # Type for dynamic sockets + @pydantic.computed_field(description="Node style") @property def style(self) -> NodeStyle: @@ -863,7 +863,7 @@ class DictCollector(DynamicSocketNodeBase): icon="F1C83", title_color="#4f413a", ) - + def __init__(self, title="Dict Collector", **kwargs): super().__init__(title=title, **kwargs) @@ -874,16 +874,15 @@ class DictCollector(DynamicSocketNodeBase): super().setup() # Start with just the output - inputs added dynamically self.add_output("dict", socket_type="dict") - + async def run(self, state: GraphState): result_dict = self.normalized_input_value("dict") or {} - + # Process all inputs for socket in self.inputs: - if socket.name in ["dict"]: continue - + if socket.source and socket.value is not UNRESOLVED: value = socket.value if isinstance(value, tuple) and len(value) == 2: @@ -900,21 +899,25 @@ class DictCollector(DynamicSocketNodeBase): # fallback to socket name result_dict[socket.name] = value if state.verbosity >= NodeVerbosity.VERBOSE: - log.debug("Source node has no name or key property, falling back to socket name", node=source_node) - + log.debug( + "Source node has no name or key property, falling back to socket name", + node=source_node, + ) + self.set_output_values({"dict": result_dict}) + @register("data/ListCollector") class ListCollector(DynamicSocketNodeBase): """ Collects items into a list with dynamic inputs. Connect tuple outputs like (key, value) to the dynamic input slots. """ - + dynamic_input_label: str = "item{i}" supports_dynamic_sockets: bool = True # Frontend flag - dynamic_input_type: str = "any" # Type for dynamic sockets - + dynamic_input_type: str = "any" # Type for dynamic sockets + @pydantic.computed_field(description="Node style") @property def style(self) -> NodeStyle: @@ -922,37 +925,37 @@ class ListCollector(DynamicSocketNodeBase): icon="F1C84", title_color="#4f413a", ) - - + def __init__(self, title="List Collector", **kwargs): super().__init__(title=title, **kwargs) - + def add_static_inputs(self): self.add_input("list", socket_type="list", optional=True) - + def setup(self): super().setup() self.add_output("list", socket_type="list") - + async def run(self, state: GraphState): result_list = self.normalized_input_value("list") or [] - + for socket in self.inputs: if socket.name in ["list"]: continue - + if socket.source and socket.value is not UNRESOLVED: result_list.append(socket.value) - + self.set_output_values({"list": result_list}) + @register("data/MakeKeyValuePair") class MakeKeyValuePair(Node): """ Creates a key-value pair tuple from separate key and value inputs. Outputs a tuple (key, value) that can be connected to DictCollector. """ - + class Fields: key = PropertyField( name="key", @@ -960,7 +963,7 @@ class MakeKeyValuePair(Node): type="str", default="", ) - + value = PropertyField( name="value", description="Value", @@ -971,9 +974,7 @@ class MakeKeyValuePair(Node): @pydantic.computed_field(description="Node style") @property def style(self) -> NodeStyle: - return NodeStyle( - auto_title="KV {key}" - ) + return NodeStyle(auto_title="KV {key}") def __init__(self, title="Make Key-Value Pair", **kwargs): super().__init__(title=title, **kwargs) @@ -981,19 +982,19 @@ class MakeKeyValuePair(Node): def setup(self): self.add_input("key", socket_type="str", optional=True) self.add_input("value", socket_type="any", optional=True) - + self.set_property("key", "") self.set_property("value", "") - + self.add_output("kv", socket_type="key/value") self.add_output("key", socket_type="str") self.add_output("value", socket_type="any") - + async def run(self, state: GraphState): key = self.get_input_value("key") value = self.get_input_value("value") - + # Create tuple from key and value result_tuple = (key, value) - + self.set_output_values({"kv": result_tuple, "key": key, "value": value}) diff --git a/src/talemate/game/engine/nodes/layout.py b/src/talemate/game/engine/nodes/layout.py index 673ca810..e09ce01c 100644 --- a/src/talemate/game/engine/nodes/layout.py +++ b/src/talemate/game/engine/nodes/layout.py @@ -111,7 +111,6 @@ def export_flat_graph(graph: "Graph") -> dict: graph.ensure_connections() for node in graph.nodes.values(): - flat_node: dict = { "id": node.id, "registry": node.registry, @@ -125,13 +124,11 @@ def export_flat_graph(graph: "Graph") -> dict: "collapsed": node.collapsed, "inherited": node.inherited, } - + # Export dynamic sockets from dynamic_inputs property - if getattr(node, 'dynamic_inputs', None) is not None: - flat_node["dynamic_sockets"] = { - "inputs": node.dynamic_inputs - } - + if getattr(node, "dynamic_inputs", None) is not None: + flat_node["dynamic_sockets"] = {"inputs": node.dynamic_inputs} + flat["nodes"].append(flat_node) for input in node.inputs: @@ -217,7 +214,6 @@ def import_flat_graph(flat_data: dict, main_graph: "Graph" = None) -> Graph: if not node_cls: raise ValueError(f"Unknown node type: {node_data['registry']}") - dynamic_inputs = node_data.get("dynamic_sockets", {}).get("inputs", []) node = node_cls( @@ -286,7 +282,7 @@ def import_flat_graph(flat_data: dict, main_graph: "Graph" = None) -> Graph: graph_data = main_graph.model_dump() load_extended_components(main_graph.extends, graph_data) main_graph = main_graph.__class__(**graph_data) - + # Initialize the graph return main_graph.reinitialize() diff --git a/src/talemate/game/engine/nodes/scene.py b/src/talemate/game/engine/nodes/scene.py index 275bf4b9..df7c0d84 100644 --- a/src/talemate/game/engine/nodes/scene.py +++ b/src/talemate/game/engine/nodes/scene.py @@ -1403,7 +1403,7 @@ class SceneLoop(Loop): scene: "Scene" = state.outer.data["scene"] if scene.auto_save: await scene.save(auto=True) - + if scene.auto_backup: await auto_backup.auto_backup(scene) diff --git a/tests/test_auto_backup.py b/tests/test_auto_backup.py index 7e17c8aa..ee330229 100644 --- a/tests/test_auto_backup.py +++ b/tests/test_auto_backup.py @@ -2,11 +2,9 @@ import os import json import tempfile import shutil -import asyncio import types import pytest -from unittest.mock import patch, MagicMock, AsyncMock -from datetime import datetime +from unittest.mock import patch, MagicMock from talemate import auto_backup from talemate.config import Config @@ -37,7 +35,7 @@ def mock_scene(temp_save_dir): "name": "Test Scene", "description": "A test scene", "characters": [], - "history": [] + "history": [], } return scene @@ -57,7 +55,7 @@ def mock_config(): def disabled_auto_backup_config(): """Create a mock config with auto backup disabled.""" config = Config() - config.game = Game() + config.game = Game() config.game.general = General() config.game.general.auto_backup = False config.game.general.auto_backup_max_backups = 3 @@ -72,9 +70,11 @@ def disabled_auto_backup_config(): @pytest.mark.asyncio async def test_auto_backup_disabled(mock_scene, disabled_auto_backup_config): """Test that auto backup does nothing when disabled.""" - with patch('talemate.auto_backup.get_config', return_value=disabled_auto_backup_config): + with patch( + "talemate.auto_backup.get_config", return_value=disabled_auto_backup_config + ): await auto_backup.auto_backup(mock_scene) - + # No backup directory should be created backups_dir = os.path.join(mock_scene.save_dir, "backups") assert not os.path.exists(backups_dir) @@ -86,10 +86,10 @@ async def test_auto_backup_no_filename(mock_config): scene = types.SimpleNamespace() scene.filename = None scene.name = "Test Scene" - - with patch('talemate.auto_backup.get_config', return_value=mock_config): + + with patch("talemate.auto_backup.get_config", return_value=mock_config): await auto_backup.auto_backup(scene) - + # No backup should be created since scene has no filename @@ -100,10 +100,10 @@ async def test_auto_backup_no_name(mock_config, temp_save_dir): scene.filename = "test.json" scene.name = None scene.save_dir = temp_save_dir - - with patch('talemate.auto_backup.get_config', return_value=mock_config): + + with patch("talemate.auto_backup.get_config", return_value=mock_config): await auto_backup.auto_backup(scene) - + # No backup directory should be created backups_dir = os.path.join(temp_save_dir, "backups") assert not os.path.exists(backups_dir) @@ -112,22 +112,24 @@ async def test_auto_backup_no_name(mock_config, temp_save_dir): @pytest.mark.asyncio async def test_auto_backup_creates_backup(mock_scene, mock_config): """Test that auto backup creates a backup file.""" - with patch('talemate.auto_backup.get_config', return_value=mock_config): + with patch("talemate.auto_backup.get_config", return_value=mock_config): await auto_backup.auto_backup(mock_scene) - + # Check that backup directory was created backups_dir = os.path.join(mock_scene.save_dir, "backups") assert os.path.exists(backups_dir) - + # Check that a backup file was created - backup_files = [f for f in os.listdir(backups_dir) if f.startswith("test_scene_backup_")] + backup_files = [ + f for f in os.listdir(backups_dir) if f.startswith("test_scene_backup_") + ] assert len(backup_files) == 1 - + # Check backup file content backup_path = os.path.join(backups_dir, backup_files[0]) - with open(backup_path, 'r') as f: + with open(backup_path, "r") as f: backup_data = json.load(f) - + assert backup_data["name"] == "Test Scene" assert backup_data["description"] == "A test scene" @@ -135,19 +137,19 @@ async def test_auto_backup_creates_backup(mock_scene, mock_config): @pytest.mark.asyncio async def test_auto_backup_filename_format(mock_scene, mock_config): """Test that backup filenames follow the expected format.""" - with patch('talemate.auto_backup.get_config', return_value=mock_config): + with patch("talemate.auto_backup.get_config", return_value=mock_config): # Mock datetime.now() to return a consistent timestamp mock_now = MagicMock() mock_now.strftime.return_value = "20250829_143022" - - with patch('talemate.auto_backup.datetime.datetime') as mock_datetime: + + with patch("talemate.auto_backup.datetime.datetime") as mock_datetime: mock_datetime.now.return_value = mock_now - + await auto_backup.auto_backup(mock_scene) - + backups_dir = os.path.join(mock_scene.save_dir, "backups") backup_files = os.listdir(backups_dir) - + assert len(backup_files) == 1 assert backup_files[0] == "test_scene_backup_20250829_143022.json" @@ -162,24 +164,26 @@ async def test_cleanup_old_backups(mock_scene, mock_config): """Test that old backups are cleaned up when exceeding max_backups.""" backups_dir = os.path.join(mock_scene.save_dir, "backups") os.makedirs(backups_dir) - + # Create 5 backup files (more than max_backups = 3) for i in range(5): backup_name = f"test_scene_backup_2025082{i}_143022.json" backup_path = os.path.join(backups_dir, backup_name) - with open(backup_path, 'w') as f: + with open(backup_path, "w") as f: json.dump({"backup": i}, f) - - with patch('talemate.auto_backup.get_config', return_value=mock_config): + + with patch("talemate.auto_backup.get_config", return_value=mock_config): await auto_backup.auto_backup(mock_scene) - + # Should have max_backups + 1 (newly created) = 4 files initially # But cleanup should reduce it to max_backups = 3 - backup_files = [f for f in os.listdir(backups_dir) if f.startswith("test_scene_backup_")] + backup_files = [ + f for f in os.listdir(backups_dir) if f.startswith("test_scene_backup_") + ] assert len(backup_files) <= 3 -@pytest.mark.asyncio +@pytest.mark.asyncio async def test_cleanup_zero_max_backups(mock_scene): """Test that cleanup handles zero max_backups correctly.""" config = Config() @@ -187,22 +191,24 @@ async def test_cleanup_zero_max_backups(mock_scene): config.game.general = General() config.game.general.auto_backup = True config.game.general.auto_backup_max_backups = 0 - + backups_dir = os.path.join(mock_scene.save_dir, "backups") os.makedirs(backups_dir) - + # Create some backup files for i in range(3): backup_name = f"test_scene_backup_2025082{i}_143022.json" backup_path = os.path.join(backups_dir, backup_name) - with open(backup_path, 'w') as f: + with open(backup_path, "w") as f: json.dump({"backup": i}, f) - - with patch('talemate.auto_backup.get_config', return_value=config): + + with patch("talemate.auto_backup.get_config", return_value=config): await auto_backup.auto_backup(mock_scene) - + # All old backups should remain since max_backups=0 means no cleanup - backup_files = [f for f in os.listdir(backups_dir) if f.startswith("test_scene_backup_")] + backup_files = [ + f for f in os.listdir(backups_dir) if f.startswith("test_scene_backup_") + ] assert len(backup_files) == 4 # 3 original + 1 new @@ -211,29 +217,33 @@ async def test_cleanup_different_scenes(mock_config, temp_save_dir): """Test that cleanup only affects backups for the same scene.""" backups_dir = os.path.join(temp_save_dir, "backups") os.makedirs(backups_dir) - + # Create backups for different scenes for scene_name in ["scene1", "scene2", "test_scene"]: for i in range(2): backup_name = f"{scene_name}_backup_2025082{i}_143022.json" backup_path = os.path.join(backups_dir, backup_name) - with open(backup_path, 'w') as f: + with open(backup_path, "w") as f: json.dump({"scene": scene_name, "backup": i}, f) - + # Create scene for test_scene scene = types.SimpleNamespace() scene.name = "Test Scene" scene.filename = "test_scene.json" scene.save_dir = temp_save_dir scene.serialize = {"name": "Test Scene"} - - with patch('talemate.auto_backup.get_config', return_value=mock_config): + + with patch("talemate.auto_backup.get_config", return_value=mock_config): await auto_backup.auto_backup(scene) - + # Check that other scene backups weren't affected - scene1_backups = [f for f in os.listdir(backups_dir) if f.startswith("scene1_backup_")] - scene2_backups = [f for f in os.listdir(backups_dir) if f.startswith("scene2_backup_")] - + scene1_backups = [ + f for f in os.listdir(backups_dir) if f.startswith("scene1_backup_") + ] + scene2_backups = [ + f for f in os.listdir(backups_dir) if f.startswith("scene2_backup_") + ] + assert len(scene1_backups) == 2 assert len(scene2_backups) == 2 @@ -249,7 +259,7 @@ def test_get_backup_files_no_scene_filename(temp_save_dir): scene.filename = None scene.name = "Test Scene" scene.save_dir = temp_save_dir - + result = auto_backup.get_backup_files(scene) assert result == [] @@ -264,7 +274,7 @@ def test_get_backup_files_empty_backups_dir(mock_scene): """Test get_backup_files returns empty list when backups directory is empty.""" backups_dir = os.path.join(mock_scene.save_dir, "backups") os.makedirs(backups_dir) - + result = auto_backup.get_backup_files(mock_scene) assert result == [] @@ -273,26 +283,26 @@ def test_get_backup_files_with_backups(mock_scene): """Test get_backup_files returns correct backup file information.""" backups_dir = os.path.join(mock_scene.save_dir, "backups") os.makedirs(backups_dir) - + # Create backup files backup_names = [ "test_scene_backup_20250829_143022.json", - "test_scene_backup_20250829_143045.json" + "test_scene_backup_20250829_143045.json", ] - + for name in backup_names: backup_path = os.path.join(backups_dir, name) - with open(backup_path, 'w') as f: + with open(backup_path, "w") as f: json.dump({"test": "data"}, f) - + result = auto_backup.get_backup_files(mock_scene) - + assert len(result) == 2 - + # Check that results are sorted by timestamp (newest first) timestamps = [backup["timestamp"] for backup in result] assert timestamps == sorted(timestamps, reverse=True) - + # Check that each backup has required fields for backup in result: assert "name" in backup @@ -306,22 +316,22 @@ def test_get_backup_files_filters_other_scenes(mock_scene): """Test get_backup_files only returns backups for the correct scene.""" backups_dir = os.path.join(mock_scene.save_dir, "backups") os.makedirs(backups_dir) - + # Create backup files for different scenes files = [ "test_scene_backup_20250829_143022.json", # Should be included "other_scene_backup_20250829_143022.json", # Should be excluded "test_scene_backup_20250829_143045.json", # Should be included - "random_file.json" # Should be excluded + "random_file.json", # Should be excluded ] - + for name in files: backup_path = os.path.join(backups_dir, name) - with open(backup_path, 'w') as f: + with open(backup_path, "w") as f: json.dump({"test": "data"}, f) - + result = auto_backup.get_backup_files(mock_scene) - + # Should only return the 2 backups for test_scene assert len(result) == 2 for backup in result: @@ -336,11 +346,11 @@ def test_get_backup_files_filters_other_scenes(mock_scene): @pytest.mark.asyncio async def test_auto_backup_file_write_error(mock_scene, mock_config): """Test auto backup handles file write errors gracefully.""" - with patch('talemate.auto_backup.get_config', return_value=mock_config): - with patch('builtins.open', side_effect=PermissionError("Permission denied")): + with patch("talemate.auto_backup.get_config", return_value=mock_config): + with patch("builtins.open", side_effect=PermissionError("Permission denied")): # Should not raise an exception, just log the error await auto_backup.auto_backup(mock_scene) - + # Verify no backup was created due to the error backups_dir = os.path.join(mock_scene.save_dir, "backups") if os.path.exists(backups_dir): @@ -353,29 +363,31 @@ async def test_cleanup_handles_removal_errors(mock_scene, mock_config): """Test that cleanup handles file removal errors gracefully.""" backups_dir = os.path.join(mock_scene.save_dir, "backups") os.makedirs(backups_dir) - + # Create more backup files than max_backups for i in range(5): backup_name = f"test_scene_backup_2025082{i}_143022.json" backup_path = os.path.join(backups_dir, backup_name) - with open(backup_path, 'w') as f: + with open(backup_path, "w") as f: json.dump({"backup": i}, f) - - with patch('talemate.auto_backup.get_config', return_value=mock_config): - with patch('os.remove', side_effect=OSError("Cannot remove file")): + + with patch("talemate.auto_backup.get_config", return_value=mock_config): + with patch("os.remove", side_effect=OSError("Cannot remove file")): # Should not raise an exception, just log the error try: await auto_backup.auto_backup(mock_scene) except OSError: - pytest.fail("auto_backup cleanup should handle file removal errors gracefully") + pytest.fail( + "auto_backup cleanup should handle file removal errors gracefully" + ) def test_get_backup_files_handles_listdir_error(mock_scene): """Test get_backup_files handles directory listing errors gracefully.""" backups_dir = os.path.join(mock_scene.save_dir, "backups") os.makedirs(backups_dir) - - with patch('os.listdir', side_effect=OSError("Cannot list directory")): + + with patch("os.listdir", side_effect=OSError("Cannot list directory")): result = auto_backup.get_backup_files(mock_scene) assert result == [] @@ -388,22 +400,24 @@ def test_get_backup_files_handles_listdir_error(mock_scene): @pytest.mark.asyncio async def test_multiple_backup_cycles(mock_scene, mock_config): """Test multiple backup cycles work correctly.""" - with patch('talemate.auto_backup.get_config', return_value=mock_config): + with patch("talemate.auto_backup.get_config", return_value=mock_config): # Create multiple backups for i in range(5): mock_scene.serialize = {"backup_number": i, "name": "Test Scene"} await auto_backup.auto_backup(mock_scene) - + backups_dir = os.path.join(mock_scene.save_dir, "backups") - backup_files = [f for f in os.listdir(backups_dir) if f.startswith("test_scene_backup_")] - + backup_files = [ + f for f in os.listdir(backups_dir) if f.startswith("test_scene_backup_") + ] + # Should respect max_backups = 3 assert len(backup_files) <= 3 - + # Verify the content of remaining backups for backup_file in backup_files: backup_path = os.path.join(backups_dir, backup_file) - with open(backup_path, 'r') as f: + with open(backup_path, "r") as f: backup_data = json.load(f) assert "backup_number" in backup_data - assert backup_data["name"] == "Test Scene" \ No newline at end of file + assert backup_data["name"] == "Test Scene"