Update configuration parameters, enhance agent personality traits, and implement statistics tracking for wealth and professions in Village Simulation. Adjusted initial agent settings, resource decay rates, and action costs to improve gameplay dynamics. Added new personality and skill systems for agents, enabling emergent professions and diverse behaviors. Integrated real-time statistics rendering for enhanced user experience.

This commit is contained in:
elit3guzhva 2026-01-19 00:25:16 +03:00
parent 396980a523
commit 1423fc0dc9
14 changed files with 1957 additions and 218 deletions

View File

@ -0,0 +1,29 @@
---
description: "Ensure Python is always run via the project's .venv virtual environment"
globs: ["**/*.py", "**/*"]
alwaysApply: true
---
# Python Environment Rule
## Context
- Applies to all Python files and commands in the VillSim project.
## Requirements
1. **Always use the virtual environment** located at `.venv/` for running Python scripts.
2. Activate with: `source .venv/bin/activate` (Unix/macOS) before running any Python commands.
3. When running Python files, use: `python` (after activation) or `.venv/bin/python` directly.
4. Install dependencies with: `.venv/bin/pip install -r requirements.txt`.
5. Never use system Python - all scripts, tests, and tasks must use the `.venv` interpreter.
## Examples
<example>
**Valid:**
- `source .venv/bin/activate && python backend/main.py`
- `.venv/bin/python tools/run_headless_analysis.py`
- `.venv/bin/pip install -r requirements.txt`
**Invalid:**
- `python backend/main.py` (using system Python)
- `pip install package` (using system pip)
</example>

View File

@ -0,0 +1,54 @@
---
description: "Always use config.json as the single source of truth for all simulation parameters"
globs: ["**/*.py", "config.json"]
alwaysApply: true
---
# Configuration Management Rule
## Context
- Applies across all code that references configuration or simulation parameters.
- The `config.json` file is the **single source of truth** for:
- Agent stats (max_energy, max_hunger, decay rates, thresholds)
- Resource properties (decay rates, hunger/thirst/energy values)
- Action costs and success rates
- World parameters (dimensions, agent count, day/night cycles)
- Market settings (discounts, price multipliers)
- Economy parameters (ratios, thresholds, markup limits)
- UI settings (auto_step_interval)
## Requirements
1. **Never hardcode values** that exist or should exist in `config.json`.
2. When adding new features, add corresponding config entries to `config.json`.
3. Load config values at runtime using the existing config mechanism:
```python
from backend.config import Config
config = Config()
# Access via: config.agent_stats, config.resources, config.actions, etc.
```
4. Keep `config.json` synchronized with any code changes.
5. Always check `config.json` first before adding magic numbers to code.
## Examples
<example>
**Valid:**
```python
from backend.config import Config
config = Config()
max_energy = config.agent_stats["max_energy"]
hunt_success = config.actions["hunt_success"]
```
**Invalid:**
```python
max_energy = 50 # Hardcoded instead of using config
hunt_success = 0.70 # Magic number
```
</example>
## Project Structure Reference
- Backend code: `backend/` (API, core simulation, domain models)
- Frontend code: `frontend/` (client, renderers)
- Tools/utilities: `tools/` (analysis scripts, config converters)
- Documentation: `docs/design/`
- Logs output: `logs/`

View File

@ -89,20 +89,34 @@ def get_market_prices():
"/control/initialize",
response_model=ControlResponse,
summary="Initialize simulation",
description="Initialize or reset the simulation with the specified parameters.",
description="Initialize or reset the simulation. Uses config.json values if no params provided.",
)
def initialize_simulation(request: InitializeRequest):
"""Initialize or reset the simulation."""
def initialize_simulation(request: InitializeRequest = None):
"""Initialize or reset the simulation.
If request is provided with specific values, use those.
Otherwise, use values from config.json.
"""
engine = get_engine()
config = WorldConfig(
width=request.world_width,
height=request.world_height,
initial_agents=request.num_agents,
)
engine.reset(config)
if request and (request.num_agents != 8 or request.world_width != 20 or request.world_height != 20):
# Custom values provided - use them
from backend.core.world import WorldConfig
config = WorldConfig(
width=request.world_width,
height=request.world_height,
initial_agents=request.num_agents,
)
engine.reset(config)
num_agents = request.num_agents
else:
# Use values from config.json
engine.reset() # This now loads from config.json automatically
num_agents = engine.world.config.initial_agents
return ControlResponse(
success=True,
message=f"Simulation initialized with {request.num_agents} agents",
message=f"Simulation initialized with {num_agents} agents",
turn=engine.world.current_turn,
mode=engine.mode.value,
)

View File

