villsim/backend/core/logger.py

351 lines
12 KiB
Python

"""Simulation logger for detailed step-by-step logging.
Performance-optimized: logging can be disabled or reduced via config.
"""
import json
import logging
from dataclasses import dataclass, field, asdict
from datetime import datetime
from pathlib import Path
from typing import Optional, TextIO
@dataclass
class AgentLogEntry:
"""Log entry for a single agent's turn."""
agent_id: str
agent_name: str
profession: str
position: dict
stats_before: dict
stats_after: dict
decision: dict
action_result: dict
inventory_before: list
inventory_after: list
money_before: int
money_after: int
@dataclass
class TurnLogEntry:
"""Complete log entry for a simulation turn."""
turn: int
day: int
step_in_day: int
time_of_day: str
timestamp: str
agent_entries: list[AgentLogEntry] = field(default_factory=list)
market_orders_before: list = field(default_factory=list)
market_orders_after: list = field(default_factory=list)
trades: list = field(default_factory=list)
deaths: list = field(default_factory=list)
statistics: dict = field(default_factory=dict)
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
"turn": self.turn,
"day": self.day,
"step_in_day": self.step_in_day,
"time_of_day": self.time_of_day,
"timestamp": self.timestamp,
"agent_entries": [asdict(e) for e in self.agent_entries],
"market_orders_before": self.market_orders_before,
"market_orders_after": self.market_orders_after,
"trades": self.trades,
"deaths": self.deaths,
"statistics": self.statistics,
}
class SimulationLogger:
"""Logger that dumps detailed simulation data to files.
Performance optimized:
- Logging can be disabled entirely via config
- File flushing is batched (not every turn)
- Agent lookups use O(1) dict instead of O(n) list search
- No in-memory accumulation of all entries
"""
def __init__(self, log_dir: str = "logs"):
self.log_dir = Path(log_dir)
# Load performance config
from backend.config import get_config
perf_config = get_config().performance
self.logging_enabled = perf_config.logging_enabled
self.detailed_logging = perf_config.detailed_logging
self.flush_interval = perf_config.log_flush_interval
# File handles (only created if logging enabled)
self._json_file: Optional[TextIO] = None
self._summary_file: Optional[TextIO] = None
# Standard Python logging (minimal overhead even when enabled)
self.logger = logging.getLogger("simulation")
self.logger.setLevel(logging.WARNING) # Only warnings by default
# Current turn tracking
self._current_entry: Optional[TurnLogEntry] = None
# O(1) lookup for agent entries by ID
self._agent_entry_map: dict[str, AgentLogEntry] = {}
# Turn counter for flush batching
self._turns_since_flush = 0
def start_session(self, config: dict) -> None:
"""Start a new logging session."""
if not self.logging_enabled:
return
self.log_dir.mkdir(exist_ok=True)
# Create session-specific log file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
session_file = self.log_dir / f"sim_{timestamp}.jsonl"
summary_file = self.log_dir / f"sim_{timestamp}_summary.txt"
self._json_file = open(session_file, "w")
self._summary_file = open(summary_file, "w")
# Write config as first line
self._json_file.write(json.dumps({"type": "config", "data": config}) + "\n")
self._summary_file.write(f"Simulation Session Started: {datetime.now()}\n")
self._summary_file.write("=" * 60 + "\n\n")
if self.detailed_logging:
# Set up file handler for detailed logs
file_handler = logging.FileHandler(self.log_dir / f"sim_{timestamp}.log")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(
"%(asctime)s | %(levelname)s | %(message)s"
))
self.logger.addHandler(file_handler)
self.logger.setLevel(logging.DEBUG)
def start_turn(self, turn: int, day: int, step_in_day: int, time_of_day: str) -> None:
"""Start logging a new turn."""
if not self.logging_enabled:
return
self._current_entry = TurnLogEntry(
turn=turn,
day=day,
step_in_day=step_in_day,
time_of_day=time_of_day,
timestamp=datetime.now().isoformat(),
)
self._agent_entry_map.clear()
if self.detailed_logging:
self.logger.debug(f"Turn {turn} started (Day {day}, Step {step_in_day}, {time_of_day})")
def log_agent_before(
self,
agent_id: str,
agent_name: str,
profession: str,
position: dict,
stats: dict,
inventory: list,
money: int,
) -> None:
"""Log agent state before action."""
if not self.logging_enabled or self._current_entry is None:
return
# Create entry and add to both list and map
entry = AgentLogEntry(
agent_id=agent_id,
agent_name=agent_name,
profession=profession,
position=position,
stats_before=stats,
stats_after={},
decision={},
action_result={},
inventory_before=inventory,
inventory_after=[],
money_before=money,
money_after=money,
)
self._current_entry.agent_entries.append(entry)
self._agent_entry_map[agent_id] = entry
def log_agent_decision(self, agent_id: str, decision: dict) -> None:
"""Log agent's AI decision."""
if not self.logging_enabled or self._current_entry is None:
return
# O(1) lookup instead of O(n) search
entry = self._agent_entry_map.get(agent_id)
if entry:
entry.decision = decision
if self.detailed_logging:
self.logger.debug(
f" {entry.agent_name}: decided to {decision.get('action', '?')} "
f"- {decision.get('reason', '')}"
)
def log_agent_after(
self,
agent_id: str,
stats: dict,
inventory: list,
money: int,
position: dict,
action_result: dict,
) -> None:
"""Log agent state after action."""
if not self.logging_enabled or self._current_entry is None:
return
# O(1) lookup instead of O(n) search
entry = self._agent_entry_map.get(agent_id)
if entry:
entry.stats_after = stats
entry.inventory_after = inventory
entry.money_after = money
entry.position = position
entry.action_result = action_result
def log_market_state(self, orders_before: list, orders_after: list) -> None:
"""Log market state."""
if not self.logging_enabled or self._current_entry is None:
return
self._current_entry.market_orders_before = orders_before
self._current_entry.market_orders_after = orders_after
def log_trade(self, trade: dict) -> None:
"""Log a trade transaction."""
if not self.logging_enabled or self._current_entry is None:
return
self._current_entry.trades.append(trade)
if self.detailed_logging:
self.logger.debug(f" Trade: {trade.get('message', 'Unknown trade')}")
def log_death(self, agent_name: str, cause: str) -> None:
"""Log an agent death."""
if not self.logging_enabled or self._current_entry is None:
return
self._current_entry.deaths.append({"name": agent_name, "cause": cause})
# Always log deaths even without detailed logging
self.logger.info(f" DEATH: {agent_name} died from {cause}")
def log_event(self, event_type: str, event_data: dict) -> None:
"""Log a general event (births, random events, etc.)."""
if not self.logging_enabled or self._current_entry is None:
return
if event_type == "birth":
self.logger.info(
f" BIRTH: {event_data.get('child_name', '?')} born to {event_data.get('parent_name', '?')}"
)
elif event_type == "random_event" and self.detailed_logging:
self.logger.info(
f" EVENT: {event_data.get('type', '?')} affecting {event_data.get('affected', [])}"
)
elif self.detailed_logging:
self.logger.debug(f" Event [{event_type}]: {event_data}")
def log_statistics(self, stats: dict) -> None:
"""Log end-of-turn statistics."""
if not self.logging_enabled or self._current_entry is None:
return
self._current_entry.statistics = stats
def end_turn(self) -> None:
"""Finish logging the current turn and write to file."""
if not self.logging_enabled or self._current_entry is None:
return
# Write to JSON lines file (without flush every time)
if self._json_file:
self._json_file.write(
json.dumps({"type": "turn", "data": self._current_entry.to_dict()}) + "\n"
)
# Write summary (without flush every time)
if self._summary_file:
entry = self._current_entry
self._summary_file.write(
f"Turn {entry.turn} | Day {entry.day} Step {entry.step_in_day} ({entry.time_of_day})\n"
)
if self.detailed_logging:
for agent in entry.agent_entries:
action = agent.decision.get("action", "?")
result = "" if agent.action_result.get("success", False) else ""
self._summary_file.write(
f" [{agent.agent_name}] {action} {result} | "
f"E:{agent.stats_after.get('energy', '?')} "
f"H:{agent.stats_after.get('hunger', '?')} "
f"T:{agent.stats_after.get('thirst', '?')} "
f"${agent.money_after}\n"
)
if entry.deaths:
for death in entry.deaths:
self._summary_file.write(f" 💀 {death['name']} died: {death['cause']}\n")
self._summary_file.write("\n")
# Batched flush - only flush every N turns
self._turns_since_flush += 1
if self._turns_since_flush >= self.flush_interval:
self._flush_files()
self._turns_since_flush = 0
# Clear current entry (don't accumulate in memory)
self._current_entry = None
self._agent_entry_map.clear()
def _flush_files(self) -> None:
"""Flush file buffers to disk."""
if self._json_file:
self._json_file.flush()
if self._summary_file:
self._summary_file.flush()
def close(self) -> None:
"""Close log files."""
if self._json_file:
self._json_file.close()
self._json_file = None
if self._summary_file:
self._summary_file.write(f"\nSession ended: {datetime.now()}\n")
self._summary_file.close()
self._summary_file = None
def get_entries(self) -> list[TurnLogEntry]:
"""Get all logged entries.
Note: Returns empty list when logging optimized (entries not kept in memory).
"""
return []
# Global logger instance
_logger: Optional[SimulationLogger] = None
def get_simulation_logger() -> SimulationLogger:
"""Get the global simulation logger."""
global _logger
if _logger is None:
_logger = SimulationLogger()
return _logger
def reset_simulation_logger() -> SimulationLogger:
"""Reset and create a new simulation logger."""
global _logger
if _logger:
_logger.close()
_logger = SimulationLogger()
return _logger