583 lines
21 KiB
Python
583 lines
21 KiB
Python
"""World container for the Village Simulation.
|
|
|
|
The world spawns diverse agents with varied personality traits,
|
|
skills, and starting conditions to create emergent professions
|
|
and class inequality.
|
|
|
|
Now includes age-based lifecycle with birth and death by old age.
|
|
"""
|
|
|
|
import random
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
from typing import Optional
|
|
|
|
from backend.domain.agent import Agent, Position, Profession
|
|
from backend.domain.personality import (
|
|
PersonalityTraits, Skills,
|
|
generate_random_personality, generate_random_skills
|
|
)
|
|
from backend.domain.resources import ResourceType
|
|
|
|
|
|
class TimeOfDay(Enum):
|
|
"""Current time of day in the simulation."""
|
|
DAY = "day"
|
|
NIGHT = "night"
|
|
|
|
|
|
def _get_world_config_from_file():
|
|
"""Load world configuration from config.json."""
|
|
from backend.config import get_config
|
|
return get_config().world
|
|
|
|
|
|
@dataclass
|
|
class WorldConfig:
|
|
"""Configuration for the world.
|
|
|
|
Default values are loaded from config.json via create_world_config().
|
|
These hardcoded defaults are only fallbacks.
|
|
"""
|
|
width: int = 25
|
|
height: int = 25
|
|
initial_agents: int = 25
|
|
day_steps: int = 10
|
|
night_steps: int = 1
|
|
|
|
|
|
def create_world_config() -> WorldConfig:
|
|
"""Factory function to create WorldConfig from config.json."""
|
|
cfg = _get_world_config_from_file()
|
|
return WorldConfig(
|
|
width=cfg.width,
|
|
height=cfg.height,
|
|
initial_agents=cfg.initial_agents,
|
|
day_steps=cfg.day_steps,
|
|
night_steps=cfg.night_steps,
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class World:
|
|
"""Container for all entities in the simulation."""
|
|
config: WorldConfig = field(default_factory=create_world_config)
|
|
agents: list[Agent] = field(default_factory=list)
|
|
current_turn: int = 0
|
|
current_day: int = 1
|
|
step_in_day: int = 0
|
|
time_of_day: TimeOfDay = TimeOfDay.DAY
|
|
|
|
# Statistics
|
|
total_agents_spawned: int = 0
|
|
total_agents_died: int = 0
|
|
total_births: int = 0
|
|
total_deaths_by_age: int = 0
|
|
total_deaths_by_starvation: int = 0
|
|
total_deaths_by_thirst: int = 0
|
|
total_deaths_by_cold: int = 0
|
|
|
|
# Village-wide storage tracking (for storage limits)
|
|
village_storage: dict = field(default_factory=lambda: {
|
|
"meat": 0,
|
|
"berries": 0,
|
|
"water": 0,
|
|
"wood": 0,
|
|
"hide": 0,
|
|
"clothes": 0,
|
|
})
|
|
|
|
def spawn_agent(
|
|
self,
|
|
name: Optional[str] = None,
|
|
profession: Optional[Profession] = None,
|
|
position: Optional[Position] = None,
|
|
archetype: Optional[str] = None,
|
|
starting_money: Optional[int] = None,
|
|
age: Optional[int] = None,
|
|
generation: int = 0,
|
|
parent_ids: Optional[list[str]] = None,
|
|
) -> Agent:
|
|
"""Spawn a new agent in the world with unique personality.
|
|
|
|
Args:
|
|
name: Agent name (auto-generated if None)
|
|
profession: Deprecated, now derived from personality
|
|
position: Starting position (random if None)
|
|
archetype: Personality archetype ("hunter", "gatherer", "trader", etc.)
|
|
starting_money: Starting money (random with inequality if None)
|
|
age: Starting age (random within config range if None)
|
|
generation: 0 for initial spawn, 1+ for born in simulation
|
|
parent_ids: IDs of parent agents (for lineage tracking)
|
|
"""
|
|
if position is None:
|
|
position = Position(
|
|
x=random.randint(0, self.config.width - 1),
|
|
y=random.randint(0, self.config.height - 1),
|
|
)
|
|
|
|
# Get age config for age calculation
|
|
from backend.config import get_config
|
|
age_config = get_config().age
|
|
|
|
# Calculate starting age
|
|
if age is None:
|
|
age = random.randint(age_config.min_start_age, age_config.max_start_age)
|
|
|
|
# Generate unique personality and skills (skills influenced by age)
|
|
personality = generate_random_personality(archetype)
|
|
skills = generate_random_skills(personality, age=age)
|
|
|
|
# Variable starting money for class inequality
|
|
# Some agents start with more, some with less
|
|
if starting_money is None:
|
|
base_money = get_config().world.starting_money
|
|
# Random multiplier: 0.3x to 2.0x base money
|
|
# This creates natural class inequality
|
|
money_multiplier = random.uniform(0.3, 2.0)
|
|
# Traders start with more money (their capital)
|
|
if personality.trade_preference > 1.3:
|
|
money_multiplier *= 1.5
|
|
starting_money = int(base_money * money_multiplier)
|
|
|
|
agent = Agent(
|
|
name=name or f"Villager_{self.total_agents_spawned + 1}",
|
|
profession=Profession.VILLAGER, # Will be updated based on personality
|
|
position=position,
|
|
personality=personality,
|
|
skills=skills,
|
|
money=starting_money,
|
|
age=age,
|
|
birth_day=self.current_day,
|
|
generation=generation,
|
|
parent_ids=parent_ids or [],
|
|
)
|
|
|
|
self.agents.append(agent)
|
|
self.total_agents_spawned += 1
|
|
return agent
|
|
|
|
def spawn_child(self, parent: Agent) -> Optional[Agent]:
|
|
"""Spawn a new agent as a child of an existing agent.
|
|
|
|
Birth chance is controlled by village prosperity (food abundance).
|
|
Parent transfers wealth to child at birth.
|
|
|
|
Returns the new agent or None if birth conditions not met.
|
|
"""
|
|
from backend.config import get_config
|
|
age_config = get_config().age
|
|
|
|
# Check birth eligibility
|
|
if not parent.can_give_birth(self.current_day):
|
|
return None
|
|
|
|
# Calculate economy-based birth chance
|
|
# More food in village = higher birth rate
|
|
# But even in hard times, some births occur (base chance always applies)
|
|
prosperity = self.calculate_prosperity()
|
|
|
|
# Prosperity boosts birth rate: base_chance * (1 + prosperity * multiplier)
|
|
# At prosperity=0: birth_chance = base_chance
|
|
# At prosperity=1: birth_chance = base_chance * (1 + multiplier)
|
|
birth_chance = age_config.birth_base_chance * (1 + prosperity * age_config.birth_prosperity_multiplier)
|
|
birth_chance = min(0.20, birth_chance) # Cap at 20%
|
|
|
|
if random.random() > birth_chance:
|
|
return None
|
|
|
|
# Birth happens! Child spawns near parent
|
|
child_pos = Position(
|
|
x=parent.position.x + random.uniform(-1, 1),
|
|
y=parent.position.y + random.uniform(-1, 1),
|
|
)
|
|
# Clamp to world bounds
|
|
child_pos.x = max(0, min(self.config.width - 1, child_pos.x))
|
|
child_pos.y = max(0, min(self.config.height - 1, child_pos.y))
|
|
|
|
# Child inherits some personality traits from parent with mutation
|
|
child_archetype = None # Random, not determined by parent
|
|
|
|
# Wealth transfer: parent gives portion of their wealth to child
|
|
wealth_transfer = age_config.birth_wealth_transfer
|
|
child_money = int(parent.money * wealth_transfer)
|
|
parent.money -= child_money
|
|
|
|
# Ensure child has minimum viable wealth
|
|
min_child_money = int(get_config().world.starting_money * 0.3)
|
|
child_money = max(child_money, min_child_money)
|
|
|
|
# Child starts at configured age (adult)
|
|
child_age = age_config.child_start_age
|
|
|
|
child = self.spawn_agent(
|
|
name=f"Child_{self.total_agents_spawned + 1}",
|
|
position=child_pos,
|
|
archetype=child_archetype,
|
|
starting_money=child_money,
|
|
age=child_age,
|
|
generation=parent.generation + 1,
|
|
parent_ids=[parent.id],
|
|
)
|
|
|
|
# Parent also transfers some food to child
|
|
self._transfer_resources_to_child(parent, child)
|
|
|
|
# Record birth for parent
|
|
parent.record_birth(self.current_day, child.id)
|
|
|
|
self.total_births += 1
|
|
return child
|
|
|
|
def _transfer_resources_to_child(self, parent: Agent, child: Agent) -> None:
|
|
"""Transfer some resources from parent to child at birth."""
|
|
# Transfer 1 of each food type parent has (if available)
|
|
for res_type in [ResourceType.MEAT, ResourceType.BERRIES, ResourceType.WATER]:
|
|
if parent.has_resource(res_type, 1):
|
|
parent.remove_from_inventory(res_type, 1)
|
|
from backend.domain.resources import Resource
|
|
child.add_to_inventory(Resource(
|
|
type=res_type,
|
|
quantity=1,
|
|
created_turn=self.current_turn,
|
|
))
|
|
|
|
def calculate_prosperity(self) -> float:
|
|
"""Calculate village prosperity (0.0 to 1.0) based on food abundance.
|
|
|
|
Higher prosperity = more births allowed.
|
|
This creates population cycles tied to resource availability.
|
|
"""
|
|
self.update_village_storage()
|
|
from backend.config import get_config
|
|
storage_config = get_config().storage
|
|
|
|
# Calculate how full each food storage is
|
|
meat_ratio = self.village_storage.get("meat", 0) / max(1, storage_config.village_meat_limit)
|
|
berries_ratio = self.village_storage.get("berries", 0) / max(1, storage_config.village_berries_limit)
|
|
water_ratio = self.village_storage.get("water", 0) / max(1, storage_config.village_water_limit)
|
|
|
|
# Average food abundance (weighted: meat most valuable)
|
|
prosperity = (meat_ratio * 0.4 + berries_ratio * 0.3 + water_ratio * 0.3)
|
|
return min(1.0, max(0.0, prosperity))
|
|
|
|
def process_inheritance(self, dead_agent: Agent) -> dict:
|
|
"""Process inheritance when an agent dies.
|
|
|
|
Wealth and resources are distributed to living children.
|
|
If no children, wealth is distributed to random villagers (estate tax).
|
|
|
|
Returns dict with inheritance details.
|
|
"""
|
|
from backend.config import get_config
|
|
age_config = get_config().age
|
|
|
|
if not age_config.inheritance_enabled:
|
|
return {"enabled": False}
|
|
|
|
inheritance_info = {
|
|
"enabled": True,
|
|
"deceased": dead_agent.name,
|
|
"total_money": dead_agent.money,
|
|
"total_resources": sum(r.quantity for r in dead_agent.inventory),
|
|
"beneficiaries": [],
|
|
}
|
|
|
|
# Find living children
|
|
living_children = []
|
|
for child_id in dead_agent.children_ids:
|
|
child = self.get_agent(child_id)
|
|
if child and child.is_alive():
|
|
living_children.append(child)
|
|
|
|
if living_children:
|
|
# Distribute equally among children
|
|
money_per_child = dead_agent.money // len(living_children)
|
|
for child in living_children:
|
|
child.money += money_per_child
|
|
inheritance_info["beneficiaries"].append({
|
|
"name": child.name,
|
|
"money": money_per_child,
|
|
})
|
|
|
|
# Distribute resources (round-robin)
|
|
for i, resource in enumerate(dead_agent.inventory):
|
|
recipient = living_children[i % len(living_children)]
|
|
recipient.add_to_inventory(resource)
|
|
else:
|
|
# No children - distribute to random villagers (estate tax effect)
|
|
living = self.get_living_agents()
|
|
if living:
|
|
# Give money to poorest villagers
|
|
poorest = sorted(living, key=lambda a: a.money)[:3]
|
|
if poorest:
|
|
money_each = dead_agent.money // len(poorest)
|
|
for villager in poorest:
|
|
villager.money += money_each
|
|
inheritance_info["beneficiaries"].append({
|
|
"name": villager.name,
|
|
"money": money_each,
|
|
"relation": "community"
|
|
})
|
|
|
|
# Clear dead agent's inventory (already distributed or lost)
|
|
dead_agent.inventory.clear()
|
|
dead_agent.money = 0
|
|
|
|
return inheritance_info
|
|
|
|
def get_agent(self, agent_id: str) -> Optional[Agent]:
|
|
"""Get an agent by ID."""
|
|
for agent in self.agents:
|
|
if agent.id == agent_id:
|
|
return agent
|
|
return None
|
|
|
|
def remove_dead_agents(self) -> list[Agent]:
|
|
"""Remove all dead agents from the world. Returns list of removed agents.
|
|
Note: This is now handled by the engine's corpse system for visualization.
|
|
"""
|
|
dead_agents = [a for a in self.agents if not a.is_alive() and not a.is_corpse()]
|
|
# Don't actually remove here - let the engine handle corpse visualization
|
|
return dead_agents
|
|
|
|
def advance_time(self) -> bool:
|
|
"""Advance the simulation time by one step.
|
|
|
|
Returns True if a new day started (for age/birth processing).
|
|
"""
|
|
self.current_turn += 1
|
|
self.step_in_day += 1
|
|
|
|
total_steps = self.config.day_steps + self.config.night_steps
|
|
new_day = False
|
|
|
|
if self.step_in_day > total_steps:
|
|
self.step_in_day = 1
|
|
self.current_day += 1
|
|
new_day = True
|
|
|
|
# Determine time of day
|
|
if self.step_in_day <= self.config.day_steps:
|
|
self.time_of_day = TimeOfDay.DAY
|
|
else:
|
|
self.time_of_day = TimeOfDay.NIGHT
|
|
|
|
return new_day
|
|
|
|
def process_new_day(self) -> dict:
|
|
"""Process all new-day events: aging, births, sinks.
|
|
|
|
Returns a dict with events that happened.
|
|
"""
|
|
events = {
|
|
"aged_agents": [],
|
|
"births": [],
|
|
"age_deaths": [],
|
|
"storage_decay": {},
|
|
"taxes_collected": 0,
|
|
"random_events": [],
|
|
}
|
|
|
|
# Age all living agents
|
|
for agent in self.get_living_agents():
|
|
agent.age_one_day()
|
|
events["aged_agents"].append(agent.id)
|
|
|
|
# Check for births (only from living agents after aging)
|
|
for agent in self.get_living_agents():
|
|
if agent.can_give_birth(self.current_day):
|
|
child = self.spawn_child(agent)
|
|
if child:
|
|
events["births"].append({
|
|
"parent_id": agent.id,
|
|
"child_id": child.id,
|
|
"child_name": child.name,
|
|
})
|
|
|
|
return events
|
|
|
|
def update_village_storage(self) -> None:
|
|
"""Update the village-wide storage tracking."""
|
|
# Reset counts
|
|
for key in self.village_storage:
|
|
self.village_storage[key] = 0
|
|
|
|
# Count all resources in agent inventories
|
|
for agent in self.get_living_agents():
|
|
for resource in agent.inventory:
|
|
res_type = resource.type.value
|
|
if res_type in self.village_storage:
|
|
self.village_storage[res_type] += resource.quantity
|
|
|
|
def get_storage_limit(self, resource_type: str) -> int:
|
|
"""Get the storage limit for a resource type."""
|
|
from backend.config import get_config
|
|
storage_config = get_config().storage
|
|
|
|
limit_map = {
|
|
"meat": storage_config.village_meat_limit,
|
|
"berries": storage_config.village_berries_limit,
|
|
"water": storage_config.village_water_limit,
|
|
"wood": storage_config.village_wood_limit,
|
|
"hide": storage_config.village_hide_limit,
|
|
"clothes": storage_config.village_clothes_limit,
|
|
}
|
|
return limit_map.get(resource_type, 999999)
|
|
|
|
def get_storage_available(self, resource_type: str) -> int:
|
|
"""Get how much more of a resource can be stored village-wide."""
|
|
self.update_village_storage()
|
|
limit = self.get_storage_limit(resource_type)
|
|
current = self.village_storage.get(resource_type, 0)
|
|
return max(0, limit - current)
|
|
|
|
def is_storage_full(self, resource_type: str) -> bool:
|
|
"""Check if village storage for a resource type is full."""
|
|
return self.get_storage_available(resource_type) <= 0
|
|
|
|
def record_death(self, agent: Agent, reason: str) -> None:
|
|
"""Record a death and update statistics."""
|
|
self.total_agents_died += 1
|
|
if reason == "age":
|
|
self.total_deaths_by_age += 1
|
|
elif reason == "hunger":
|
|
self.total_deaths_by_starvation += 1
|
|
elif reason == "thirst":
|
|
self.total_deaths_by_thirst += 1
|
|
elif reason == "heat":
|
|
self.total_deaths_by_cold += 1
|
|
|
|
def is_night(self) -> bool:
|
|
"""Check if it's currently night."""
|
|
return self.time_of_day == TimeOfDay.NIGHT
|
|
|
|
def get_living_agents(self) -> list[Agent]:
|
|
"""Get all living agents (excludes corpses)."""
|
|
return [a for a in self.agents if a.is_alive() and not a.is_corpse()]
|
|
|
|
def get_statistics(self) -> dict:
|
|
"""Get current world statistics including wealth distribution and demographics."""
|
|
living = self.get_living_agents()
|
|
total_money = sum(a.money for a in living)
|
|
|
|
# Count emergent professions (updated based on current skills)
|
|
profession_counts = {}
|
|
for agent in living:
|
|
agent._update_profession() # Update based on current state
|
|
prof = agent.profession.value
|
|
profession_counts[prof] = profession_counts.get(prof, 0) + 1
|
|
|
|
# Age demographics
|
|
age_distribution = {"young": 0, "prime": 0, "old": 0}
|
|
ages = []
|
|
generations = {}
|
|
for agent in living:
|
|
category = agent.get_age_category()
|
|
age_distribution[category] = age_distribution.get(category, 0) + 1
|
|
ages.append(agent.age)
|
|
gen = agent.generation
|
|
generations[gen] = generations.get(gen, 0) + 1
|
|
|
|
avg_age = sum(ages) / len(ages) if ages else 0
|
|
oldest_age = max(ages) if ages else 0
|
|
youngest_age = min(ages) if ages else 0
|
|
|
|
# Calculate wealth inequality metrics
|
|
if living:
|
|
moneys = sorted([a.money for a in living])
|
|
avg_money = total_money / len(living)
|
|
median_money = moneys[len(moneys) // 2]
|
|
richest = moneys[-1] if moneys else 0
|
|
poorest = moneys[0] if moneys else 0
|
|
|
|
# Gini coefficient for inequality (0 = perfect equality, 1 = max inequality)
|
|
n = len(moneys)
|
|
if n > 1 and total_money > 0:
|
|
sum_of_diffs = sum(abs(m1 - m2) for m1 in moneys for m2 in moneys)
|
|
gini = sum_of_diffs / (2 * n * total_money)
|
|
else:
|
|
gini = 0
|
|
else:
|
|
avg_money = median_money = richest = poorest = gini = 0
|
|
|
|
# Update village storage
|
|
self.update_village_storage()
|
|
|
|
return {
|
|
"current_turn": self.current_turn,
|
|
"current_day": self.current_day,
|
|
"step_in_day": self.step_in_day,
|
|
"time_of_day": self.time_of_day.value,
|
|
"living_agents": len(living),
|
|
"total_agents_spawned": self.total_agents_spawned,
|
|
"total_agents_died": self.total_agents_died,
|
|
"total_births": self.total_births,
|
|
"total_money_in_circulation": total_money,
|
|
"professions": profession_counts,
|
|
# Wealth inequality metrics
|
|
"avg_money": round(avg_money, 1),
|
|
"median_money": median_money,
|
|
"richest_agent": richest,
|
|
"poorest_agent": poorest,
|
|
"gini_coefficient": round(gini, 3),
|
|
# Age demographics
|
|
"age_distribution": age_distribution,
|
|
"avg_age": round(avg_age, 1),
|
|
"oldest_agent": oldest_age,
|
|
"youngest_agent": youngest_age,
|
|
"generations": generations,
|
|
# Death statistics
|
|
"deaths_by_cause": {
|
|
"age": self.total_deaths_by_age,
|
|
"starvation": self.total_deaths_by_starvation,
|
|
"thirst": self.total_deaths_by_thirst,
|
|
"cold": self.total_deaths_by_cold,
|
|
},
|
|
# Village storage
|
|
"village_storage": self.village_storage.copy(),
|
|
}
|
|
|
|
def get_state_snapshot(self) -> dict:
|
|
"""Get a full snapshot of the world state for API."""
|
|
return {
|
|
"turn": self.current_turn,
|
|
"day": self.current_day,
|
|
"step_in_day": self.step_in_day,
|
|
"time_of_day": self.time_of_day.value,
|
|
"world_size": {"width": self.config.width, "height": self.config.height},
|
|
"agents": [a.to_dict() for a in self.agents],
|
|
"statistics": self.get_statistics(),
|
|
}
|
|
|
|
def initialize(self) -> None:
|
|
"""Initialize the world with diverse starting agents.
|
|
|
|
Creates a mix of agent archetypes to seed profession diversity:
|
|
- Some hunters (risk-takers who hunt)
|
|
- Some gatherers (cautious resource collectors)
|
|
- Some traders (market-focused wealth builders)
|
|
- Some generalists (balanced approach)
|
|
"""
|
|
n = self.config.initial_agents
|
|
|
|
# Distribute archetypes for diversity
|
|
# ~15% hunters, ~15% gatherers, ~15% traders, ~10% woodcutters, 45% random
|
|
archetypes = (
|
|
["hunter"] * max(1, n // 7) +
|
|
["gatherer"] * max(1, n // 7) +
|
|
["trader"] * max(1, n // 7) +
|
|
["woodcutter"] * max(1, n // 10)
|
|
)
|
|
|
|
# Fill remaining slots with random (no archetype)
|
|
while len(archetypes) < n:
|
|
archetypes.append(None)
|
|
|
|
# Shuffle to randomize positions
|
|
random.shuffle(archetypes)
|
|
|
|
for archetype in archetypes:
|
|
self.spawn_agent(archetype=archetype)
|
|
|