@ -1,14 +1,16 @@
"""AI decision system for agents in the Village Simulation.
Major rework to create market-driven economy:
- Agents understand that BUYING saves energy (trading is smart!)
- Wealth accumulation as a goal (money = safety buffer)
- Dynamic pricing based on supply/demand signals
- Proactive trading - buy low, sell high
- Market participation is now central to survival strategy
Major rework to create diverse, personality-driven economy:
- Each agent has unique personality traits affecting all decisions
- Emergent professions: Hunters, Gatherers, Traders, Generalists
- Class inequality through varied strategies and skills
- Traders focus on arbitrage (buy low, sell high)
- Personality affects: risk tolerance, hoarding, market participation
Key insight: An agent with money can survive without working.
The market is not a last resort - it's the optimal strategy when prices are good.
Key insight: Different personalities lead to different strategies.
Traders don't gather - they profit from others' labor.
Hunters take risks for bigger rewards.
Gatherers play it safe.
"""
import random
@ -18,6 +20,7 @@ from typing import Optional, TYPE_CHECKING
from backend.domain.agent import Agent
from backend.domain.action import ActionType, ACTION_CONFIG
from backend.domain.resources import ResourceType
from backend.domain.personality import get_trade_price_modifier
if TYPE_CHECKING:
from backend.core.market import OrderBook
@ -108,23 +111,23 @@ def _get_economy_config():
class AgentAI:
"""AI decision maker with market-driven economy behavior.
"""AI decision maker with personality-driven economy behavior.
Core philosophy: Trading is SMART, not a last resort.
Core philosophy: Each agent has a unique strategy based on personality.
The agent now understands:
1. Buying is often more efficient than gathering (saves energy!)
2. Money is power - wealth means safety and flexibility
3. Selling at good prices builds wealth
4. Adjusting prices responds to supply/demand
5. The market is a tool for survival, not just emergency trades
Personality effects:
1. wealth_desire: How aggressively to accumulate money
2. hoarding_rate: How much to keep vs. sell on market
3. risk_tolerance: Hunt (risky, high reward) vs. gather (safe)
4. market_affinity: How often to engage with market
5. trade_preference: High = trader profession (arbitrage focus)
6. price_sensitivity: How picky about deals
Economic behaviors:
- Calculate "fair value" of resources based on energy cost
- Buy when market price < energy cost to gather
- Sell when market price > production cost
- Adjust prices based on market conditions (supply/demand)
- Accumulate wealth as a safety buffer
Emergent professions:
- Traders: High trade_preference + market_affinity = buy low, sell high
- Hunters: High hunt_preference + risk_tolerance = meat production
- Gatherers: High gather_preference, low risk = safe resource collection
- Generalists: Balanced approach to all activities
"""
# Thresholds for stat management
@ -135,19 +138,16 @@ class AgentAI:
REST_ENERGY_THRESHOLD = 18 # Rest when below this if no urgent needs
WORK_ENERGY_MINIMUM = 20 # Prefer to have this much for work
# Resource stockpile targets
MIN_WATER_STOCK = 3
MIN_FOOD_STOCK = 4
MIN_WOOD_STOCK = 3
# Resource stockpile targets (modified by personality.hoarding_rate)
BASE_WATER_STOCK = 2
BASE_FOOD_STOCK = 3
BASE_WOOD_STOCK = 2
# Heat thresholds
HEAT_PROACTIVE_THRESHOLD = 0.50
# ECONOMY SETTINGS - These make agents trade more
# Base economy settings (modified by personality)
ENERGY_TO_MONEY_RATIO = 1.5 # 1 energy = ~1.5 coins in perceived value
WEALTH_DESIRE = 0.3 # How much agents want to accumulate wealth (0-1)
BUY_EFFICIENCY_THRESHOLD = 0.7 # Buy if market price < 70% of gather cost
MIN_WEALTH_TARGET = 50 # Agents want at least this much money
MAX_PRICE_MARKUP = 2.0 # Maximum markup over base price
MIN_PRICE_DISCOUNT = 0.5 # Minimum discount (50% of base price)
@ -158,18 +158,38 @@ class AgentAI:
self.day_steps = day_steps
self.current_turn = current_turn
# Personality shortcuts
self.p = agent.personality # Convenience shortcut
self.skills = agent.skills
# Load thresholds from config
config = _get_ai_config()
self.CRITICAL_THRESHOLD = config.critical_threshold
self.LOW_ENERGY_THRESHOLD = config.low_energy_threshold
# Try to load economy config
# Personality-adjusted values
# Wealth desire from personality (0.1 to 0.9)
self.WEALTH_DESIRE = self.p.wealth_desire
# Buy efficiency threshold adjusted by price sensitivity
# High sensitivity = only buy very good deals
economy = _get_economy_config()
if economy:
self.ENERGY_TO_MONEY_RATIO = getattr(economy, 'energy_to_money_ratio', self.ENERGY_TO_MONEY_RATIO)
self.WEALTH_DESIRE = getattr(economy, 'wealth_desire', self.WEALTH_DESIRE)
self.BUY_EFFICIENCY_THRESHOLD = getattr(economy, 'buy_efficiency_threshold', self.BUY_EFFICIENCY_THRESHOLD)
self.MIN_WEALTH_TARGET = getattr(economy, 'min_wealth_target', self.MIN_WEALTH_TARGET)
base_threshold = getattr(economy, 'buy_efficiency_threshold', 0.7) if economy else 0.7
self.BUY_EFFICIENCY_THRESHOLD = base_threshold / self.p.price_sensitivity
# Wealth target scaled by wealth desire
base_target = getattr(economy, 'min_wealth_target', 50) if economy else 50
self.MIN_WEALTH_TARGET = int(base_target * (0.5 + self.p.wealth_desire))
# Resource stockpile targets modified by hoarding rate
# High hoarders keep more in reserve
hoarding_mult = 0.5 + self.p.hoarding_rate # 0.6 to 1.4
self.MIN_WATER_STOCK = max(1, int(self.BASE_WATER_STOCK * hoarding_mult))
self.MIN_FOOD_STOCK = max(2, int(self.BASE_FOOD_STOCK * hoarding_mult))
self.MIN_WOOD_STOCK = max(1, int(self.BASE_WOOD_STOCK * hoarding_mult))
# Trader mode: agents with high trade preference become market-focused
self.is_trader = self.p.trade_preference > 1.3 and self.p.market_affinity > 0.5
@property
def is_evening(self) -> bool:
@ -200,12 +220,13 @@ class AgentAI:
return self.agent.money >= self.MIN_WEALTH_TARGET
def decide(self) -> AIDecision:
"""Make a decision based on survival AND economic optimization.
"""Make a decision based on survival, personality, and economic goals.
Key insight: Trading is often BETTER than gathering because:
1. Trade uses only 1 energy vs 4-8 for gathering
2. If market price < energy cost, buying is pure profit
3. Money = stored energy = safety buffer
Decision flow varies by personality:
- Traders prioritize market operations (arbitrage)
- Hunters prefer hunting when possible
- Gatherers stick to safe resource collection
- All agents prioritize survival when needed
"""
# Priority 1: Critical survival needs (immediate danger)
decision = self._check_critical_needs()
@ -217,33 +238,155 @@ class AgentAI:
if decision:
return decision
# Priority 3: Price adjustment - respond to market conditions
# Priority 3: Trader-specific behavior (high trade_preference agents)
# Traders focus on market operations when survival is secured
if self.is_trader:
decision = self._do_trader_behavior()
if decision:
return decision
# Priority 4: Price adjustment - respond to market conditions
decision = self._check_price_adjustments()
if decision:
return decision
# Priority 4: Smart shopping - buy good deals on the market!
decision = self._check_market_opportunities()
if decision:
return decision
# Priority 5: Smart shopping - buy good deals on the market!
# Frequency affected by market_affinity
if random.random() < self.p.market_affinity:
decision = self._check_market_opportunities()
if decision:
return decision
# Priority 5: Craft clothes if we have hide
# Priority 6: Craft clothes if we have hide
decision = self._check_clothes_crafting()
if decision:
return decision
# Priority 6: Energy management
# Priority 7: Energy management
decision = self._check_energy()
if decision:
return decision
# Priority 7: Economic activities (sell excess, build wealth)
decision = self._check_economic()
# Priority 8: Economic activities (sell excess, build wealth)
# Frequency affected by hoarding_rate (low hoarding = sell more)
if random.random() > self.p.hoarding_rate * 0.5:
decision = self._check_economic()
if decision:
return decision
# Priority 9: Routine survival work (gather resources we need)
return self._do_survival_work()
def _do_trader_behavior(self) -> Optional[AIDecision]:
"""Trader-specific behavior: focus on arbitrage and market operations.
Traders don't gather much - they profit from buying low and selling high.
Core trader strategy:
1. Look for arbitrage opportunities (price differences)
2. Buy underpriced goods
3. Sell at markup
4. Build wealth through trading margins
"""
# Traders need money to operate
if self.agent.money < 20:
# Low on capital - need to do some work or sell inventory
decision = self._try_to_sell(urgent=True)
if decision:
return decision
# If nothing to sell, do some quick gathering
return None # Fall through to normal behavior
# Look for arbitrage opportunities
# Find resources being sold below fair value
best_deal = None
best_margin = 0
for resource_type in [ResourceType.MEAT, ResourceType.BERRIES, ResourceType.WATER, ResourceType.WOOD]:
order = self.market.get_cheapest_order(resource_type)
if not order or order.seller_id == self.agent.id:
continue
fair_value = self._get_resource_fair_value(resource_type)
# Check if we can profit by buying and reselling
buy_price = order.price_per_unit
# Apply trading skill to get better prices
sell_modifier = get_trade_price_modifier(self.skills.trading, is_buying=False)
potential_sell_price = int(fair_value * sell_modifier * 1.1) # 10% markup target
margin = potential_sell_price - buy_price
if margin > best_margin and self.agent.money >= buy_price:
best_margin = margin
best_deal = (resource_type, order, buy_price, potential_sell_price)
# Execute arbitrage if profitable
if best_deal and best_margin >= 2: # At least 2 coins profit
resource_type, order, buy_price, sell_price = best_deal
safe_price = max(1, buy_price) # Prevent division by zero
quantity = min(3, self.agent.inventory_space(), order.quantity,
self.agent.money // safe_price)
if quantity > 0:
return AIDecision(
action=ActionType.TRADE,
target_resource=resource_type,
order_id=order.id,
quantity=quantity,
price=buy_price,
reason=f"Trader: buying {resource_type.value} @ {buy_price}c (resell @ {sell_price}c)",
)
# If holding inventory, try to sell at markup
decision = self._try_trader_sell()
if decision:
return decision
# Priority 8: Routine survival work (gather resources we need)
return self._do_survival_work()
# Adjust prices on existing orders
decision = self._check_price_adjustments()
if decision:
return decision
return None
def _try_trader_sell(self) -> Optional[AIDecision]:
"""Trader sells inventory at markup prices."""
for resource in self.agent.inventory:
if resource.type == ResourceType.CLOTHES:
continue
# Traders sell everything except minimal survival reserves
if resource.quantity <= 1:
continue
fair_value = self._get_resource_fair_value(resource.type)
# Apply trading skill for better sell prices
sell_modifier = get_trade_price_modifier(self.skills.trading, is_buying=False)
# Check market conditions
signal = self.market.get_market_signal(resource.type)
if signal == "sell": # Scarcity - high markup
price = int(fair_value * sell_modifier * 1.4)
else:
price = int(fair_value * sell_modifier * 1.15)
# Don't undercut ourselves if we have active orders
my_orders = [o for o in self.market.get_orders_by_seller(self.agent.id)
if o.resource_type == resource.type]
if my_orders:
continue # Wait for existing order to sell
quantity = resource.quantity - 1 # Keep 1 for emergencies
return AIDecision(
action=ActionType.TRADE,
target_resource=resource.type,
quantity=quantity,
price=price,
reason=f"Trader: selling {resource.type.value} @ {price}c (markup)",
)
return None
def _check_critical_needs(self) -> Optional[AIDecision]:
"""Check if any vital stat is critical and act accordingly."""
@ -372,6 +515,10 @@ class AgentAI:
if not order or order.seller_id == self.agent.id:
continue
# Skip invalid orders (price <= 0)
if order.price_per_unit <= 0:
continue
if self.agent.money < order.price_per_unit:
continue
@ -395,7 +542,8 @@ class AgentAI:
# Score this opportunity
if is_good_deal:
# Good deal - definitely consider buying
efficiency_score = fair_value / order.price_per_unit # How much we're saving
price = max(1, order.price_per_unit) # Prevent division by zero
efficiency_score = fair_value / price # How much we're saving
total_score = need_score + efficiency_score * 2
shopping_list.append((resource_type, order, total_score))
elif need_score > 0 and self._is_wealthy():
@ -415,7 +563,8 @@ class AgentAI:
return None
# Calculate how much to buy
can_afford = self.agent.money // order.price_per_unit
price = max(1, order.price_per_unit) # Prevent division by zero
can_afford = self.agent.money // price
space = self.agent.inventory_space()
want_quantity = min(2, can_afford, space, order.quantity) # Buy up to 2 at a time
@ -697,16 +846,21 @@ class AgentAI:
def _try_proactive_sell(self) -> Optional[AIDecision]:
"""Proactively sell when market conditions are good.
Sell when:
- Market signal says 'sell' (scarcity)
- We have more than minimum stock
- We could use more money (not wealthy)
Affected by personality:
- hoarding_rate: High hoarders keep more, sell less
- wealth_desire: High wealth desire = more aggressive selling
- trade_preference: High traders sell more frequently
"""
if self._is_wealthy() and self.agent.inventory_space() > 3:
# Already rich and have space, no rush to sell
return None
survival_minimums = {
# Hoarders are reluctant to sell
if random.random() < self.p.hoarding_rate * 0.7:
return None
# Survival minimums scaled by hoarding rate
base_min = {
ResourceType.WATER: 2,
ResourceType.MEAT: 1,
ResourceType.BERRIES: 2,
@ -714,6 +868,10 @@ class AgentAI:
ResourceType.HIDE: 0,
}
# High hoarders keep more
hoarding_mult = 0.5 + self.p.hoarding_rate
survival_minimums = {k: int(v * hoarding_mult) for k, v in base_min.items()}
# Look for profitable sales
best_opportunity = None
best_score = 0
@ -732,12 +890,15 @@ class AgentAI:
signal = self.market.get_market_signal(resource.type)
fair_value = self._get_resource_fair_value(resource.type)
# Apply trading skill for better sell prices
sell_modifier = get_trade_price_modifier(self.skills.trading, is_buying=False)
# Calculate optimal price
if signal == "sell": # Scarcity - we can charge more
price = int(fair_value * 1.3) # 30% markup
price = int(fair_value * 1.3 * sell_modifier)
score = 3 + excess
elif signal == "hold": # Normal market
price = fair_value
price = int(fair_value * sell_modifier)
score = 1 + excess * 0.5
else: # Surplus - price competitively
# Find cheapest competitor
@ -745,9 +906,12 @@ class AgentAI:
if cheapest and cheapest.seller_id != self.agent.id:
price = max(1, cheapest.price_per_unit - 1)
else:
price = int(fair_value * 0.8)
price = int(fair_value * 0.8 * sell_modifier)
score = 0.5 # Not a great time to sell
# Wealth desire increases sell motivation
score *= (0.7 + self.p.wealth_desire * 0.6)
if score > best_score:
best_score = score
best_opportunity = (resource.type, min(excess, 3), price) # Sell up to 3 at a time
@ -759,7 +923,7 @@ class AgentAI:
target_resource=resource_type,
quantity=quantity,
price=price,
reason=f"Market opportunity: selling {resource_type.value} @ {price}c",
reason=f"Selling {resource_type.value} @ {price}c",
)
return None
@ -811,11 +975,13 @@ class AgentAI:
return max(1, suggested)
def _do_survival_work(self) -> AIDecision:
"""Perform work based on what we need most for survival.
"""Perform work based on survival needs AND personality preferences.
KEY CHANGE: Always consider buying as an alternative!
If there's a good deal on the market, BUY instead of gathering.
This is the core economic behavior we want.
Personality effects:
- hunt_preference: Likelihood of choosing to hunt
- gather_preference: Likelihood of choosing to gather
- risk_tolerance: Affects hunt vs gather choice
- market_affinity: Likelihood of buying vs gathering
"""
stats = self.agent.stats
@ -832,18 +998,23 @@ class AgentAI:
# Helper to decide: buy or gather?
def get_resource_decision(resource_type: ResourceType, gather_action: ActionType, reason: str) -> AIDecision:
"""Decide whether to buy or gather a resource."""
# Check if buying is efficient
order = self.market.get_cheapest_order(resource_type)
if order and order.seller_id != self.agent.id and self.agent.money >= order.price_per_unit:
if self._is_good_buy(resource_type, order.price_per_unit):
return AIDecision(
action=ActionType.TRADE,
target_resource=resource_type,
order_id=order.id,
quantity=1,
price=order.price_per_unit,
reason=f"{reason} (buying @ {order.price_per_unit}c - good deal!)",
)
# Check if buying is efficient (affected by market_affinity)
if random.random() < self.p.market_affinity:
order = self.market.get_cheapest_order(resource_type)
if order and order.seller_id != self.agent.id and self.agent.money >= order.price_per_unit:
# Apply trading skill for better buy prices
buy_modifier = get_trade_price_modifier(self.skills.trading, is_buying=True)
effective_price = order.price_per_unit # Skill affects perceived value
if self._is_good_buy(resource_type, effective_price):
return AIDecision(
action=ActionType.TRADE,
target_resource=resource_type,
order_id=order.id,
quantity=1,
price=order.price_per_unit,
reason=f"{reason} (buying @ {order.price_per_unit}c)",
)
# Gather it ourselves
config = ACTION_CONFIG[gather_action]
@ -866,6 +1037,7 @@ class AgentAI:
return decision
# Priority: Stock up on wood if low (for heat)
# Affected by woodcut_preference
if wood_count < self.MIN_WOOD_STOCK and heat_urgency > 0.3:
decision = get_resource_decision(
ResourceType.WOOD,
@ -877,18 +1049,28 @@ class AgentAI:
# Priority: Stock up on food if low
if food_count < self.MIN_FOOD_STOCK:
# Decide between hunting and gathering based on conditions
# Meat is more valuable (more hunger restored), but hunting costs more energy
hunt_config = ACTION_CONFIG[ActionType.HUNT]
gather_config = ACTION_CONFIG[ActionType.GATHER]
# Prefer hunting if:
# - We have enough energy for hunt
# - AND (we have no meat OR random chance favors hunting for diversity)
# Personality-driven choice between hunting and gathering
# risk_tolerance and hunt_preference affect this choice
can_hunt = stats.energy >= abs(hunt_config.energy_cost) + 5
prefer_hunt = (meat_count == 0) or (can_hunt and random.random() < 0.4) # 40% hunt chance
if prefer_hunt and can_hunt:
# Calculate hunt probability based on personality
# High risk_tolerance + high hunt_preference = more hunting
hunt_score = self.p.hunt_preference * self.p.risk_tolerance
gather_score = self.p.gather_preference * (1.5 - self.p.risk_tolerance)
# Normalize to probability
total = hunt_score + gather_score
hunt_prob = hunt_score / total if total > 0 else 0.3
# Also prefer hunting if we have no meat
if meat_count == 0:
hunt_prob = min(0.8, hunt_prob + 0.3)
prefer_hunt = can_hunt and random.random() < hunt_prob
if prefer_hunt:
decision = get_resource_decision(
ResourceType.MEAT,
ActionType.HUNT,
@ -921,22 +1103,26 @@ class AgentAI:
if decision:
return decision
# Default: varied work based on need (with buy checks)
# Default: varied work based on need AND personality preferences
needs = []
if water_count < self.MIN_WATER_STOCK + 2:
needs.append((ResourceType.WATER, ActionType.GET_WATER, "Getting water", 2))
needs.append((ResourceType.WATER, ActionType.GET_WATER, "Getting water", 2.0))
if food_count < self.MIN_FOOD_STOCK + 2:
# Both berries and hunting are valid options
needs.append((ResourceType.BERRIES, ActionType.GATHER, "Gathering berries", 2))
# Add hunting with good weight if we have energy
# Weight by personality preferences
needs.append((ResourceType.BERRIES, ActionType.GATHER, "Gathering berries",
2.0 * self.p.gather_preference))
# Add hunting weighted by personality
hunt_config = ACTION_CONFIG[ActionType.HUNT]
if stats.energy >= abs(hunt_config.energy_cost) + 3:
needs.append((ResourceType.MEAT, ActionType.HUNT, "Hunting for meat", 2)) # Same weight as berries
hunt_weight = 2.0 * self.p.hunt_preference * self.p.risk_tolerance
needs.append((ResourceType.MEAT, ActionType.HUNT, "Hunting for meat", hunt_weight))
if wood_count < self.MIN_WOOD_STOCK + 2:
needs.append((ResourceType.WOOD, ActionType.CHOP_WOOD, "Chopping wood", 1))
needs.append((ResourceType.WOOD, ActionType.CHOP_WOOD, "Chopping wood",
1.0 * self.p.woodcut_preference))
if not needs:
# We have good reserves, maybe sell excess or rest
@ -945,26 +1131,34 @@ class AgentAI:
if decision:
return decision
# Default: maintain food supply
# Default activity based on personality
# High hunt_preference = hunt, else gather
if self.p.hunt_preference > self.p.gather_preference and stats.energy >= 10:
return AIDecision(
action=ActionType.HUNT,
target_resource=ResourceType.MEAT,
reason="Default: hunting (personality)",
)
return AIDecision(
action=ActionType.GATHER,
target_resource=ResourceType.BERRIES,
reason="Maintaining supplies",
reason="Default: gathering (personality)",
)
# For each need, check if we can buy cheaply
for resource_type, action, reason, weight in needs:
order = self.market.get_cheapest_order(resource_type)
if order and order.seller_id != self.agent.id and self.agent.money >= order.price_per_unit:
if self._is_good_buy(resource_type, order.price_per_unit):
return AIDecision(
action=ActionType.TRADE,
target_resource=resource_type,
order_id=order.id,
quantity=1,
price=order.price_per_unit,
reason=f"{reason} (buying cheap!)",
)
# For each need, check if we can buy cheaply (market_affinity affects this)
if random.random() < self.p.market_affinity:
for resource_type, action, reason, weight in needs:
order = self.market.get_cheapest_order(resource_type)
if order and order.seller_id != self.agent.id and self.agent.money >= order.price_per_unit:
if self._is_good_buy(resource_type, order.price_per_unit):
return AIDecision(
action=ActionType.TRADE,
target_resource=resource_type,
order_id=order.id,
quantity=1,
price=order.price_per_unit,
reason=f"{reason} (buying cheap!)",
)
# Weighted random selection for gathering
total_weight = sum(weight for _, _, _, weight in needs)

View File

@ -10,6 +10,7 @@ from typing import Optional
from backend.domain.agent import Agent
from backend.domain.action import ActionType, ActionResult, ACTION_CONFIG
from backend.domain.resources import Resource, ResourceType
from backend.domain.personality import get_action_skill_modifier
from backend.core.world import World, WorldConfig, TimeOfDay
from backend.core.market import OrderBook
from backend.core.ai import get_ai_decision, AIDecision
@ -59,7 +60,8 @@ class GameEngine:
self.market = OrderBook()
self.mode = SimulationMode.MANUAL
self.is_running = False
self.auto_step_interval = 1.0 # seconds
# Load auto_step_interval from config
self.auto_step_interval = get_config().auto_step_interval
self._auto_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self.turn_logs: list[TurnLog] = []
@ -86,9 +88,16 @@ class GameEngine:
self.world.initialize()
self.is_running = True
def initialize(self, num_agents: int = 8) -> None:
"""Initialize the simulation with agents."""
self.world.config.initial_agents = num_agents
def initialize(self, num_agents: Optional[int] = None) -> None:
"""Initialize the simulation with agents.
Args:
num_agents: Number of agents to spawn. If None, uses config.json value.
"""
if num_agents is not None:
self.world.config.initial_agents = num_agents
# Otherwise use the value already loaded from config.json
self.world.initialize()
# Start logging session
@ -323,7 +332,14 @@ class GameEngine:
return ActionResult(action_type=action, success=False, message="Unknown action")
def _execute_work(self, agent: Agent, action: ActionType, config) -> ActionResult:
"""Execute a work action (hunting, gathering, etc.)."""
"""Execute a work action (hunting, gathering, etc.).
Skills now affect outcomes:
- Hunting skill affects hunt success rate
- Gathering skill affects gather output
- Woodcutting skill affects wood output
- Skills improve with use
"""
# Check energy
energy_cost = abs(config.energy_cost)
if not agent.spend_energy(energy_cost):
@ -344,8 +360,19 @@ class GameEngine:
)
agent.remove_from_inventory(config.requires_resource, config.requires_quantity)
# Check success chance
if random.random() > config.success_chance:
# Get relevant skill for this action
skill_name = self._get_skill_for_action(action)
skill_value = getattr(agent.skills, skill_name, 1.0) if skill_name else 1.0
skill_modifier = get_action_skill_modifier(skill_value)
# Check success chance (modified by skill)
# Higher skill = higher effective success chance
effective_success_chance = min(0.98, config.success_chance * skill_modifier)
if random.random() > effective_success_chance:
# Record action attempt (skill still improves on failure, just less)
agent.record_action(action.value)
if skill_name:
agent.skills.improve(skill_name, 0.005) # Small improvement on failure
return ActionResult(
action_type=action,
success=False,
@ -353,11 +380,14 @@ class GameEngine:
message="Action failed",
)
# Generate output
# Generate output (modified by skill for quantity)
resources_gained = []
if config.output_resource:
quantity = random.randint(config.min_output, config.max_output)
# Skill affects output quantity
base_quantity = random.randint(config.min_output, config.max_output)
quantity = max(config.min_output, int(base_quantity * skill_modifier))
if quantity > 0:
resource = Resource(
type=config.output_resource,
@ -372,9 +402,10 @@ class GameEngine:
created_turn=self.world.current_turn,
))
# Secondary output (e.g., hide from hunting)
# Secondary output (e.g., hide from hunting) - also affected by skill
if config.secondary_output:
quantity = random.randint(config.secondary_min, config.secondary_max)
base_quantity = random.randint(config.secondary_min, config.secondary_max)
quantity = max(0, int(base_quantity * skill_modifier))
if quantity > 0:
resource = Resource(
type=config.secondary_output,
@ -389,6 +420,11 @@ class GameEngine:
created_turn=self.world.current_turn,
))
# Record action and improve skill
agent.record_action(action.value)
if skill_name:
agent.skills.improve(skill_name, 0.015) # Skill improves with successful use
# Build success message with details
gained_str = ", ".join(f"+{r.quantity} {r.type.value}" for r in resources_gained)
message = f"{action.value}: {gained_str}" if gained_str else f"{action.value} (nothing gained)"
@ -401,8 +437,21 @@ class GameEngine:
message=message,
)
def _get_skill_for_action(self, action: ActionType) -> Optional[str]:
"""Get the skill name that affects a given action."""
skill_map = {
ActionType.HUNT: "hunting",
ActionType.GATHER: "gathering",
ActionType.CHOP_WOOD: "woodcutting",
ActionType.WEAVE: "crafting",
}
return skill_map.get(action)
def _execute_trade(self, agent: Agent, decision: AIDecision) -> ActionResult:
"""Execute a trade action (buy, sell, or price adjustment). Supports multi-item trades."""
"""Execute a trade action (buy, sell, or price adjustment). Supports multi-item trades.
Trading skill improves with successful trades and affects prices slightly.
"""
config = ACTION_CONFIG[ActionType.TRADE]
# Handle price adjustments (no energy cost)
@ -445,13 +494,19 @@ class GameEngine:
)
agent.add_to_inventory(resource)
# Add money to seller
# Add money to seller and record their trade
seller = self.world.get_agent(result.seller_id)
if seller:
seller.money += result.total_paid
seller.record_trade(result.total_paid)
seller.skills.improve("trading", 0.02) # Seller skill improves
agent.spend_energy(abs(config.energy_cost))
# Record buyer's trade and improve skill
agent.record_action("trade")
agent.skills.improve("trading", 0.01) # Buyer skill improves less
return ActionResult(
action_type=ActionType.TRADE,
success=True,
@ -467,7 +522,7 @@ class GameEngine:
)
elif decision.target_resource and decision.quantity > 0:
# Selling to market
# Selling to market (listing)
if agent.has_resource(decision.target_resource, decision.quantity):
agent.remove_from_inventory(decision.target_resource, decision.quantity)
@ -480,6 +535,7 @@ class GameEngine:
)
agent.spend_energy(abs(config.energy_cost))
agent.record_action("trade") # Track listing action
return ActionResult(
action_type=ActionType.TRADE,

View File

@ -118,6 +118,12 @@ class TradeResult:
}
def _get_market_config():
"""Load market configuration from config.json."""
from backend.config import get_config
return get_config().market
@dataclass
class OrderBook:
"""Central market order book with supply/demand tracking.
@ -127,14 +133,16 @@ class OrderBook:
- Calculate supply/demand scores
- Suggest prices based on market conditions
- Allow sellers to adjust prices dynamically
Configuration is loaded from config.json.
"""
orders: list[Order] = field(default_factory=list)
trade_history: list[TradeResult] = field(default_factory=list)
price_history: dict[ResourceType, PriceHistory] = field(default_factory=dict)
# Configuration
TURNS_BEFORE_DISCOUNT: int = 3
DISCOUNT_RATE: float = 0.15 # 15% discount after waiting
# Configuration - defaults loaded from config.json in __post_init__
TURNS_BEFORE_DISCOUNT: int = 15
DISCOUNT_RATE: float = 0.12
# Supply/demand thresholds
LOW_SUPPLY_THRESHOLD: int = 3 # Less than this = scarcity
@ -142,7 +150,15 @@ class OrderBook:
DEMAND_DECAY: float = 0.95 # How fast demand score decays per turn
def __post_init__(self):
"""Initialize price history for all resource types."""
"""Initialize price history and load config values."""
# Load market config from config.json
try:
cfg = _get_market_config()
self.TURNS_BEFORE_DISCOUNT = cfg.turns_before_discount
self.DISCOUNT_RATE = cfg.discount_rate
except Exception:
pass # Use defaults if config not available
if not self.price_history:
for resource_type in ResourceType:
self.price_history[resource_type] = PriceHistory()

View File

@ -1,4 +1,9 @@
"""World container for the Village Simulation."""
"""World container for the Village Simulation.
The world spawns diverse agents with varied personality traits,
skills, and starting conditions to create emergent professions
and class inequality.
"""
import random
from dataclasses import dataclass, field
@ -6,6 +11,10 @@ from enum import Enum
from typing import Optional
from backend.domain.agent import Agent, Position, Profession
from backend.domain.personality import (
PersonalityTraits, Skills,
generate_random_personality, generate_random_skills
)
class TimeOfDay(Enum):
@ -14,20 +23,42 @@ class TimeOfDay(Enum):
NIGHT = "night"
def _get_world_config_from_file():
"""Load world configuration from config.json."""
from backend.config import get_config
return get_config().world
@dataclass
class WorldConfig:
"""Configuration for the world."""
width: int = 20
height: int = 20
initial_agents: int = 8
"""Configuration for the world.
Default values are loaded from config.json via create_world_config().
These hardcoded defaults are only fallbacks.
"""
width: int = 25
height: int = 25
initial_agents: int = 25
day_steps: int = 10
night_steps: int = 1
def create_world_config() -> WorldConfig:
"""Factory function to create WorldConfig from config.json."""
cfg = _get_world_config_from_file()
return WorldConfig(
width=cfg.width,
height=cfg.height,
initial_agents=cfg.initial_agents,
day_steps=cfg.day_steps,
night_steps=cfg.night_steps,
)
@dataclass
class World:
"""Container for all entities in the simulation."""
config: WorldConfig = field(default_factory=WorldConfig)
config: WorldConfig = field(default_factory=create_world_config)
agents: list[Agent] = field(default_factory=list)
current_turn: int = 0
current_day: int = 1
@ -43,22 +74,48 @@ class World:
name: Optional[str] = None,
profession: Optional[Profession] = None,
position: Optional[Position] = None,
archetype: Optional[str] = None,
starting_money: Optional[int] = None,
) -> Agent:
"""Spawn a new agent in the world."""
# All agents are now generic villagers - profession is not used for decisions
if profession is None:
profession = Profession.VILLAGER
"""Spawn a new agent in the world with unique personality.
Args:
name: Agent name (auto-generated if None)
profession: Deprecated, now derived from personality
position: Starting position (random if None)
archetype: Personality archetype ("hunter", "gatherer", "trader", etc.)
starting_money: Starting money (random with inequality if None)
"""
if position is None:
position = Position(
x=random.randint(0, self.config.width - 1),
y=random.randint(0, self.config.height - 1),
)
# Generate unique personality and skills
personality = generate_random_personality(archetype)
skills = generate_random_skills(personality)
# Variable starting money for class inequality
# Some agents start with more, some with less
if starting_money is None:
from backend.config import get_config
base_money = get_config().world.starting_money
# Random multiplier: 0.3x to 2.0x base money
# This creates natural class inequality
money_multiplier = random.uniform(0.3, 2.0)
# Traders start with more money (their capital)
if personality.trade_preference > 1.3:
money_multiplier *= 1.5
starting_money = int(base_money * money_multiplier)
agent = Agent(
name=name or f"Villager_{self.total_agents_spawned + 1}",
profession=profession,
profession=Profession.VILLAGER, # Will be updated based on personality
position=position,
personality=personality,
skills=skills,
money=starting_money,
)
self.agents.append(agent)
@ -106,15 +163,35 @@ class World:
return [a for a in self.agents if a.is_alive() and not a.is_corpse()]
def get_statistics(self) -> dict:
"""Get current world statistics."""
"""Get current world statistics including wealth distribution."""
living = self.get_living_agents()
total_money = sum(a.money for a in living)
# Count emergent professions (updated based on current skills)
profession_counts = {}
for agent in living:
agent._update_profession() # Update based on current state
prof = agent.profession.value
profession_counts[prof] = profession_counts.get(prof, 0) + 1
# Calculate wealth inequality metrics
if living:
moneys = sorted([a.money for a in living])
avg_money = total_money / len(living)
median_money = moneys[len(moneys) // 2]
richest = moneys[-1] if moneys else 0
poorest = moneys[0] if moneys else 0
# Gini coefficient for inequality (0 = perfect equality, 1 = max inequality)
n = len(moneys)
if n > 1 and total_money > 0:
sum_of_diffs = sum(abs(m1 - m2) for m1 in moneys for m2 in moneys)
gini = sum_of_diffs / (2 * n * total_money)
else:
gini = 0
else:
avg_money = median_money = richest = poorest = gini = 0
return {
"current_turn": self.current_turn,
"current_day": self.current_day,
@ -125,6 +202,12 @@ class World:
"total_agents_died": self.total_agents_died,
"total_money_in_circulation": total_money,
"professions": profession_counts,
# Wealth inequality metrics
"avg_money": round(avg_money, 1),
"median_money": median_money,
"richest_agent": richest,
"poorest_agent": poorest,
"gini_coefficient": round(gini, 3),
}
def get_state_snapshot(self) -> dict:
@ -140,7 +223,32 @@ class World:
}
def initialize(self) -> None:
"""Initialize the world with starting agents."""
for _ in range(self.config.initial_agents):
self.spawn_agent()
"""Initialize the world with diverse starting agents.
Creates a mix of agent archetypes to seed profession diversity:
- Some hunters (risk-takers who hunt)
- Some gatherers (cautious resource collectors)
- Some traders (market-focused wealth builders)
- Some generalists (balanced approach)
"""
n = self.config.initial_agents
# Distribute archetypes for diversity
# ~15% hunters, ~15% gatherers, ~15% traders, ~10% woodcutters, 45% random
archetypes = (
["hunter"] * max(1, n // 7) +
["gatherer"] * max(1, n // 7) +
["trader"] * max(1, n // 7) +
["woodcutter"] * max(1, n // 10)
)
# Fill remaining slots with random (no archetype)
while len(archetypes) < n:
archetypes.append(None)
# Shuffle to randomize positions
random.shuffle(archetypes)
for archetype in archetypes:
self.spawn_agent(archetype=archetype)

View File

@ -1,6 +1,8 @@
"""Agent model for the Village Simulation.
Agent stats are loaded dynamically from the global config.
Each agent now has unique personality traits and skills that create
emergent professions and behavioral diversity.
"""
import math
@ -11,6 +13,10 @@ from typing import Optional
from uuid import uuid4
from .resources import Resource, ResourceType, RESOURCE_EFFECTS
from .personality import (
PersonalityTraits, Skills, ProfessionType,
determine_profession
)
def _get_agent_stats_config():
@ -20,11 +26,12 @@ def _get_agent_stats_config():
class Profession(Enum):
"""Agent professions - kept for backwards compatibility but no longer used."""
"""Agent professions - now derived from personality and skills."""
VILLAGER = "villager"
HUNTER = "hunter"
GATHERER = "gatherer"
WOODCUTTER = "woodcutter"
TRADER = "trader"
CRAFTER = "crafter"
@ -208,15 +215,21 @@ class Agent:
"""An agent in the village simulation.
Stats, inventory slots, and starting money are loaded from config.json.
Each agent now has unique personality traits and skills that create
emergent behaviors and professions.
"""
id: str = field(default_factory=lambda: str(uuid4())[:8])
name: str = ""
profession: Profession = Profession.VILLAGER # No longer used for decision making
profession: Profession = Profession.VILLAGER # Now derived from personality/skills
position: Position = field(default_factory=Position)
stats: AgentStats = field(default_factory=create_agent_stats)
inventory: list[Resource] = field(default_factory=list)
money: int = field(default=-1) # -1 signals to use config value
# Personality and skills - create agent diversity
personality: PersonalityTraits = field(default_factory=PersonalityTraits)
skills: Skills = field(default_factory=Skills)
# Movement and action tracking
home_position: Position = field(default_factory=Position)
current_action: AgentAction = field(default_factory=AgentAction)
@ -226,6 +239,13 @@ class Agent:
death_turn: int = -1 # Turn when agent died, -1 if alive
death_reason: str = "" # Cause of death
# Statistics tracking for profession determination
actions_performed: dict = field(default_factory=lambda: {
"hunt": 0, "gather": 0, "chop_wood": 0, "trade": 0, "craft": 0
})
total_trades_completed: int = 0
total_money_earned: int = 0
# Configuration - loaded from config
INVENTORY_SLOTS: int = field(default=-1) # -1 signals to use config value
MOVE_SPEED: float = 0.8 # Grid cells per turn
@ -243,6 +263,32 @@ class Agent:
if self.INVENTORY_SLOTS == -1:
self.INVENTORY_SLOTS = config.inventory_slots
# Update profession based on personality and skills
self._update_profession()
def _update_profession(self) -> None:
"""Update profession based on personality and skills."""
prof_type = determine_profession(self.personality, self.skills)
profession_map = {
ProfessionType.HUNTER: Profession.HUNTER,
ProfessionType.GATHERER: Profession.GATHERER,
ProfessionType.WOODCUTTER: Profession.WOODCUTTER,
ProfessionType.TRADER: Profession.TRADER,
ProfessionType.GENERALIST: Profession.VILLAGER,
}
self.profession = profession_map.get(prof_type, Profession.VILLAGER)
def record_action(self, action_type: str) -> None:
"""Record an action for profession tracking."""
if action_type in self.actions_performed:
self.actions_performed[action_type] += 1
def record_trade(self, money_earned: int) -> None:
"""Record a completed trade for statistics."""
self.total_trades_completed += 1
if money_earned > 0:
self.total_money_earned += money_earned
def is_alive(self) -> bool:
"""Check if the agent is still alive."""
return (
@ -440,6 +486,9 @@ class Agent:
def to_dict(self) -> dict:
"""Convert to dictionary for API serialization."""
# Update profession before serializing
self._update_profession()
return {
"id": self.id,
"name": self.name,
@ -456,4 +505,10 @@ class Agent:
"last_action_result": self.last_action_result,
"death_turn": self.death_turn,
"death_reason": self.death_reason,
# New fields for agent diversity
"personality": self.personality.to_dict(),
"skills": self.skills.to_dict(),
"actions_performed": self.actions_performed.copy(),
"total_trades": self.total_trades_completed,
"total_money_earned": self.total_money_earned,
}

View File

@ -0,0 +1,297 @@
"""Personality and skill system for agents in the Village Simulation.
Each agent has unique personality traits that affect their behavior:
- How much they value wealth vs. survival resources
- What activities they prefer (hunting, gathering, trading)
- How willing they are to take risks
- How much they hoard vs. trade
Agents also develop skills over time based on their actions:
- Skills improve with practice
- Better skills = better outcomes
This creates emergent professions:
- Hunters: High hunting skill, prefer meat production
- Gatherers: High gathering skill, prefer berries/water/wood
- Traders: High trading skill, focus on buy low / sell high arbitrage
"""
import random
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
class ProfessionType(Enum):
"""Emergent profession types based on behavior patterns."""
HUNTER = "hunter"
GATHERER = "gatherer"
WOODCUTTER = "woodcutter"
TRADER = "trader"
GENERALIST = "generalist"
@dataclass
class PersonalityTraits:
"""Unique personality traits that affect agent behavior.
These are set at birth and don't change during the agent's life.
They create natural diversity in the population.
"""
# How much the agent values accumulating wealth (0.1 = minimal, 0.9 = greedy)
wealth_desire: float = 0.3
# How much the agent hoards resources vs trades them (0.1 = trades freely, 0.9 = hoards)
hoarding_rate: float = 0.5
# Willingness to take risks (0.1 = very cautious, 0.9 = risk-taker)
# Affects: hunting vs gathering preference, price decisions
risk_tolerance: float = 0.5
# Sensitivity to good/bad deals (0.5 = not picky, 1.5 = very price conscious)
price_sensitivity: float = 1.0
# Activity biases - how much the agent prefers each activity
# Higher values = more likely to choose this activity
# These create "profession tendencies"
hunt_preference: float = 1.0 # Preference for hunting
gather_preference: float = 1.0 # Preference for gathering
woodcut_preference: float = 1.0 # Preference for wood
trade_preference: float = 1.0 # Preference for trading/market
# How social/market-oriented the agent is
# High = frequent market visits, buys more from others
# Low = self-sufficient, prefers to produce own resources
market_affinity: float = 0.5
def to_dict(self) -> dict:
return {
"wealth_desire": round(self.wealth_desire, 2),
"hoarding_rate": round(self.hoarding_rate, 2),
"risk_tolerance": round(self.risk_tolerance, 2),
"price_sensitivity": round(self.price_sensitivity, 2),
"hunt_preference": round(self.hunt_preference, 2),
"gather_preference": round(self.gather_preference, 2),
"woodcut_preference": round(self.woodcut_preference, 2),
"trade_preference": round(self.trade_preference, 2),
"market_affinity": round(self.market_affinity, 2),
}
@dataclass
class Skills:
"""Skills that improve with practice.
Each skill affects the outcome of related actions.
Skills increase slowly through practice (use it or lose it).
"""
# Combat/hunting skill - affects hunt success rate
hunting: float = 1.0
# Foraging skill - affects gather output quantity
gathering: float = 1.0
# Woodcutting skill - affects wood output
woodcutting: float = 1.0
# Trading skill - affects prices (buy lower, sell higher)
trading: float = 1.0
# Crafting skill - affects craft quality/success
crafting: float = 1.0
# Skill improvement rate per action
IMPROVEMENT_RATE: float = 0.02
# Skill decay rate per turn (use it or lose it, gentle decay)
DECAY_RATE: float = 0.001
# Maximum skill level
MAX_SKILL: float = 2.0
# Minimum skill level
MIN_SKILL: float = 0.5
def improve(self, skill_name: str, amount: Optional[float] = None) -> None:
"""Improve a skill through practice."""
if amount is None:
amount = self.IMPROVEMENT_RATE
if hasattr(self, skill_name):
current = getattr(self, skill_name)
new_value = min(self.MAX_SKILL, current + amount)
setattr(self, skill_name, new_value)
def decay_all(self) -> None:
"""Apply gentle decay to all skills (use it or lose it)."""
for skill_name in ['hunting', 'gathering', 'woodcutting', 'trading', 'crafting']:
current = getattr(self, skill_name)
new_value = max(self.MIN_SKILL, current - self.DECAY_RATE)
setattr(self, skill_name, new_value)
def get_primary_skill(self) -> tuple[str, float]:
"""Get the agent's highest skill and its name."""
skills = {
'hunting': self.hunting,
'gathering': self.gathering,
'woodcutting': self.woodcutting,
'trading': self.trading,
'crafting': self.crafting,
}
best_skill = max(skills, key=skills.get)
return best_skill, skills[best_skill]
def to_dict(self) -> dict:
return {
"hunting": round(self.hunting, 3),
"gathering": round(self.gathering, 3),
"woodcutting": round(self.woodcutting, 3),
"trading": round(self.trading, 3),
"crafting": round(self.crafting, 3),
}
def generate_random_personality(archetype: Optional[str] = None) -> PersonalityTraits:
"""Generate random personality traits.
If archetype is specified, traits will be biased towards that profession:
- "hunter": High risk tolerance, high hunt preference
- "gatherer": Low risk tolerance, high gather preference
- "trader": High wealth desire, high market affinity, high trade preference
- "hoarder": High hoarding rate, low market affinity
- None: Fully random
Returns a PersonalityTraits instance with randomized values.
"""
# Start with base random values
traits = PersonalityTraits(
wealth_desire=random.uniform(0.1, 0.9),
hoarding_rate=random.uniform(0.2, 0.8),
risk_tolerance=random.uniform(0.2, 0.8),
price_sensitivity=random.uniform(0.6, 1.4),
hunt_preference=random.uniform(0.5, 1.5),
gather_preference=random.uniform(0.5, 1.5),
woodcut_preference=random.uniform(0.5, 1.5),
trade_preference=random.uniform(0.5, 1.5),
market_affinity=random.uniform(0.2, 0.8),
)
# Apply archetype biases
if archetype == "hunter":
traits.hunt_preference = random.uniform(1.3, 2.0)
traits.risk_tolerance = random.uniform(0.6, 0.9)
traits.gather_preference = random.uniform(0.3, 0.7)
elif archetype == "gatherer":
traits.gather_preference = random.uniform(1.3, 2.0)
traits.risk_tolerance = random.uniform(0.2, 0.5)
traits.hunt_preference = random.uniform(0.3, 0.7)
elif archetype == "trader":
traits.trade_preference = random.uniform(1.5, 2.5)
traits.market_affinity = random.uniform(0.7, 0.95)
traits.wealth_desire = random.uniform(0.6, 0.95)
traits.price_sensitivity = random.uniform(1.1, 1.6)
traits.hoarding_rate = random.uniform(0.1, 0.4) # Traders sell!
# Traders don't hunt/gather much
traits.hunt_preference = random.uniform(0.2, 0.5)
traits.gather_preference = random.uniform(0.2, 0.5)
elif archetype == "hoarder":
traits.hoarding_rate = random.uniform(0.7, 0.95)
traits.market_affinity = random.uniform(0.1, 0.4)
traits.trade_preference = random.uniform(0.3, 0.7)
elif archetype == "woodcutter":
traits.woodcut_preference = random.uniform(1.3, 2.0)
traits.gather_preference = random.uniform(0.5, 0.8)
return traits
def generate_random_skills(personality: PersonalityTraits) -> Skills:
"""Generate starting skills influenced by personality.
Agents with strong preferences start with slightly better skills
in those areas (natural talent).
"""
# Base skill level with small random variation
base = 1.0
variance = 0.15
skills = Skills(
hunting=base + random.uniform(-variance, variance) + (personality.hunt_preference - 1.0) * 0.1,
gathering=base + random.uniform(-variance, variance) + (personality.gather_preference - 1.0) * 0.1,
woodcutting=base + random.uniform(-variance, variance) + (personality.woodcut_preference - 1.0) * 0.1,
trading=base + random.uniform(-variance, variance) + (personality.trade_preference - 1.0) * 0.1,
crafting=base + random.uniform(-variance, variance),
)
# Clamp all skills to valid range
skills.hunting = max(skills.MIN_SKILL, min(skills.MAX_SKILL, skills.hunting))
skills.gathering = max(skills.MIN_SKILL, min(skills.MAX_SKILL, skills.gathering))
skills.woodcutting = max(skills.MIN_SKILL, min(skills.MAX_SKILL, skills.woodcutting))
skills.trading = max(skills.MIN_SKILL, min(skills.MAX_SKILL, skills.trading))
skills.crafting = max(skills.MIN_SKILL, min(skills.MAX_SKILL, skills.crafting))
return skills
def determine_profession(personality: PersonalityTraits, skills: Skills) -> ProfessionType:
"""Determine an agent's emergent profession based on traits and skills.
This is for display/statistics - it doesn't affect behavior directly.
The behavior is determined by the traits and skills themselves.
"""
# Calculate profession scores
scores = {
ProfessionType.HUNTER: personality.hunt_preference * skills.hunting * 1.2,
ProfessionType.GATHERER: personality.gather_preference * skills.gathering,
ProfessionType.WOODCUTTER: personality.woodcut_preference * skills.woodcutting,
ProfessionType.TRADER: personality.trade_preference * skills.trading * personality.market_affinity * 1.5,
}
# Find the best match
best_profession = max(scores, key=scores.get)
best_score = scores[best_profession]
# If no clear winner (all scores similar), they're a generalist
second_best = sorted(scores.values(), reverse=True)[1]
if best_score < second_best * 1.2:
return ProfessionType.GENERALIST
return best_profession
def get_action_skill_modifier(skill_value: float) -> float:
"""Convert skill value to action modifier.
Skill 0.5 = 0.75x effectiveness
Skill 1.0 = 1.0x effectiveness
Skill 1.5 = 1.25x effectiveness
Skill 2.0 = 1.5x effectiveness
This creates meaningful but not overpowering differences.
"""
# Linear scaling: (skill - 1.0) * 0.5 + 1.0
# So skill 0.5 -> 0.75, skill 1.0 -> 1.0, skill 2.0 -> 1.5
return max(0.5, min(1.5, 0.5 * skill_value + 0.5))
def get_trade_price_modifier(skill_value: float, is_buying: bool) -> float:
"""Get price modifier for trading based on skill.
Higher trading skill = better deals:
- When buying: lower prices (modifier < 1)
- When selling: higher prices (modifier > 1)
Skill 1.0 = no modifier
Skill 2.0 = 15% better deals
"""
modifier = (skill_value - 1.0) * 0.15
if is_buying:
return max(0.85, 1.0 - modifier) # Lower is better for buying
else:
return min(1.15, 1.0 + modifier) # Higher is better for selling

View File

@ -31,10 +31,14 @@ app.include_router(router, prefix="/api", tags=["simulation"])
@app.on_event("startup")
async def startup_event():
"""Initialize the simulation on startup."""
"""Initialize the simulation on startup with config.json values."""
from backend.config import get_config
config = get_config()
engine = get_engine()
engine.initialize(num_agents=8)
print("Village Simulation initialized with 8 agents")
# Use reset() which automatically loads config values
engine.reset()
print(f"Village Simulation initialized with {config.world.initial_agents} agents")
@app.get("/", tags=["root"])

View File

@ -5,69 +5,69 @@
"max_thirst": 100,
"max_heat": 100,
"start_energy": 50,
"start_hunger": 64,
"start_thirst": 69,
"start_hunger": 70,
"start_thirst": 75,
"start_heat": 100,
"energy_decay": 1,
"hunger_decay": 1,
"thirst_decay": 2,
"hunger_decay": 2,
"thirst_decay": 3,
"heat_decay": 3,
"critical_threshold": 0.25,
"low_energy_threshold": 15
"low_energy_threshold": 12
},
"resources": {
"meat_decay": 8,
"berries_decay": 4,
"clothes_decay": 15,
"meat_hunger": 34,
"meat_energy": 15,
"berries_hunger": 8,
"berries_thirst": 3,
"water_thirst": 56,
"fire_heat": 15
"meat_decay": 10,
"berries_decay": 6,
"clothes_decay": 20,
"meat_hunger": 35,
"meat_energy": 12,
"berries_hunger": 10,
"berries_thirst": 4,
"water_thirst": 50,
"fire_heat": 20
},
"actions": {
"sleep_energy": 60,
"rest_energy": 15,
"hunt_energy": -8,
"sleep_energy": 55,
"rest_energy": 12,
"hunt_energy": -7,
"gather_energy": -3,
"chop_wood_energy": -8,
"get_water_energy": -3,
"weave_energy": -8,
"build_fire_energy": -5,
"chop_wood_energy": -6,
"get_water_energy": -2,
"weave_energy": -6,
"build_fire_energy": -4,
"trade_energy": -1,
"hunt_success": 0.79,
"chop_wood_success": 0.95,
"hunt_meat_min": 1,
"hunt_meat_max": 4,
"hunt_success": 0.70,
"chop_wood_success": 0.90,
"hunt_meat_min": 2,
"hunt_meat_max": 5,
"hunt_hide_min": 0,
"hunt_hide_max": 1,
"gather_min": 1,
"gather_max": 3,
"hunt_hide_max": 2,
"gather_min": 2,
"gather_max": 4,
"chop_wood_min": 1,
"chop_wood_max": 2
"chop_wood_max": 3
},
"world": {
"width": 20,
"height": 20,
"initial_agents": 10,
"width": 25,
"height": 25,
"initial_agents": 25,
"day_steps": 10,
"night_steps": 1,
"inventory_slots": 10,
"starting_money": 100
"inventory_slots": 12,
"starting_money": 80
},
"market": {
"turns_before_discount": 20,
"discount_rate": 0.15,
"base_price_multiplier": 1.25
"turns_before_discount": 15,
"discount_rate": 0.12,
"base_price_multiplier": 1.3
},
"economy": {
"energy_to_money_ratio": 1.46,
"wealth_desire": 0.23,
"buy_efficiency_threshold": 0.89,
"min_wealth_target": 63,
"energy_to_money_ratio": 1.5,
"wealth_desire": 0.35,
"buy_efficiency_threshold": 0.75,
"min_wealth_target": 50,
"max_price_markup": 2.5,
"min_price_discount": 0.3
"min_price_discount": 0.4
},
"auto_step_interval": 0.2
"auto_step_interval": 0.15
}

