villsim/backend/core/logger.py

576 lines
20 KiB
Python

"""Simulation logger for detailed step-by-step logging.
Performance-optimized with non-blocking I/O:
- Logging can be disabled or reduced via config
- File writes happen in a background thread (producer-consumer pattern)
- Agent lookups use O(1) dict instead of O(n) list search
- No in-memory accumulation of all entries
"""
import json
import logging
import threading
import queue
import atexit
from dataclasses import dataclass, field, asdict
from datetime import datetime
from pathlib import Path
from typing import Optional, TextIO
@dataclass
class AgentLogEntry:
"""Log entry for a single agent's turn."""
agent_id: str
agent_name: str
profession: str
position: dict
stats_before: dict
stats_after: dict
decision: dict
action_result: dict
inventory_before: list
inventory_after: list
money_before: int
money_after: int
@dataclass
class TurnLogEntry:
"""Complete log entry for a simulation turn."""
turn: int
day: int
step_in_day: int
time_of_day: str
timestamp: str
agent_entries: list[AgentLogEntry] = field(default_factory=list)
market_orders_before: list = field(default_factory=list)
market_orders_after: list = field(default_factory=list)
trades: list = field(default_factory=list)
deaths: list = field(default_factory=list)
statistics: dict = field(default_factory=dict)
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
"turn": self.turn,
"day": self.day,
"step_in_day": self.step_in_day,
"time_of_day": self.time_of_day,
"timestamp": self.timestamp,
"agent_entries": [asdict(e) for e in self.agent_entries],
"market_orders_before": self.market_orders_before,
"market_orders_after": self.market_orders_after,
"trades": self.trades,
"deaths": self.deaths,
"statistics": self.statistics,
}
class AsyncLogWriter:
"""Background thread that handles file I/O asynchronously.
Uses a producer-consumer pattern to decouple log generation
from file writing, preventing I/O from blocking the simulation.
"""
def __init__(self, max_queue_size: int = 1000):
self._queue: queue.Queue = queue.Queue(maxsize=max_queue_size)
self._stop_event = threading.Event()
self._thread: Optional[threading.Thread] = None
self._files: dict[str, TextIO] = {}
self._lock = threading.Lock()
def start(self) -> None:
"""Start the background writer thread."""
if self._thread is not None and self._thread.is_alive():
return
self._stop_event.clear()
self._thread = threading.Thread(target=self._writer_loop, daemon=True)
self._thread.start()
def stop(self, timeout: float = 2.0) -> None:
"""Stop the background writer thread and flush remaining items."""
self._stop_event.set()
if self._thread is not None:
self._thread.join(timeout=timeout)
self._thread = None
# Process any remaining items in the queue
self._drain_queue()
# Close all files
with self._lock:
for f in self._files.values():
try:
f.close()
except Exception:
pass
self._files.clear()
def _writer_loop(self) -> None:
"""Main loop for the background writer thread."""
while not self._stop_event.is_set():
try:
# Wait for items with timeout to allow stop checks
item = self._queue.get(timeout=0.1)
self._process_item(item)
self._queue.task_done()
except queue.Empty:
continue
except Exception as e:
# Log errors but don't crash the thread
logging.getLogger("simulation").warning(f"Async log writer error: {e}")
def _process_item(self, item: dict) -> None:
"""Process a single log item."""
action = item.get("action")
if action == "open":
self._open_file(item["path"], item["file_id"])
elif action == "write":
self._write_to_file(item["file_id"], item["data"])
elif action == "flush":
self._flush_file(item.get("file_id"))
elif action == "close":
self._close_file(item["file_id"])
def _open_file(self, path: str, file_id: str) -> None:
"""Open a file for writing."""
with self._lock:
if file_id not in self._files:
self._files[file_id] = open(path, "w", encoding="utf-8")
def _write_to_file(self, file_id: str, data: str) -> None:
"""Write data to a file."""
with self._lock:
f = self._files.get(file_id)
if f:
f.write(data)
def _flush_file(self, file_id: Optional[str] = None) -> None:
"""Flush file(s) to disk."""
with self._lock:
if file_id:
f = self._files.get(file_id)
if f:
f.flush()
else:
for f in self._files.values():
f.flush()
def _close_file(self, file_id: str) -> None:
"""Close a file."""
with self._lock:
f = self._files.pop(file_id, None)
if f:
f.close()
def _drain_queue(self) -> None:
"""Process all remaining items in the queue."""
while True:
try:
item = self._queue.get_nowait()
self._process_item(item)
self._queue.task_done()
except queue.Empty:
break
def enqueue(self, item: dict) -> bool:
"""Add an item to the write queue.
Returns False if queue is full (item dropped).
"""
try:
self._queue.put_nowait(item)
return True
except queue.Full:
return False
def open_file(self, path: str, file_id: str) -> None:
"""Queue a file open operation."""
self.enqueue({"action": "open", "path": path, "file_id": file_id})
def write(self, file_id: str, data: str) -> None:
"""Queue a write operation."""
self.enqueue({"action": "write", "file_id": file_id, "data": data})
def flush(self, file_id: Optional[str] = None) -> None:
"""Queue a flush operation."""
self.enqueue({"action": "flush", "file_id": file_id})
def close_file(self, file_id: str) -> None:
"""Queue a file close operation."""
self.enqueue({"action": "close", "file_id": file_id})
class SimulationLogger:
"""Logger that dumps detailed simulation data to files.
Performance optimized:
- Logging can be disabled entirely via config
- File writes happen in background thread (non-blocking)
- Agent lookups use O(1) dict instead of O(n) list search
- No in-memory accumulation of all entries
"""
def __init__(self, log_dir: str = "logs"):
self.log_dir = Path(log_dir)
# Load performance config
from backend.config import get_config
perf_config = get_config().performance
self.logging_enabled = perf_config.logging_enabled
self.detailed_logging = perf_config.detailed_logging
self.flush_interval = perf_config.log_flush_interval
# Get async logging config (default to True if available)
self.async_logging = getattr(perf_config, 'async_logging', True)
# Async writer (only created if logging enabled)
self._async_writer: Optional[AsyncLogWriter] = None
# File IDs for async writing
self._json_file_id: Optional[str] = None
self._summary_file_id: Optional[str] = None
# Fallback: synchronous file handles (used if async disabled)
self._json_file: Optional[TextIO] = None
self._summary_file: Optional[TextIO] = None
# Standard Python logging (minimal overhead even when enabled)
self.logger = logging.getLogger("simulation")
self.logger.setLevel(logging.WARNING) # Only warnings by default
# Current turn tracking
self._current_entry: Optional[TurnLogEntry] = None
# O(1) lookup for agent entries by ID
self._agent_entry_map: dict[str, AgentLogEntry] = {}
# Turn counter for flush batching
self._turns_since_flush = 0
# Stats
self._items_queued = 0
self._items_dropped = 0
def start_session(self, config: dict) -> None:
"""Start a new logging session."""
if not self.logging_enabled:
return
self.log_dir.mkdir(exist_ok=True)
# Create session-specific log file paths
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
json_path = self.log_dir / f"sim_{timestamp}.jsonl"
summary_path = self.log_dir / f"sim_{timestamp}_summary.txt"
if self.async_logging:
# Use async writer
self._async_writer = AsyncLogWriter()
self._async_writer.start()
self._json_file_id = f"json_{timestamp}"
self._summary_file_id = f"summary_{timestamp}"
self._async_writer.open_file(str(json_path), self._json_file_id)
self._async_writer.open_file(str(summary_path), self._summary_file_id)
# Write initial data
self._async_writer.write(
self._json_file_id,
json.dumps({"type": "config", "data": config}) + "\n"
)
self._async_writer.write(
self._summary_file_id,
f"Simulation Session Started: {datetime.now()}\n" + "=" * 60 + "\n\n"
)
else:
# Use synchronous file handles
self._json_file = open(json_path, "w")
self._summary_file = open(summary_path, "w")
self._json_file.write(json.dumps({"type": "config", "data": config}) + "\n")
self._summary_file.write(f"Simulation Session Started: {datetime.now()}\n")
self._summary_file.write("=" * 60 + "\n\n")
if self.detailed_logging:
# Set up file handler for detailed logs
file_handler = logging.FileHandler(self.log_dir / f"sim_{timestamp}.log")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(
"%(asctime)s | %(levelname)s | %(message)s"
))
self.logger.addHandler(file_handler)
self.logger.setLevel(logging.DEBUG)
def start_turn(self, turn: int, day: int, step_in_day: int, time_of_day: str) -> None:
"""Start logging a new turn."""
if not self.logging_enabled:
return
self._current_entry = TurnLogEntry(
turn=turn,
day=day,
step_in_day=step_in_day,
time_of_day=time_of_day,
timestamp=datetime.now().isoformat(),
)
self._agent_entry_map.clear()
if self.detailed_logging:
self.logger.debug(f"Turn {turn} started (Day {day}, Step {step_in_day}, {time_of_day})")
def log_agent_before(
self,
agent_id: str,
agent_name: str,
profession: str,
position: dict,
stats: dict,
inventory: list,
money: int,
) -> None:
"""Log agent state before action."""
if not self.logging_enabled or self._current_entry is None:
return
# Create entry and add to both list and map
entry = AgentLogEntry(
agent_id=agent_id,
agent_name=agent_name,
profession=profession,
position=position,
stats_before=stats,
stats_after={},
decision={},
action_result={},
inventory_before=inventory,
inventory_after=[],
money_before=money,
money_after=money,
)
self._current_entry.agent_entries.append(entry)
self._agent_entry_map[agent_id] = entry
def log_agent_decision(self, agent_id: str, decision: dict) -> None:
"""Log agent's AI decision."""
if not self.logging_enabled or self._current_entry is None:
return
# O(1) lookup instead of O(n) search
entry = self._agent_entry_map.get(agent_id)
if entry:
entry.decision = decision
if self.detailed_logging:
self.logger.debug(
f" {entry.agent_name}: decided to {decision.get('action', '?')} "
f"- {decision.get('reason', '')}"
)
def log_agent_after(
self,
agent_id: str,
stats: dict,
inventory: list,
money: int,
position: dict,
action_result: dict,
) -> None:
"""Log agent state after action."""
if not self.logging_enabled or self._current_entry is None:
return
# O(1) lookup instead of O(n) search
entry = self._agent_entry_map.get(agent_id)
if entry:
entry.stats_after = stats
entry.inventory_after = inventory
entry.money_after = money
entry.position = position
entry.action_result = action_result
def log_market_state(self, orders_before: list, orders_after: list) -> None:
"""Log market state."""
if not self.logging_enabled or self._current_entry is None:
return
self._current_entry.market_orders_before = orders_before
self._current_entry.market_orders_after = orders_after
def log_trade(self, trade: dict) -> None:
"""Log a trade transaction."""
if not self.logging_enabled or self._current_entry is None:
return
self._current_entry.trades.append(trade)
if self.detailed_logging:
self.logger.debug(f" Trade: {trade.get('message', 'Unknown trade')}")
def log_death(self, agent_name: str, cause: str) -> None:
"""Log an agent death."""
if not self.logging_enabled or self._current_entry is None:
return
self._current_entry.deaths.append({"name": agent_name, "cause": cause})
# Always log deaths even without detailed logging
self.logger.info(f" DEATH: {agent_name} died from {cause}")
def log_event(self, event_type: str, event_data: dict) -> None:
"""Log a general event (births, random events, etc.)."""
if not self.logging_enabled or self._current_entry is None:
return
if event_type == "birth":
self.logger.info(
f" BIRTH: {event_data.get('child_name', '?')} born to {event_data.get('parent_name', '?')}"
)
elif event_type == "random_event" and self.detailed_logging:
self.logger.info(
f" EVENT: {event_data.get('type', '?')} affecting {event_data.get('affected', [])}"
)
elif self.detailed_logging:
self.logger.debug(f" Event [{event_type}]: {event_data}")
def log_statistics(self, stats: dict) -> None:
"""Log end-of-turn statistics."""
if not self.logging_enabled or self._current_entry is None:
return
self._current_entry.statistics = stats
def end_turn(self) -> None:
"""Finish logging the current turn and write to file."""
if not self.logging_enabled or self._current_entry is None:
return
entry = self._current_entry
# Prepare data
json_data = json.dumps({"type": "turn", "data": entry.to_dict()}) + "\n"
summary_lines = [f"Turn {entry.turn} | Day {entry.day} Step {entry.step_in_day} ({entry.time_of_day})\n"]
if self.detailed_logging:
for agent in entry.agent_entries:
action = agent.decision.get("action", "?")
result = "+" if agent.action_result.get("success", False) else "-"
summary_lines.append(
f" [{agent.agent_name}] {action} {result} | "
f"E:{agent.stats_after.get('energy', '?')} "
f"H:{agent.stats_after.get('hunger', '?')} "
f"T:{agent.stats_after.get('thirst', '?')} "
f"${agent.money_after}\n"
)
if entry.deaths:
for death in entry.deaths:
summary_lines.append(f" X {death['name']} died: {death['cause']}\n")
summary_lines.append("\n")
summary_data = "".join(summary_lines)
if self.async_logging and self._async_writer:
# Non-blocking write
self._async_writer.write(self._json_file_id, json_data)
self._async_writer.write(self._summary_file_id, summary_data)
self._items_queued += 2
else:
# Synchronous write
if self._json_file:
self._json_file.write(json_data)
if self._summary_file:
self._summary_file.write(summary_data)
# Batched flush - only flush every N turns
self._turns_since_flush += 1
if self._turns_since_flush >= self.flush_interval:
self._flush_files()
self._turns_since_flush = 0
# Clear current entry (don't accumulate in memory)
self._current_entry = None
self._agent_entry_map.clear()
def _flush_files(self) -> None:
"""Flush file buffers to disk."""
if self.async_logging and self._async_writer:
self._async_writer.flush()
else:
if self._json_file:
self._json_file.flush()
if self._summary_file:
self._summary_file.flush()
def close(self) -> None:
"""Close log files."""
if self.async_logging and self._async_writer:
# Write final message before closing
if self._summary_file_id:
self._async_writer.write(
self._summary_file_id,
f"\nSession ended: {datetime.now()}\n"
)
# Close files
if self._json_file_id:
self._async_writer.close_file(self._json_file_id)
if self._summary_file_id:
self._async_writer.close_file(self._summary_file_id)
# Stop the writer thread
self._async_writer.stop()
self._async_writer = None
else:
if self._json_file:
self._json_file.close()
self._json_file = None
if self._summary_file:
self._summary_file.write(f"\nSession ended: {datetime.now()}\n")
self._summary_file.close()
self._summary_file = None
def get_entries(self) -> list[TurnLogEntry]:
"""Get all logged entries.
Note: Returns empty list when logging optimized (entries not kept in memory).
"""
return []
def get_stats(self) -> dict:
"""Get logging statistics."""
return {
"logging_enabled": self.logging_enabled,
"async_logging": self.async_logging,
"items_queued": self._items_queued,
"items_dropped": self._items_dropped,
}
# Global logger instance
_logger: Optional[SimulationLogger] = None
def get_simulation_logger() -> SimulationLogger:
"""Get the global simulation logger."""
global _logger
if _logger is None:
_logger = SimulationLogger()
return _logger
def reset_simulation_logger() -> SimulationLogger:
"""Reset and create a new simulation logger."""
global _logger
if _logger:
_logger.close()
_logger = SimulationLogger()
return _logger
# Ensure logger is closed on exit
@atexit.register
def _cleanup_logger():
global _logger
if _logger:
_logger.close()
_logger = None