villsim/backend/core/engine.py

991 lines
39 KiB
Python

"""Game Engine for the Village Simulation."""
import random
import threading
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
from backend.domain.agent import Agent
from backend.domain.action import ActionType, ActionResult, ACTION_CONFIG
from backend.domain.resources import Resource, ResourceType
from backend.domain.personality import get_action_skill_modifier
from backend.core.world import World, WorldConfig, TimeOfDay
from backend.core.market import OrderBook
from backend.core.ai import get_ai_decision, AIDecision
from backend.core.logger import get_simulation_logger, reset_simulation_logger
from backend.config import get_config
class SimulationMode(Enum):
"""Simulation run mode."""
MANUAL = "manual" # Wait for explicit next_step call
AUTO = "auto" # Run automatically with timer
@dataclass
class TurnLog:
"""Log of events that happened in a turn."""
turn: int
agent_actions: list[dict] = field(default_factory=list)
deaths: list[str] = field(default_factory=list)
births: list[str] = field(default_factory=list)
trades: list[dict] = field(default_factory=list)
# Resource tracking for this turn
resources_produced: dict = field(default_factory=dict)
resources_consumed: dict = field(default_factory=dict)
resources_spoiled: dict = field(default_factory=dict)
# New day events
day_events: dict = field(default_factory=dict)
def to_dict(self) -> dict:
return {
"turn": self.turn,
"agent_actions": self.agent_actions,
"deaths": self.deaths,
"births": self.births,
"trades": self.trades,
"resources_produced": self.resources_produced,
"resources_consumed": self.resources_consumed,
"resources_spoiled": self.resources_spoiled,
"day_events": self.day_events,
}
class GameEngine:
"""Main game engine singleton."""
_instance: Optional["GameEngine"] = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self.world = World()
self.market = OrderBook()
self.mode = SimulationMode.MANUAL
self.is_running = False
# Load auto_step_interval from config
self.auto_step_interval = get_config().auto_step_interval
self._auto_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self.turn_logs: list[TurnLog] = []
self.logger = get_simulation_logger()
# Resource statistics tracking (cumulative)
self.resource_stats = {
"produced": {}, # Total resources produced
"consumed": {}, # Total resources consumed
"spoiled": {}, # Total resources spoiled
"traded": {}, # Total resources traded (bought/sold)
"in_market": {}, # Currently in market
"in_inventory": {}, # Currently in all inventories
}
self._initialized = True
def reset(self, config: Optional[WorldConfig] = None) -> None:
"""Reset the simulation to initial state."""
# Stop auto mode if running
self._stop_auto_mode()
if config:
self.world = World(config=config)
else:
self.world = World()
self.market = OrderBook()
self.turn_logs = []
# Reset resource statistics
self.resource_stats = {
"produced": {},
"consumed": {},
"spoiled": {},
"traded": {},
"in_market": {},
"in_inventory": {},
}
# Reset and start new logging session
self.logger = reset_simulation_logger()
sim_config = get_config()
self.logger.start_session(sim_config.to_dict())
self.world.initialize()
self.is_running = True
def initialize(self, num_agents: Optional[int] = None) -> None:
"""Initialize the simulation with agents.
Args:
num_agents: Number of agents to spawn. If None, uses config.json value.
"""
if num_agents is not None:
self.world.config.initial_agents = num_agents
# Otherwise use the value already loaded from config.json
self.world.initialize()
# Start logging session
self.logger = reset_simulation_logger()
sim_config = get_config()
self.logger.start_session(sim_config.to_dict())
self.is_running = True
def next_step(self) -> TurnLog:
"""Advance the simulation by one step."""
if not self.is_running:
return TurnLog(turn=-1)
turn_log = TurnLog(turn=self.world.current_turn + 1)
current_turn = self.world.current_turn + 1
# Start logging this turn
self.logger.start_turn(
turn=current_turn,
day=self.world.current_day,
step_in_day=self.world.step_in_day + 1,
time_of_day=self.world.time_of_day.value,
)
# Log market state before
market_orders_before = [o.to_dict() for o in self.market.get_active_orders()]
# 0. Remove corpses from previous turn (agents who died last turn)
self._remove_old_corpses(current_turn)
# 1. Collect AI decisions for all living agents (not corpses)
decisions: list[tuple[Agent, AIDecision]] = []
for agent in self.world.get_living_agents():
# Log agent state before
self.logger.log_agent_before(
agent_id=agent.id,
agent_name=agent.name,
profession=agent.profession.value,
position=agent.position.to_dict(),
stats=agent.stats.to_dict(),
inventory=[r.to_dict() for r in agent.inventory],
money=agent.money,
)
# Get AI config to determine which system to use
ai_config = get_config().ai
# GOAP AI handles night time automatically
decision = get_ai_decision(
agent,
self.market,
step_in_day=self.world.step_in_day,
day_steps=self.world.config.day_steps,
current_turn=current_turn,
use_goap=ai_config.use_goap,
is_night=self.world.is_night(),
)
decisions.append((agent, decision))
# Log decision
self.logger.log_agent_decision(agent.id, decision.to_dict())
# 2. Calculate movement targets and move agents
for agent, decision in decisions:
action_name = decision.action.value
agent.set_action(
action_type=action_name,
world_width=self.world.config.width,
world_height=self.world.config.height,
message=decision.reason,
target_resource=decision.target_resource.value if decision.target_resource else None,
)
agent.update_movement()
# 3. Execute all actions and update action indicators with results
for agent, decision in decisions:
result = self._execute_action(agent, decision)
# Complete agent action with result - this updates the indicator to show what was done
if result:
agent.complete_action(result.success, result.message)
# Log to agent's personal history
agent.log_action(
turn=current_turn,
action_type=decision.action.value,
result=result.message,
success=result.success,
)
# Track resources produced
for res in result.resources_gained:
res_type = res.type.value
turn_log.resources_produced[res_type] = turn_log.resources_produced.get(res_type, 0) + res.quantity
self.resource_stats["produced"][res_type] = self.resource_stats["produced"].get(res_type, 0) + res.quantity
# Track resources consumed
for res in result.resources_consumed:
res_type = res.type.value
turn_log.resources_consumed[res_type] = turn_log.resources_consumed.get(res_type, 0) + res.quantity
self.resource_stats["consumed"][res_type] = self.resource_stats["consumed"].get(res_type, 0) + res.quantity
turn_log.agent_actions.append({
"agent_id": agent.id,
"agent_name": agent.name,
"decision": decision.to_dict(),
"result": result.to_dict() if result else None,
})
# Log agent state after action
self.logger.log_agent_after(
agent_id=agent.id,
stats=agent.stats.to_dict(),
inventory=[r.to_dict() for r in agent.inventory],
money=agent.money,
position=agent.position.to_dict(),
action_result=result.to_dict() if result else {},
)
# 4. Resolve pending market orders (price updates)
self.market.update_prices(current_turn)
# Log market state after
market_orders_after = [o.to_dict() for o in self.market.get_active_orders()]
self.logger.log_market_state(market_orders_before, market_orders_after)
# 5. Apply passive decay to all living agents
for agent in self.world.get_living_agents():
agent.apply_passive_decay()
# 6. Decay resources in inventories
for agent in self.world.get_living_agents():
expired = agent.decay_inventory(current_turn)
# Track spoiled resources
for res in expired:
res_type = res.type.value
turn_log.resources_spoiled[res_type] = turn_log.resources_spoiled.get(res_type, 0) + res.quantity
self.resource_stats["spoiled"][res_type] = self.resource_stats["spoiled"].get(res_type, 0) + res.quantity
# 7. Mark newly dead agents as corpses (don't remove yet for visualization)
newly_dead = self._mark_dead_agents(current_turn)
for dead_agent in newly_dead:
cause = dead_agent.death_reason
self.logger.log_death(dead_agent.name, cause)
# Cancel their market orders immediately
self.market.cancel_seller_orders(dead_agent.id)
turn_log.deaths = [a.name for a in newly_dead]
# Log statistics
self.logger.log_statistics(self.world.get_statistics())
# End turn logging
self.logger.end_turn()
# 8. Advance time (returns True if new day started)
new_day = self.world.advance_time()
# 9. Process new day events (aging, births, sinks)
if new_day:
day_events = self._process_new_day(turn_log)
turn_log.day_events = day_events
# 10. Check win/lose conditions (count only truly living agents, not corpses)
if len(self.world.get_living_agents()) == 0:
self.is_running = False
self.logger.close()
# Keep turn_logs bounded to prevent memory growth
max_logs = get_config().performance.max_turn_logs
self.turn_logs.append(turn_log)
if len(self.turn_logs) > max_logs:
# Remove oldest logs, keep only recent ones
self.turn_logs = self.turn_logs[-max_logs:]
return turn_log
def _mark_dead_agents(self, current_turn: int) -> list[Agent]:
"""Mark agents who just died as corpses. Returns list of newly dead agents.
Also processes inheritance - distributing wealth to children.
"""
newly_dead = []
for agent in self.world.agents:
if not agent.is_alive() and not agent.is_corpse():
# Determine cause of death
if agent.is_too_old():
cause = "age"
else:
cause = agent.stats.get_critical_stat() or "unknown"
# Process inheritance BEFORE marking dead (while inventory still accessible)
inheritance = self.world.process_inheritance(agent)
if inheritance.get("beneficiaries"):
self.logger.log_event("inheritance", inheritance)
agent.mark_dead(current_turn, cause)
# Clear their action to show death state
agent.current_action.action_type = "dead"
agent.current_action.message = f"Died: {cause}"
# Record death statistics
self.world.record_death(agent, cause)
newly_dead.append(agent)
return newly_dead
def _remove_old_corpses(self, current_turn: int) -> list[Agent]:
"""Remove corpses that have been visible for one turn."""
to_remove = []
for agent in self.world.agents:
if agent.is_corpse() and agent.death_turn < current_turn:
# Corpse has been visible for one turn, remove it
to_remove.append(agent)
for agent in to_remove:
self.world.agents.remove(agent)
# Remove from index as well
self.world._agent_index.pop(agent.id, None)
# Note: death was already recorded in _mark_dead_agents
return to_remove
def _process_new_day(self, turn_log: TurnLog) -> dict:
"""Process all new-day events: aging, births, resource sinks.
Called when a new simulation day starts.
"""
events = {
"day": self.world.current_day,
"births": [],
"age_deaths": [],
"taxes_collected": 0,
"storage_decay": {},
"random_events": [],
}
sinks_config = get_config().sinks
age_config = get_config().age
# 1. Age all living agents
for agent in self.world.get_living_agents():
agent.age_one_day()
# 2. Check for age-related deaths (after aging)
current_turn = self.world.current_turn
for agent in self.world.agents:
if not agent.is_corpse() and agent.is_too_old() and not agent.is_alive():
# Will be caught by _mark_dead_agents in the next turn
pass
# 3. Process potential births
for agent in list(self.world.get_living_agents()): # Copy list since we modify it
if agent.can_give_birth(self.world.current_day):
child = self.world.spawn_child(agent)
if child:
birth_info = {
"parent_id": agent.id,
"parent_name": agent.name,
"child_id": child.id,
"child_name": child.name,
}
events["births"].append(birth_info)
turn_log.births.append(child.name)
self.logger.log_event("birth", birth_info)
# 4. Apply daily money tax (wealth redistribution/removal)
if sinks_config.daily_tax_rate > 0:
total_taxes = 0
for agent in self.world.get_living_agents():
tax = int(agent.money * sinks_config.daily_tax_rate)
if tax > 0:
agent.money -= tax
total_taxes += tax
events["taxes_collected"] = total_taxes
# 5. Apply village storage decay (resources spoil over time)
if sinks_config.daily_village_decay_rate > 0:
decay_rate = sinks_config.daily_village_decay_rate
for agent in self.world.get_living_agents():
for resource in agent.inventory[:]: # Copy list to allow modification
# Random chance for each resource to decay
if random.random() < decay_rate:
decay_amount = max(1, int(resource.quantity * decay_rate))
resource.quantity -= decay_amount
res_type = resource.type.value
events["storage_decay"][res_type] = events["storage_decay"].get(res_type, 0) + decay_amount
# Track as spoiled
turn_log.resources_spoiled[res_type] = turn_log.resources_spoiled.get(res_type, 0) + decay_amount
self.resource_stats["spoiled"][res_type] = self.resource_stats["spoiled"].get(res_type, 0) + decay_amount
if resource.quantity <= 0:
agent.inventory.remove(resource)
# 6. Random events (fires, theft, etc.)
if random.random() < sinks_config.random_event_chance:
event = self._generate_random_event(events, turn_log)
if event:
events["random_events"].append(event)
return events
def _generate_random_event(self, events: dict, turn_log: TurnLog) -> Optional[dict]:
"""Generate a random village event (disaster, theft, etc.)."""
sinks_config = get_config().sinks
living_agents = self.world.get_living_agents()
if not living_agents:
return None
event_types = ["fire", "theft", "blessing"]
event_type = random.choice(event_types)
event_info = {"type": event_type, "affected": []}
if event_type == "fire":
# Fire destroys some resources from random agents
num_affected = max(1, len(living_agents) // 5) # 20% of agents affected
affected_agents = random.sample(living_agents, min(num_affected, len(living_agents)))
for agent in affected_agents:
for resource in agent.inventory[:]:
loss = int(resource.quantity * sinks_config.fire_event_resource_loss)
if loss > 0:
resource.quantity -= loss
res_type = resource.type.value
turn_log.resources_spoiled[res_type] = turn_log.resources_spoiled.get(res_type, 0) + loss
if resource.quantity <= 0:
agent.inventory.remove(resource)
event_info["affected"].append(agent.name)
elif event_type == "theft":
# Some money is stolen from wealthy agents
wealthy_agents = [a for a in living_agents if a.money > 1000]
if wealthy_agents:
victim = random.choice(wealthy_agents)
stolen = int(victim.money * sinks_config.theft_event_money_loss)
victim.money -= stolen
event_info["affected"].append(victim.name)
event_info["amount_stolen"] = stolen
elif event_type == "blessing":
# Good harvest - some agents get bonus resources
lucky_agent = random.choice(living_agents)
from backend.domain.resources import Resource, ResourceType
bonus_type = random.choice([ResourceType.BERRIES, ResourceType.WOOD])
bonus = Resource(type=bonus_type, quantity=random.randint(2, 5), created_turn=self.world.current_turn)
lucky_agent.add_to_inventory(bonus)
event_info["affected"].append(lucky_agent.name)
event_info["bonus"] = f"+{bonus.quantity} {bonus_type.value}"
if event_info["affected"]:
self.logger.log_event("random_event", event_info)
return event_info
return None
def _execute_action(self, agent: Agent, decision: AIDecision) -> Optional[ActionResult]:
"""Execute an action for an agent."""
action = decision.action
config = ACTION_CONFIG[action]
# Handle different action types
if action == ActionType.SLEEP:
agent.restore_energy(config.energy_cost)
return ActionResult(
action_type=action,
success=True,
energy_spent=-config.energy_cost,
message="Sleeping soundly",
)
elif action == ActionType.REST:
agent.restore_energy(config.energy_cost)
return ActionResult(
action_type=action,
success=True,
energy_spent=-config.energy_cost,
message="Resting",
)
elif action == ActionType.CONSUME:
if decision.target_resource:
success = agent.consume(decision.target_resource)
consumed_list = []
if success:
consumed_list.append(Resource(
type=decision.target_resource,
quantity=1,
created_turn=self.world.current_turn,
))
return ActionResult(
action_type=action,
success=success,
resources_consumed=consumed_list,
message=f"Consumed {decision.target_resource.value}" if success else "Nothing to consume",
)
return ActionResult(action_type=action, success=False, message="No resource specified")
elif action == ActionType.BUILD_FIRE:
if agent.has_resource(ResourceType.WOOD):
agent.remove_from_inventory(ResourceType.WOOD, 1)
if not agent.spend_energy(abs(config.energy_cost)):
return ActionResult(action_type=action, success=False, message="Not enough energy")
# Fire heat from config
from backend.domain.resources import get_fire_heat
fire_heat = get_fire_heat()
agent.apply_heat(fire_heat)
return ActionResult(
action_type=action,
success=True,
energy_spent=abs(config.energy_cost),
heat_gained=fire_heat,
resources_consumed=[Resource(type=ResourceType.WOOD, quantity=1, created_turn=self.world.current_turn)],
message="Built a warm fire",
)
return ActionResult(action_type=action, success=False, message="No wood for fire")
elif action == ActionType.TRADE:
return self._execute_trade(agent, decision)
elif action in [ActionType.HUNT, ActionType.GATHER, ActionType.CHOP_WOOD,
ActionType.GET_WATER, ActionType.WEAVE]:
return self._execute_work(agent, action, config)
return ActionResult(action_type=action, success=False, message="Unknown action")
def _execute_work(self, agent: Agent, action: ActionType, config) -> ActionResult:
"""Execute a work action (hunting, gathering, etc.).
Skills now affect outcomes:
- Hunting skill affects hunt success rate
- Gathering skill affects gather output
- Woodcutting skill affects wood output
- Skills improve with use
Age modifies:
- Energy costs (young use less, old use more)
- Skill effectiveness (young less effective, old more effective "wisdom")
- Learning rate (young learn faster, old learn slower)
"""
# Calculate age-modified energy cost
base_energy_cost = abs(config.energy_cost)
energy_cost_modifier = agent.get_energy_cost_modifier()
energy_cost = max(1, int(base_energy_cost * energy_cost_modifier))
if not agent.spend_energy(energy_cost):
return ActionResult(
action_type=action,
success=False,
message="Not enough energy",
)
# Check required materials
resources_consumed = []
if config.requires_resource:
if not agent.has_resource(config.requires_resource, config.requires_quantity):
agent.restore_energy(energy_cost) # Refund energy
return ActionResult(
action_type=action,
success=False,
message=f"Missing required {config.requires_resource.value}",
)
agent.remove_from_inventory(config.requires_resource, config.requires_quantity)
resources_consumed.append(Resource(
type=config.requires_resource,
quantity=config.requires_quantity,
created_turn=self.world.current_turn,
))
# Get relevant skill for this action
skill_name = self._get_skill_for_action(action)
skill_value = getattr(agent.skills, skill_name, 1.0) if skill_name else 1.0
# Apply age-based skill modifier (young less effective, old more effective)
age_skill_modifier = agent.get_skill_modifier()
skill_modifier = get_action_skill_modifier(skill_value) * age_skill_modifier
# Check success chance (modified by skill and age)
# Higher skill = higher effective success chance
effective_success_chance = min(0.98, config.success_chance * skill_modifier)
if random.random() > effective_success_chance:
# Record action attempt (skill still improves on failure, just less)
agent.record_action(action.value)
if skill_name:
learning_modifier = agent.get_learning_modifier()
agent.skills.improve(skill_name, 0.005, learning_modifier) # Small improvement on failure
return ActionResult(
action_type=action,
success=False,
energy_spent=energy_cost,
message="Action failed",
)
# Generate output (modified by skill and age for quantity)
resources_gained = []
if config.output_resource:
# Check storage limit before producing
res_type = config.output_resource.value
storage_available = self.world.get_storage_available(res_type)
# Skill affects output quantity
base_quantity = random.randint(config.min_output, config.max_output)
quantity = max(config.min_output, int(base_quantity * skill_modifier))
# Limit by storage
quantity = min(quantity, storage_available)
if quantity > 0:
resource = Resource(
type=config.output_resource,
quantity=quantity,
created_turn=self.world.current_turn,
)
added = agent.add_to_inventory(resource)
if added > 0:
resources_gained.append(Resource(
type=config.output_resource,
quantity=added,
created_turn=self.world.current_turn,
))
# Secondary output (e.g., hide from hunting) - also affected by skill and storage
if config.secondary_output:
res_type = config.secondary_output.value
storage_available = self.world.get_storage_available(res_type)
base_quantity = random.randint(config.secondary_min, config.secondary_max)
quantity = max(0, int(base_quantity * skill_modifier))
quantity = min(quantity, storage_available)
if quantity > 0:
resource = Resource(
type=config.secondary_output,
quantity=quantity,
created_turn=self.world.current_turn,
)
added = agent.add_to_inventory(resource)
if added > 0:
resources_gained.append(Resource(
type=config.secondary_output,
quantity=added,
created_turn=self.world.current_turn,
))
# Record action and improve skill (modified by age learning rate)
agent.record_action(action.value)
if skill_name:
learning_modifier = agent.get_learning_modifier()
agent.skills.improve(skill_name, 0.015, learning_modifier) # Skill improves with successful use
# Build success message with details
gained_str = ", ".join(f"+{r.quantity} {r.type.value}" for r in resources_gained)
message = f"{action.value}: {gained_str}" if gained_str else f"{action.value} (nothing gained)"
return ActionResult(
action_type=action,
success=True,
energy_spent=energy_cost,
resources_gained=resources_gained,
resources_consumed=resources_consumed,
message=message,
)
def _get_skill_for_action(self, action: ActionType) -> Optional[str]:
"""Get the skill name that affects a given action."""
skill_map = {
ActionType.HUNT: "hunting",
ActionType.GATHER: "gathering",
ActionType.CHOP_WOOD: "woodcutting",
ActionType.WEAVE: "crafting",
}
return skill_map.get(action)
def _execute_trade(self, agent: Agent, decision: AIDecision) -> ActionResult:
"""Execute a trade action (buy, sell, or price adjustment). Supports multi-item trades.
Trading skill improves with successful trades and affects prices slightly.
"""
config = ACTION_CONFIG[ActionType.TRADE]
# Handle price adjustments (no energy cost)
if decision.adjust_order_id and decision.new_price is not None:
return self._execute_price_adjustment(agent, decision)
# Handle multi-item trades
if decision.trade_items:
return self._execute_multi_buy(agent, decision)
if decision.order_id:
# Buying single item from market
result = self.market.execute_buy(
buyer_id=agent.id,
order_id=decision.order_id,
quantity=decision.quantity,
buyer_money=agent.money,
)
if result.success:
# Log the trade
self.logger.log_trade(result.to_dict())
# Record sale for price history tracking
self.market._record_sale(
result.resource_type,
result.total_paid // result.quantity,
result.quantity,
self.world.current_turn,
)
# Track traded resources
res_type = result.resource_type.value
self.resource_stats["traded"][res_type] = self.resource_stats["traded"].get(res_type, 0) + result.quantity
# Deduct money from buyer
agent.money -= result.total_paid
# Add resources to buyer
resource = Resource(
type=result.resource_type,
quantity=result.quantity,
created_turn=self.world.current_turn,
)
agent.add_to_inventory(resource)
# Add money to seller and record their trade
seller = self.world.get_agent(result.seller_id)
if seller:
seller.money += result.total_paid
seller.record_trade(result.total_paid)
seller.skills.improve("trading", 0.02, seller.get_learning_modifier()) # Seller skill improves
# Age-modified energy cost for trading
energy_cost = max(1, int(abs(config.energy_cost) * agent.get_energy_cost_modifier()))
agent.spend_energy(energy_cost)
# Record buyer's trade and improve skill (with age learning modifier)
agent.record_action("trade")
agent.skills.improve("trading", 0.01, agent.get_learning_modifier()) # Buyer skill improves less
return ActionResult(
action_type=ActionType.TRADE,
success=True,
energy_spent=abs(config.energy_cost),
resources_gained=[resource],
message=f"Bought {result.quantity} {result.resource_type.value} for {result.total_paid}c",
)
else:
return ActionResult(
action_type=ActionType.TRADE,
success=False,
message=result.message,
)
elif decision.target_resource and decision.quantity > 0:
# Selling to market (listing)
if agent.has_resource(decision.target_resource, decision.quantity):
agent.remove_from_inventory(decision.target_resource, decision.quantity)
order = self.market.place_order(
seller_id=agent.id,
resource_type=decision.target_resource,
quantity=decision.quantity,
price_per_unit=decision.price,
current_turn=self.world.current_turn,
)
agent.spend_energy(abs(config.energy_cost))
agent.record_action("trade") # Track listing action
return ActionResult(
action_type=ActionType.TRADE,
success=True,
energy_spent=abs(config.energy_cost),
message=f"Listed {decision.quantity} {decision.target_resource.value} @ {decision.price}c each",
)
else:
return ActionResult(
action_type=ActionType.TRADE,
success=False,
message="Not enough resources to sell",
)
return ActionResult(
action_type=ActionType.TRADE,
success=False,
message="Invalid trade parameters",
)
def _execute_price_adjustment(self, agent: Agent, decision: AIDecision) -> ActionResult:
"""Execute a price adjustment on an existing order (no energy cost)."""
success = self.market.adjust_order_price(
order_id=decision.adjust_order_id,
seller_id=agent.id,
new_price=decision.new_price,
current_turn=self.world.current_turn,
)
if success:
return ActionResult(
action_type=ActionType.TRADE,
success=True,
energy_spent=0, # Price adjustments are free
message=f"Adjusted {decision.target_resource.value} price to {decision.new_price}c",
)
else:
return ActionResult(
action_type=ActionType.TRADE,
success=False,
message="Failed to adjust price",
)
def _execute_multi_buy(self, agent: Agent, decision: AIDecision) -> ActionResult:
"""Execute a multi-item buy trade."""
config = ACTION_CONFIG[ActionType.TRADE]
# Build list of purchases
purchases = [(item.order_id, item.quantity) for item in decision.trade_items]
# Execute all purchases
results = self.market.execute_multi_buy(
buyer_id=agent.id,
purchases=purchases,
buyer_money=agent.money,
)
# Process results
total_paid = 0
resources_gained = []
items_bought = []
for result in results:
if result.success:
self.logger.log_trade(result.to_dict())
agent.money -= result.total_paid
total_paid += result.total_paid
# Record sale for price history
self.market._record_sale(
result.resource_type,
result.total_paid // result.quantity,
result.quantity,
self.world.current_turn
)
# Track traded resources
res_type = result.resource_type.value
self.resource_stats["traded"][res_type] = self.resource_stats["traded"].get(res_type, 0) + result.quantity
resource = Resource(
type=result.resource_type,
quantity=result.quantity,
created_turn=self.world.current_turn,
)
agent.add_to_inventory(resource)
resources_gained.append(resource)
items_bought.append(f"{result.quantity} {result.resource_type.value}")
# Add money to seller
seller = self.world.get_agent(result.seller_id)
if seller:
seller.money += result.total_paid
if resources_gained:
agent.spend_energy(abs(config.energy_cost))
message = f"Bought {', '.join(items_bought)} for {total_paid}c"
return ActionResult(
action_type=ActionType.TRADE,
success=True,
energy_spent=abs(config.energy_cost),
resources_gained=resources_gained,
message=message,
)
else:
return ActionResult(
action_type=ActionType.TRADE,
success=False,
message="Failed to buy any items",
)
def set_mode(self, mode: SimulationMode) -> None:
"""Set the simulation mode."""
if mode == self.mode:
return
if mode == SimulationMode.AUTO:
self._start_auto_mode()
else:
self._stop_auto_mode()
self.mode = mode
def _start_auto_mode(self) -> None:
"""Start automatic step advancement."""
self._stop_event.clear()
def auto_step():
while not self._stop_event.is_set() and self.is_running:
self.next_step()
time.sleep(self.auto_step_interval)
self._auto_thread = threading.Thread(target=auto_step, daemon=True)
self._auto_thread.start()
def _stop_auto_mode(self) -> None:
"""Stop automatic step advancement."""
self._stop_event.set()
if self._auto_thread:
self._auto_thread.join(timeout=2.0)
self._auto_thread = None
def get_state(self) -> dict:
"""Get the full simulation state for API."""
return {
**self.world.get_state_snapshot(),
"market": self.market.get_state_snapshot(),
"mode": self.mode.value,
"is_running": self.is_running,
"recent_logs": [
log.to_dict() for log in self.turn_logs[-5:]
],
"resource_stats": self._get_resource_stats(),
}
def _get_resource_stats(self) -> dict:
"""Get comprehensive resource statistics."""
# Calculate current inventory totals
in_inventory = {}
for agent in self.world.get_living_agents():
for res in agent.inventory:
res_type = res.type.value
in_inventory[res_type] = in_inventory.get(res_type, 0) + res.quantity
# Calculate current market totals
in_market = {}
for order in self.market.get_active_orders():
res_type = order.resource_type.value
in_market[res_type] = in_market.get(res_type, 0) + order.quantity
return {
"produced": self.resource_stats["produced"].copy(),
"consumed": self.resource_stats["consumed"].copy(),
"spoiled": self.resource_stats["spoiled"].copy(),
"traded": self.resource_stats["traded"].copy(),
"in_inventory": in_inventory,
"in_market": in_market,
}
# Global engine instance
def get_engine() -> GameEngine:
"""Get the global game engine instance."""
return GameEngine()