View File

@ -8,11 +8,12 @@ from frontend.renderer.map_renderer import MapRenderer
from frontend.renderer.agent_renderer import AgentRenderer
from frontend.renderer.ui_renderer import UIRenderer
from frontend.renderer.settings_renderer import SettingsRenderer
from frontend.renderer.stats_renderer import StatsRenderer
# Window configuration
WINDOW_WIDTH = 1000
WINDOW_HEIGHT = 700
WINDOW_WIDTH = 1200
WINDOW_HEIGHT = 800
WINDOW_TITLE = "Village Economy Simulation"
FPS = 30
@ -55,11 +56,13 @@ class VillageSimulationApp:
self.agent_renderer = AgentRenderer(self.screen, self.map_renderer, self.font)
self.ui_renderer = UIRenderer(self.screen, self.font)
self.settings_renderer = SettingsRenderer(self.screen)
self.stats_renderer = StatsRenderer(self.screen)
# State
self.state: SimulationState | None = None
self.running = True
self.hovered_agent: dict | None = None
self._last_turn: int = -1 # Track turn changes for stats update
# Polling interval (ms)
self.last_poll_time = 0
@ -116,6 +119,10 @@ class VillageSimulationApp:
if event.type == pygame.QUIT:
self.running = False
# Let stats panel handle events first if visible
if self.stats_renderer.handle_event(event):
continue
# Let settings panel handle events first if visible
if self.settings_renderer.handle_event(event):
continue
@ -129,36 +136,46 @@ class VillageSimulationApp:
def _handle_keydown(self, event: pygame.event.Event) -> None:
"""Handle keyboard input."""
if event.key == pygame.K_ESCAPE:
if self.settings_renderer.visible:
if self.stats_renderer.visible:
self.stats_renderer.toggle()
elif self.settings_renderer.visible:
self.settings_renderer.toggle()
else:
self.running = False
elif event.key == pygame.K_SPACE:
# Advance one turn
if self.client.connected and not self.settings_renderer.visible:
if self.client.connected and not self.settings_renderer.visible and not self.stats_renderer.visible:
if self.client.advance_turn():
# Immediately fetch new state
self.state = self.client.get_state()
elif event.key == pygame.K_r:
# Reset simulation
if self.client.connected and not self.settings_renderer.visible:
if self.client.connected and not self.settings_renderer.visible and not self.stats_renderer.visible:
if self.client.initialize():
self.state = self.client.get_state()
self.stats_renderer.clear_history()
self._last_turn = -1
elif event.key == pygame.K_m:
# Toggle mode
if self.client.connected and self.state and not self.settings_renderer.visible:
if self.client.connected and self.state and not self.settings_renderer.visible and not self.stats_renderer.visible:
new_mode = "auto" if self.state.mode == "manual" else "manual"
if self.client.set_mode(new_mode):
self.state = self.client.get_state()
elif event.key == pygame.K_g:
# Toggle statistics/graphs panel
if not self.settings_renderer.visible:
self.stats_renderer.toggle()
elif event.key == pygame.K_s:
# Toggle settings panel
if not self.settings_renderer.visible:
self._load_config()
self.settings_renderer.toggle()
if not self.stats_renderer.visible:
if not self.settings_renderer.visible:
self._load_config()
self.settings_renderer.toggle()
def _handle_mouse_motion(self, event: pygame.event.Event) -> None:
"""Handle mouse motion for agent hover detection."""
@ -218,6 +235,11 @@ class VillageSimulationApp:
)
self.state = new_state
# Update stats history when turn changes
if new_state.turn != self._last_turn:
self.stats_renderer.update_history(new_state)
self._last_turn = new_state.turn
def draw(self) -> None:
"""Draw all elements."""
# Clear screen
@ -245,9 +267,13 @@ class VillageSimulationApp:
# Draw settings panel if visible
self.settings_renderer.draw()
# Draw settings hint
if not self.settings_renderer.visible:
hint = pygame.font.Font(None, 18).render("Press S for Settings", True, (100, 100, 120))
# Draw stats panel if visible
self.stats_renderer.draw(self.state)
# Draw hints at bottom
if not self.settings_renderer.visible and not self.stats_renderer.visible:
hint_font = pygame.font.Font(None, 18)
hint = hint_font.render("S: Settings | G: Statistics & Graphs", True, (100, 100, 120))
self.screen.blit(hint, (5, self.screen.get_height() - 20))
# Update display
@ -270,7 +296,8 @@ class VillageSimulationApp:
print(" R - Reset simulation")
print(" M - Toggle auto/manual mode")
print(" S - Open settings")
print(" ESC - Close settings / Quit")
print(" G - Open statistics & graphs")
print(" ESC - Close panel / Quit")
print()
while self.running:

