"""GOAP Planner using A* search. The planner finds optimal action sequences to achieve goals. It uses A* search with the goal condition as the target. """ import heapq from dataclasses import dataclass, field from typing import Optional from .world_state import WorldState from .goal import Goal from .action import GOAPAction @dataclass(order=True) class PlanNode: """A node in the planning search tree.""" f_cost: float # Total cost (g + h) g_cost: float = field(compare=False) # Cost so far state: WorldState = field(compare=False) action: Optional[GOAPAction] = field(compare=False, default=None) parent: Optional["PlanNode"] = field(compare=False, default=None) depth: int = field(compare=False, default=0) @dataclass class Plan: """A plan is a sequence of actions to achieve a goal.""" goal: Goal actions: list[GOAPAction] total_cost: float expected_final_state: WorldState @property def first_action(self) -> Optional[GOAPAction]: """Get the first action to execute.""" return self.actions[0] if self.actions else None @property def is_empty(self) -> bool: return len(self.actions) == 0 def __repr__(self) -> str: action_names = " -> ".join(a.name for a in self.actions) return f"Plan({self.goal.name}: {action_names} [cost={self.total_cost:.1f}])" class GOAPPlanner: """A* planner for GOAP. Finds the lowest-cost sequence of actions that transforms the current world state into one where the goal is satisfied. """ def __init__(self, max_iterations: int = 100): self.max_iterations = max_iterations def plan( self, initial_state: WorldState, goal: Goal, available_actions: list[GOAPAction], ) -> Optional[Plan]: """Find an action sequence to achieve the goal. Uses A* search where: - g(n) = accumulated action costs - h(n) = heuristic estimate to goal (0 for now - effectively Dijkstra's) Returns None if no plan found within iteration limit. """ # Check if goal is already satisfied if goal.satisfied(initial_state): return Plan( goal=goal, actions=[], total_cost=0.0, expected_final_state=initial_state, ) # Priority queue for A* open_set: list[PlanNode] = [] start_node = PlanNode( f_cost=0.0, g_cost=0.0, state=initial_state, action=None, parent=None, depth=0, ) heapq.heappush(open_set, start_node) # Track visited states to avoid cycles # We use a simplified state hash for efficiency visited: set[tuple] = set() iterations = 0 while open_set and iterations < self.max_iterations: iterations += 1 # Get node with lowest f_cost current = heapq.heappop(open_set) # Check depth limit if current.depth >= goal.max_plan_depth: continue # Create state hash for cycle detection state_hash = self._hash_state(current.state) if state_hash in visited: continue visited.add(state_hash) # Try each action for action in available_actions: # Check if action is valid in current state if not action.is_valid(current.state): continue # Apply action to get new state new_state = action.apply(current.state) # Calculate costs action_cost = action.get_cost(current.state) g_cost = current.g_cost + action_cost h_cost = self._heuristic(new_state, goal) f_cost = g_cost + h_cost # Create new node new_node = PlanNode( f_cost=f_cost, g_cost=g_cost, state=new_state, action=action, parent=current, depth=current.depth + 1, ) # Check if goal is satisfied if goal.satisfied(new_state): # Reconstruct and return plan return self._reconstruct_plan(new_node, goal) # Add to open set heapq.heappush(open_set, new_node) # No plan found return None def plan_for_goals( self, initial_state: WorldState, goals: list[Goal], available_actions: list[GOAPAction], ) -> Optional[Plan]: """Find the best plan among multiple goals. Selects the highest-priority goal that has a valid plan, considering both goal priority and plan cost. """ # Sort goals by priority (highest first) sorted_goals = sorted(goals, key=lambda g: g.priority(initial_state), reverse=True) best_plan: Optional[Plan] = None best_score = float('-inf') for goal in sorted_goals: priority = goal.priority(initial_state) # Skip low-priority goals if we already have a good plan if priority <= 0: continue if best_plan and priority < best_score * 0.5: # This goal is much lower priority, skip break plan = self.plan(initial_state, goal, available_actions) if plan: # Score = priority / (cost + 1) # Higher priority and lower cost = better score = priority / (plan.total_cost + 1.0) if score > best_score: best_score = score best_plan = plan return best_plan def _hash_state(self, state: WorldState) -> tuple: """Create a hashable representation of key state values. We don't hash everything - just the values that matter for planning. """ return ( round(state.thirst_pct, 1), round(state.hunger_pct, 1), round(state.heat_pct, 1), round(state.energy_pct, 1), state.water_count, state.food_count, state.wood_count, state.money // 10, # Bucket money ) def _heuristic(self, state: WorldState, goal: Goal) -> float: """Estimate cost to reach goal from state. For now, we use a simple heuristic based on the distance from current state values to goal target values. """ if not goal.target_state: return 0.0 h = 0.0 for key, target in goal.target_state.items(): if hasattr(state, key): current = getattr(state, key) if isinstance(current, (int, float)) and isinstance(target, (int, float)): diff = abs(target - current) h += diff return h def _reconstruct_plan(self, final_node: PlanNode, goal: Goal) -> Plan: """Reconstruct the action sequence from the final node.""" actions = [] node = final_node while node.parent is not None: if node.action: actions.append(node.action) node = node.parent actions.reverse() return Plan( goal=goal, actions=actions, total_cost=final_node.g_cost, expected_final_state=final_node.state, ) class ReactivePlanner: """A simpler reactive planner for immediate needs. Sometimes we don't need full planning - we just need to pick the best immediate action. This planner evaluates single actions against goals. """ def select_best_action( self, state: WorldState, goals: list[Goal], available_actions: list[GOAPAction], ) -> Optional[GOAPAction]: """Select the single best action to take right now. Evaluates each valid action and scores it based on how well it progresses toward high-priority goals. """ best_action: Optional[GOAPAction] = None best_score = float('-inf') for action in available_actions: if not action.is_valid(state): continue score = self._score_action(state, action, goals) if score > best_score: best_score = score best_action = action return best_action def _score_action( self, state: WorldState, action: GOAPAction, goals: list[Goal], ) -> float: """Score an action based on its contribution to goals.""" # Apply action to get expected new state new_state = action.apply(state) action_cost = action.get_cost(state) total_score = 0.0 for goal in goals: priority = goal.priority(state) if priority <= 0: continue # Check if this action helps with the goal was_satisfied = goal.satisfied(state) now_satisfied = goal.satisfied(new_state) if now_satisfied and not was_satisfied: # Action satisfies the goal - big bonus! total_score += priority * 10.0 elif not was_satisfied: # Check if we made progress # This is a simplified check based on urgencies old_urgency = self._get_goal_urgency(goal, state) new_urgency = self._get_goal_urgency(goal, new_state) if new_urgency < old_urgency: improvement = old_urgency - new_urgency total_score += priority * improvement * 5.0 # Subtract cost total_score -= action_cost return total_score def _get_goal_urgency(self, goal: Goal, state: WorldState) -> float: """Get the urgency related to a goal.""" # Map goal types to state urgencies from .goal import GoalType urgency_map = { GoalType.SATISFY_THIRST: state.thirst_urgency, GoalType.SATISFY_HUNGER: state.hunger_urgency, GoalType.MAINTAIN_HEAT: state.heat_urgency, GoalType.RESTORE_ENERGY: state.energy_urgency, } return urgency_map.get(goal.goal_type, 0.0)