336 lines
10 KiB
Python
336 lines
10 KiB
Python
"""GOAP Planner using A* search.
|
|
|
|
The planner finds optimal action sequences to achieve goals.
|
|
It uses A* search with the goal condition as the target.
|
|
"""
|
|
|
|
import heapq
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional
|
|
|
|
from .world_state import WorldState
|
|
from .goal import Goal
|
|
from .action import GOAPAction
|
|
|
|
|
|
@dataclass(order=True)
|
|
class PlanNode:
|
|
"""A node in the planning search tree."""
|
|
f_cost: float # Total cost (g + h)
|
|
g_cost: float = field(compare=False) # Cost so far
|
|
state: WorldState = field(compare=False)
|
|
action: Optional[GOAPAction] = field(compare=False, default=None)
|
|
parent: Optional["PlanNode"] = field(compare=False, default=None)
|
|
depth: int = field(compare=False, default=0)
|
|
|
|
|
|
@dataclass
|
|
class Plan:
|
|
"""A plan is a sequence of actions to achieve a goal."""
|
|
goal: Goal
|
|
actions: list[GOAPAction]
|
|
total_cost: float
|
|
expected_final_state: WorldState
|
|
|
|
@property
|
|
def first_action(self) -> Optional[GOAPAction]:
|
|
"""Get the first action to execute."""
|
|
return self.actions[0] if self.actions else None
|
|
|
|
@property
|
|
def is_empty(self) -> bool:
|
|
return len(self.actions) == 0
|
|
|
|
def __repr__(self) -> str:
|
|
action_names = " -> ".join(a.name for a in self.actions)
|
|
return f"Plan({self.goal.name}: {action_names} [cost={self.total_cost:.1f}])"
|
|
|
|
|
|
class GOAPPlanner:
|
|
"""A* planner for GOAP.
|
|
|
|
Finds the lowest-cost sequence of actions that transforms
|
|
the current world state into one where the goal is satisfied.
|
|
"""
|
|
|
|
def __init__(self, max_iterations: int = 100):
|
|
self.max_iterations = max_iterations
|
|
|
|
def plan(
|
|
self,
|
|
initial_state: WorldState,
|
|
goal: Goal,
|
|
available_actions: list[GOAPAction],
|
|
) -> Optional[Plan]:
|
|
"""Find an action sequence to achieve the goal.
|
|
|
|
Uses A* search where:
|
|
- g(n) = accumulated action costs
|
|
- h(n) = heuristic estimate to goal (0 for now - effectively Dijkstra's)
|
|
|
|
Returns None if no plan found within iteration limit.
|
|
"""
|
|
# Check if goal is already satisfied
|
|
if goal.satisfied(initial_state):
|
|
return Plan(
|
|
goal=goal,
|
|
actions=[],
|
|
total_cost=0.0,
|
|
expected_final_state=initial_state,
|
|
)
|
|
|
|
# Priority queue for A*
|
|
open_set: list[PlanNode] = []
|
|
start_node = PlanNode(
|
|
f_cost=0.0,
|
|
g_cost=0.0,
|
|
state=initial_state,
|
|
action=None,
|
|
parent=None,
|
|
depth=0,
|
|
)
|
|
heapq.heappush(open_set, start_node)
|
|
|
|
# Track visited states to avoid cycles
|
|
# We use a simplified state hash for efficiency
|
|
visited: set[tuple] = set()
|
|
|
|
iterations = 0
|
|
|
|
while open_set and iterations < self.max_iterations:
|
|
iterations += 1
|
|
|
|
# Get node with lowest f_cost
|
|
current = heapq.heappop(open_set)
|
|
|
|
# Check depth limit
|
|
if current.depth >= goal.max_plan_depth:
|
|
continue
|
|
|
|
# Create state hash for cycle detection
|
|
state_hash = self._hash_state(current.state)
|
|
if state_hash in visited:
|
|
continue
|
|
visited.add(state_hash)
|
|
|
|
# Try each action
|
|
for action in available_actions:
|
|
# Check if action is valid in current state
|
|
if not action.is_valid(current.state):
|
|
continue
|
|
|
|
# Apply action to get new state
|
|
new_state = action.apply(current.state)
|
|
|
|
# Calculate costs
|
|
action_cost = action.get_cost(current.state)
|
|
g_cost = current.g_cost + action_cost
|
|
h_cost = self._heuristic(new_state, goal)
|
|
f_cost = g_cost + h_cost
|
|
|
|
# Create new node
|
|
new_node = PlanNode(
|
|
f_cost=f_cost,
|
|
g_cost=g_cost,
|
|
state=new_state,
|
|
action=action,
|
|
parent=current,
|
|
depth=current.depth + 1,
|
|
)
|
|
|
|
# Check if goal is satisfied
|
|
if goal.satisfied(new_state):
|
|
# Reconstruct and return plan
|
|
return self._reconstruct_plan(new_node, goal)
|
|
|
|
# Add to open set
|
|
heapq.heappush(open_set, new_node)
|
|
|
|
# No plan found
|
|
return None
|
|
|
|
def plan_for_goals(
|
|
self,
|
|
initial_state: WorldState,
|
|
goals: list[Goal],
|
|
available_actions: list[GOAPAction],
|
|
) -> Optional[Plan]:
|
|
"""Find the best plan among multiple goals.
|
|
|
|
Selects the highest-priority goal that has a valid plan,
|
|
considering both goal priority and plan cost.
|
|
"""
|
|
# Sort goals by priority (highest first)
|
|
sorted_goals = sorted(goals, key=lambda g: g.priority(initial_state), reverse=True)
|
|
|
|
best_plan: Optional[Plan] = None
|
|
best_score = float('-inf')
|
|
|
|
for goal in sorted_goals:
|
|
priority = goal.priority(initial_state)
|
|
|
|
# Skip low-priority goals if we already have a good plan
|
|
if priority <= 0:
|
|
continue
|
|
|
|
if best_plan and priority < best_score * 0.5:
|
|
# This goal is much lower priority, skip
|
|
break
|
|
|
|
plan = self.plan(initial_state, goal, available_actions)
|
|
|
|
if plan:
|
|
# Score = priority / (cost + 1)
|
|
# Higher priority and lower cost = better
|
|
score = priority / (plan.total_cost + 1.0)
|
|
|
|
if score > best_score:
|
|
best_score = score
|
|
best_plan = plan
|
|
|
|
return best_plan
|
|
|
|
def _hash_state(self, state: WorldState) -> tuple:
|
|
"""Create a hashable representation of key state values.
|
|
|
|
We don't hash everything - just the values that matter for planning.
|
|
"""
|
|
return (
|
|
round(state.thirst_pct, 1),
|
|
round(state.hunger_pct, 1),
|
|
round(state.heat_pct, 1),
|
|
round(state.energy_pct, 1),
|
|
state.water_count,
|
|
state.food_count,
|
|
state.wood_count,
|
|
state.money // 10, # Bucket money
|
|
)
|
|
|
|
def _heuristic(self, state: WorldState, goal: Goal) -> float:
|
|
"""Estimate cost to reach goal from state.
|
|
|
|
For now, we use a simple heuristic based on the distance
|
|
from current state values to goal target values.
|
|
"""
|
|
if not goal.target_state:
|
|
return 0.0
|
|
|
|
h = 0.0
|
|
for key, target in goal.target_state.items():
|
|
if hasattr(state, key):
|
|
current = getattr(state, key)
|
|
if isinstance(current, (int, float)) and isinstance(target, (int, float)):
|
|
diff = abs(target - current)
|
|
h += diff
|
|
|
|
return h
|
|
|
|
def _reconstruct_plan(self, final_node: PlanNode, goal: Goal) -> Plan:
|
|
"""Reconstruct the action sequence from the final node."""
|
|
actions = []
|
|
node = final_node
|
|
|
|
while node.parent is not None:
|
|
if node.action:
|
|
actions.append(node.action)
|
|
node = node.parent
|
|
|
|
actions.reverse()
|
|
|
|
return Plan(
|
|
goal=goal,
|
|
actions=actions,
|
|
total_cost=final_node.g_cost,
|
|
expected_final_state=final_node.state,
|
|
)
|
|
|
|
|
|
class ReactivePlanner:
|
|
"""A simpler reactive planner for immediate needs.
|
|
|
|
Sometimes we don't need full planning - we just need to
|
|
pick the best immediate action. This planner evaluates
|
|
single actions against goals.
|
|
"""
|
|
|
|
def select_best_action(
|
|
self,
|
|
state: WorldState,
|
|
goals: list[Goal],
|
|
available_actions: list[GOAPAction],
|
|
) -> Optional[GOAPAction]:
|
|
"""Select the single best action to take right now.
|
|
|
|
Evaluates each valid action and scores it based on how well
|
|
it progresses toward high-priority goals.
|
|
"""
|
|
best_action: Optional[GOAPAction] = None
|
|
best_score = float('-inf')
|
|
|
|
for action in available_actions:
|
|
if not action.is_valid(state):
|
|
continue
|
|
|
|
score = self._score_action(state, action, goals)
|
|
|
|
if score > best_score:
|
|
best_score = score
|
|
best_action = action
|
|
|
|
return best_action
|
|
|
|
def _score_action(
|
|
self,
|
|
state: WorldState,
|
|
action: GOAPAction,
|
|
goals: list[Goal],
|
|
) -> float:
|
|
"""Score an action based on its contribution to goals."""
|
|
# Apply action to get expected new state
|
|
new_state = action.apply(state)
|
|
action_cost = action.get_cost(state)
|
|
|
|
total_score = 0.0
|
|
|
|
for goal in goals:
|
|
priority = goal.priority(state)
|
|
if priority <= 0:
|
|
continue
|
|
|
|
# Check if this action helps with the goal
|
|
was_satisfied = goal.satisfied(state)
|
|
now_satisfied = goal.satisfied(new_state)
|
|
|
|
if now_satisfied and not was_satisfied:
|
|
# Action satisfies the goal - big bonus!
|
|
total_score += priority * 10.0
|
|
elif not was_satisfied:
|
|
# Check if we made progress
|
|
# This is a simplified check based on urgencies
|
|
old_urgency = self._get_goal_urgency(goal, state)
|
|
new_urgency = self._get_goal_urgency(goal, new_state)
|
|
|
|
if new_urgency < old_urgency:
|
|
improvement = old_urgency - new_urgency
|
|
total_score += priority * improvement * 5.0
|
|
|
|
# Subtract cost
|
|
total_score -= action_cost
|
|
|
|
return total_score
|
|
|
|
def _get_goal_urgency(self, goal: Goal, state: WorldState) -> float:
|
|
"""Get the urgency related to a goal."""
|
|
# Map goal types to state urgencies
|
|
from .goal import GoalType
|
|
|
|
urgency_map = {
|
|
GoalType.SATISFY_THIRST: state.thirst_urgency,
|
|
GoalType.SATISFY_HUNGER: state.hunger_urgency,
|
|
GoalType.MAINTAIN_HEAT: state.heat_urgency,
|
|
GoalType.RESTORE_ENERGY: state.energy_urgency,
|
|
}
|
|
|
|
return urgency_map.get(goal.goal_type, 0.0)
|
|
|