View File

@ -0,0 +1,770 @@
"""Real-time statistics and charts renderer for the Village Simulation.
Uses matplotlib to render charts to pygame surfaces for a seamless visualization experience.
"""
import io
from dataclasses import dataclass, field
from collections import deque
from typing import TYPE_CHECKING, Optional
import pygame
import matplotlib
matplotlib.use('Agg') # Use non-interactive backend for pygame integration
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
from matplotlib.figure import Figure
import numpy as np
if TYPE_CHECKING:
from frontend.client import SimulationState
# Color scheme - dark cyberpunk inspired
class ChartColors:
"""Color palette for charts - dark theme with neon accents."""
BG = '#1a1d26'
PANEL = '#252a38'
GRID = '#2f3545'
TEXT = '#e0e0e8'
TEXT_DIM = '#7a7e8c'
# Neon accents for data series
CYAN = '#00d4ff'
MAGENTA = '#ff0099'
LIME = '#39ff14'
ORANGE = '#ff6600'
PURPLE = '#9d4edd'
YELLOW = '#ffcc00'
TEAL = '#00ffa3'
PINK = '#ff1493'
# Series colors for different resources/categories
SERIES = [CYAN, MAGENTA, LIME, ORANGE, PURPLE, YELLOW, TEAL, PINK]
class UIColors:
"""Color palette for pygame UI elements."""
BG = (26, 29, 38)
PANEL_BG = (37, 42, 56)
PANEL_BORDER = (70, 80, 100)
TEXT_PRIMARY = (224, 224, 232)
TEXT_SECONDARY = (122, 126, 140)
TEXT_HIGHLIGHT = (0, 212, 255)
TAB_ACTIVE = (0, 212, 255)
TAB_INACTIVE = (55, 60, 75)
TAB_HOVER = (75, 85, 110)
@dataclass
class HistoryData:
"""Stores historical simulation data for charting."""
max_history: int = 200
# Time series data
turns: deque = field(default_factory=lambda: deque(maxlen=200))
population: deque = field(default_factory=lambda: deque(maxlen=200))
deaths_cumulative: deque = field(default_factory=lambda: deque(maxlen=200))
# Money/Wealth data
total_money: deque = field(default_factory=lambda: deque(maxlen=200))
avg_wealth: deque = field(default_factory=lambda: deque(maxlen=200))
gini_coefficient: deque = field(default_factory=lambda: deque(maxlen=200))
# Price history per resource
prices: dict = field(default_factory=dict) # resource -> deque of prices
# Trade statistics
trade_volume: deque = field(default_factory=lambda: deque(maxlen=200))
# Profession counts over time
professions: dict = field(default_factory=dict) # profession -> deque of counts
def clear(self) -> None:
"""Clear all history data."""
self.turns.clear()
self.population.clear()
self.deaths_cumulative.clear()
self.total_money.clear()
self.avg_wealth.clear()
self.gini_coefficient.clear()
self.prices.clear()
self.trade_volume.clear()
self.professions.clear()
def update(self, state: "SimulationState") -> None:
"""Update history with new state data."""
turn = state.turn
# Avoid duplicate entries for the same turn
if self.turns and self.turns[-1] == turn:
return
self.turns.append(turn)
# Population
living = len([a for a in state.agents if a.get("is_alive", False)])
self.population.append(living)
self.deaths_cumulative.append(state.statistics.get("total_agents_died", 0))
# Wealth data
stats = state.statistics
self.total_money.append(stats.get("total_money_in_circulation", 0))
self.avg_wealth.append(stats.get("avg_money", 0))
self.gini_coefficient.append(stats.get("gini_coefficient", 0))
# Price history from market
for resource, data in state.market_prices.items():
if resource not in self.prices:
self.prices[resource] = deque(maxlen=self.max_history)
# Track lowest price (current market rate)
lowest = data.get("lowest_price")
avg = data.get("avg_sale_price")
# Use lowest price if available, else avg sale price
price = lowest if lowest is not None else avg
self.prices[resource].append(price)
# Trade volume (from recent trades in market orders)
trades = len(state.market_orders) # Active orders as proxy
self.trade_volume.append(trades)
# Profession distribution
professions = stats.get("professions", {})
for prof, count in professions.items():
if prof not in self.professions:
self.professions[prof] = deque(maxlen=self.max_history)
self.professions[prof].append(count)
# Pad missing professions with 0
for prof in self.professions:
if prof not in professions:
self.professions[prof].append(0)
class ChartRenderer:
"""Renders matplotlib charts to pygame surfaces."""
def __init__(self, width: int, height: int):
self.width = width
self.height = height
self.dpi = 100
# Configure matplotlib style
plt.style.use('dark_background')
plt.rcParams.update({
'figure.facecolor': ChartColors.BG,
'axes.facecolor': ChartColors.PANEL,
'axes.edgecolor': ChartColors.GRID,
'axes.labelcolor': ChartColors.TEXT,
'text.color': ChartColors.TEXT,
'xtick.color': ChartColors.TEXT_DIM,
'ytick.color': ChartColors.TEXT_DIM,
'grid.color': ChartColors.GRID,
'grid.alpha': 0.3,
'legend.facecolor': ChartColors.PANEL,
'legend.edgecolor': ChartColors.GRID,
'font.size': 9,
'axes.titlesize': 11,
'axes.titleweight': 'bold',
})
def _fig_to_surface(self, fig: Figure) -> pygame.Surface:
"""Convert a matplotlib figure to a pygame surface."""
buf = io.BytesIO()
fig.savefig(buf, format='png', dpi=self.dpi,
facecolor=ChartColors.BG, edgecolor='none',
bbox_inches='tight', pad_inches=0.1)
buf.seek(0)
surface = pygame.image.load(buf, 'png')
buf.close()
plt.close(fig)
return surface
def render_price_history(self, history: HistoryData, width: int, height: int) -> pygame.Surface:
"""Render price history chart for all resources."""
fig, ax = plt.subplots(figsize=(width/self.dpi, height/self.dpi), dpi=self.dpi)
turns = list(history.turns) if history.turns else [0]
has_data = False
for i, (resource, prices) in enumerate(history.prices.items()):
if prices and any(p is not None for p in prices):
color = ChartColors.SERIES[i % len(ChartColors.SERIES)]
# Filter out None values
valid_prices = [p if p is not None else 0 for p in prices]
# Align with turns
min_len = min(len(turns), len(valid_prices))
ax.plot(list(turns)[-min_len:], valid_prices[-min_len:],
color=color, linewidth=1.5, label=resource.capitalize(), alpha=0.9)
has_data = True
ax.set_title('Market Prices', color=ChartColors.CYAN)
ax.set_xlabel('Turn')
ax.set_ylabel('Price (coins)')
ax.grid(True, alpha=0.2)
if has_data:
ax.legend(loc='upper left', fontsize=8, framealpha=0.8)
ax.set_ylim(bottom=0)
ax.yaxis.set_major_locator(mticker.MaxNLocator(integer=True))
fig.tight_layout()
return self._fig_to_surface(fig)
def render_population(self, history: HistoryData, width: int, height: int) -> pygame.Surface:
"""Render population over time chart."""
fig, ax = plt.subplots(figsize=(width/self.dpi, height/self.dpi), dpi=self.dpi)
turns = list(history.turns) if history.turns else [0]
population = list(history.population) if history.population else [0]
deaths = list(history.deaths_cumulative) if history.deaths_cumulative else [0]
min_len = min(len(turns), len(population))
# Population line
ax.fill_between(turns[-min_len:], population[-min_len:],
alpha=0.3, color=ChartColors.CYAN)
ax.plot(turns[-min_len:], population[-min_len:],
color=ChartColors.CYAN, linewidth=2, label='Living')
# Deaths line
if deaths:
ax.plot(turns[-min_len:], deaths[-min_len:],
color=ChartColors.MAGENTA, linewidth=1.5, linestyle='--',
label='Total Deaths', alpha=0.8)
ax.set_title('Population Over Time', color=ChartColors.LIME)
ax.set_xlabel('Turn')
ax.set_ylabel('Count')
ax.grid(True, alpha=0.2)
ax.legend(loc='upper right', fontsize=8)
ax.set_ylim(bottom=0)
ax.yaxis.set_major_locator(mticker.MaxNLocator(integer=True))
fig.tight_layout()
return self._fig_to_surface(fig)
def render_wealth_distribution(self, state: "SimulationState", width: int, height: int) -> pygame.Surface:
"""Render current wealth distribution as a bar chart."""
fig, ax = plt.subplots(figsize=(width/self.dpi, height/self.dpi), dpi=self.dpi)
# Get agent wealth data
agents = [a for a in state.agents if a.get("is_alive", False)]
if not agents:
ax.text(0.5, 0.5, 'No living agents', ha='center', va='center',
color=ChartColors.TEXT_DIM, fontsize=12)
ax.set_title('Wealth Distribution', color=ChartColors.ORANGE)
fig.tight_layout()
return self._fig_to_surface(fig)
# Sort by wealth
agents_sorted = sorted(agents, key=lambda a: a.get("money", 0), reverse=True)
names = [a.get("name", "?")[:8] for a in agents_sorted]
wealth = [a.get("money", 0) for a in agents_sorted]
# Create gradient colors based on wealth ranking
colors = []
for i in range(len(agents_sorted)):
ratio = i / max(1, len(agents_sorted) - 1)
# Gradient from cyan (rich) to magenta (poor)
r = int(0 + ratio * 255)
g = int(212 - ratio * 212)
b = int(255 - ratio * 102)
colors.append(f'#{r:02x}{g:02x}{b:02x}')
bars = ax.barh(range(len(agents_sorted)), wealth, color=colors, alpha=0.85)
ax.set_yticks(range(len(agents_sorted)))
ax.set_yticklabels(names, fontsize=7)
ax.invert_yaxis() # Rich at top
# Add value labels
for bar, val in zip(bars, wealth):
ax.text(bar.get_width() + 1, bar.get_y() + bar.get_height()/2,
f'{val}', va='center', fontsize=7, color=ChartColors.TEXT_DIM)
ax.set_title('Wealth Distribution', color=ChartColors.ORANGE)
ax.set_xlabel('Coins')
ax.grid(True, alpha=0.2, axis='x')
fig.tight_layout()
return self._fig_to_surface(fig)
def render_wealth_over_time(self, history: HistoryData, width: int, height: int) -> pygame.Surface:
"""Render wealth metrics over time (total money, avg, gini)."""
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(width/self.dpi, height/self.dpi),
dpi=self.dpi, height_ratios=[2, 1])
turns = list(history.turns) if history.turns else [0]
total = list(history.total_money) if history.total_money else [0]
avg = list(history.avg_wealth) if history.avg_wealth else [0]
gini = list(history.gini_coefficient) if history.gini_coefficient else [0]
min_len = min(len(turns), len(total), len(avg))
# Total and average wealth
ax1.plot(turns[-min_len:], total[-min_len:],
color=ChartColors.CYAN, linewidth=2, label='Total Money')
ax1.fill_between(turns[-min_len:], total[-min_len:],
alpha=0.2, color=ChartColors.CYAN)
ax1_twin = ax1.twinx()
ax1_twin.plot(turns[-min_len:], avg[-min_len:],
color=ChartColors.LIME, linewidth=1.5, linestyle='--', label='Avg Wealth')
ax1_twin.set_ylabel('Avg Wealth', color=ChartColors.LIME)
ax1_twin.tick_params(axis='y', labelcolor=ChartColors.LIME)
ax1.set_title('Money in Circulation', color=ChartColors.YELLOW)
ax1.set_ylabel('Total Money', color=ChartColors.CYAN)
ax1.tick_params(axis='y', labelcolor=ChartColors.CYAN)
ax1.grid(True, alpha=0.2)
ax1.set_ylim(bottom=0)
# Gini coefficient (inequality)
min_len_gini = min(len(turns), len(gini))
ax2.fill_between(turns[-min_len_gini:], gini[-min_len_gini:],
alpha=0.4, color=ChartColors.MAGENTA)
ax2.plot(turns[-min_len_gini:], gini[-min_len_gini:],
color=ChartColors.MAGENTA, linewidth=1.5)
ax2.set_xlabel('Turn')
ax2.set_ylabel('Gini')
ax2.set_title('Inequality Index', color=ChartColors.MAGENTA, fontsize=9)
ax2.set_ylim(0, 1)
ax2.grid(True, alpha=0.2)
# Add reference lines for gini
ax2.axhline(y=0.4, color=ChartColors.YELLOW, linestyle=':', alpha=0.5, linewidth=1)
ax2.text(turns[-1] if turns else 0, 0.42, 'Moderate', fontsize=7,
color=ChartColors.YELLOW, alpha=0.7)
fig.tight_layout()
return self._fig_to_surface(fig)
def render_professions(self, state: "SimulationState", history: HistoryData,
width: int, height: int) -> pygame.Surface:
"""Render profession distribution as pie chart and area chart."""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(width/self.dpi, height/self.dpi), dpi=self.dpi)
# Current profession pie chart
professions = state.statistics.get("professions", {})
if professions:
labels = list(professions.keys())
sizes = list(professions.values())
colors = [ChartColors.SERIES[i % len(ChartColors.SERIES)] for i in range(len(labels))]
wedges, texts, autotexts = ax1.pie(
sizes, labels=labels, colors=colors, autopct='%1.0f%%',
startangle=90, pctdistance=0.75,
textprops={'fontsize': 8, 'color': ChartColors.TEXT}
)
for autotext in autotexts:
autotext.set_color(ChartColors.BG)
autotext.set_fontweight('bold')
ax1.set_title('Current Distribution', color=ChartColors.PURPLE, fontsize=10)
else:
ax1.text(0.5, 0.5, 'No data', ha='center', va='center', color=ChartColors.TEXT_DIM)
ax1.set_title('Current Distribution', color=ChartColors.PURPLE)
# Profession history as stacked area
turns = list(history.turns) if history.turns else [0]
if history.professions and turns:
profs_list = list(history.professions.keys())
data = []
for prof in profs_list:
prof_data = list(history.professions[prof])
# Pad to match turns length
while len(prof_data) < len(turns):
prof_data.insert(0, 0)
data.append(prof_data[-len(turns):])
colors = [ChartColors.SERIES[i % len(ChartColors.SERIES)] for i in range(len(profs_list))]
ax2.stackplot(turns, *data, labels=profs_list, colors=colors, alpha=0.8)
ax2.legend(loc='upper left', fontsize=7, framealpha=0.8)
ax2.set_xlabel('Turn')
ax2.set_ylabel('Count')
ax2.set_title('Over Time', color=ChartColors.PURPLE, fontsize=10)
ax2.grid(True, alpha=0.2)
fig.tight_layout()
return self._fig_to_surface(fig)
def render_market_activity(self, state: "SimulationState", history: HistoryData,
width: int, height: int) -> pygame.Surface:
"""Render market activity - orders by resource, supply/demand."""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(width/self.dpi, height/self.dpi), dpi=self.dpi)
# Current market orders by resource type
prices = state.market_prices
resources = []
quantities = []
colors = []
for i, (resource, data) in enumerate(prices.items()):
qty = data.get("total_available", 0)
if qty > 0:
resources.append(resource.capitalize())
quantities.append(qty)
colors.append(ChartColors.SERIES[i % len(ChartColors.SERIES)])
if resources:
bars = ax1.bar(resources, quantities, color=colors, alpha=0.85)
ax1.set_ylabel('Available')
for bar, val in zip(bars, quantities):
ax1.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.3,
str(val), ha='center', fontsize=8, color=ChartColors.TEXT)
else:
ax1.text(0.5, 0.5, 'No orders', ha='center', va='center', color=ChartColors.TEXT_DIM)
ax1.set_title('Market Supply', color=ChartColors.TEAL, fontsize=10)
ax1.tick_params(axis='x', rotation=45, labelsize=7)
ax1.grid(True, alpha=0.2, axis='y')
# Supply/Demand scores
resources_sd = []
supply_scores = []
demand_scores = []
for resource, data in prices.items():
resources_sd.append(resource[:6])
supply_scores.append(data.get("supply_score", 0.5))
demand_scores.append(data.get("demand_score", 0.5))
if resources_sd:
x = np.arange(len(resources_sd))
width_bar = 0.35
ax2.bar(x - width_bar/2, supply_scores, width_bar, label='Supply',
color=ChartColors.CYAN, alpha=0.8)
ax2.bar(x + width_bar/2, demand_scores, width_bar, label='Demand',
color=ChartColors.MAGENTA, alpha=0.8)
ax2.set_xticks(x)
ax2.set_xticklabels(resources_sd, fontsize=7, rotation=45)
ax2.set_ylabel('Score')
ax2.legend(fontsize=7)
ax2.set_ylim(0, 1.2)
ax2.set_title('Supply/Demand', color=ChartColors.TEAL, fontsize=10)
ax2.grid(True, alpha=0.2, axis='y')
fig.tight_layout()
return self._fig_to_surface(fig)
def render_agent_stats(self, state: "SimulationState", width: int, height: int) -> pygame.Surface:
"""Render aggregate agent statistics - energy, hunger, thirst distributions."""
fig, axes = plt.subplots(2, 2, figsize=(width/self.dpi, height/self.dpi), dpi=self.dpi)
agents = [a for a in state.agents if a.get("is_alive", False)]
if not agents:
for ax in axes.flat:
ax.text(0.5, 0.5, 'No agents', ha='center', va='center', color=ChartColors.TEXT_DIM)
fig.suptitle('Agent Statistics', color=ChartColors.CYAN)
fig.tight_layout()
return self._fig_to_surface(fig)
# Extract stats
energies = [a.get("stats", {}).get("energy", 0) for a in agents]
hungers = [a.get("stats", {}).get("hunger", 0) for a in agents]
thirsts = [a.get("stats", {}).get("thirst", 0) for a in agents]
heats = [a.get("stats", {}).get("heat", 0) for a in agents]
max_energy = agents[0].get("stats", {}).get("max_energy", 100)
max_hunger = agents[0].get("stats", {}).get("max_hunger", 100)
max_thirst = agents[0].get("stats", {}).get("max_thirst", 100)
max_heat = agents[0].get("stats", {}).get("max_heat", 100)
stats_data = [
(energies, max_energy, 'Energy', ChartColors.LIME),
(hungers, max_hunger, 'Hunger', ChartColors.ORANGE),
(thirsts, max_thirst, 'Thirst', ChartColors.CYAN),
(heats, max_heat, 'Heat', ChartColors.MAGENTA),
]
for ax, (values, max_val, name, color) in zip(axes.flat, stats_data):
# Histogram
bins = np.linspace(0, max_val, 11)
ax.hist(values, bins=bins, color=color, alpha=0.7, edgecolor=ChartColors.PANEL)
# Mean line
mean_val = np.mean(values)
ax.axvline(x=mean_val, color=ChartColors.TEXT, linestyle='--',
linewidth=1.5, label=f'Avg: {mean_val:.0f}')
# Critical threshold
critical = max_val * 0.25
ax.axvline(x=critical, color=ChartColors.MAGENTA, linestyle=':',
linewidth=1, alpha=0.7)
ax.set_title(name, color=color, fontsize=9)
ax.set_xlim(0, max_val)
ax.legend(fontsize=7, loc='upper right')
ax.grid(True, alpha=0.2)
fig.suptitle('Agent Statistics Distribution', color=ChartColors.CYAN, fontsize=11)
fig.tight_layout()
return self._fig_to_surface(fig)
class StatsRenderer:
"""Main statistics panel with tabs and charts."""
TABS = [
("Prices", "price_history"),
("Wealth", "wealth"),
("Population", "population"),
("Professions", "professions"),
("Market", "market"),
("Agent Stats", "agent_stats"),
]
def __init__(self, screen: pygame.Surface):
self.screen = screen
self.visible = False
self.font = pygame.font.Font(None, 24)
self.small_font = pygame.font.Font(None, 18)
self.title_font = pygame.font.Font(None, 32)
self.current_tab = 0
self.tab_hovered = -1
# History data
self.history = HistoryData()
# Chart renderer
self.chart_renderer: Optional[ChartRenderer] = None
# Cached chart surfaces
self._chart_cache: dict[str, pygame.Surface] = {}
self._cache_turn: int = -1
# Layout
self._calculate_layout()
def _calculate_layout(self) -> None:
"""Calculate panel layout based on screen size."""
screen_w, screen_h = self.screen.get_size()
# Panel takes most of the screen with some margin
margin = 30
self.panel_rect = pygame.Rect(
margin, margin,
screen_w - margin * 2,
screen_h - margin * 2
)
# Tab bar
self.tab_height = 40
self.tab_rect = pygame.Rect(
self.panel_rect.x,
self.panel_rect.y,
self.panel_rect.width,
self.tab_height
)
# Chart area
self.chart_rect = pygame.Rect(
self.panel_rect.x + 10,
self.panel_rect.y + self.tab_height + 10,
self.panel_rect.width - 20,
self.panel_rect.height - self.tab_height - 20
)
# Initialize chart renderer with chart area size
self.chart_renderer = ChartRenderer(
self.chart_rect.width,
self.chart_rect.height
)
# Calculate tab widths
self.tab_width = self.panel_rect.width // len(self.TABS)
def toggle(self) -> None:
"""Toggle visibility of the stats panel."""
self.visible = not self.visible
if self.visible:
self._invalidate_cache()
def update_history(self, state: "SimulationState") -> None:
"""Update history data with new state."""
if state:
self.history.update(state)
def clear_history(self) -> None:
"""Clear all history data (e.g., on simulation reset)."""
self.history.clear()
self._invalidate_cache()
def _invalidate_cache(self) -> None:
"""Invalidate chart cache to force re-render."""
self._chart_cache.clear()
self._cache_turn = -1
def handle_event(self, event: pygame.event.Event) -> bool:
"""Handle input events. Returns True if event was consumed."""
if not self.visible:
return False
if event.type == pygame.MOUSEMOTION:
self._handle_mouse_motion(event.pos)
return True
elif event.type == pygame.MOUSEBUTTONDOWN:
if self._handle_click(event.pos):
return True
# Consume clicks when visible
return True
elif event.type == pygame.KEYDOWN:
if event.key == pygame.K_ESCAPE:
self.toggle()
return True
elif event.key == pygame.K_LEFT:
self.current_tab = (self.current_tab - 1) % len(self.TABS)
self._invalidate_cache()
return True
elif event.key == pygame.K_RIGHT:
self.current_tab = (self.current_tab + 1) % len(self.TABS)
self._invalidate_cache()
return True
return False
def _handle_mouse_motion(self, pos: tuple[int, int]) -> None:
"""Handle mouse motion for tab hover effects."""
self.tab_hovered = -1
if self.tab_rect.collidepoint(pos):
rel_x = pos[0] - self.tab_rect.x
tab_idx = rel_x // self.tab_width
if 0 <= tab_idx < len(self.TABS):
self.tab_hovered = tab_idx
def _handle_click(self, pos: tuple[int, int]) -> bool:
"""Handle mouse click. Returns True if click was on a tab."""
if self.tab_rect.collidepoint(pos):
rel_x = pos[0] - self.tab_rect.x
tab_idx = rel_x // self.tab_width
if 0 <= tab_idx < len(self.TABS) and tab_idx != self.current_tab:
self.current_tab = tab_idx
self._invalidate_cache()
return True
return False
def _render_chart(self, state: "SimulationState") -> pygame.Surface:
"""Render the current tab's chart."""
tab_name, tab_key = self.TABS[self.current_tab]
# Check cache
current_turn = state.turn if state else 0
if tab_key in self._chart_cache and self._cache_turn == current_turn:
return self._chart_cache[tab_key]
# Render chart based on current tab
width = self.chart_rect.width
height = self.chart_rect.height
if tab_key == "price_history":
surface = self.chart_renderer.render_price_history(self.history, width, height)
elif tab_key == "wealth":
# Split into two charts
half_height = height // 2
dist_surface = self.chart_renderer.render_wealth_distribution(state, width, half_height)
time_surface = self.chart_renderer.render_wealth_over_time(self.history, width, half_height)
surface = pygame.Surface((width, height))
surface.fill(UIColors.BG)
surface.blit(dist_surface, (0, 0))
surface.blit(time_surface, (0, half_height))
elif tab_key == "population":
surface = self.chart_renderer.render_population(self.history, width, height)
elif tab_key == "professions":
surface = self.chart_renderer.render_professions(state, self.history, width, height)
elif tab_key == "market":
surface = self.chart_renderer.render_market_activity(state, self.history, width, height)
elif tab_key == "agent_stats":
surface = self.chart_renderer.render_agent_stats(state, width, height)
else:
# Fallback empty surface
surface = pygame.Surface((width, height))
surface.fill(UIColors.BG)
# Cache the result
self._chart_cache[tab_key] = surface
self._cache_turn = current_turn
return surface
def draw(self, state: "SimulationState") -> None:
"""Draw the statistics panel."""
if not self.visible:
return
# Dim background
overlay = pygame.Surface(self.screen.get_size(), pygame.SRCALPHA)
overlay.fill((0, 0, 0, 220))
self.screen.blit(overlay, (0, 0))
# Panel background
pygame.draw.rect(self.screen, UIColors.PANEL_BG, self.panel_rect, border_radius=12)
pygame.draw.rect(self.screen, UIColors.PANEL_BORDER, self.panel_rect, 2, border_radius=12)
# Draw tabs
self._draw_tabs()
# Draw chart
if state:
chart_surface = self._render_chart(state)
self.screen.blit(chart_surface, self.chart_rect.topleft)
# Draw close hint
hint = self.small_font.render("Press G or ESC to close | ← → to switch tabs",
True, UIColors.TEXT_SECONDARY)
hint_rect = hint.get_rect(centerx=self.panel_rect.centerx,
y=self.panel_rect.bottom - 25)
self.screen.blit(hint, hint_rect)
def _draw_tabs(self) -> None:
"""Draw the tab bar."""
for i, (tab_name, _) in enumerate(self.TABS):
tab_x = self.tab_rect.x + i * self.tab_width
tab_rect = pygame.Rect(tab_x, self.tab_rect.y, self.tab_width, self.tab_height)
# Tab background
if i == self.current_tab:
color = UIColors.TAB_ACTIVE
elif i == self.tab_hovered:
color = UIColors.TAB_HOVER
else:
color = UIColors.TAB_INACTIVE
# Draw tab with rounded top corners
tab_surface = pygame.Surface((self.tab_width, self.tab_height), pygame.SRCALPHA)
pygame.draw.rect(tab_surface, color, (0, 0, self.tab_width, self.tab_height),
border_top_left_radius=8, border_top_right_radius=8)
if i == self.current_tab:
# Active tab - solid color
tab_surface.set_alpha(255)
else:
tab_surface.set_alpha(180)
self.screen.blit(tab_surface, (tab_x, self.tab_rect.y))
# Tab text
text_color = UIColors.BG if i == self.current_tab else UIColors.TEXT_PRIMARY
text = self.small_font.render(tab_name, True, text_color)
text_rect = text.get_rect(center=tab_rect.center)
self.screen.blit(text, text_rect)
# Tab border
if i != self.current_tab:
pygame.draw.line(self.screen, UIColors.PANEL_BORDER,
(tab_x + self.tab_width - 1, self.tab_rect.y + 5),
(tab_x + self.tab_width - 1, self.tab_rect.y + self.tab_height - 5))

