"""Simulation logger for detailed step-by-step logging. Performance-optimized with non-blocking I/O: - Logging can be disabled or reduced via config - File writes happen in a background thread (producer-consumer pattern) - Agent lookups use O(1) dict instead of O(n) list search - No in-memory accumulation of all entries """ import json import logging import threading import queue import atexit from dataclasses import dataclass, field, asdict from datetime import datetime from pathlib import Path from typing import Optional, TextIO @dataclass class AgentLogEntry: """Log entry for a single agent's turn.""" agent_id: str agent_name: str profession: str position: dict stats_before: dict stats_after: dict decision: dict action_result: dict inventory_before: list inventory_after: list money_before: int money_after: int @dataclass class TurnLogEntry: """Complete log entry for a simulation turn.""" turn: int day: int step_in_day: int time_of_day: str timestamp: str agent_entries: list[AgentLogEntry] = field(default_factory=list) market_orders_before: list = field(default_factory=list) market_orders_after: list = field(default_factory=list) trades: list = field(default_factory=list) deaths: list = field(default_factory=list) statistics: dict = field(default_factory=dict) def to_dict(self) -> dict: """Convert to dictionary for JSON serialization.""" return { "turn": self.turn, "day": self.day, "step_in_day": self.step_in_day, "time_of_day": self.time_of_day, "timestamp": self.timestamp, "agent_entries": [asdict(e) for e in self.agent_entries], "market_orders_before": self.market_orders_before, "market_orders_after": self.market_orders_after, "trades": self.trades, "deaths": self.deaths, "statistics": self.statistics, } class AsyncLogWriter: """Background thread that handles file I/O asynchronously. Uses a producer-consumer pattern to decouple log generation from file writing, preventing I/O from blocking the simulation. """ def __init__(self, max_queue_size: int = 1000): self._queue: queue.Queue = queue.Queue(maxsize=max_queue_size) self._stop_event = threading.Event() self._thread: Optional[threading.Thread] = None self._files: dict[str, TextIO] = {} self._lock = threading.Lock() def start(self) -> None: """Start the background writer thread.""" if self._thread is not None and self._thread.is_alive(): return self._stop_event.clear() self._thread = threading.Thread(target=self._writer_loop, daemon=True) self._thread.start() def stop(self, timeout: float = 2.0) -> None: """Stop the background writer thread and flush remaining items.""" self._stop_event.set() if self._thread is not None: self._thread.join(timeout=timeout) self._thread = None # Process any remaining items in the queue self._drain_queue() # Close all files with self._lock: for f in self._files.values(): try: f.close() except Exception: pass self._files.clear() def _writer_loop(self) -> None: """Main loop for the background writer thread.""" while not self._stop_event.is_set(): try: # Wait for items with timeout to allow stop checks item = self._queue.get(timeout=0.1) self._process_item(item) self._queue.task_done() except queue.Empty: continue except Exception as e: # Log errors but don't crash the thread logging.getLogger("simulation").warning(f"Async log writer error: {e}") def _process_item(self, item: dict) -> None: """Process a single log item.""" action = item.get("action") if action == "open": self._open_file(item["path"], item["file_id"]) elif action == "write": self._write_to_file(item["file_id"], item["data"]) elif action == "flush": self._flush_file(item.get("file_id")) elif action == "close": self._close_file(item["file_id"]) def _open_file(self, path: str, file_id: str) -> None: """Open a file for writing.""" with self._lock: if file_id not in self._files: self._files[file_id] = open(path, "w", encoding="utf-8") def _write_to_file(self, file_id: str, data: str) -> None: """Write data to a file.""" with self._lock: f = self._files.get(file_id) if f: f.write(data) def _flush_file(self, file_id: Optional[str] = None) -> None: """Flush file(s) to disk.""" with self._lock: if file_id: f = self._files.get(file_id) if f: f.flush() else: for f in self._files.values(): f.flush() def _close_file(self, file_id: str) -> None: """Close a file.""" with self._lock: f = self._files.pop(file_id, None) if f: f.close() def _drain_queue(self) -> None: """Process all remaining items in the queue.""" while True: try: item = self._queue.get_nowait() self._process_item(item) self._queue.task_done() except queue.Empty: break def enqueue(self, item: dict) -> bool: """Add an item to the write queue. Returns False if queue is full (item dropped). """ try: self._queue.put_nowait(item) return True except queue.Full: return False def open_file(self, path: str, file_id: str) -> None: """Queue a file open operation.""" self.enqueue({"action": "open", "path": path, "file_id": file_id}) def write(self, file_id: str, data: str) -> None: """Queue a write operation.""" self.enqueue({"action": "write", "file_id": file_id, "data": data}) def flush(self, file_id: Optional[str] = None) -> None: """Queue a flush operation.""" self.enqueue({"action": "flush", "file_id": file_id}) def close_file(self, file_id: str) -> None: """Queue a file close operation.""" self.enqueue({"action": "close", "file_id": file_id}) class SimulationLogger: """Logger that dumps detailed simulation data to files. Performance optimized: - Logging can be disabled entirely via config - File writes happen in background thread (non-blocking) - Agent lookups use O(1) dict instead of O(n) list search - No in-memory accumulation of all entries """ def __init__(self, log_dir: str = "logs"): self.log_dir = Path(log_dir) # Load performance config from backend.config import get_config perf_config = get_config().performance self.logging_enabled = perf_config.logging_enabled self.detailed_logging = perf_config.detailed_logging self.flush_interval = perf_config.log_flush_interval # Get async logging config (default to True if available) self.async_logging = getattr(perf_config, 'async_logging', True) # Async writer (only created if logging enabled) self._async_writer: Optional[AsyncLogWriter] = None # File IDs for async writing self._json_file_id: Optional[str] = None self._summary_file_id: Optional[str] = None # Fallback: synchronous file handles (used if async disabled) self._json_file: Optional[TextIO] = None self._summary_file: Optional[TextIO] = None # Standard Python logging (minimal overhead even when enabled) self.logger = logging.getLogger("simulation") self.logger.setLevel(logging.WARNING) # Only warnings by default # Current turn tracking self._current_entry: Optional[TurnLogEntry] = None # O(1) lookup for agent entries by ID self._agent_entry_map: dict[str, AgentLogEntry] = {} # Turn counter for flush batching self._turns_since_flush = 0 # Stats self._items_queued = 0 self._items_dropped = 0 def start_session(self, config: dict) -> None: """Start a new logging session.""" if not self.logging_enabled: return self.log_dir.mkdir(exist_ok=True) # Create session-specific log file paths timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") json_path = self.log_dir / f"sim_{timestamp}.jsonl" summary_path = self.log_dir / f"sim_{timestamp}_summary.txt" if self.async_logging: # Use async writer self._async_writer = AsyncLogWriter() self._async_writer.start() self._json_file_id = f"json_{timestamp}" self._summary_file_id = f"summary_{timestamp}" self._async_writer.open_file(str(json_path), self._json_file_id) self._async_writer.open_file(str(summary_path), self._summary_file_id) # Write initial data self._async_writer.write( self._json_file_id, json.dumps({"type": "config", "data": config}) + "\n" ) self._async_writer.write( self._summary_file_id, f"Simulation Session Started: {datetime.now()}\n" + "=" * 60 + "\n\n" ) else: # Use synchronous file handles self._json_file = open(json_path, "w") self._summary_file = open(summary_path, "w") self._json_file.write(json.dumps({"type": "config", "data": config}) + "\n") self._summary_file.write(f"Simulation Session Started: {datetime.now()}\n") self._summary_file.write("=" * 60 + "\n\n") if self.detailed_logging: # Set up file handler for detailed logs file_handler = logging.FileHandler(self.log_dir / f"sim_{timestamp}.log") file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(logging.Formatter( "%(asctime)s | %(levelname)s | %(message)s" )) self.logger.addHandler(file_handler) self.logger.setLevel(logging.DEBUG) def start_turn(self, turn: int, day: int, step_in_day: int, time_of_day: str) -> None: """Start logging a new turn.""" if not self.logging_enabled: return self._current_entry = TurnLogEntry( turn=turn, day=day, step_in_day=step_in_day, time_of_day=time_of_day, timestamp=datetime.now().isoformat(), ) self._agent_entry_map.clear() if self.detailed_logging: self.logger.debug(f"Turn {turn} started (Day {day}, Step {step_in_day}, {time_of_day})") def log_agent_before( self, agent_id: str, agent_name: str, profession: str, position: dict, stats: dict, inventory: list, money: int, ) -> None: """Log agent state before action.""" if not self.logging_enabled or self._current_entry is None: return # Create entry and add to both list and map entry = AgentLogEntry( agent_id=agent_id, agent_name=agent_name, profession=profession, position=position, stats_before=stats, stats_after={}, decision={}, action_result={}, inventory_before=inventory, inventory_after=[], money_before=money, money_after=money, ) self._current_entry.agent_entries.append(entry) self._agent_entry_map[agent_id] = entry def log_agent_decision(self, agent_id: str, decision: dict) -> None: """Log agent's AI decision.""" if not self.logging_enabled or self._current_entry is None: return # O(1) lookup instead of O(n) search entry = self._agent_entry_map.get(agent_id) if entry: entry.decision = decision if self.detailed_logging: self.logger.debug( f" {entry.agent_name}: decided to {decision.get('action', '?')} " f"- {decision.get('reason', '')}" ) def log_agent_after( self, agent_id: str, stats: dict, inventory: list, money: int, position: dict, action_result: dict, ) -> None: """Log agent state after action.""" if not self.logging_enabled or self._current_entry is None: return # O(1) lookup instead of O(n) search entry = self._agent_entry_map.get(agent_id) if entry: entry.stats_after = stats entry.inventory_after = inventory entry.money_after = money entry.position = position entry.action_result = action_result def log_market_state(self, orders_before: list, orders_after: list) -> None: """Log market state.""" if not self.logging_enabled or self._current_entry is None: return self._current_entry.market_orders_before = orders_before self._current_entry.market_orders_after = orders_after def log_trade(self, trade: dict) -> None: """Log a trade transaction.""" if not self.logging_enabled or self._current_entry is None: return self._current_entry.trades.append(trade) if self.detailed_logging: self.logger.debug(f" Trade: {trade.get('message', 'Unknown trade')}") def log_death(self, agent_name: str, cause: str) -> None: """Log an agent death.""" if not self.logging_enabled or self._current_entry is None: return self._current_entry.deaths.append({"name": agent_name, "cause": cause}) # Always log deaths even without detailed logging self.logger.info(f" DEATH: {agent_name} died from {cause}") def log_event(self, event_type: str, event_data: dict) -> None: """Log a general event (births, random events, etc.).""" if not self.logging_enabled or self._current_entry is None: return if event_type == "birth": self.logger.info( f" BIRTH: {event_data.get('child_name', '?')} born to {event_data.get('parent_name', '?')}" ) elif event_type == "random_event" and self.detailed_logging: self.logger.info( f" EVENT: {event_data.get('type', '?')} affecting {event_data.get('affected', [])}" ) elif self.detailed_logging: self.logger.debug(f" Event [{event_type}]: {event_data}") def log_statistics(self, stats: dict) -> None: """Log end-of-turn statistics.""" if not self.logging_enabled or self._current_entry is None: return self._current_entry.statistics = stats def end_turn(self) -> None: """Finish logging the current turn and write to file.""" if not self.logging_enabled or self._current_entry is None: return entry = self._current_entry # Prepare data json_data = json.dumps({"type": "turn", "data": entry.to_dict()}) + "\n" summary_lines = [f"Turn {entry.turn} | Day {entry.day} Step {entry.step_in_day} ({entry.time_of_day})\n"] if self.detailed_logging: for agent in entry.agent_entries: action = agent.decision.get("action", "?") result = "+" if agent.action_result.get("success", False) else "-" summary_lines.append( f" [{agent.agent_name}] {action} {result} | " f"E:{agent.stats_after.get('energy', '?')} " f"H:{agent.stats_after.get('hunger', '?')} " f"T:{agent.stats_after.get('thirst', '?')} " f"${agent.money_after}\n" ) if entry.deaths: for death in entry.deaths: summary_lines.append(f" X {death['name']} died: {death['cause']}\n") summary_lines.append("\n") summary_data = "".join(summary_lines) if self.async_logging and self._async_writer: # Non-blocking write self._async_writer.write(self._json_file_id, json_data) self._async_writer.write(self._summary_file_id, summary_data) self._items_queued += 2 else: # Synchronous write if self._json_file: self._json_file.write(json_data) if self._summary_file: self._summary_file.write(summary_data) # Batched flush - only flush every N turns self._turns_since_flush += 1 if self._turns_since_flush >= self.flush_interval: self._flush_files() self._turns_since_flush = 0 # Clear current entry (don't accumulate in memory) self._current_entry = None self._agent_entry_map.clear() def _flush_files(self) -> None: """Flush file buffers to disk.""" if self.async_logging and self._async_writer: self._async_writer.flush() else: if self._json_file: self._json_file.flush() if self._summary_file: self._summary_file.flush() def close(self) -> None: """Close log files.""" if self.async_logging and self._async_writer: # Write final message before closing if self._summary_file_id: self._async_writer.write( self._summary_file_id, f"\nSession ended: {datetime.now()}\n" ) # Close files if self._json_file_id: self._async_writer.close_file(self._json_file_id) if self._summary_file_id: self._async_writer.close_file(self._summary_file_id) # Stop the writer thread self._async_writer.stop() self._async_writer = None else: if self._json_file: self._json_file.close() self._json_file = None if self._summary_file: self._summary_file.write(f"\nSession ended: {datetime.now()}\n") self._summary_file.close() self._summary_file = None def get_entries(self) -> list[TurnLogEntry]: """Get all logged entries. Note: Returns empty list when logging optimized (entries not kept in memory). """ return [] def get_stats(self) -> dict: """Get logging statistics.""" return { "logging_enabled": self.logging_enabled, "async_logging": self.async_logging, "items_queued": self._items_queued, "items_dropped": self._items_dropped, } # Global logger instance _logger: Optional[SimulationLogger] = None def get_simulation_logger() -> SimulationLogger: """Get the global simulation logger.""" global _logger if _logger is None: _logger = SimulationLogger() return _logger def reset_simulation_logger() -> SimulationLogger: """Reset and create a new simulation logger.""" global _logger if _logger: _logger.close() _logger = SimulationLogger() return _logger # Ensure logger is closed on exit @atexit.register def _cleanup_logger(): global _logger if _logger: _logger.close() _logger = None