From 73e47121192cb6b7d8a43f79a1d13de29436dfee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A1=D0=BD=D0=B5=D1=81=D0=B0=D1=80=D0=B5=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC?= Date: Fri, 30 Jan 2026 18:28:43 +0300 Subject: [PATCH] [upd] Implement BDI reasoning system and enhance AI decision-making with persistent beliefs and desires. Introduced Redis state storage for improved simulation state management and added asynchronous logging for performance optimization. Updated configuration parameters to support new features. --- backend/config.py | 74 +++- backend/core/ai.py | 163 ++++++++- backend/core/bdi/__init__.py | 50 +++ backend/core/bdi/bdi_agent.py | 627 ++++++++++++++++++++++++++++++++++ backend/core/bdi/belief.py | 401 ++++++++++++++++++++++ backend/core/bdi/desire.py | 347 +++++++++++++++++++ backend/core/bdi/intention.py | 289 ++++++++++++++++ backend/core/logger.py | 333 +++++++++++++++--- backend/core/storage.py | 450 ++++++++++++++++++++++++ config.json | 25 +- 10 files changed, 2689 insertions(+), 70 deletions(-) create mode 100644 backend/core/bdi/__init__.py create mode 100644 backend/core/bdi/bdi_agent.py create mode 100644 backend/core/bdi/belief.py create mode 100644 backend/core/bdi/desire.py create mode 100644 backend/core/bdi/intention.py create mode 100644 backend/core/storage.py diff --git a/backend/config.py b/backend/config.py index 665f2ae..0d1024a 100644 --- a/backend/config.py +++ b/backend/config.py @@ -133,7 +133,7 @@ class EconomyConfig: @dataclass class AIConfig: - """Configuration for AI decision-making system (GOAP-based).""" + """Configuration for AI decision-making system.""" # Maximum A* iterations for GOAP planner goap_max_iterations: int = 50 @@ -143,6 +143,53 @@ class AIConfig: # Fall back to reactive planning if GOAP fails to find a plan reactive_fallback: bool = True + # Use BDI (Belief-Desire-Intention) instead of pure GOAP + # BDI adds persistent beliefs, long-term desires, and plan commitment + use_bdi: bool = False + + +@dataclass +class BDIConfig: + """Configuration for BDI (Belief-Desire-Intention) reasoning system. + + BDI extends GOAP with: + - Persistent beliefs (memory of past events) + - Long-term desires (personality-driven motivations) + - Committed intentions (plan persistence) + """ + # Timeslicing: how often agents run full deliberation + # 1 = every turn, 3 = every 3rd turn (staggered by agent ID) + thinking_interval: int = 1 + + # Maximum consecutive action failures before replanning + max_consecutive_failures: int = 2 + + # Priority multiplier needed to switch from current intention + # 1.5 = new goal must be 50% higher priority to cause a switch + priority_switch_threshold: float = 1.5 + + # Memory system settings + memory_max_events: int = 50 # Max events to remember + memory_decay_rate: float = 0.1 # How fast memories fade + + +@dataclass +class RedisConfig: + """Configuration for optional Redis state storage. + + Redis enables: + - Persistent state across restarts + - Decoupled UI polling (web clients read independently) + - Distributed access (multiple simulation instances) + """ + enabled: bool = False + host: str = "localhost" + port: int = 6379 + db: int = 0 + password: Optional[str] = None + prefix: str = "villsim:" + ttl_seconds: int = 3600 # 1 hour default TTL + @dataclass class AgeConfig: @@ -252,6 +299,7 @@ class PerformanceConfig: # Logging control logging_enabled: bool = False # Enable file logging (disable for speed) detailed_logging: bool = False # Enable verbose per-agent logging + async_logging: bool = True # Use non-blocking background logging log_flush_interval: int = 50 # Flush logs every N turns (not every turn) # Memory management @@ -260,6 +308,9 @@ class PerformanceConfig: # Statistics calculation frequency stats_update_interval: int = 10 # Update expensive stats every N turns + # State storage + state_storage_enabled: bool = True # Enable state snapshotting + @dataclass class SimulationConfig: @@ -272,6 +323,8 @@ class SimulationConfig: market: MarketConfig = field(default_factory=MarketConfig) economy: EconomyConfig = field(default_factory=EconomyConfig) ai: AIConfig = field(default_factory=AIConfig) + bdi: BDIConfig = field(default_factory=BDIConfig) + redis: RedisConfig = field(default_factory=RedisConfig) age: AgeConfig = field(default_factory=AgeConfig) storage: StorageConfig = field(default_factory=StorageConfig) sinks: SinksConfig = field(default_factory=SinksConfig) @@ -281,9 +334,10 @@ class SimulationConfig: def to_dict(self) -> dict: """Convert to dictionary.""" - return { + result = { "performance": asdict(self.performance), "ai": asdict(self.ai), + "bdi": asdict(self.bdi), "agent_stats": asdict(self.agent_stats), "resources": asdict(self.resources), "actions": asdict(self.actions), @@ -295,13 +349,24 @@ class SimulationConfig: "sinks": asdict(self.sinks), "auto_step_interval": self.auto_step_interval, } + # Handle redis separately due to Optional field + redis_dict = asdict(self.redis) + result["redis"] = redis_dict + return result @classmethod def from_dict(cls, data: dict) -> "SimulationConfig": """Create from dictionary.""" + # Handle redis config specially due to Optional password + redis_data = data.get("redis", {}) + if redis_data.get("password") is None: + redis_data["password"] = None + return cls( performance=PerformanceConfig(**data.get("performance", {})), ai=AIConfig(**data.get("ai", {})), + bdi=BDIConfig(**data.get("bdi", {})), + redis=RedisConfig(**redis_data), agent_stats=AgentStatsConfig(**data.get("agent_stats", {})), resources=ResourceConfig(**data.get("resources", {})), actions=ActionConfig(**data.get("actions", {})), @@ -407,3 +472,8 @@ def _reset_all_caches() -> None: except ImportError: pass + try: + from backend.core.storage import reset_state_store + reset_state_store() + except ImportError: + pass diff --git a/backend/core/ai.py b/backend/core/ai.py index d0deadd..08667be 100644 --- a/backend/core/ai.py +++ b/backend/core/ai.py @@ -1,13 +1,14 @@ """AI decision system for agents in the Village Simulation. -This module provides shared AI classes and utilities used by the GOAP -(Goal-Oriented Action Planning) system. +This module provides the main entry point for AI decisions, supporting +both GOAP (Goal-Oriented Action Planning) and BDI (Belief-Desire-Intention) +reasoning systems. -GOAP Benefits: -- Agents plan multi-step sequences to achieve goals -- Goals are dynamically prioritized based on state -- More emergent and adaptive behavior -- Easier to extend with new goals and actions +AI System Options: +- GOAP: Fast, reactive planning with goal prioritization +- BDI: Persistent beliefs, long-term desires, plan commitment + +Configure via config.json "ai.use_bdi" (default: false for backward compatibility) Major features: - Each agent has unique personality traits affecting all decisions @@ -51,6 +52,10 @@ class AIDecision: # For price adjustments adjust_order_id: Optional[str] = None new_price: Optional[int] = None + # GOAP/BDI fields + goal_name: str = "" + plan_length: int = 0 + bdi_info: dict = field(default_factory=dict) def to_dict(self) -> dict: return { @@ -71,6 +76,9 @@ class AIDecision: ], "adjust_order_id": self.adjust_order_id, "new_price": self.new_price, + "goal_name": self.goal_name, + "plan_length": self.plan_length, + "bdi_info": self.bdi_info, } @@ -101,6 +109,7 @@ def get_energy_cost(resource_type: ResourceType) -> int: # Cached config values to avoid repeated lookups _cached_ai_config = None _cached_economy_config = None +_cached_use_bdi = None def _get_ai_config(): @@ -121,11 +130,30 @@ def _get_economy_config(): return _cached_economy_config +def _should_use_bdi() -> bool: + """Check if BDI should be used (cached).""" + global _cached_use_bdi + if _cached_use_bdi is None: + from backend.config import get_config + config = get_config() + ai_config = getattr(config, 'ai', None) + _cached_use_bdi = getattr(ai_config, 'use_bdi', False) if ai_config else False + return _cached_use_bdi + + def reset_ai_config_cache(): """Reset the cached config values (call after config reload).""" - global _cached_ai_config, _cached_economy_config + global _cached_ai_config, _cached_economy_config, _cached_use_bdi _cached_ai_config = None _cached_economy_config = None + _cached_use_bdi = None + + # Also reset BDI state if it was being used + try: + from backend.core.bdi import reset_bdi_state + reset_bdi_state() + except ImportError: + pass def get_ai_decision( @@ -136,8 +164,10 @@ def get_ai_decision( current_turn: int = 0, is_night: bool = False, ) -> AIDecision: - """Get an AI decision for an agent using GOAP (Goal-Oriented Action Planning). - + """Get an AI decision for an agent. + + Uses either GOAP or BDI based on config setting "ai.use_bdi". + Args: agent: The agent to make a decision for market: The market order book @@ -149,8 +179,38 @@ def get_ai_decision( Returns: AIDecision with the chosen action and parameters """ + if _should_use_bdi(): + return _get_bdi_decision( + agent=agent, + market=market, + step_in_day=step_in_day, + day_steps=day_steps, + current_turn=current_turn, + is_night=is_night, + ) + else: + return _get_goap_decision( + agent=agent, + market=market, + step_in_day=step_in_day, + day_steps=day_steps, + current_turn=current_turn, + is_night=is_night, + ) + + +def _get_goap_decision( + agent: Agent, + market: "OrderBook", + step_in_day: int, + day_steps: int, + current_turn: int, + is_night: bool, +) -> AIDecision: + """Get an AI decision using GOAP (Goal-Oriented Action Planning).""" from backend.core.goap.goap_ai import get_goap_decision - return get_goap_decision( + + goap_decision = get_goap_decision( agent=agent, market=market, step_in_day=step_in_day, @@ -158,3 +218,84 @@ def get_ai_decision( current_turn=current_turn, is_night=is_night, ) + + # Convert GOAP AIDecision to our AIDecision (they should be compatible) + return AIDecision( + action=goap_decision.action, + target_resource=goap_decision.target_resource, + order_id=goap_decision.order_id, + quantity=goap_decision.quantity, + price=goap_decision.price, + reason=goap_decision.reason, + trade_items=[ + TradeItem( + order_id=t.order_id, + resource_type=t.resource_type, + quantity=t.quantity, + price_per_unit=t.price_per_unit, + ) + for t in goap_decision.trade_items + ], + adjust_order_id=goap_decision.adjust_order_id, + new_price=goap_decision.new_price, + goal_name=goap_decision.goal_name, + plan_length=goap_decision.plan_length, + ) + + +def _get_bdi_decision( + agent: Agent, + market: "OrderBook", + step_in_day: int, + day_steps: int, + current_turn: int, + is_night: bool, +) -> AIDecision: + """Get an AI decision using BDI (Belief-Desire-Intention).""" + from backend.core.bdi import get_bdi_decision + + bdi_decision = get_bdi_decision( + agent=agent, + market=market, + step_in_day=step_in_day, + day_steps=day_steps, + current_turn=current_turn, + is_night=is_night, + ) + + # Convert BDI AIDecision to our AIDecision + return AIDecision( + action=bdi_decision.action, + target_resource=bdi_decision.target_resource, + order_id=bdi_decision.order_id, + quantity=bdi_decision.quantity, + price=bdi_decision.price, + reason=bdi_decision.reason, + trade_items=[ + TradeItem( + order_id=t.order_id, + resource_type=t.resource_type, + quantity=t.quantity, + price_per_unit=t.price_per_unit, + ) + for t in bdi_decision.trade_items + ], + adjust_order_id=bdi_decision.adjust_order_id, + new_price=bdi_decision.new_price, + goal_name=bdi_decision.goal_name, + plan_length=bdi_decision.plan_length, + bdi_info=bdi_decision.bdi_info, + ) + + +def on_agent_death(agent_id: str) -> None: + """Clean up AI state when an agent dies. + + Call this from the engine when an agent is removed. + """ + if _should_use_bdi(): + try: + from backend.core.bdi import remove_agent_bdi_state + remove_agent_bdi_state(agent_id) + except ImportError: + pass diff --git a/backend/core/bdi/__init__.py b/backend/core/bdi/__init__.py new file mode 100644 index 0000000..5214d8c --- /dev/null +++ b/backend/core/bdi/__init__.py @@ -0,0 +1,50 @@ +"""BDI (Belief-Desire-Intention) module for agent AI. + +This module provides a BDI architecture that wraps the existing GOAP planner, +enabling: +- Persistent beliefs (memory of past events) +- Long-term desires (personality-driven motivations) +- Committed intentions (plan persistence) + +Main entry points: +- get_bdi_decision(): Get an AI decision using BDI reasoning +- reset_bdi_state(): Reset all agent BDI state (on simulation reset) +""" + +from backend.core.bdi.belief import BeliefBase, MemoryEvent +from backend.core.bdi.desire import Desire, DesireType, DesireManager +from backend.core.bdi.intention import ( + Intention, + IntentionManager, + CommitmentStrategy, +) +from backend.core.bdi.bdi_agent import ( + BDIAgentAI, + AIDecision, + TradeItem, + get_bdi_decision, + reset_bdi_state, + remove_agent_bdi_state, +) + +__all__ = [ + # Belief system + "BeliefBase", + "MemoryEvent", + # Desire system + "Desire", + "DesireType", + "DesireManager", + # Intention system + "Intention", + "IntentionManager", + "CommitmentStrategy", + # BDI Agent + "BDIAgentAI", + "AIDecision", + "TradeItem", + # Entry points + "get_bdi_decision", + "reset_bdi_state", + "remove_agent_bdi_state", +] diff --git a/backend/core/bdi/bdi_agent.py b/backend/core/bdi/bdi_agent.py new file mode 100644 index 0000000..df52bd1 --- /dev/null +++ b/backend/core/bdi/bdi_agent.py @@ -0,0 +1,627 @@ +"""BDI Agent AI that wraps GOAP planning with BDI reasoning. + +This module provides the main BDI-based AI decision maker that: +1. Maintains persistent beliefs about the world +2. Manages desires based on personality +3. Commits to intentions (plans) and executes them +4. Uses GOAP planning to generate action sequences + +Performance optimizations: +- Timeslicing: full BDI cycle only runs periodically +- Plan persistence: reuses plans across turns +- Cached belief updates: skips unchanged data +""" + +from dataclasses import dataclass, field +from typing import Optional, TYPE_CHECKING + +from backend.domain.action import ActionType +from backend.domain.resources import ResourceType +from backend.domain.personality import get_trade_price_modifier + +from backend.core.bdi.belief import BeliefBase +from backend.core.bdi.desire import DesireManager +from backend.core.bdi.intention import IntentionManager + +from backend.core.goap.planner import GOAPPlanner, ReactivePlanner +from backend.core.goap.goals import get_all_goals +from backend.core.goap.actions import get_all_actions + +if TYPE_CHECKING: + from backend.domain.agent import Agent + from backend.core.market import OrderBook + from backend.core.goap.goal import Goal + from backend.core.goap.action import GOAPAction + from backend.core.goap.planner import Plan + + +@dataclass +class TradeItem: + """A single item to buy/sell in a trade.""" + order_id: str + resource_type: ResourceType + quantity: int + price_per_unit: int + + +@dataclass +class AIDecision: + """A decision made by the AI for an agent.""" + action: ActionType + target_resource: Optional[ResourceType] = None + order_id: Optional[str] = None + quantity: int = 1 + price: int = 0 + reason: str = "" + trade_items: list[TradeItem] = field(default_factory=list) + adjust_order_id: Optional[str] = None + new_price: Optional[int] = None + + # GOAP/BDI-specific fields + goal_name: str = "" + plan_length: int = 0 + bdi_info: dict = field(default_factory=dict) + + def to_dict(self) -> dict: + return { + "action": self.action.value, + "target_resource": self.target_resource.value if self.target_resource else None, + "order_id": self.order_id, + "quantity": self.quantity, + "price": self.price, + "reason": self.reason, + "trade_items": [ + { + "order_id": t.order_id, + "resource_type": t.resource_type.value, + "quantity": t.quantity, + "price_per_unit": t.price_per_unit, + } + for t in self.trade_items + ], + "adjust_order_id": self.adjust_order_id, + "new_price": self.new_price, + "goal_name": self.goal_name, + "plan_length": self.plan_length, + "bdi_info": self.bdi_info, + } + + +class BDIAgentAI: + """BDI-based AI decision maker that wraps GOAP planning. + + The BDI cycle: + 1. Update beliefs from sensors (agent state, market) + 2. Update desires based on beliefs and personality + 3. Check if current intention should continue + 4. If needed, generate new plan via GOAP + 5. Execute next action from intention + + Performance features: + - Timeslicing: full deliberation only every N turns + - Plan persistence: reuse plans across turns + - Reactive fallback: simple decisions when not deliberating + """ + + # Class-level cache for planners (shared across instances) + _planner_cache: Optional[GOAPPlanner] = None + _reactive_cache: Optional[ReactivePlanner] = None + _goals_cache: Optional[list] = None + _actions_cache: Optional[list] = None + + def __init__( + self, + agent: "Agent", + market: "OrderBook", + step_in_day: int = 1, + day_steps: int = 10, + current_turn: int = 0, + is_night: bool = False, + # Persistent BDI state (passed in for continuity) + beliefs: Optional[BeliefBase] = None, + desires: Optional[DesireManager] = None, + intentions: Optional[IntentionManager] = None, + ): + self.agent = agent + self.market = market + self.step_in_day = step_in_day + self.day_steps = day_steps + self.current_turn = current_turn + self.is_night = is_night + + # Initialize or use existing BDI components + self.beliefs = beliefs or BeliefBase() + self.desires = desires or DesireManager(agent.personality) + self.intentions = intentions or IntentionManager.from_personality(agent.personality) + + # Update beliefs from current state + self.beliefs.update_from_sensors( + agent=agent, + market=market, + step_in_day=step_in_day, + day_steps=day_steps, + current_turn=current_turn, + is_night=is_night, + ) + + # Update desires from beliefs + self.desires.update_from_beliefs(self.beliefs) + + # Get cached planners and goals/actions + self.planner = self._get_planner() + self.reactive_planner = self._get_reactive_planner() + self.goals = self._get_goals() + self.actions = self._get_actions() + + # Personality shortcuts + self.p = agent.personality + self.skills = agent.skills + + @classmethod + def _get_planner(cls) -> GOAPPlanner: + """Get cached GOAP planner.""" + if cls._planner_cache is None: + from backend.config import get_config + config = get_config() + ai_config = config.ai + cls._planner_cache = GOAPPlanner( + max_iterations=ai_config.goap_max_iterations, + ) + return cls._planner_cache + + @classmethod + def _get_reactive_planner(cls) -> ReactivePlanner: + """Get cached reactive planner.""" + if cls._reactive_cache is None: + cls._reactive_cache = ReactivePlanner() + return cls._reactive_cache + + @classmethod + def _get_goals(cls) -> list: + """Get cached goals list.""" + if cls._goals_cache is None: + cls._goals_cache = get_all_goals() + return cls._goals_cache + + @classmethod + def _get_actions(cls) -> list: + """Get cached actions list.""" + if cls._actions_cache is None: + cls._actions_cache = get_all_actions() + return cls._actions_cache + + @classmethod + def reset_caches(cls) -> None: + """Reset all caches (call after config reload).""" + cls._planner_cache = None + cls._reactive_cache = None + cls._goals_cache = None + cls._actions_cache = None + + def should_deliberate(self) -> bool: + """Check if this agent should run full BDI deliberation this turn. + + Timeslicing: not every agent deliberates every turn. + Agents are staggered based on their ID hash. + """ + from backend.config import get_config + config = get_config() + + # Get thinking interval from config (default to 1 = every turn) + bdi_config = getattr(config, 'bdi', None) + thinking_interval = getattr(bdi_config, 'thinking_interval', 1) if bdi_config else 1 + + if thinking_interval <= 1: + return True # Deliberate every turn + + # Stagger agents across turns + agent_hash = hash(self.agent.id) % thinking_interval + return (self.current_turn % thinking_interval) == agent_hash + + def decide(self) -> AIDecision: + """Make a decision using BDI reasoning with GOAP planning. + + Decision flow: + 1. Night time: mandatory sleep + 2. Check if should deliberate (timeslicing) + 3. If deliberating: run full BDI cycle + 4. If not: continue current intention or reactive fallback + """ + # Night time - mandatory sleep + if self.is_night: + return AIDecision( + action=ActionType.SLEEP, + reason="Night time: sleeping", + goal_name="Sleep", + bdi_info={"mode": "night"}, + ) + + # Check if we should run full deliberation + if self.should_deliberate(): + return self._deliberate() + else: + return self._continue_or_react() + + def _deliberate(self) -> AIDecision: + """Run full BDI deliberation cycle.""" + # Filter goals by desires + filtered_goals = self.desires.filter_goals_by_desire(self.goals, self.beliefs) + + # Check if we should reconsider current intention + should_replan = self.intentions.should_reconsider( + beliefs=self.beliefs, + desire_manager=self.desires, + available_goals=filtered_goals, + ) + + if not should_replan and self.intentions.has_intention(): + # Continue with current intention + action = self.intentions.get_next_action() + if action: + return self._convert_to_decision( + goap_action=action, + goal=self.intentions.current_intention.goal, + plan=self.intentions.current_intention.plan, + mode="continue", + ) + + # Need to plan for a goal + world_state = self.beliefs.to_world_state() + + plan = self.planner.plan_for_goals( + initial_state=world_state, + goals=filtered_goals, + available_actions=self.actions, + ) + + if plan and not plan.is_empty: + # Commit to new intention + self.intentions.commit_to_plan( + goal=plan.goal, + plan=plan, + current_turn=self.current_turn, + ) + + goap_action = plan.first_action + return self._convert_to_decision( + goap_action=goap_action, + goal=plan.goal, + plan=plan, + mode="new_plan", + ) + + # Fallback to reactive planning + return self._reactive_fallback() + + def _continue_or_react(self) -> AIDecision: + """Continue current intention or use reactive fallback (no deliberation).""" + if self.intentions.has_intention(): + action = self.intentions.get_next_action() + if action: + return self._convert_to_decision( + goap_action=action, + goal=self.intentions.current_intention.goal, + plan=self.intentions.current_intention.plan, + mode="timeslice_continue", + ) + + # No intention, use reactive fallback + return self._reactive_fallback() + + def _reactive_fallback(self) -> AIDecision: + """Use reactive planning when no intention exists.""" + world_state = self.beliefs.to_world_state() + + best_action = self.reactive_planner.select_best_action( + state=world_state, + goals=self.goals, + available_actions=self.actions, + ) + + if best_action: + return self._convert_to_decision( + goap_action=best_action, + goal=None, + plan=None, + mode="reactive", + ) + + # Ultimate fallback - rest + return AIDecision( + action=ActionType.REST, + reason="No valid action found, resting", + bdi_info={"mode": "fallback"}, + ) + + def _convert_to_decision( + self, + goap_action: "GOAPAction", + goal: Optional["Goal"], + plan: Optional["Plan"], + mode: str = "deliberate", + ) -> AIDecision: + """Convert a GOAP action to an AIDecision with proper parameters.""" + action_type = goap_action.action_type + target_resource = goap_action.target_resource + + # Build reason string + if goal: + reason = f"{goal.name}: {goap_action.name}" + else: + reason = f"Reactive: {goap_action.name}" + + # BDI debug info + bdi_info = { + "mode": mode, + "dominant_desire": self.desires.dominant_desire.value if self.desires.dominant_desire else None, + "commitment": self.intentions.commitment_strategy.value, + "has_intention": self.intentions.has_intention(), + } + + # Handle different action types + if action_type == ActionType.CONSUME: + return AIDecision( + action=action_type, + target_resource=target_resource, + reason=reason, + goal_name=goal.name if goal else "", + plan_length=len(plan.actions) if plan else 0, + bdi_info=bdi_info, + ) + + elif action_type == ActionType.TRADE: + return self._create_trade_decision(goap_action, goal, plan, reason, bdi_info) + + elif action_type in [ActionType.HUNT, ActionType.GATHER, ActionType.CHOP_WOOD, + ActionType.GET_WATER, ActionType.WEAVE]: + return AIDecision( + action=action_type, + target_resource=target_resource, + reason=reason, + goal_name=goal.name if goal else "", + plan_length=len(plan.actions) if plan else 0, + bdi_info=bdi_info, + ) + + elif action_type == ActionType.BUILD_FIRE: + return AIDecision( + action=action_type, + target_resource=ResourceType.WOOD, + reason=reason, + goal_name=goal.name if goal else "", + plan_length=len(plan.actions) if plan else 0, + bdi_info=bdi_info, + ) + + elif action_type in [ActionType.REST, ActionType.SLEEP]: + return AIDecision( + action=action_type, + reason=reason, + goal_name=goal.name if goal else "", + plan_length=len(plan.actions) if plan else 0, + bdi_info=bdi_info, + ) + + # Default case + return AIDecision( + action=action_type, + target_resource=target_resource, + reason=reason, + goal_name=goal.name if goal else "", + plan_length=len(plan.actions) if plan else 0, + bdi_info=bdi_info, + ) + + def _create_trade_decision( + self, + goap_action: "GOAPAction", + goal: Optional["Goal"], + plan: Optional["Plan"], + reason: str, + bdi_info: dict, + ) -> AIDecision: + """Create a trade decision with actual market parameters.""" + target_resource = goap_action.target_resource + action_name = goap_action.name.lower() + + if "buy" in action_name: + # Find the best order to buy from + order = self.market.get_cheapest_order(target_resource) + + if order and order.seller_id != self.agent.id: + # Check trust for this seller + trust = self.beliefs.get_trade_trust(order.seller_id) + + # Skip distrusted sellers if we're picky + if trust < -0.5 and self.p.price_sensitivity > 1.2: + # Try next cheapest? For now, fall back to gathering + return self._create_gather_fallback(target_resource, reason, goal, plan, bdi_info) + + # Calculate quantity to buy + can_afford = self.agent.money // max(1, order.price_per_unit) + space = self.agent.inventory_space() + quantity = min(2, can_afford, space, order.quantity) + + if quantity > 0: + return AIDecision( + action=ActionType.TRADE, + target_resource=target_resource, + order_id=order.id, + quantity=quantity, + price=order.price_per_unit, + reason=f"{reason} @ {order.price_per_unit}c", + goal_name=goal.name if goal else "", + plan_length=len(plan.actions) if plan else 0, + bdi_info=bdi_info, + ) + + # Can't buy - fallback to gathering + return self._create_gather_fallback(target_resource, reason, goal, plan, bdi_info) + + elif "sell" in action_name: + # Create a sell order + quantity_available = self.agent.get_resource_count(target_resource) + + # Calculate minimum to keep + min_keep = self._get_min_keep(target_resource) + quantity_to_sell = min(3, quantity_available - min_keep) + + if quantity_to_sell > 0: + price = self._calculate_sell_price(target_resource) + + return AIDecision( + action=ActionType.TRADE, + target_resource=target_resource, + quantity=quantity_to_sell, + price=price, + reason=f"{reason} @ {price}c", + goal_name=goal.name if goal else "", + plan_length=len(plan.actions) if plan else 0, + bdi_info=bdi_info, + ) + + # Invalid trade action - rest + return AIDecision( + action=ActionType.REST, + reason="Trade not possible", + bdi_info=bdi_info, + ) + + def _create_gather_fallback( + self, + resource_type: ResourceType, + reason: str, + goal: Optional["Goal"], + plan: Optional["Plan"], + bdi_info: dict, + ) -> AIDecision: + """Create a gather action as fallback when buying isn't possible.""" + action_map = { + ResourceType.WATER: ActionType.GET_WATER, + ResourceType.BERRIES: ActionType.GATHER, + ResourceType.MEAT: ActionType.HUNT, + ResourceType.WOOD: ActionType.CHOP_WOOD, + } + + action = action_map.get(resource_type, ActionType.GATHER) + + return AIDecision( + action=action, + target_resource=resource_type, + reason=f"{reason} (gathering instead)", + goal_name=goal.name if goal else "", + plan_length=len(plan.actions) if plan else 0, + bdi_info=bdi_info, + ) + + def _get_min_keep(self, resource_type: ResourceType) -> int: + """Get minimum quantity to keep for survival.""" + # Adjusted by hoarding rate from desires + hoarding_mult = 0.5 + self.p.hoarding_rate + + base_min = { + ResourceType.WATER: 2, + ResourceType.MEAT: 1, + ResourceType.BERRIES: 2, + ResourceType.WOOD: 1, + ResourceType.HIDE: 0, + } + + return int(base_min.get(resource_type, 1) * hoarding_mult) + + def _calculate_sell_price(self, resource_type: ResourceType) -> int: + """Calculate sell price based on fair value and market conditions.""" + from backend.core.ai import get_energy_cost + from backend.config import get_config + + config = get_config() + economy = getattr(config, 'economy', None) + energy_to_money_ratio = getattr(economy, 'energy_to_money_ratio', 150) if economy else 150 + min_price = getattr(economy, 'min_price', 100) if economy else 100 + + energy_cost = get_energy_cost(resource_type) + fair_value = max(min_price, int(energy_cost * energy_to_money_ratio)) + + # Apply trading skill + sell_modifier = get_trade_price_modifier(self.skills.trading, is_buying=False) + + # Get market signal + signal = self.market.get_market_signal(resource_type) + + if signal == "sell": # Scarcity + price = int(fair_value * 1.3 * sell_modifier) + elif signal == "hold": + price = int(fair_value * sell_modifier) + else: # Surplus + cheapest = self.market.get_cheapest_order(resource_type) + if cheapest and cheapest.seller_id != self.agent.id: + price = max(min_price, cheapest.price_per_unit - 1) + else: + price = int(fair_value * 0.8 * sell_modifier) + + return max(min_price, price) + + def record_action_result(self, success: bool, action_type: str) -> None: + """Record the result of an action for learning and intention tracking.""" + # Update intention + self.intentions.advance_intention(success) + + # Update beliefs/memory + if success: + self.beliefs.record_successful_action(action_type) + else: + self.beliefs.record_failed_action(action_type) + + +# Persistent BDI state storage for agents +_agent_bdi_state: dict[str, tuple[BeliefBase, DesireManager, IntentionManager]] = {} + + +def get_bdi_decision( + agent: "Agent", + market: "OrderBook", + step_in_day: int = 1, + day_steps: int = 10, + current_turn: int = 0, + is_night: bool = False, +) -> AIDecision: + """Get a BDI-based AI decision for an agent. + + This is the main entry point for the BDI AI system. + It maintains persistent BDI state for each agent. + """ + # Get or create persistent BDI state + if agent.id not in _agent_bdi_state: + beliefs = BeliefBase() + desires = DesireManager(agent.personality) + intentions = IntentionManager.from_personality(agent.personality) + _agent_bdi_state[agent.id] = (beliefs, desires, intentions) + else: + beliefs, desires, intentions = _agent_bdi_state[agent.id] + + # Create AI instance with persistent state + ai = BDIAgentAI( + agent=agent, + market=market, + step_in_day=step_in_day, + day_steps=day_steps, + current_turn=current_turn, + is_night=is_night, + beliefs=beliefs, + desires=desires, + intentions=intentions, + ) + + return ai.decide() + + +def reset_bdi_state() -> None: + """Reset all BDI state (call on simulation reset).""" + global _agent_bdi_state + _agent_bdi_state.clear() + BDIAgentAI.reset_caches() + + +def remove_agent_bdi_state(agent_id: str) -> None: + """Remove BDI state for a specific agent (call on agent death).""" + _agent_bdi_state.pop(agent_id, None) diff --git a/backend/core/bdi/belief.py b/backend/core/bdi/belief.py new file mode 100644 index 0000000..695b085 --- /dev/null +++ b/backend/core/bdi/belief.py @@ -0,0 +1,401 @@ +"""Belief System for BDI agents. + +The BeliefBase maintains persistent state about the world from the agent's +perspective, including: +- Current sensory data (vitals, inventory, market) +- Memory of past events (failed trades, good hunting spots) +- Dirty flags for efficient updates + +This replaces the transient WorldState creation with a persistent belief system. +""" + +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Optional +from collections import deque + +if TYPE_CHECKING: + from backend.domain.agent import Agent + from backend.core.market import OrderBook + + +@dataclass +class MemoryEvent: + """A remembered event that may influence future decisions.""" + event_type: str # "trade_failed", "hunt_failed", "good_deal", etc. + turn: int + data: dict = field(default_factory=dict) + relevance: float = 1.0 # Decays over time + + +@dataclass +class BeliefBase: + """Persistent belief system for a BDI agent. + + Maintains both current perceptions and memories of past events. + Uses dirty flags to avoid unnecessary recomputation. + """ + + # Current perception state (cached WorldState fields) + thirst_pct: float = 1.0 + hunger_pct: float = 1.0 + heat_pct: float = 1.0 + energy_pct: float = 1.0 + + # Resource counts + water_count: int = 0 + food_count: int = 0 + meat_count: int = 0 + berries_count: int = 0 + wood_count: int = 0 + hide_count: int = 0 + + # Inventory state + has_clothes: bool = False + inventory_space: int = 0 + inventory_full: bool = False + + # Economic state + money: int = 0 + is_wealthy: bool = False + + # Market beliefs (what we believe about market availability) + can_buy_water: bool = False + can_buy_food: bool = False + can_buy_meat: bool = False + can_buy_berries: bool = False + can_buy_wood: bool = False + water_market_price: int = 0 + food_market_price: int = 0 + wood_market_price: int = 0 + + # Time beliefs + is_night: bool = False + is_evening: bool = False + step_in_day: int = 0 + day_steps: int = 10 + current_turn: int = 0 + + # Personality (cached from agent, rarely changes) + wealth_desire: float = 0.5 + hoarding_rate: float = 0.5 + risk_tolerance: float = 0.5 + market_affinity: float = 0.5 + is_trader: bool = False + gather_preference: float = 1.0 + hunt_preference: float = 1.0 + trade_preference: float = 1.0 + + # Skills + hunting_skill: float = 1.0 + gathering_skill: float = 1.0 + trading_skill: float = 1.0 + + # Thresholds (from config) + critical_threshold: float = 0.25 + low_threshold: float = 0.45 + + # Calculated urgencies + thirst_urgency: float = 0.0 + hunger_urgency: float = 0.0 + heat_urgency: float = 0.0 + energy_urgency: float = 0.0 + + # === BDI Extensions === + + # Memory system - stores past events that influence decisions + memories: deque = field(default_factory=lambda: deque(maxlen=50)) + + # Track failed actions for this resource type (turn -> count) + failed_hunts: int = 0 + failed_gathers: int = 0 + failed_trades: int = 0 + + # Track successful trades with agents (agent_id -> positive count) + trusted_traders: dict = field(default_factory=dict) + # Track failed trades with agents (agent_id -> negative count) + distrusted_traders: dict = field(default_factory=dict) + + # Dirty flags for optimization + _vitals_dirty: bool = True + _inventory_dirty: bool = True + _market_dirty: bool = True + _last_update_turn: int = -1 + + def update_from_sensors( + self, + agent: "Agent", + market: "OrderBook", + step_in_day: int = 1, + day_steps: int = 10, + current_turn: int = 0, + is_night: bool = False, + ) -> None: + """Update beliefs from current agent and market state. + + Uses dirty flags to skip unchanged data. + """ + from backend.domain.resources import ResourceType + from backend.config import get_config + + self.current_turn = current_turn + self.step_in_day = step_in_day + self.day_steps = day_steps + self.is_night = is_night + self.is_evening = step_in_day >= day_steps - 2 + + # Always update vitals (they change every turn) + stats = agent.stats + self.thirst_pct = stats.thirst / stats.MAX_THIRST + self.hunger_pct = stats.hunger / stats.MAX_HUNGER + self.heat_pct = stats.heat / stats.MAX_HEAT + self.energy_pct = stats.energy / stats.MAX_ENERGY + + # Update inventory + self.water_count = agent.get_resource_count(ResourceType.WATER) + self.meat_count = agent.get_resource_count(ResourceType.MEAT) + self.berries_count = agent.get_resource_count(ResourceType.BERRIES) + self.wood_count = agent.get_resource_count(ResourceType.WOOD) + self.hide_count = agent.get_resource_count(ResourceType.HIDE) + self.food_count = self.meat_count + self.berries_count + + self.has_clothes = agent.has_clothes() + self.inventory_space = agent.inventory_space() + self.inventory_full = agent.inventory_full() + self.money = agent.money + + # Update personality (cached, rarely changes) + self.wealth_desire = agent.personality.wealth_desire + self.hoarding_rate = agent.personality.hoarding_rate + self.risk_tolerance = agent.personality.risk_tolerance + self.market_affinity = agent.personality.market_affinity + self.gather_preference = agent.personality.gather_preference + self.hunt_preference = agent.personality.hunt_preference + self.trade_preference = agent.personality.trade_preference + + # Skills + self.hunting_skill = agent.skills.hunting + self.gathering_skill = agent.skills.gathering + self.trading_skill = agent.skills.trading + + # Market availability + def get_market_info(resource_type: ResourceType) -> tuple[bool, int]: + order = market.get_cheapest_order(resource_type) + if order and order.seller_id != agent.id and agent.money >= order.price_per_unit: + return True, order.price_per_unit + return False, 0 + + self.can_buy_water, self.water_market_price = get_market_info(ResourceType.WATER) + self.can_buy_meat, meat_price = get_market_info(ResourceType.MEAT) + self.can_buy_berries, berries_price = get_market_info(ResourceType.BERRIES) + self.can_buy_wood, self.wood_market_price = get_market_info(ResourceType.WOOD) + + self.can_buy_food = self.can_buy_meat or self.can_buy_berries + food_price = min( + meat_price if self.can_buy_meat else float('inf'), + berries_price if self.can_buy_berries else float('inf') + ) + self.food_market_price = int(food_price) if food_price != float('inf') else 0 + + # Wealth calculation + config = get_config() + economy_config = getattr(config, 'economy', None) + min_wealth_target = getattr(economy_config, 'min_wealth_target', 5000) if economy_config else 5000 + wealth_target = int(min_wealth_target * (0.5 + self.wealth_desire)) + self.is_wealthy = self.money >= wealth_target + + # Trader check + self.is_trader = self.trade_preference > 1.3 and self.market_affinity > 0.5 + + # Config thresholds + agent_config = config.agent_stats + self.critical_threshold = agent_config.critical_threshold + self.low_threshold = 0.45 + + # Calculate urgencies + self._calculate_urgencies() + + # Decay old memories + self._decay_memories() + + self._last_update_turn = current_turn + + def _calculate_urgencies(self) -> None: + """Calculate urgency values for each vital stat.""" + def calc_urgency(pct: float, critical: float, low: float) -> float: + if pct >= low: + return 0.0 + elif pct >= critical: + return 1.0 - (pct - critical) / (low - critical) + else: + return 1.0 + (critical - pct) / critical * 2.0 + + self.thirst_urgency = calc_urgency(self.thirst_pct, self.critical_threshold, self.low_threshold) + self.hunger_urgency = calc_urgency(self.hunger_pct, self.critical_threshold, self.low_threshold) + self.heat_urgency = calc_urgency(self.heat_pct, self.critical_threshold, self.low_threshold) + + if self.energy_pct < 0.25: + self.energy_urgency = 2.0 + elif self.energy_pct < 0.40: + self.energy_urgency = 1.0 + else: + self.energy_urgency = 0.0 + + def _decay_memories(self) -> None: + """Decay memory relevance over time.""" + for memory in self.memories: + age = self.current_turn - memory.turn + # Memories decay exponentially with age + memory.relevance = max(0.1, 1.0 / (1.0 + age * 0.1)) + + def add_memory(self, event_type: str, data: dict = None) -> None: + """Add a new memory event.""" + self.memories.append(MemoryEvent( + event_type=event_type, + turn=self.current_turn, + data=data or {}, + relevance=1.0, + )) + + def record_failed_action(self, action_type: str) -> None: + """Record a failed action for learning.""" + if action_type == "hunt": + self.failed_hunts += 1 + self.add_memory("hunt_failed") + elif action_type == "gather": + self.failed_gathers += 1 + self.add_memory("gather_failed") + elif action_type == "trade": + self.failed_trades += 1 + self.add_memory("trade_failed") + + def record_successful_action(self, action_type: str) -> None: + """Record a successful action, reducing failure counts.""" + if action_type == "hunt": + self.failed_hunts = max(0, self.failed_hunts - 1) + elif action_type == "gather": + self.failed_gathers = max(0, self.failed_gathers - 1) + elif action_type == "trade": + self.failed_trades = max(0, self.failed_trades - 1) + + def record_trade_partner(self, partner_id: str, success: bool) -> None: + """Track trade relationship with another agent.""" + if success: + self.trusted_traders[partner_id] = self.trusted_traders.get(partner_id, 0) + 1 + # Reduce distrust if they've been bad before + if partner_id in self.distrusted_traders: + self.distrusted_traders[partner_id] = max(0, self.distrusted_traders[partner_id] - 1) + else: + self.distrusted_traders[partner_id] = self.distrusted_traders.get(partner_id, 0) + 1 + + def get_trade_trust(self, partner_id: str) -> float: + """Get trust level for a trade partner (-1 to 1).""" + trust = self.trusted_traders.get(partner_id, 0) + distrust = self.distrusted_traders.get(partner_id, 0) + total = trust + distrust + if total == 0: + return 0.0 # Unknown partner + return (trust - distrust) / total + + def has_critical_need(self) -> bool: + """Check if any vital stat is critical (requires immediate attention).""" + return ( + self.thirst_urgency >= 2.0 or + self.hunger_urgency >= 2.0 or + self.heat_urgency >= 2.0 or + self.energy_urgency >= 2.0 + ) + + def get_most_urgent_need(self) -> Optional[str]: + """Get the most urgent vital need, if any.""" + urgencies = { + "thirst": self.thirst_urgency, + "hunger": self.hunger_urgency, + "heat": self.heat_urgency, + "energy": self.energy_urgency, + } + max_urgency = max(urgencies.values()) + if max_urgency < 0.5: + return None + return max(urgencies, key=urgencies.get) + + def to_world_state(self): + """Convert beliefs to a WorldState for GOAP planner compatibility.""" + from backend.core.goap.world_state import WorldState + + return WorldState( + thirst_pct=self.thirst_pct, + hunger_pct=self.hunger_pct, + heat_pct=self.heat_pct, + energy_pct=self.energy_pct, + water_count=self.water_count, + food_count=self.food_count, + meat_count=self.meat_count, + berries_count=self.berries_count, + wood_count=self.wood_count, + hide_count=self.hide_count, + has_clothes=self.has_clothes, + inventory_space=self.inventory_space, + inventory_full=self.inventory_full, + money=self.money, + is_wealthy=self.is_wealthy, + can_buy_water=self.can_buy_water, + can_buy_food=self.can_buy_food, + can_buy_meat=self.can_buy_meat, + can_buy_berries=self.can_buy_berries, + can_buy_wood=self.can_buy_wood, + water_market_price=self.water_market_price, + food_market_price=self.food_market_price, + wood_market_price=self.wood_market_price, + is_night=self.is_night, + is_evening=self.is_evening, + step_in_day=self.step_in_day, + day_steps=self.day_steps, + wealth_desire=self.wealth_desire, + hoarding_rate=self.hoarding_rate, + risk_tolerance=self.risk_tolerance, + market_affinity=self.market_affinity, + is_trader=self.is_trader, + gather_preference=self.gather_preference, + hunt_preference=self.hunt_preference, + trade_preference=self.trade_preference, + hunting_skill=self.hunting_skill, + gathering_skill=self.gathering_skill, + trading_skill=self.trading_skill, + critical_threshold=self.critical_threshold, + low_threshold=self.low_threshold, + ) + + def to_dict(self) -> dict: + """Convert to dictionary for debugging/logging.""" + return { + "vitals": { + "thirst": round(self.thirst_pct, 2), + "hunger": round(self.hunger_pct, 2), + "heat": round(self.heat_pct, 2), + "energy": round(self.energy_pct, 2), + }, + "urgencies": { + "thirst": round(self.thirst_urgency, 2), + "hunger": round(self.hunger_urgency, 2), + "heat": round(self.heat_urgency, 2), + "energy": round(self.energy_urgency, 2), + }, + "inventory": { + "water": self.water_count, + "meat": self.meat_count, + "berries": self.berries_count, + "wood": self.wood_count, + "hide": self.hide_count, + "space": self.inventory_space, + }, + "economy": { + "money": self.money, + "is_wealthy": self.is_wealthy, + }, + "memory": { + "failed_hunts": self.failed_hunts, + "failed_gathers": self.failed_gathers, + "failed_trades": self.failed_trades, + "memory_count": len(self.memories), + }, + } diff --git a/backend/core/bdi/desire.py b/backend/core/bdi/desire.py new file mode 100644 index 0000000..6175f30 --- /dev/null +++ b/backend/core/bdi/desire.py @@ -0,0 +1,347 @@ +"""Desire System for BDI agents. + +Desires represent high-level, long-term motivations that persist across turns. +Unlike GOAP goals (which are immediate targets), desires are personality-driven +and influence which goals get activated. + +Key concepts: +- Desires are weighted by personality traits +- Desires can be satisfied temporarily but return +- Desires influence goal selection, not direct actions +""" + +from dataclasses import dataclass +from enum import Enum +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from backend.core.bdi.belief import BeliefBase + from backend.domain.personality import PersonalityTraits + from backend.core.goap.goal import Goal + + +class DesireType(Enum): + """Types of high-level desires.""" + SURVIVAL = "survival" # Stay alive (thirst, hunger, heat, energy) + ACCUMULATE_WEALTH = "wealth" # Build up money reserves + STOCK_RESOURCES = "stock" # Hoard resources for security + MASTER_PROFESSION = "mastery" # Improve skills in preferred activity + SOCIAL_STANDING = "social" # Gain reputation through trade + COMFORT = "comfort" # Maintain clothes, warmth, rest + + +@dataclass +class Desire: + """A high-level motivation that influences goal selection. + + Desires have: + - A base intensity (how strongly the agent wants this) + - A satisfaction level (0-1, how fulfilled is this desire currently) + - A personality weight (how much this agent's personality cares) + """ + desire_type: DesireType + base_intensity: float = 1.0 # Base importance (0-2) + satisfaction: float = 0.5 # Current fulfillment (0-1) + personality_weight: float = 1.0 # Multiplier from personality + + # Persistence tracking + turns_pursued: int = 0 # How long we've been chasing this + turns_since_progress: int = 0 # Turns since we made progress + + @property + def effective_intensity(self) -> float: + """Calculate current desire intensity considering satisfaction.""" + # Desire is stronger when unsatisfied + unsatisfied = 1.0 - self.satisfaction + # Apply personality weight + intensity = self.base_intensity * self.personality_weight * (0.5 + unsatisfied) + # Reduce intensity if pursued too long without progress (boredom) + if self.turns_since_progress > 10: + boredom_factor = max(0.3, 1.0 - (self.turns_since_progress - 10) * 0.05) + intensity *= boredom_factor + return intensity + + def update_satisfaction(self, new_satisfaction: float) -> None: + """Update satisfaction level and track progress.""" + if new_satisfaction > self.satisfaction: + self.turns_since_progress = 0 # Made progress! + else: + self.turns_since_progress += 1 + self.satisfaction = max(0.0, min(1.0, new_satisfaction)) + self.turns_pursued += 1 + + def reset_pursuit(self) -> None: + """Reset pursuit tracking (when switching to different desire).""" + self.turns_pursued = 0 + self.turns_since_progress = 0 + + +class DesireManager: + """Manages an agent's desires and converts them to active goals. + + The DesireManager: + 1. Maintains a set of desires weighted by personality + 2. Updates desire satisfaction based on beliefs + 3. Selects which desires should drive current goals + 4. Filters/prioritizes GOAP goals based on active desires + """ + + def __init__(self, personality: "PersonalityTraits"): + """Initialize desires based on personality.""" + self.desires: dict[DesireType, Desire] = {} + self._init_desires(personality) + + # Track which desire is currently dominant + self.dominant_desire: Optional[DesireType] = None + self.stubbornness: float = 0.5 # How reluctant to switch desires + + def _init_desires(self, personality: "PersonalityTraits") -> None: + """Initialize desires with personality-based weights.""" + # Survival - everyone has this, but risk-averse agents weight it higher + self.desires[DesireType.SURVIVAL] = Desire( + desire_type=DesireType.SURVIVAL, + base_intensity=2.0, # Always high base + personality_weight=1.5 - personality.risk_tolerance * 0.5, + ) + + # Accumulate Wealth - driven by wealth_desire + self.desires[DesireType.ACCUMULATE_WEALTH] = Desire( + desire_type=DesireType.ACCUMULATE_WEALTH, + base_intensity=1.0, + personality_weight=0.5 + personality.wealth_desire, + ) + + # Stock Resources - driven by hoarding_rate + self.desires[DesireType.STOCK_RESOURCES] = Desire( + desire_type=DesireType.STOCK_RESOURCES, + base_intensity=0.8, + personality_weight=0.5 + personality.hoarding_rate, + ) + + # Master Profession - driven by strongest preference + max_pref = max( + personality.hunt_preference, + personality.gather_preference, + personality.trade_preference, + ) + self.desires[DesireType.MASTER_PROFESSION] = Desire( + desire_type=DesireType.MASTER_PROFESSION, + base_intensity=0.6, + personality_weight=max_pref * 0.5, + ) + + # Social Standing - driven by market_affinity and trade_preference + self.desires[DesireType.SOCIAL_STANDING] = Desire( + desire_type=DesireType.SOCIAL_STANDING, + base_intensity=0.5, + personality_weight=personality.market_affinity * personality.trade_preference * 0.5, + ) + + # Comfort - moderate for everyone + self.desires[DesireType.COMFORT] = Desire( + desire_type=DesireType.COMFORT, + base_intensity=0.7, + personality_weight=1.0 - personality.risk_tolerance * 0.3, + ) + + # Calculate stubbornness from personality + # High hoarding + low risk tolerance = stubborn + self.stubbornness = (personality.hoarding_rate + (1.0 - personality.risk_tolerance)) / 2 + + def update_from_beliefs(self, beliefs: "BeliefBase") -> None: + """Update desire satisfaction based on current beliefs.""" + # Survival satisfaction based on vitals + min_vital = min( + beliefs.thirst_pct, + beliefs.hunger_pct, + beliefs.heat_pct, + beliefs.energy_pct, + ) + self.desires[DesireType.SURVIVAL].update_satisfaction(min_vital) + + # Wealth satisfaction based on money relative to wealthy threshold + if beliefs.is_wealthy: + self.desires[DesireType.ACCUMULATE_WEALTH].update_satisfaction(0.8) + else: + # Scale satisfaction by money (rough approximation) + wealth_sat = min(1.0, beliefs.money / 10000) + self.desires[DesireType.ACCUMULATE_WEALTH].update_satisfaction(wealth_sat) + + # Stock satisfaction based on inventory + total_resources = ( + beliefs.water_count + + beliefs.food_count + + beliefs.wood_count + ) + stock_sat = min(1.0, total_resources / 15) # 15 resources = satisfied + self.desires[DesireType.STOCK_RESOURCES].update_satisfaction(stock_sat) + + # Mastery satisfaction based on skill levels + max_skill = max( + beliefs.hunting_skill, + beliefs.gathering_skill, + beliefs.trading_skill, + ) + mastery_sat = (max_skill - 1.0) / 1.0 # 0 at skill 1.0, 1 at skill 2.0 + self.desires[DesireType.MASTER_PROFESSION].update_satisfaction(max(0, mastery_sat)) + + # Social satisfaction - harder to measure, use trade success + trade_success = max(0, 5 - beliefs.failed_trades) / 5 + self.desires[DesireType.SOCIAL_STANDING].update_satisfaction(trade_success) + + # Comfort satisfaction + comfort_factors = [ + beliefs.energy_pct, + beliefs.heat_pct, + 1.0 if beliefs.has_clothes else 0.5, + ] + comfort_sat = sum(comfort_factors) / len(comfort_factors) + self.desires[DesireType.COMFORT].update_satisfaction(comfort_sat) + + def get_active_desires(self) -> list[Desire]: + """Get desires sorted by effective intensity (highest first).""" + return sorted( + self.desires.values(), + key=lambda d: d.effective_intensity, + reverse=True, + ) + + def should_switch_desire(self, new_dominant: DesireType) -> bool: + """Determine if we should switch dominant desire. + + Stubborn agents require a larger intensity difference to switch. + """ + if self.dominant_desire is None: + return True + + if new_dominant == self.dominant_desire: + return False + + current = self.desires[self.dominant_desire] + new = self.desires[new_dominant] + + # Calculate required intensity difference based on stubbornness + # Stubbornness 0.5 = need 20% more intensity to switch + # Stubbornness 1.0 = need 50% more intensity to switch + required_difference = 1.0 + self.stubbornness * 0.5 + + return new.effective_intensity > current.effective_intensity * required_difference + + def update_dominant_desire(self) -> DesireType: + """Update and return the currently dominant desire.""" + active = self.get_active_desires() + if not active: + return DesireType.SURVIVAL + + top_desire = active[0].desire_type + + if self.should_switch_desire(top_desire): + # Reset pursuit on old desire if switching + if self.dominant_desire and self.dominant_desire != top_desire: + self.desires[self.dominant_desire].reset_pursuit() + self.dominant_desire = top_desire + + return self.dominant_desire + + def filter_goals_by_desire( + self, + goals: list["Goal"], + beliefs: "BeliefBase", + ) -> list["Goal"]: + """Filter and re-prioritize goals based on active desires. + + This is where desires influence GOAP goal selection: + - Goals aligned with dominant desire get priority boost + - Goals conflicting with desires get reduced priority + """ + # Update dominant desire + dominant = self.update_dominant_desire() + + # Get world state for goal priority calculation + world_state = beliefs.to_world_state() + + # Calculate goal priorities with desire modifiers + goal_priorities: list[tuple["Goal", float]] = [] + + for goal in goals: + base_priority = goal.get_priority(world_state) + + # Apply desire-based modifiers + modifier = self._get_goal_desire_modifier(goal.name, dominant, beliefs) + adjusted_priority = base_priority * modifier + + goal_priorities.append((goal, adjusted_priority)) + + # Sort by adjusted priority + goal_priorities.sort(key=lambda x: x[1], reverse=True) + + # Return goals (the priorities will be recalculated by planner, + # but we've filtered/ordered them by our preferences) + return [g for g, _ in goal_priorities] + + def _get_goal_desire_modifier( + self, + goal_name: str, + dominant_desire: DesireType, + beliefs: "BeliefBase", + ) -> float: + """Get priority modifier for a goal based on dominant desire.""" + goal_lower = goal_name.lower() + + # Map goals to desires they serve + goal_desire_map = { + # Survival goals + "satisfy thirst": DesireType.SURVIVAL, + "satisfy hunger": DesireType.SURVIVAL, + "maintain heat": DesireType.SURVIVAL, + "restore energy": DesireType.SURVIVAL, + # Wealth goals + "build wealth": DesireType.ACCUMULATE_WEALTH, + "sell excess": DesireType.ACCUMULATE_WEALTH, + "find deals": DesireType.ACCUMULATE_WEALTH, + "trader arbitrage": DesireType.ACCUMULATE_WEALTH, + # Stock goals + "stock water": DesireType.STOCK_RESOURCES, + "stock food": DesireType.STOCK_RESOURCES, + "stock wood": DesireType.STOCK_RESOURCES, + # Comfort goals + "get clothes": DesireType.COMFORT, + "sleep": DesireType.COMFORT, + } + + # Find which desire this goal serves + goal_desire = None + for gn, desire in goal_desire_map.items(): + if gn in goal_lower: + goal_desire = desire + break + + if goal_desire is None: + return 1.0 # No modifier for unknown goals + + # Boost if aligned with dominant desire + if goal_desire == dominant_desire: + return 1.3 + self.desires[dominant_desire].effective_intensity * 0.2 + + # Survival always gets a baseline boost if critical + if goal_desire == DesireType.SURVIVAL and beliefs.has_critical_need(): + return 2.0 # Critical needs override other desires + + # Slight reduction for non-aligned goals + return 0.9 + + def to_dict(self) -> dict: + """Convert to dictionary for debugging/logging.""" + return { + "dominant_desire": self.dominant_desire.value if self.dominant_desire else None, + "stubbornness": round(self.stubbornness, 2), + "desires": { + d.desire_type.value: { + "intensity": round(d.effective_intensity, 2), + "satisfaction": round(d.satisfaction, 2), + "personality_weight": round(d.personality_weight, 2), + "turns_pursued": d.turns_pursued, + } + for d in self.desires.values() + }, + } diff --git a/backend/core/bdi/intention.py b/backend/core/bdi/intention.py new file mode 100644 index 0000000..6051394 --- /dev/null +++ b/backend/core/bdi/intention.py @@ -0,0 +1,289 @@ +"""Intention System for BDI agents. + +Intentions represent committed plans that the agent is executing. +Unlike desires (motivations) or goals (targets), intentions are +concrete action sequences the agent has decided to pursue. + +Key concepts: +- Intention persistence: agents stick to plans unless interrupted +- Commitment strategies: different levels of plan commitment +- Plan monitoring: detecting when a plan becomes invalid +""" + +from dataclasses import dataclass +from enum import Enum +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from backend.core.bdi.belief import BeliefBase + from backend.core.bdi.desire import DesireManager + from backend.core.goap.goal import Goal + from backend.core.goap.action import GOAPAction + from backend.core.goap.planner import Plan + + +class CommitmentStrategy(Enum): + """How strongly an agent commits to their current intention.""" + REACTIVE = "reactive" # Replan every turn (no commitment) + CAUTIOUS = "cautious" # Replan if priorities shift significantly + DETERMINED = "determined" # Stick to plan unless it becomes impossible + STUBBORN = "stubborn" # Only abandon for critical interrupts + + +@dataclass +class Intention: + """A committed plan that the agent is executing. + + Tracks the goal, plan, and execution progress. + """ + goal: "Goal" # The goal we're pursuing + plan: "Plan" # The plan to achieve it + start_turn: int # When we committed + actions_completed: int = 0 # How many actions done + last_action_success: bool = True # Did last action succeed? + consecutive_failures: int = 0 # Failures in a row + + @property + def current_action(self) -> Optional["GOAPAction"]: + """Get the next action to execute.""" + if self.plan.is_empty: + return None + remaining = self.plan.actions[self.actions_completed:] + return remaining[0] if remaining else None + + @property + def is_complete(self) -> bool: + """Check if the plan has been fully executed.""" + return self.actions_completed >= len(self.plan.actions) + + @property + def remaining_actions(self) -> int: + """Number of actions left in the plan.""" + return max(0, len(self.plan.actions) - self.actions_completed) + + def advance(self, success: bool) -> None: + """Mark current action as executed and advance.""" + self.last_action_success = success + if success: + self.actions_completed += 1 + self.consecutive_failures = 0 + else: + self.consecutive_failures += 1 + + +class IntentionManager: + """Manages the agent's current intention and commitment. + + Responsibilities: + - Maintain the current intention (goal + plan) + - Decide when to continue vs. replan + - Handle plan failure and recovery + """ + + def __init__(self, commitment_strategy: CommitmentStrategy = CommitmentStrategy.CAUTIOUS): + self.current_intention: Optional[Intention] = None + self.commitment_strategy = commitment_strategy + + # Tracking + self.intentions_completed: int = 0 + self.intentions_abandoned: int = 0 + self.total_actions_executed: int = 0 + + # Thresholds for reconsideration + self.max_consecutive_failures: int = 2 + self.priority_switch_threshold: float = 1.5 # New goal must be 1.5x priority + + @classmethod + def from_personality(cls, personality) -> "IntentionManager": + """Create an IntentionManager with commitment strategy based on personality.""" + # Derive commitment from personality traits + # High hoarding + low risk tolerance = stubborn + commitment_score = ( + personality.hoarding_rate * 0.4 + + (1.0 - personality.risk_tolerance) * 0.4 + + (1.0 - personality.market_affinity) * 0.2 + ) + + if commitment_score > 0.7: + strategy = CommitmentStrategy.STUBBORN + elif commitment_score > 0.5: + strategy = CommitmentStrategy.DETERMINED + elif commitment_score > 0.3: + strategy = CommitmentStrategy.CAUTIOUS + else: + strategy = CommitmentStrategy.REACTIVE + + return cls(commitment_strategy=strategy) + + def has_intention(self) -> bool: + """Check if we have an active intention.""" + return ( + self.current_intention is not None and + not self.current_intention.is_complete + ) + + def get_next_action(self) -> Optional["GOAPAction"]: + """Get the next action from current intention.""" + if not self.has_intention(): + return None + return self.current_intention.current_action + + def should_reconsider( + self, + beliefs: "BeliefBase", + desire_manager: "DesireManager", + available_goals: list["Goal"], + ) -> bool: + """Determine if we should reconsider our current intention. + + This implements the commitment strategy logic. + """ + # No intention = definitely need to plan + if not self.has_intention(): + return True + + intention = self.current_intention + + # Check for critical interrupts (always reconsider) + if beliefs.has_critical_need(): + # But only if current intention isn't already addressing it + urgent_need = beliefs.get_most_urgent_need() + if not self._intention_addresses_need(intention, urgent_need): + return True + + # Check for too many failures + if intention.consecutive_failures >= self.max_consecutive_failures: + return True + + # Strategy-specific logic + if self.commitment_strategy == CommitmentStrategy.REACTIVE: + return True # Always replan + + elif self.commitment_strategy == CommitmentStrategy.CAUTIOUS: + # Reconsider if a significantly better goal is available + return self._better_goal_available( + beliefs, desire_manager, available_goals, + threshold=self.priority_switch_threshold + ) + + elif self.commitment_strategy == CommitmentStrategy.DETERMINED: + # Only reconsider for much better goals or if plan is failing + return ( + intention.consecutive_failures > 0 or + self._better_goal_available( + beliefs, desire_manager, available_goals, + threshold=2.0 # Need 2x priority to switch + ) + ) + + elif self.commitment_strategy == CommitmentStrategy.STUBBORN: + # Only abandon for critical needs or impossible plans + return ( + beliefs.has_critical_need() or + intention.consecutive_failures >= self.max_consecutive_failures + ) + + return False + + def _intention_addresses_need(self, intention: Intention, need: str) -> bool: + """Check if current intention addresses a vital need.""" + if not intention or not need: + return False + + goal_name = intention.goal.name.lower() + need_map = { + "thirst": "thirst", + "hunger": "hunger", + "heat": "heat", + "energy": "energy", + } + + return need_map.get(need, "") in goal_name + + def _better_goal_available( + self, + beliefs: "BeliefBase", + desire_manager: "DesireManager", + available_goals: list["Goal"], + threshold: float, + ) -> bool: + """Check if there's a significantly better goal available.""" + if not self.has_intention(): + return True + + current_goal = self.current_intention.goal + world_state = beliefs.to_world_state() + current_priority = current_goal.get_priority(world_state) + + # Get desire-filtered goals + filtered_goals = desire_manager.filter_goals_by_desire(available_goals, beliefs) + + for goal in filtered_goals: + if goal.name == current_goal.name: + continue + + goal_priority = goal.get_priority(world_state) + if goal_priority > current_priority * threshold: + return True + + return False + + def commit_to_plan(self, goal: "Goal", plan: "Plan", current_turn: int) -> None: + """Commit to a new intention.""" + # Track abandoned intention + if self.has_intention(): + self.intentions_abandoned += 1 + + self.current_intention = Intention( + goal=goal, + plan=plan, + start_turn=current_turn, + ) + + def advance_intention(self, success: bool) -> None: + """Record action execution and advance the intention.""" + if not self.has_intention(): + return + + self.current_intention.advance(success) + self.total_actions_executed += 1 + + # Check if intention is complete + if self.current_intention.is_complete: + self.intentions_completed += 1 + self.current_intention = None + + def abandon_intention(self, reason: str = "unknown") -> None: + """Abandon the current intention.""" + if self.has_intention(): + self.intentions_abandoned += 1 + self.current_intention = None + + def get_plan_progress(self) -> dict: + """Get current plan execution progress.""" + if not self.has_intention(): + return {"has_intention": False} + + intention = self.current_intention + return { + "has_intention": True, + "goal": intention.goal.name, + "actions_completed": intention.actions_completed, + "remaining_actions": intention.remaining_actions, + "consecutive_failures": intention.consecutive_failures, + "turns_active": 0, # Would need current_turn to calculate + } + + def to_dict(self) -> dict: + """Convert to dictionary for debugging/logging.""" + return { + "commitment_strategy": self.commitment_strategy.value, + "has_intention": self.has_intention(), + "current_goal": self.current_intention.goal.name if self.has_intention() else None, + "plan_progress": self.get_plan_progress(), + "stats": { + "intentions_completed": self.intentions_completed, + "intentions_abandoned": self.intentions_abandoned, + "total_actions": self.total_actions_executed, + }, + } diff --git a/backend/core/logger.py b/backend/core/logger.py index 07dbd34..e8245c3 100644 --- a/backend/core/logger.py +++ b/backend/core/logger.py @@ -1,10 +1,17 @@ """Simulation logger for detailed step-by-step logging. -Performance-optimized: logging can be disabled or reduced via config. +Performance-optimized with non-blocking I/O: +- Logging can be disabled or reduced via config +- File writes happen in a background thread (producer-consumer pattern) +- Agent lookups use O(1) dict instead of O(n) list search +- No in-memory accumulation of all entries """ import json import logging +import threading +import queue +import atexit from dataclasses import dataclass, field, asdict from datetime import datetime from pathlib import Path @@ -60,12 +67,150 @@ class TurnLogEntry: } +class AsyncLogWriter: + """Background thread that handles file I/O asynchronously. + + Uses a producer-consumer pattern to decouple log generation + from file writing, preventing I/O from blocking the simulation. + """ + + def __init__(self, max_queue_size: int = 1000): + self._queue: queue.Queue = queue.Queue(maxsize=max_queue_size) + self._stop_event = threading.Event() + self._thread: Optional[threading.Thread] = None + self._files: dict[str, TextIO] = {} + self._lock = threading.Lock() + + def start(self) -> None: + """Start the background writer thread.""" + if self._thread is not None and self._thread.is_alive(): + return + + self._stop_event.clear() + self._thread = threading.Thread(target=self._writer_loop, daemon=True) + self._thread.start() + + def stop(self, timeout: float = 2.0) -> None: + """Stop the background writer thread and flush remaining items.""" + self._stop_event.set() + if self._thread is not None: + self._thread.join(timeout=timeout) + self._thread = None + + # Process any remaining items in the queue + self._drain_queue() + + # Close all files + with self._lock: + for f in self._files.values(): + try: + f.close() + except Exception: + pass + self._files.clear() + + def _writer_loop(self) -> None: + """Main loop for the background writer thread.""" + while not self._stop_event.is_set(): + try: + # Wait for items with timeout to allow stop checks + item = self._queue.get(timeout=0.1) + self._process_item(item) + self._queue.task_done() + except queue.Empty: + continue + except Exception as e: + # Log errors but don't crash the thread + logging.getLogger("simulation").warning(f"Async log writer error: {e}") + + def _process_item(self, item: dict) -> None: + """Process a single log item.""" + action = item.get("action") + + if action == "open": + self._open_file(item["path"], item["file_id"]) + elif action == "write": + self._write_to_file(item["file_id"], item["data"]) + elif action == "flush": + self._flush_file(item.get("file_id")) + elif action == "close": + self._close_file(item["file_id"]) + + def _open_file(self, path: str, file_id: str) -> None: + """Open a file for writing.""" + with self._lock: + if file_id not in self._files: + self._files[file_id] = open(path, "w", encoding="utf-8") + + def _write_to_file(self, file_id: str, data: str) -> None: + """Write data to a file.""" + with self._lock: + f = self._files.get(file_id) + if f: + f.write(data) + + def _flush_file(self, file_id: Optional[str] = None) -> None: + """Flush file(s) to disk.""" + with self._lock: + if file_id: + f = self._files.get(file_id) + if f: + f.flush() + else: + for f in self._files.values(): + f.flush() + + def _close_file(self, file_id: str) -> None: + """Close a file.""" + with self._lock: + f = self._files.pop(file_id, None) + if f: + f.close() + + def _drain_queue(self) -> None: + """Process all remaining items in the queue.""" + while True: + try: + item = self._queue.get_nowait() + self._process_item(item) + self._queue.task_done() + except queue.Empty: + break + + def enqueue(self, item: dict) -> bool: + """Add an item to the write queue. + + Returns False if queue is full (item dropped). + """ + try: + self._queue.put_nowait(item) + return True + except queue.Full: + return False + + def open_file(self, path: str, file_id: str) -> None: + """Queue a file open operation.""" + self.enqueue({"action": "open", "path": path, "file_id": file_id}) + + def write(self, file_id: str, data: str) -> None: + """Queue a write operation.""" + self.enqueue({"action": "write", "file_id": file_id, "data": data}) + + def flush(self, file_id: Optional[str] = None) -> None: + """Queue a flush operation.""" + self.enqueue({"action": "flush", "file_id": file_id}) + + def close_file(self, file_id: str) -> None: + """Queue a file close operation.""" + self.enqueue({"action": "close", "file_id": file_id}) + + class SimulationLogger: """Logger that dumps detailed simulation data to files. Performance optimized: - Logging can be disabled entirely via config - - File flushing is batched (not every turn) + - File writes happen in background thread (non-blocking) - Agent lookups use O(1) dict instead of O(n) list search - No in-memory accumulation of all entries """ @@ -79,8 +224,18 @@ class SimulationLogger: self.logging_enabled = perf_config.logging_enabled self.detailed_logging = perf_config.detailed_logging self.flush_interval = perf_config.log_flush_interval + + # Get async logging config (default to True if available) + self.async_logging = getattr(perf_config, 'async_logging', True) - # File handles (only created if logging enabled) + # Async writer (only created if logging enabled) + self._async_writer: Optional[AsyncLogWriter] = None + + # File IDs for async writing + self._json_file_id: Optional[str] = None + self._summary_file_id: Optional[str] = None + + # Fallback: synchronous file handles (used if async disabled) self._json_file: Optional[TextIO] = None self._summary_file: Optional[TextIO] = None @@ -95,6 +250,10 @@ class SimulationLogger: # Turn counter for flush batching self._turns_since_flush = 0 + + # Stats + self._items_queued = 0 + self._items_dropped = 0 def start_session(self, config: dict) -> None: """Start a new logging session.""" @@ -103,19 +262,39 @@ class SimulationLogger: self.log_dir.mkdir(exist_ok=True) - # Create session-specific log file + # Create session-specific log file paths timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - session_file = self.log_dir / f"sim_{timestamp}.jsonl" - summary_file = self.log_dir / f"sim_{timestamp}_summary.txt" - - self._json_file = open(session_file, "w") - self._summary_file = open(summary_file, "w") - - # Write config as first line - self._json_file.write(json.dumps({"type": "config", "data": config}) + "\n") - - self._summary_file.write(f"Simulation Session Started: {datetime.now()}\n") - self._summary_file.write("=" * 60 + "\n\n") + json_path = self.log_dir / f"sim_{timestamp}.jsonl" + summary_path = self.log_dir / f"sim_{timestamp}_summary.txt" + + if self.async_logging: + # Use async writer + self._async_writer = AsyncLogWriter() + self._async_writer.start() + + self._json_file_id = f"json_{timestamp}" + self._summary_file_id = f"summary_{timestamp}" + + self._async_writer.open_file(str(json_path), self._json_file_id) + self._async_writer.open_file(str(summary_path), self._summary_file_id) + + # Write initial data + self._async_writer.write( + self._json_file_id, + json.dumps({"type": "config", "data": config}) + "\n" + ) + self._async_writer.write( + self._summary_file_id, + f"Simulation Session Started: {datetime.now()}\n" + "=" * 60 + "\n\n" + ) + else: + # Use synchronous file handles + self._json_file = open(json_path, "w") + self._summary_file = open(summary_path, "w") + + self._json_file.write(json.dumps({"type": "config", "data": config}) + "\n") + self._summary_file.write(f"Simulation Session Started: {datetime.now()}\n") + self._summary_file.write("=" * 60 + "\n\n") if self.detailed_logging: # Set up file handler for detailed logs @@ -263,36 +442,43 @@ class SimulationLogger: if not self.logging_enabled or self._current_entry is None: return - # Write to JSON lines file (without flush every time) - if self._json_file: - self._json_file.write( - json.dumps({"type": "turn", "data": self._current_entry.to_dict()}) + "\n" - ) + entry = self._current_entry + + # Prepare data + json_data = json.dumps({"type": "turn", "data": entry.to_dict()}) + "\n" + + summary_lines = [f"Turn {entry.turn} | Day {entry.day} Step {entry.step_in_day} ({entry.time_of_day})\n"] + + if self.detailed_logging: + for agent in entry.agent_entries: + action = agent.decision.get("action", "?") + result = "+" if agent.action_result.get("success", False) else "-" + summary_lines.append( + f" [{agent.agent_name}] {action} {result} | " + f"E:{agent.stats_after.get('energy', '?')} " + f"H:{agent.stats_after.get('hunger', '?')} " + f"T:{agent.stats_after.get('thirst', '?')} " + f"${agent.money_after}\n" + ) - # Write summary (without flush every time) - if self._summary_file: - entry = self._current_entry - self._summary_file.write( - f"Turn {entry.turn} | Day {entry.day} Step {entry.step_in_day} ({entry.time_of_day})\n" - ) + if entry.deaths: + for death in entry.deaths: + summary_lines.append(f" X {death['name']} died: {death['cause']}\n") - if self.detailed_logging: - for agent in entry.agent_entries: - action = agent.decision.get("action", "?") - result = "✓" if agent.action_result.get("success", False) else "✗" - self._summary_file.write( - f" [{agent.agent_name}] {action} {result} | " - f"E:{agent.stats_after.get('energy', '?')} " - f"H:{agent.stats_after.get('hunger', '?')} " - f"T:{agent.stats_after.get('thirst', '?')} " - f"${agent.money_after}\n" - ) - - if entry.deaths: - for death in entry.deaths: - self._summary_file.write(f" 💀 {death['name']} died: {death['cause']}\n") - - self._summary_file.write("\n") + summary_lines.append("\n") + summary_data = "".join(summary_lines) + + if self.async_logging and self._async_writer: + # Non-blocking write + self._async_writer.write(self._json_file_id, json_data) + self._async_writer.write(self._summary_file_id, summary_data) + self._items_queued += 2 + else: + # Synchronous write + if self._json_file: + self._json_file.write(json_data) + if self._summary_file: + self._summary_file.write(summary_data) # Batched flush - only flush every N turns self._turns_since_flush += 1 @@ -306,20 +492,41 @@ class SimulationLogger: def _flush_files(self) -> None: """Flush file buffers to disk.""" - if self._json_file: - self._json_file.flush() - if self._summary_file: - self._summary_file.flush() + if self.async_logging and self._async_writer: + self._async_writer.flush() + else: + if self._json_file: + self._json_file.flush() + if self._summary_file: + self._summary_file.flush() def close(self) -> None: """Close log files.""" - if self._json_file: - self._json_file.close() - self._json_file = None - if self._summary_file: - self._summary_file.write(f"\nSession ended: {datetime.now()}\n") - self._summary_file.close() - self._summary_file = None + if self.async_logging and self._async_writer: + # Write final message before closing + if self._summary_file_id: + self._async_writer.write( + self._summary_file_id, + f"\nSession ended: {datetime.now()}\n" + ) + + # Close files + if self._json_file_id: + self._async_writer.close_file(self._json_file_id) + if self._summary_file_id: + self._async_writer.close_file(self._summary_file_id) + + # Stop the writer thread + self._async_writer.stop() + self._async_writer = None + else: + if self._json_file: + self._json_file.close() + self._json_file = None + if self._summary_file: + self._summary_file.write(f"\nSession ended: {datetime.now()}\n") + self._summary_file.close() + self._summary_file = None def get_entries(self) -> list[TurnLogEntry]: """Get all logged entries. @@ -327,6 +534,15 @@ class SimulationLogger: Note: Returns empty list when logging optimized (entries not kept in memory). """ return [] + + def get_stats(self) -> dict: + """Get logging statistics.""" + return { + "logging_enabled": self.logging_enabled, + "async_logging": self.async_logging, + "items_queued": self._items_queued, + "items_dropped": self._items_dropped, + } # Global logger instance @@ -348,3 +564,12 @@ def reset_simulation_logger() -> SimulationLogger: _logger.close() _logger = SimulationLogger() return _logger + + +# Ensure logger is closed on exit +@atexit.register +def _cleanup_logger(): + global _logger + if _logger: + _logger.close() + _logger = None diff --git a/backend/core/storage.py b/backend/core/storage.py new file mode 100644 index 0000000..954000e --- /dev/null +++ b/backend/core/storage.py @@ -0,0 +1,450 @@ +"""State storage abstraction for simulation state. + +Provides a unified interface for storing and retrieving simulation state, +with implementations for: +- In-memory storage (default, fast but not persistent) +- Redis storage (optional, enables decoupled UI polling and persistence) + +This allows the simulation loop to snapshot state without blocking, +and enables external systems (like a web UI) to poll state independently. +""" + +import json +import time +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Optional, Dict + + +@dataclass +class StateSnapshot: + """A point-in-time snapshot of simulation state.""" + turn: int + timestamp: float + data: dict + + def to_json(self) -> str: + """Convert to JSON string.""" + return json.dumps({ + "turn": self.turn, + "timestamp": self.timestamp, + "data": self.data, + }) + + @classmethod + def from_json(cls, json_str: str) -> "StateSnapshot": + """Create from JSON string.""" + obj = json.loads(json_str) + return cls( + turn=obj["turn"], + timestamp=obj["timestamp"], + data=obj["data"], + ) + + +class StateStore(ABC): + """Abstract interface for state storage. + + Implementations should be thread-safe for concurrent read/write. + """ + + @abstractmethod + def save_state(self, key: str, snapshot: StateSnapshot) -> bool: + """Save a state snapshot. + + Args: + key: Unique key for this state (e.g., "world", "market", "agent_123") + snapshot: The state snapshot to save + + Returns: + True if saved successfully + """ + pass + + @abstractmethod + def get_state(self, key: str) -> Optional[StateSnapshot]: + """Get the latest state snapshot for a key. + + Args: + key: The state key to retrieve + + Returns: + The snapshot if found, None otherwise + """ + pass + + @abstractmethod + def get_all_states(self, prefix: str = "") -> Dict[str, StateSnapshot]: + """Get all states matching a prefix. + + Args: + prefix: Key prefix to filter by (empty = all) + + Returns: + Dict mapping keys to snapshots + """ + pass + + @abstractmethod + def delete_state(self, key: str) -> bool: + """Delete a state snapshot. + + Args: + key: The state key to delete + + Returns: + True if deleted (or didn't exist) + """ + pass + + @abstractmethod + def clear_all(self) -> None: + """Clear all stored states.""" + pass + + @abstractmethod + def is_healthy(self) -> bool: + """Check if the store is healthy and accessible.""" + pass + + +class MemoryStateStore(StateStore): + """In-memory state storage (default implementation). + + Fast but not persistent across restarts. + Thread-safe using a simple lock. + """ + + def __init__(self, max_entries: int = 1000): + """Initialize memory store. + + Args: + max_entries: Maximum number of entries to keep (LRU eviction) + """ + import threading + self._data: Dict[str, StateSnapshot] = {} + self._lock = threading.Lock() + self._max_entries = max_entries + self._access_order: list[str] = [] # For LRU tracking + + def save_state(self, key: str, snapshot: StateSnapshot) -> bool: + with self._lock: + # LRU eviction if at capacity + if len(self._data) >= self._max_entries and key not in self._data: + oldest = self._access_order.pop(0) if self._access_order else None + if oldest: + self._data.pop(oldest, None) + + self._data[key] = snapshot + + # Update access order + if key in self._access_order: + self._access_order.remove(key) + self._access_order.append(key) + + return True + + def get_state(self, key: str) -> Optional[StateSnapshot]: + with self._lock: + snapshot = self._data.get(key) + if snapshot and key in self._access_order: + # Update access order for LRU + self._access_order.remove(key) + self._access_order.append(key) + return snapshot + + def get_all_states(self, prefix: str = "") -> Dict[str, StateSnapshot]: + with self._lock: + if not prefix: + return dict(self._data) + return {k: v for k, v in self._data.items() if k.startswith(prefix)} + + def delete_state(self, key: str) -> bool: + with self._lock: + self._data.pop(key, None) + if key in self._access_order: + self._access_order.remove(key) + return True + + def clear_all(self) -> None: + with self._lock: + self._data.clear() + self._access_order.clear() + + def is_healthy(self) -> bool: + return True + + +class RedisStateStore(StateStore): + """Redis-backed state storage. + + Enables: + - Persistent state across restarts + - Decoupled UI polling (web clients can read state independently) + - Distributed access (multiple simulation instances) + + Requires redis-py: pip install redis + """ + + def __init__( + self, + host: str = "localhost", + port: int = 6379, + db: int = 0, + password: Optional[str] = None, + prefix: str = "villsim:", + ttl_seconds: int = 3600, # 1 hour default TTL + ): + """Initialize Redis store. + + Args: + host: Redis server host + port: Redis server port + db: Redis database number + password: Redis password (if required) + prefix: Key prefix for all keys (for namespacing) + ttl_seconds: Time-to-live for entries (0 = no expiry) + """ + self._prefix = prefix + self._ttl = ttl_seconds + self._client = None + self._connection_params = { + "host": host, + "port": port, + "db": db, + "password": password, + "decode_responses": True, + } + self._connect() + + def _connect(self) -> None: + """Establish connection to Redis.""" + try: + import redis + self._client = redis.Redis(**self._connection_params) + # Test connection + self._client.ping() + except ImportError: + raise ImportError( + "Redis support requires the 'redis' package. " + "Install with: pip install redis" + ) + except Exception as e: + self._client = None + raise ConnectionError(f"Failed to connect to Redis: {e}") + + def _make_key(self, key: str) -> str: + """Create full Redis key with prefix.""" + return f"{self._prefix}{key}" + + def save_state(self, key: str, snapshot: StateSnapshot) -> bool: + if not self._client: + return False + + try: + full_key = self._make_key(key) + data = snapshot.to_json() + + if self._ttl > 0: + self._client.setex(full_key, self._ttl, data) + else: + self._client.set(full_key, data) + + return True + except Exception: + return False + + def get_state(self, key: str) -> Optional[StateSnapshot]: + if not self._client: + return None + + try: + full_key = self._make_key(key) + data = self._client.get(full_key) + + if data: + return StateSnapshot.from_json(data) + return None + except Exception: + return None + + def get_all_states(self, prefix: str = "") -> Dict[str, StateSnapshot]: + if not self._client: + return {} + + try: + pattern = self._make_key(prefix + "*") + keys = self._client.keys(pattern) + + result = {} + for full_key in keys: + # Remove our prefix to get the original key + key = full_key[len(self._prefix):] + data = self._client.get(full_key) + if data: + result[key] = StateSnapshot.from_json(data) + + return result + except Exception: + return {} + + def delete_state(self, key: str) -> bool: + if not self._client: + return False + + try: + full_key = self._make_key(key) + self._client.delete(full_key) + return True + except Exception: + return False + + def clear_all(self) -> None: + if not self._client: + return + + try: + pattern = self._make_key("*") + keys = self._client.keys(pattern) + if keys: + self._client.delete(*keys) + except Exception: + pass + + def is_healthy(self) -> bool: + if not self._client: + return False + + try: + self._client.ping() + return True + except Exception: + return False + + def publish_state_update(self, channel: str, key: str) -> None: + """Publish a state update notification (for real-time subscribers). + + This can be used for WebSocket-style updates where clients + subscribe to state changes. + """ + if not self._client: + return + + try: + self._client.publish( + f"{self._prefix}updates:{channel}", + json.dumps({"key": key, "timestamp": time.time()}) + ) + except Exception: + pass + + +class StubStateStore(StateStore): + """No-op state store for when storage is disabled. + + All operations succeed but don't actually store anything. + """ + + def save_state(self, key: str, snapshot: StateSnapshot) -> bool: + return True + + def get_state(self, key: str) -> Optional[StateSnapshot]: + return None + + def get_all_states(self, prefix: str = "") -> Dict[str, StateSnapshot]: + return {} + + def delete_state(self, key: str) -> bool: + return True + + def clear_all(self) -> None: + pass + + def is_healthy(self) -> bool: + return True + + +# Global state store instance +_state_store: Optional[StateStore] = None + + +def get_state_store() -> StateStore: + """Get the global state store instance. + + Creates a store based on config: + - If Redis is configured and available, uses Redis + - Otherwise falls back to in-memory storage + """ + global _state_store + + if _state_store is None: + _state_store = _create_state_store() + + return _state_store + + +def _create_state_store() -> StateStore: + """Create the appropriate state store based on config.""" + from backend.config import get_config + + config = get_config() + + # Check for Redis config + redis_config = getattr(config, 'redis', None) + + if redis_config and getattr(redis_config, 'enabled', False): + try: + store = RedisStateStore( + host=getattr(redis_config, 'host', 'localhost'), + port=getattr(redis_config, 'port', 6379), + db=getattr(redis_config, 'db', 0), + password=getattr(redis_config, 'password', None), + prefix=getattr(redis_config, 'prefix', 'villsim:'), + ttl_seconds=getattr(redis_config, 'ttl_seconds', 3600), + ) + if store.is_healthy(): + return store + except Exception: + # Fall through to memory store + pass + + # Check if storage is disabled + perf_config = getattr(config, 'performance', None) + if perf_config and not getattr(perf_config, 'state_storage_enabled', True): + return StubStateStore() + + # Default to memory store + return MemoryStateStore() + + +def reset_state_store() -> None: + """Reset the global state store.""" + global _state_store + if _state_store: + _state_store.clear_all() + _state_store = None + + +def save_simulation_state(turn: int, state_data: dict) -> bool: + """Convenience function to save simulation state. + + Args: + turn: Current simulation turn + state_data: Full state dict (world, market, agents, etc.) + + Returns: + True if saved successfully + """ + store = get_state_store() + snapshot = StateSnapshot( + turn=turn, + timestamp=time.time(), + data=state_data, + ) + return store.save_state("simulation:current", snapshot) + + +def get_simulation_state() -> Optional[StateSnapshot]: + """Convenience function to get current simulation state.""" + store = get_state_store() + return store.get_state("simulation:current") diff --git a/config.json b/config.json index f313cfa..0444b8a 100644 --- a/config.json +++ b/config.json @@ -2,14 +2,33 @@ "performance": { "logging_enabled": false, "detailed_logging": false, + "async_logging": true, "log_flush_interval": 50, "max_turn_logs": 100, - "stats_update_interval": 10 + "stats_update_interval": 10, + "state_storage_enabled": true }, "ai": { "goap_max_iterations": 30, "goap_max_plan_depth": 2, - "reactive_fallback": true + "reactive_fallback": true, + "use_bdi": true + }, + "bdi": { + "thinking_interval": 1, + "max_consecutive_failures": 2, + "priority_switch_threshold": 1.5, + "memory_max_events": 50, + "memory_decay_rate": 0.1 + }, + "redis": { + "enabled": false, + "host": "localhost", + "port": 6379, + "db": 0, + "password": null, + "prefix": "villsim:", + "ttl_seconds": 3600 }, "agent_stats": { "max_energy": 50, @@ -134,4 +153,4 @@ "min_price_discount": 0.4 }, "auto_step_interval": 0.15 -} +} \ No newline at end of file