View File

@ -64,6 +64,13 @@ class SimulationStats:
# Time tracking
actions_by_time_of_day: dict = field(default_factory=lambda: {"day": defaultdict(int), "night": defaultdict(int)})
# Profession and wealth tracking (new)
professions_over_time: list = field(default_factory=list)
gini_over_time: list = field(default_factory=list)
richest_agent_money: list = field(default_factory=list)
poorest_agent_money: list = field(default_factory=list)
final_agent_stats: list = field(default_factory=list) # List of agent dicts at end
def run_headless_simulation(num_steps: int = 1000, num_agents: int = 10) -> tuple[str, SimulationStats]:
"""
@ -110,16 +117,42 @@ def run_headless_simulation(num_steps: int = 1000, num_agents: int = 10) -> tupl
time_of_day = engine.world.time_of_day.value
# Track living agents
living = len(engine.world.get_living_agents())
living_agents = engine.world.get_living_agents()
living = len(living_agents)
stats.living_agents_over_time.append(living)
# Track money
total_money = sum(a.money for a in engine.world.agents)
stats.money_circulation_over_time.append(total_money)
if living > 0:
# Track money and wealth inequality
if living_agents:
moneys = sorted([a.money for a in living_agents])
total_money = sum(moneys)
stats.money_circulation_over_time.append(total_money)
stats.avg_agent_money_over_time.append(total_money / living)
stats.richest_agent_money.append(moneys[-1])
stats.poorest_agent_money.append(moneys[0])
# Track Gini coefficient
n = len(moneys)
if n > 1 and total_money > 0:
sum_of_diffs = sum(abs(m1 - m2) for m1 in moneys for m2 in moneys)
gini = sum_of_diffs / (2 * n * total_money)
else:
gini = 0
stats.gini_over_time.append(gini)
# Track profession distribution
professions = {}
for agent in living_agents:
agent._update_profession()
prof = agent.profession.value
professions[prof] = professions.get(prof, 0) + 1
stats.professions_over_time.append(professions)
else:
stats.money_circulation_over_time.append(0)
stats.avg_agent_money_over_time.append(0)
stats.richest_agent_money.append(0)
stats.poorest_agent_money.append(0)
stats.gini_over_time.append(0)
stats.professions_over_time.append({})
# Process agent actions
for action_data in turn_log.agent_actions:
@ -206,6 +239,25 @@ def run_headless_simulation(num_steps: int = 1000, num_agents: int = 10) -> tupl
print("\n")
# Collect final agent statistics
for agent in engine.world.get_living_agents():
agent._update_profession()
stats.final_agent_stats.append({
"name": agent.name,
"profession": agent.profession.value,
"money": agent.money,
"trades": agent.total_trades_completed,
"money_earned": agent.total_money_earned,
"skills": agent.skills.to_dict(),
"personality": {
"wealth_desire": round(agent.personality.wealth_desire, 2),
"trade_preference": round(agent.personality.trade_preference, 2),
"hoarding_rate": round(agent.personality.hoarding_rate, 2),
"market_affinity": round(agent.personality.market_affinity, 2),
},
"actions": agent.actions_performed.copy(),
})
# Close logger
engine.logger.close()
@ -420,6 +472,69 @@ def generate_text_report(stats: SimulationStats) -> str:
for action, count in sorted(stats.actions_by_time_of_day["night"].items(), key=lambda x: -x[1])[:5]:
lines.append(f" - {action}: {count:,}")
# Profession Distribution (new)
lines.append("\n\n👤 PROFESSION DISTRIBUTION")
lines.append("-" * 40)
if stats.professions_over_time:
final_profs = stats.professions_over_time[-1]
if final_profs:
total_agents = sum(final_profs.values())
lines.append(f" {'Profession':<15} {'Count':>8} {'Percentage':>12}")
lines.append(f" {'-'*15} {'-'*8} {'-'*12}")
for prof, count in sorted(final_profs.items(), key=lambda x: -x[1]):
pct = count / total_agents * 100 if total_agents > 0 else 0
lines.append(f" {prof:<15} {count:>8} {pct:>10.1f}%")
else:
lines.append(" No agents remaining")
else:
lines.append(" No profession data")
# Wealth Inequality (new)
lines.append("\n\n💎 WEALTH INEQUALITY")
lines.append("-" * 40)
if stats.gini_over_time:
final_gini = stats.gini_over_time[-1]
avg_gini = sum(stats.gini_over_time) / len(stats.gini_over_time)
max_gini = max(stats.gini_over_time)
lines.append(f" Final Gini Coefficient: {final_gini:.3f}")
lines.append(f" Average Gini: {avg_gini:.3f}")
lines.append(f" Peak Gini: {max_gini:.3f}")
lines.append("")
lines.append(f" (0 = perfect equality, 1 = maximum inequality)")
if stats.richest_agent_money and stats.poorest_agent_money:
lines.append("")
lines.append(f" Richest agent at end: {stats.richest_agent_money[-1]}¢")
lines.append(f" Poorest agent at end: {stats.poorest_agent_money[-1]}¢")
wealth_ratio = stats.richest_agent_money[-1] / max(1, stats.poorest_agent_money[-1])
lines.append(f" Wealth ratio (rich/poor): {wealth_ratio:.1f}x")
else:
lines.append(" No wealth data")
# Top Agents by Wealth (new)
lines.append("\n\n🏆 TOP AGENTS BY WEALTH")
lines.append("-" * 40)
if stats.final_agent_stats:
sorted_agents = sorted(stats.final_agent_stats, key=lambda x: -x["money"])
lines.append(f" {'Name':<15} {'Prof':<12} {'Money':>8} {'Trades':>8}")
lines.append(f" {'-'*15} {'-'*12} {'-'*8} {'-'*8}")
for agent in sorted_agents[:10]:
lines.append(f" {agent['name']:<15} {agent['profession']:<12} {agent['money']:>7}¢ {agent['trades']:>8}")
# Skill leaders
lines.append("\n 📈 Highest Skills:")
skill_leaders = {}
for agent in stats.final_agent_stats:
for skill, value in agent["skills"].items():
if skill not in skill_leaders or value > skill_leaders[skill][1]:
skill_leaders[skill] = (agent["name"], value)
for skill, (name, value) in sorted(skill_leaders.items()):
lines.append(f" {skill}: {name} ({value:.2f})")
else:
lines.append(" No agent data")
lines.append("\n" + "=" * 70)
return "\n".join(lines)