"""Game Engine for the Village Simulation.""" import random import threading import time from dataclasses import dataclass, field from enum import Enum from typing import Optional from backend.domain.agent import Agent from backend.domain.action import ActionType, ActionResult, ACTION_CONFIG from backend.domain.resources import Resource, ResourceType from backend.domain.personality import get_action_skill_modifier from backend.core.world import World, WorldConfig, TimeOfDay from backend.core.market import OrderBook from backend.core.ai import get_ai_decision, AIDecision from backend.core.logger import get_simulation_logger, reset_simulation_logger from backend.config import get_config class SimulationMode(Enum): """Simulation run mode.""" MANUAL = "manual" # Wait for explicit next_step call AUTO = "auto" # Run automatically with timer @dataclass class TurnLog: """Log of events that happened in a turn.""" turn: int agent_actions: list[dict] = field(default_factory=list) deaths: list[str] = field(default_factory=list) trades: list[dict] = field(default_factory=list) # Resource tracking for this turn resources_produced: dict = field(default_factory=dict) resources_consumed: dict = field(default_factory=dict) resources_spoiled: dict = field(default_factory=dict) def to_dict(self) -> dict: return { "turn": self.turn, "agent_actions": self.agent_actions, "deaths": self.deaths, "trades": self.trades, "resources_produced": self.resources_produced, "resources_consumed": self.resources_consumed, "resources_spoiled": self.resources_spoiled, } class GameEngine: """Main game engine singleton.""" _instance: Optional["GameEngine"] = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self.world = World() self.market = OrderBook() self.mode = SimulationMode.MANUAL self.is_running = False # Load auto_step_interval from config self.auto_step_interval = get_config().auto_step_interval self._auto_thread: Optional[threading.Thread] = None self._stop_event = threading.Event() self.turn_logs: list[TurnLog] = [] self.logger = get_simulation_logger() # Resource statistics tracking (cumulative) self.resource_stats = { "produced": {}, # Total resources produced "consumed": {}, # Total resources consumed "spoiled": {}, # Total resources spoiled "traded": {}, # Total resources traded (bought/sold) "in_market": {}, # Currently in market "in_inventory": {}, # Currently in all inventories } self._initialized = True def reset(self, config: Optional[WorldConfig] = None) -> None: """Reset the simulation to initial state.""" # Stop auto mode if running self._stop_auto_mode() if config: self.world = World(config=config) else: self.world = World() self.market = OrderBook() self.turn_logs = [] # Reset resource statistics self.resource_stats = { "produced": {}, "consumed": {}, "spoiled": {}, "traded": {}, "in_market": {}, "in_inventory": {}, } # Reset and start new logging session self.logger = reset_simulation_logger() sim_config = get_config() self.logger.start_session(sim_config.to_dict()) self.world.initialize() self.is_running = True def initialize(self, num_agents: Optional[int] = None) -> None: """Initialize the simulation with agents. Args: num_agents: Number of agents to spawn. If None, uses config.json value. """ if num_agents is not None: self.world.config.initial_agents = num_agents # Otherwise use the value already loaded from config.json self.world.initialize() # Start logging session self.logger = reset_simulation_logger() sim_config = get_config() self.logger.start_session(sim_config.to_dict()) self.is_running = True def next_step(self) -> TurnLog: """Advance the simulation by one step.""" if not self.is_running: return TurnLog(turn=-1) turn_log = TurnLog(turn=self.world.current_turn + 1) current_turn = self.world.current_turn + 1 # Start logging this turn self.logger.start_turn( turn=current_turn, day=self.world.current_day, step_in_day=self.world.step_in_day + 1, time_of_day=self.world.time_of_day.value, ) # Log market state before market_orders_before = [o.to_dict() for o in self.market.get_active_orders()] # 0. Remove corpses from previous turn (agents who died last turn) self._remove_old_corpses(current_turn) # 1. Collect AI decisions for all living agents (not corpses) decisions: list[tuple[Agent, AIDecision]] = [] for agent in self.world.get_living_agents(): # Log agent state before self.logger.log_agent_before( agent_id=agent.id, agent_name=agent.name, profession=agent.profession.value, position=agent.position.to_dict(), stats=agent.stats.to_dict(), inventory=[r.to_dict() for r in agent.inventory], money=agent.money, ) if self.world.is_night(): # Force sleep at night decision = AIDecision( action=ActionType.SLEEP, reason="Night time: sleeping", ) else: # Pass time info so AI can prepare for night decision = get_ai_decision( agent, self.market, step_in_day=self.world.step_in_day, day_steps=self.world.config.day_steps, current_turn=current_turn, ) decisions.append((agent, decision)) # Log decision self.logger.log_agent_decision(agent.id, decision.to_dict()) # 2. Calculate movement targets and move agents for agent, decision in decisions: action_name = decision.action.value agent.set_action( action_type=action_name, world_width=self.world.config.width, world_height=self.world.config.height, message=decision.reason, target_resource=decision.target_resource.value if decision.target_resource else None, ) agent.update_movement() # 3. Execute all actions and update action indicators with results for agent, decision in decisions: result = self._execute_action(agent, decision) # Complete agent action with result - this updates the indicator to show what was done if result: agent.complete_action(result.success, result.message) # Log to agent's personal history agent.log_action( turn=current_turn, action_type=decision.action.value, result=result.message, success=result.success, ) # Track resources produced for res in result.resources_gained: res_type = res.type.value turn_log.resources_produced[res_type] = turn_log.resources_produced.get(res_type, 0) + res.quantity self.resource_stats["produced"][res_type] = self.resource_stats["produced"].get(res_type, 0) + res.quantity # Track resources consumed for res in result.resources_consumed: res_type = res.type.value turn_log.resources_consumed[res_type] = turn_log.resources_consumed.get(res_type, 0) + res.quantity self.resource_stats["consumed"][res_type] = self.resource_stats["consumed"].get(res_type, 0) + res.quantity turn_log.agent_actions.append({ "agent_id": agent.id, "agent_name": agent.name, "decision": decision.to_dict(), "result": result.to_dict() if result else None, }) # Log agent state after action self.logger.log_agent_after( agent_id=agent.id, stats=agent.stats.to_dict(), inventory=[r.to_dict() for r in agent.inventory], money=agent.money, position=agent.position.to_dict(), action_result=result.to_dict() if result else {}, ) # 4. Resolve pending market orders (price updates) self.market.update_prices(current_turn) # Log market state after market_orders_after = [o.to_dict() for o in self.market.get_active_orders()] self.logger.log_market_state(market_orders_before, market_orders_after) # 5. Apply passive decay to all living agents for agent in self.world.get_living_agents(): agent.apply_passive_decay() # 6. Decay resources in inventories for agent in self.world.get_living_agents(): expired = agent.decay_inventory(current_turn) # Track spoiled resources for res in expired: res_type = res.type.value turn_log.resources_spoiled[res_type] = turn_log.resources_spoiled.get(res_type, 0) + res.quantity self.resource_stats["spoiled"][res_type] = self.resource_stats["spoiled"].get(res_type, 0) + res.quantity # 7. Mark newly dead agents as corpses (don't remove yet for visualization) newly_dead = self._mark_dead_agents(current_turn) for dead_agent in newly_dead: cause = dead_agent.death_reason self.logger.log_death(dead_agent.name, cause) # Cancel their market orders immediately self.market.cancel_seller_orders(dead_agent.id) turn_log.deaths = [a.name for a in newly_dead] # Log statistics self.logger.log_statistics(self.world.get_statistics()) # End turn logging self.logger.end_turn() # 8. Advance time self.world.advance_time() # 9. Check win/lose conditions (count only truly living agents, not corpses) if len(self.world.get_living_agents()) == 0: self.is_running = False self.logger.close() self.turn_logs.append(turn_log) return turn_log def _mark_dead_agents(self, current_turn: int) -> list[Agent]: """Mark agents who just died as corpses. Returns list of newly dead agents.""" newly_dead = [] for agent in self.world.agents: if not agent.is_alive() and not agent.is_corpse(): # Agent just died this turn cause = agent.stats.get_critical_stat() or "unknown" agent.mark_dead(current_turn, cause) # Clear their action to show death state agent.current_action.action_type = "dead" agent.current_action.message = f"Died: {cause}" newly_dead.append(agent) return newly_dead def _remove_old_corpses(self, current_turn: int) -> list[Agent]: """Remove corpses that have been visible for one turn.""" to_remove = [] for agent in self.world.agents: if agent.is_corpse() and agent.death_turn < current_turn: # Corpse has been visible for one turn, remove it to_remove.append(agent) for agent in to_remove: self.world.agents.remove(agent) self.world.total_agents_died += 1 return to_remove def _execute_action(self, agent: Agent, decision: AIDecision) -> Optional[ActionResult]: """Execute an action for an agent.""" action = decision.action config = ACTION_CONFIG[action] # Handle different action types if action == ActionType.SLEEP: agent.restore_energy(config.energy_cost) return ActionResult( action_type=action, success=True, energy_spent=-config.energy_cost, message="Sleeping soundly", ) elif action == ActionType.REST: agent.restore_energy(config.energy_cost) return ActionResult( action_type=action, success=True, energy_spent=-config.energy_cost, message="Resting", ) elif action == ActionType.CONSUME: if decision.target_resource: success = agent.consume(decision.target_resource) consumed_list = [] if success: consumed_list.append(Resource( type=decision.target_resource, quantity=1, created_turn=self.world.current_turn, )) return ActionResult( action_type=action, success=success, resources_consumed=consumed_list, message=f"Consumed {decision.target_resource.value}" if success else "Nothing to consume", ) return ActionResult(action_type=action, success=False, message="No resource specified") elif action == ActionType.BUILD_FIRE: if agent.has_resource(ResourceType.WOOD): agent.remove_from_inventory(ResourceType.WOOD, 1) if not agent.spend_energy(abs(config.energy_cost)): return ActionResult(action_type=action, success=False, message="Not enough energy") # Fire heat from config from backend.domain.resources import get_fire_heat fire_heat = get_fire_heat() agent.apply_heat(fire_heat) return ActionResult( action_type=action, success=True, energy_spent=abs(config.energy_cost), heat_gained=fire_heat, resources_consumed=[Resource(type=ResourceType.WOOD, quantity=1, created_turn=self.world.current_turn)], message="Built a warm fire", ) return ActionResult(action_type=action, success=False, message="No wood for fire") elif action == ActionType.TRADE: return self._execute_trade(agent, decision) elif action in [ActionType.HUNT, ActionType.GATHER, ActionType.CHOP_WOOD, ActionType.GET_WATER, ActionType.WEAVE]: return self._execute_work(agent, action, config) return ActionResult(action_type=action, success=False, message="Unknown action") def _execute_work(self, agent: Agent, action: ActionType, config) -> ActionResult: """Execute a work action (hunting, gathering, etc.). Skills now affect outcomes: - Hunting skill affects hunt success rate - Gathering skill affects gather output - Woodcutting skill affects wood output - Skills improve with use """ # Check energy energy_cost = abs(config.energy_cost) if not agent.spend_energy(energy_cost): return ActionResult( action_type=action, success=False, message="Not enough energy", ) # Check required materials resources_consumed = [] if config.requires_resource: if not agent.has_resource(config.requires_resource, config.requires_quantity): agent.restore_energy(energy_cost) # Refund energy return ActionResult( action_type=action, success=False, message=f"Missing required {config.requires_resource.value}", ) agent.remove_from_inventory(config.requires_resource, config.requires_quantity) resources_consumed.append(Resource( type=config.requires_resource, quantity=config.requires_quantity, created_turn=self.world.current_turn, )) # Get relevant skill for this action skill_name = self._get_skill_for_action(action) skill_value = getattr(agent.skills, skill_name, 1.0) if skill_name else 1.0 skill_modifier = get_action_skill_modifier(skill_value) # Check success chance (modified by skill) # Higher skill = higher effective success chance effective_success_chance = min(0.98, config.success_chance * skill_modifier) if random.random() > effective_success_chance: # Record action attempt (skill still improves on failure, just less) agent.record_action(action.value) if skill_name: agent.skills.improve(skill_name, 0.005) # Small improvement on failure return ActionResult( action_type=action, success=False, energy_spent=energy_cost, message="Action failed", ) # Generate output (modified by skill for quantity) resources_gained = [] if config.output_resource: # Skill affects output quantity base_quantity = random.randint(config.min_output, config.max_output) quantity = max(config.min_output, int(base_quantity * skill_modifier)) if quantity > 0: resource = Resource( type=config.output_resource, quantity=quantity, created_turn=self.world.current_turn, ) added = agent.add_to_inventory(resource) if added > 0: resources_gained.append(Resource( type=config.output_resource, quantity=added, created_turn=self.world.current_turn, )) # Secondary output (e.g., hide from hunting) - also affected by skill if config.secondary_output: base_quantity = random.randint(config.secondary_min, config.secondary_max) quantity = max(0, int(base_quantity * skill_modifier)) if quantity > 0: resource = Resource( type=config.secondary_output, quantity=quantity, created_turn=self.world.current_turn, ) added = agent.add_to_inventory(resource) if added > 0: resources_gained.append(Resource( type=config.secondary_output, quantity=added, created_turn=self.world.current_turn, )) # Record action and improve skill agent.record_action(action.value) if skill_name: agent.skills.improve(skill_name, 0.015) # Skill improves with successful use # Build success message with details gained_str = ", ".join(f"+{r.quantity} {r.type.value}" for r in resources_gained) message = f"{action.value}: {gained_str}" if gained_str else f"{action.value} (nothing gained)" return ActionResult( action_type=action, success=True, energy_spent=energy_cost, resources_gained=resources_gained, resources_consumed=resources_consumed, message=message, ) def _get_skill_for_action(self, action: ActionType) -> Optional[str]: """Get the skill name that affects a given action.""" skill_map = { ActionType.HUNT: "hunting", ActionType.GATHER: "gathering", ActionType.CHOP_WOOD: "woodcutting", ActionType.WEAVE: "crafting", } return skill_map.get(action) def _execute_trade(self, agent: Agent, decision: AIDecision) -> ActionResult: """Execute a trade action (buy, sell, or price adjustment). Supports multi-item trades. Trading skill improves with successful trades and affects prices slightly. """ config = ACTION_CONFIG[ActionType.TRADE] # Handle price adjustments (no energy cost) if decision.adjust_order_id and decision.new_price is not None: return self._execute_price_adjustment(agent, decision) # Handle multi-item trades if decision.trade_items: return self._execute_multi_buy(agent, decision) if decision.order_id: # Buying single item from market result = self.market.execute_buy( buyer_id=agent.id, order_id=decision.order_id, quantity=decision.quantity, buyer_money=agent.money, ) if result.success: # Log the trade self.logger.log_trade(result.to_dict()) # Record sale for price history tracking self.market._record_sale( result.resource_type, result.total_paid // result.quantity, result.quantity, self.world.current_turn, ) # Track traded resources res_type = result.resource_type.value self.resource_stats["traded"][res_type] = self.resource_stats["traded"].get(res_type, 0) + result.quantity # Deduct money from buyer agent.money -= result.total_paid # Add resources to buyer resource = Resource( type=result.resource_type, quantity=result.quantity, created_turn=self.world.current_turn, ) agent.add_to_inventory(resource) # Add money to seller and record their trade seller = self.world.get_agent(result.seller_id) if seller: seller.money += result.total_paid seller.record_trade(result.total_paid) seller.skills.improve("trading", 0.02) # Seller skill improves agent.spend_energy(abs(config.energy_cost)) # Record buyer's trade and improve skill agent.record_action("trade") agent.skills.improve("trading", 0.01) # Buyer skill improves less return ActionResult( action_type=ActionType.TRADE, success=True, energy_spent=abs(config.energy_cost), resources_gained=[resource], message=f"Bought {result.quantity} {result.resource_type.value} for {result.total_paid}c", ) else: return ActionResult( action_type=ActionType.TRADE, success=False, message=result.message, ) elif decision.target_resource and decision.quantity > 0: # Selling to market (listing) if agent.has_resource(decision.target_resource, decision.quantity): agent.remove_from_inventory(decision.target_resource, decision.quantity) order = self.market.place_order( seller_id=agent.id, resource_type=decision.target_resource, quantity=decision.quantity, price_per_unit=decision.price, current_turn=self.world.current_turn, ) agent.spend_energy(abs(config.energy_cost)) agent.record_action("trade") # Track listing action return ActionResult( action_type=ActionType.TRADE, success=True, energy_spent=abs(config.energy_cost), message=f"Listed {decision.quantity} {decision.target_resource.value} @ {decision.price}c each", ) else: return ActionResult( action_type=ActionType.TRADE, success=False, message="Not enough resources to sell", ) return ActionResult( action_type=ActionType.TRADE, success=False, message="Invalid trade parameters", ) def _execute_price_adjustment(self, agent: Agent, decision: AIDecision) -> ActionResult: """Execute a price adjustment on an existing order (no energy cost).""" success = self.market.adjust_order_price( order_id=decision.adjust_order_id, seller_id=agent.id, new_price=decision.new_price, current_turn=self.world.current_turn, ) if success: return ActionResult( action_type=ActionType.TRADE, success=True, energy_spent=0, # Price adjustments are free message=f"Adjusted {decision.target_resource.value} price to {decision.new_price}c", ) else: return ActionResult( action_type=ActionType.TRADE, success=False, message="Failed to adjust price", ) def _execute_multi_buy(self, agent: Agent, decision: AIDecision) -> ActionResult: """Execute a multi-item buy trade.""" config = ACTION_CONFIG[ActionType.TRADE] # Build list of purchases purchases = [(item.order_id, item.quantity) for item in decision.trade_items] # Execute all purchases results = self.market.execute_multi_buy( buyer_id=agent.id, purchases=purchases, buyer_money=agent.money, ) # Process results total_paid = 0 resources_gained = [] items_bought = [] for result in results: if result.success: self.logger.log_trade(result.to_dict()) agent.money -= result.total_paid total_paid += result.total_paid # Record sale for price history self.market._record_sale( result.resource_type, result.total_paid // result.quantity, result.quantity, self.world.current_turn ) # Track traded resources res_type = result.resource_type.value self.resource_stats["traded"][res_type] = self.resource_stats["traded"].get(res_type, 0) + result.quantity resource = Resource( type=result.resource_type, quantity=result.quantity, created_turn=self.world.current_turn, ) agent.add_to_inventory(resource) resources_gained.append(resource) items_bought.append(f"{result.quantity} {result.resource_type.value}") # Add money to seller seller = self.world.get_agent(result.seller_id) if seller: seller.money += result.total_paid if resources_gained: agent.spend_energy(abs(config.energy_cost)) message = f"Bought {', '.join(items_bought)} for {total_paid}c" return ActionResult( action_type=ActionType.TRADE, success=True, energy_spent=abs(config.energy_cost), resources_gained=resources_gained, message=message, ) else: return ActionResult( action_type=ActionType.TRADE, success=False, message="Failed to buy any items", ) def set_mode(self, mode: SimulationMode) -> None: """Set the simulation mode.""" if mode == self.mode: return if mode == SimulationMode.AUTO: self._start_auto_mode() else: self._stop_auto_mode() self.mode = mode def _start_auto_mode(self) -> None: """Start automatic step advancement.""" self._stop_event.clear() def auto_step(): while not self._stop_event.is_set() and self.is_running: self.next_step() time.sleep(self.auto_step_interval) self._auto_thread = threading.Thread(target=auto_step, daemon=True) self._auto_thread.start() def _stop_auto_mode(self) -> None: """Stop automatic step advancement.""" self._stop_event.set() if self._auto_thread: self._auto_thread.join(timeout=2.0) self._auto_thread = None def get_state(self) -> dict: """Get the full simulation state for API.""" return { **self.world.get_state_snapshot(), "market": self.market.get_state_snapshot(), "mode": self.mode.value, "is_running": self.is_running, "recent_logs": [ log.to_dict() for log in self.turn_logs[-5:] ], "resource_stats": self._get_resource_stats(), } def _get_resource_stats(self) -> dict: """Get comprehensive resource statistics.""" # Calculate current inventory totals in_inventory = {} for agent in self.world.get_living_agents(): for res in agent.inventory: res_type = res.type.value in_inventory[res_type] = in_inventory.get(res_type, 0) + res.quantity # Calculate current market totals in_market = {} for order in self.market.get_active_orders(): res_type = order.resource_type.value in_market[res_type] = in_market.get(res_type, 0) + order.quantity return { "produced": self.resource_stats["produced"].copy(), "consumed": self.resource_stats["consumed"].copy(), "spoiled": self.resource_stats["spoiled"].copy(), "traded": self.resource_stats["traded"].copy(), "in_inventory": in_inventory, "in_market": in_market, } # Global engine instance def get_engine() -> GameEngine: """Get the global game engine instance.""" return GameEngine()