Снесарев Максим 308f738c37 [new] add goap agents
2026-01-19 20:45:35 +03:00

336 lines
10 KiB
Python

"""GOAP Planner using A* search.
The planner finds optimal action sequences to achieve goals.
It uses A* search with the goal condition as the target.
"""
import heapq
from dataclasses import dataclass, field
from typing import Optional
from .world_state import WorldState
from .goal import Goal
from .action import GOAPAction
@dataclass(order=True)
class PlanNode:
"""A node in the planning search tree."""
f_cost: float # Total cost (g + h)
g_cost: float = field(compare=False) # Cost so far
state: WorldState = field(compare=False)
action: Optional[GOAPAction] = field(compare=False, default=None)
parent: Optional["PlanNode"] = field(compare=False, default=None)
depth: int = field(compare=False, default=0)
@dataclass
class Plan:
"""A plan is a sequence of actions to achieve a goal."""
goal: Goal
actions: list[GOAPAction]
total_cost: float
expected_final_state: WorldState
@property
def first_action(self) -> Optional[GOAPAction]:
"""Get the first action to execute."""
return self.actions[0] if self.actions else None
@property
def is_empty(self) -> bool:
return len(self.actions) == 0
def __repr__(self) -> str:
action_names = " -> ".join(a.name for a in self.actions)
return f"Plan({self.goal.name}: {action_names} [cost={self.total_cost:.1f}])"
class GOAPPlanner:
"""A* planner for GOAP.
Finds the lowest-cost sequence of actions that transforms
the current world state into one where the goal is satisfied.
"""
def __init__(self, max_iterations: int = 100):
self.max_iterations = max_iterations
def plan(
self,
initial_state: WorldState,
goal: Goal,
available_actions: list[GOAPAction],
) -> Optional[Plan]:
"""Find an action sequence to achieve the goal.
Uses A* search where:
- g(n) = accumulated action costs
- h(n) = heuristic estimate to goal (0 for now - effectively Dijkstra's)
Returns None if no plan found within iteration limit.
"""
# Check if goal is already satisfied
if goal.satisfied(initial_state):
return Plan(
goal=goal,
actions=[],
total_cost=0.0,
expected_final_state=initial_state,
)
# Priority queue for A*
open_set: list[PlanNode] = []
start_node = PlanNode(
f_cost=0.0,
g_cost=0.0,
state=initial_state,
action=None,
parent=None,
depth=0,
)
heapq.heappush(open_set, start_node)
# Track visited states to avoid cycles
# We use a simplified state hash for efficiency
visited: set[tuple] = set()
iterations = 0
while open_set and iterations < self.max_iterations:
iterations += 1
# Get node with lowest f_cost
current = heapq.heappop(open_set)
# Check depth limit
if current.depth >= goal.max_plan_depth:
continue
# Create state hash for cycle detection
state_hash = self._hash_state(current.state)
if state_hash in visited:
continue
visited.add(state_hash)
# Try each action
for action in available_actions:
# Check if action is valid in current state
if not action.is_valid(current.state):
continue
# Apply action to get new state
new_state = action.apply(current.state)
# Calculate costs
action_cost = action.get_cost(current.state)
g_cost = current.g_cost + action_cost
h_cost = self._heuristic(new_state, goal)
f_cost = g_cost + h_cost
# Create new node
new_node = PlanNode(
f_cost=f_cost,
g_cost=g_cost,
state=new_state,
action=action,
parent=current,
depth=current.depth + 1,
)
# Check if goal is satisfied
if goal.satisfied(new_state):
# Reconstruct and return plan
return self._reconstruct_plan(new_node, goal)
# Add to open set
heapq.heappush(open_set, new_node)
# No plan found
return None
def plan_for_goals(
self,
initial_state: WorldState,
goals: list[Goal],
available_actions: list[GOAPAction],
) -> Optional[Plan]:
"""Find the best plan among multiple goals.
Selects the highest-priority goal that has a valid plan,
considering both goal priority and plan cost.
"""
# Sort goals by priority (highest first)
sorted_goals = sorted(goals, key=lambda g: g.priority(initial_state), reverse=True)
best_plan: Optional[Plan] = None
best_score = float('-inf')
for goal in sorted_goals:
priority = goal.priority(initial_state)
# Skip low-priority goals if we already have a good plan
if priority <= 0:
continue
if best_plan and priority < best_score * 0.5:
# This goal is much lower priority, skip
break
plan = self.plan(initial_state, goal, available_actions)
if plan:
# Score = priority / (cost + 1)
# Higher priority and lower cost = better
score = priority / (plan.total_cost + 1.0)
if score > best_score:
best_score = score
best_plan = plan
return best_plan
def _hash_state(self, state: WorldState) -> tuple:
"""Create a hashable representation of key state values.
We don't hash everything - just the values that matter for planning.
"""
return (
round(state.thirst_pct, 1),
round(state.hunger_pct, 1),
round(state.heat_pct, 1),
round(state.energy_pct, 1),
state.water_count,
state.food_count,
state.wood_count,
state.money // 10, # Bucket money
)
def _heuristic(self, state: WorldState, goal: Goal) -> float:
"""Estimate cost to reach goal from state.
For now, we use a simple heuristic based on the distance
from current state values to goal target values.
"""
if not goal.target_state:
return 0.0
h = 0.0
for key, target in goal.target_state.items():
if hasattr(state, key):
current = getattr(state, key)
if isinstance(current, (int, float)) and isinstance(target, (int, float)):
diff = abs(target - current)
h += diff
return h
def _reconstruct_plan(self, final_node: PlanNode, goal: Goal) -> Plan:
"""Reconstruct the action sequence from the final node."""
actions = []
node = final_node
while node.parent is not None:
if node.action:
actions.append(node.action)
node = node.parent
actions.reverse()
return Plan(
goal=goal,
actions=actions,
total_cost=final_node.g_cost,
expected_final_state=final_node.state,
)
class ReactivePlanner:
"""A simpler reactive planner for immediate needs.
Sometimes we don't need full planning - we just need to
pick the best immediate action. This planner evaluates
single actions against goals.
"""
def select_best_action(
self,
state: WorldState,
goals: list[Goal],
available_actions: list[GOAPAction],
) -> Optional[GOAPAction]:
"""Select the single best action to take right now.
Evaluates each valid action and scores it based on how well
it progresses toward high-priority goals.
"""
best_action: Optional[GOAPAction] = None
best_score = float('-inf')
for action in available_actions:
if not action.is_valid(state):
continue
score = self._score_action(state, action, goals)
if score > best_score:
best_score = score
best_action = action
return best_action
def _score_action(
self,
state: WorldState,
action: GOAPAction,
goals: list[Goal],
) -> float:
"""Score an action based on its contribution to goals."""
# Apply action to get expected new state
new_state = action.apply(state)
action_cost = action.get_cost(state)
total_score = 0.0
for goal in goals:
priority = goal.priority(state)
if priority <= 0:
continue
# Check if this action helps with the goal
was_satisfied = goal.satisfied(state)
now_satisfied = goal.satisfied(new_state)
if now_satisfied and not was_satisfied:
# Action satisfies the goal - big bonus!
total_score += priority * 10.0
elif not was_satisfied:
# Check if we made progress
# This is a simplified check based on urgencies
old_urgency = self._get_goal_urgency(goal, state)
new_urgency = self._get_goal_urgency(goal, new_state)
if new_urgency < old_urgency:
improvement = old_urgency - new_urgency
total_score += priority * improvement * 5.0
# Subtract cost
total_score -= action_cost
return total_score
def _get_goal_urgency(self, goal: Goal, state: WorldState) -> float:
"""Get the urgency related to a goal."""
# Map goal types to state urgencies
from .goal import GoalType
urgency_map = {
GoalType.SATISFY_THIRST: state.thirst_urgency,
GoalType.SATISFY_HUNGER: state.hunger_urgency,
GoalType.MAINTAIN_HEAT: state.heat_urgency,
GoalType.RESTORE_ENERGY: state.energy_urgency,
}
return urgency_map.get(goal.goal_type, 0.0)