"""Simulation logger for detailed step-by-step logging. Performance-optimized: logging can be disabled or reduced via config. """ import json import logging from dataclasses import dataclass, field, asdict from datetime import datetime from pathlib import Path from typing import Optional, TextIO @dataclass class AgentLogEntry: """Log entry for a single agent's turn.""" agent_id: str agent_name: str profession: str position: dict stats_before: dict stats_after: dict decision: dict action_result: dict inventory_before: list inventory_after: list money_before: int money_after: int @dataclass class TurnLogEntry: """Complete log entry for a simulation turn.""" turn: int day: int step_in_day: int time_of_day: str timestamp: str agent_entries: list[AgentLogEntry] = field(default_factory=list) market_orders_before: list = field(default_factory=list) market_orders_after: list = field(default_factory=list) trades: list = field(default_factory=list) deaths: list = field(default_factory=list) statistics: dict = field(default_factory=dict) def to_dict(self) -> dict: """Convert to dictionary for JSON serialization.""" return { "turn": self.turn, "day": self.day, "step_in_day": self.step_in_day, "time_of_day": self.time_of_day, "timestamp": self.timestamp, "agent_entries": [asdict(e) for e in self.agent_entries], "market_orders_before": self.market_orders_before, "market_orders_after": self.market_orders_after, "trades": self.trades, "deaths": self.deaths, "statistics": self.statistics, } class SimulationLogger: """Logger that dumps detailed simulation data to files. Performance optimized: - Logging can be disabled entirely via config - File flushing is batched (not every turn) - Agent lookups use O(1) dict instead of O(n) list search - No in-memory accumulation of all entries """ def __init__(self, log_dir: str = "logs"): self.log_dir = Path(log_dir) # Load performance config from backend.config import get_config perf_config = get_config().performance self.logging_enabled = perf_config.logging_enabled self.detailed_logging = perf_config.detailed_logging self.flush_interval = perf_config.log_flush_interval # File handles (only created if logging enabled) self._json_file: Optional[TextIO] = None self._summary_file: Optional[TextIO] = None # Standard Python logging (minimal overhead even when enabled) self.logger = logging.getLogger("simulation") self.logger.setLevel(logging.WARNING) # Only warnings by default # Current turn tracking self._current_entry: Optional[TurnLogEntry] = None # O(1) lookup for agent entries by ID self._agent_entry_map: dict[str, AgentLogEntry] = {} # Turn counter for flush batching self._turns_since_flush = 0 def start_session(self, config: dict) -> None: """Start a new logging session.""" if not self.logging_enabled: return self.log_dir.mkdir(exist_ok=True) # Create session-specific log file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") session_file = self.log_dir / f"sim_{timestamp}.jsonl" summary_file = self.log_dir / f"sim_{timestamp}_summary.txt" self._json_file = open(session_file, "w") self._summary_file = open(summary_file, "w") # Write config as first line self._json_file.write(json.dumps({"type": "config", "data": config}) + "\n") self._summary_file.write(f"Simulation Session Started: {datetime.now()}\n") self._summary_file.write("=" * 60 + "\n\n") if self.detailed_logging: # Set up file handler for detailed logs file_handler = logging.FileHandler(self.log_dir / f"sim_{timestamp}.log") file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(logging.Formatter( "%(asctime)s | %(levelname)s | %(message)s" )) self.logger.addHandler(file_handler) self.logger.setLevel(logging.DEBUG) def start_turn(self, turn: int, day: int, step_in_day: int, time_of_day: str) -> None: """Start logging a new turn.""" if not self.logging_enabled: return self._current_entry = TurnLogEntry( turn=turn, day=day, step_in_day=step_in_day, time_of_day=time_of_day, timestamp=datetime.now().isoformat(), ) self._agent_entry_map.clear() if self.detailed_logging: self.logger.debug(f"Turn {turn} started (Day {day}, Step {step_in_day}, {time_of_day})") def log_agent_before( self, agent_id: str, agent_name: str, profession: str, position: dict, stats: dict, inventory: list, money: int, ) -> None: """Log agent state before action.""" if not self.logging_enabled or self._current_entry is None: return # Create entry and add to both list and map entry = AgentLogEntry( agent_id=agent_id, agent_name=agent_name, profession=profession, position=position, stats_before=stats, stats_after={}, decision={}, action_result={}, inventory_before=inventory, inventory_after=[], money_before=money, money_after=money, ) self._current_entry.agent_entries.append(entry) self._agent_entry_map[agent_id] = entry def log_agent_decision(self, agent_id: str, decision: dict) -> None: """Log agent's AI decision.""" if not self.logging_enabled or self._current_entry is None: return # O(1) lookup instead of O(n) search entry = self._agent_entry_map.get(agent_id) if entry: entry.decision = decision if self.detailed_logging: self.logger.debug( f" {entry.agent_name}: decided to {decision.get('action', '?')} " f"- {decision.get('reason', '')}" ) def log_agent_after( self, agent_id: str, stats: dict, inventory: list, money: int, position: dict, action_result: dict, ) -> None: """Log agent state after action.""" if not self.logging_enabled or self._current_entry is None: return # O(1) lookup instead of O(n) search entry = self._agent_entry_map.get(agent_id) if entry: entry.stats_after = stats entry.inventory_after = inventory entry.money_after = money entry.position = position entry.action_result = action_result def log_market_state(self, orders_before: list, orders_after: list) -> None: """Log market state.""" if not self.logging_enabled or self._current_entry is None: return self._current_entry.market_orders_before = orders_before self._current_entry.market_orders_after = orders_after def log_trade(self, trade: dict) -> None: """Log a trade transaction.""" if not self.logging_enabled or self._current_entry is None: return self._current_entry.trades.append(trade) if self.detailed_logging: self.logger.debug(f" Trade: {trade.get('message', 'Unknown trade')}") def log_death(self, agent_name: str, cause: str) -> None: """Log an agent death.""" if not self.logging_enabled or self._current_entry is None: return self._current_entry.deaths.append({"name": agent_name, "cause": cause}) # Always log deaths even without detailed logging self.logger.info(f" DEATH: {agent_name} died from {cause}") def log_event(self, event_type: str, event_data: dict) -> None: """Log a general event (births, random events, etc.).""" if not self.logging_enabled or self._current_entry is None: return if event_type == "birth": self.logger.info( f" BIRTH: {event_data.get('child_name', '?')} born to {event_data.get('parent_name', '?')}" ) elif event_type == "random_event" and self.detailed_logging: self.logger.info( f" EVENT: {event_data.get('type', '?')} affecting {event_data.get('affected', [])}" ) elif self.detailed_logging: self.logger.debug(f" Event [{event_type}]: {event_data}") def log_statistics(self, stats: dict) -> None: """Log end-of-turn statistics.""" if not self.logging_enabled or self._current_entry is None: return self._current_entry.statistics = stats def end_turn(self) -> None: """Finish logging the current turn and write to file.""" if not self.logging_enabled or self._current_entry is None: return # Write to JSON lines file (without flush every time) if self._json_file: self._json_file.write( json.dumps({"type": "turn", "data": self._current_entry.to_dict()}) + "\n" ) # Write summary (without flush every time) if self._summary_file: entry = self._current_entry self._summary_file.write( f"Turn {entry.turn} | Day {entry.day} Step {entry.step_in_day} ({entry.time_of_day})\n" ) if self.detailed_logging: for agent in entry.agent_entries: action = agent.decision.get("action", "?") result = "✓" if agent.action_result.get("success", False) else "✗" self._summary_file.write( f" [{agent.agent_name}] {action} {result} | " f"E:{agent.stats_after.get('energy', '?')} " f"H:{agent.stats_after.get('hunger', '?')} " f"T:{agent.stats_after.get('thirst', '?')} " f"${agent.money_after}\n" ) if entry.deaths: for death in entry.deaths: self._summary_file.write(f" 💀 {death['name']} died: {death['cause']}\n") self._summary_file.write("\n") # Batched flush - only flush every N turns self._turns_since_flush += 1 if self._turns_since_flush >= self.flush_interval: self._flush_files() self._turns_since_flush = 0 # Clear current entry (don't accumulate in memory) self._current_entry = None self._agent_entry_map.clear() def _flush_files(self) -> None: """Flush file buffers to disk.""" if self._json_file: self._json_file.flush() if self._summary_file: self._summary_file.flush() def close(self) -> None: """Close log files.""" if self._json_file: self._json_file.close() self._json_file = None if self._summary_file: self._summary_file.write(f"\nSession ended: {datetime.now()}\n") self._summary_file.close() self._summary_file = None def get_entries(self) -> list[TurnLogEntry]: """Get all logged entries. Note: Returns empty list when logging optimized (entries not kept in memory). """ return [] # Global logger instance _logger: Optional[SimulationLogger] = None def get_simulation_logger() -> SimulationLogger: """Get the global simulation logger.""" global _logger if _logger is None: _logger = SimulationLogger() return _logger def reset_simulation_logger() -> SimulationLogger: """Reset and create a new simulation logger.""" global _logger if _logger: _logger.close() _logger = SimulationLogger() return _logger