villsim/backend/core/market.py

444 lines
17 KiB
Python

"""Market system with Order Book for the Village Simulation.
Implements supply/demand pricing mechanics:
- Sellers can adjust prices based on market conditions
- Scarcity drives prices up, surplus drives prices down
- Historical price tracking for market signals
"""
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
from uuid import uuid4
from backend.domain.resources import ResourceType
class OrderStatus(Enum):
"""Status of a market order."""
ACTIVE = "active"
FILLED = "filled"
CANCELLED = "cancelled"
EXPIRED = "expired"
@dataclass
class PriceHistory:
"""Track price history for a resource type."""
last_sale_price: int = 0
avg_sale_price: float = 0.0
total_sold: int = 0
last_sale_turn: int = 0
demand_score: float = 0.5 # 0-1, higher = more demand
supply_score: float = 0.5 # 0-1, higher = more supply
@dataclass
class Order:
"""A market order (sell order)."""
id: str = field(default_factory=lambda: str(uuid4())[:8])
seller_id: str = ""
resource_type: ResourceType = ResourceType.BERRIES
quantity: int = 1
price_per_unit: int = 1
created_turn: int = 0
status: OrderStatus = OrderStatus.ACTIVE
# Price adjustment tracking
turns_without_sale: int = 0
original_price: int = 0
last_adjusted_turn: int = 0 # Track when price was last changed
def __post_init__(self):
if self.original_price == 0:
self.original_price = self.price_per_unit
if self.last_adjusted_turn == 0:
self.last_adjusted_turn = self.created_turn
@property
def total_price(self) -> int:
"""Get total price for all units."""
return self.quantity * self.price_per_unit
def apply_discount(self, percentage: float = 0.1) -> None:
"""Apply a discount to the price."""
reduction = max(1, int(self.price_per_unit * percentage))
self.price_per_unit = max(1, self.price_per_unit - reduction)
def adjust_price(self, new_price: int, current_turn: int) -> bool:
"""Adjust the order's price. Returns True if successful."""
if new_price < 1:
return False
self.price_per_unit = new_price
self.last_adjusted_turn = current_turn
return True
def can_raise_price(self, current_turn: int, min_turns: int = 2) -> bool:
"""Check if enough time has passed to raise the price again."""
return current_turn - self.last_adjusted_turn >= min_turns
def to_dict(self) -> dict:
"""Convert to dictionary for API serialization."""
return {
"id": self.id,
"seller_id": self.seller_id,
"resource_type": self.resource_type.value,
"quantity": self.quantity,
"price_per_unit": self.price_per_unit,
"total_price": self.total_price,
"created_turn": self.created_turn,
"status": self.status.value,
"turns_without_sale": self.turns_without_sale,
"original_price": self.original_price,
}
@dataclass
class TradeResult:
"""Result of a trade transaction."""
success: bool
order_id: str = ""
buyer_id: str = ""
seller_id: str = ""
resource_type: Optional[ResourceType] = None
quantity: int = 0
total_paid: int = 0
message: str = ""
def to_dict(self) -> dict:
return {
"success": self.success,
"order_id": self.order_id,
"buyer_id": self.buyer_id,
"seller_id": self.seller_id,
"resource_type": self.resource_type.value if self.resource_type else None,
"quantity": self.quantity,
"total_paid": self.total_paid,
"message": self.message,
}
@dataclass
class OrderBook:
"""Central market order book with supply/demand tracking.
Features:
- Track price history per resource type
- Calculate supply/demand scores
- Suggest prices based on market conditions
- Allow sellers to adjust prices dynamically
"""
orders: list[Order] = field(default_factory=list)
trade_history: list[TradeResult] = field(default_factory=list)
price_history: dict[ResourceType, PriceHistory] = field(default_factory=dict)
# Configuration
TURNS_BEFORE_DISCOUNT: int = 3
DISCOUNT_RATE: float = 0.15 # 15% discount after waiting
# Supply/demand thresholds
LOW_SUPPLY_THRESHOLD: int = 3 # Less than this = scarcity
HIGH_SUPPLY_THRESHOLD: int = 10 # More than this = surplus
DEMAND_DECAY: float = 0.95 # How fast demand score decays per turn
def __post_init__(self):
"""Initialize price history for all resource types."""
if not self.price_history:
for resource_type in ResourceType:
self.price_history[resource_type] = PriceHistory()
def place_order(
self,
seller_id: str,
resource_type: ResourceType,
quantity: int,
price_per_unit: int,
current_turn: int,
) -> Order:
"""Place a new sell order."""
order = Order(
seller_id=seller_id,
resource_type=resource_type,
quantity=quantity,
price_per_unit=price_per_unit,
created_turn=current_turn,
)
self.orders.append(order)
return order
def cancel_order(self, order_id: str, seller_id: str) -> bool:
"""Cancel an order. Returns True if successful."""
for order in self.orders:
if order.id == order_id and order.seller_id == seller_id:
if order.status == OrderStatus.ACTIVE:
order.status = OrderStatus.CANCELLED
return True
return False
def get_active_orders(self) -> list[Order]:
"""Get all active orders."""
return [o for o in self.orders if o.status == OrderStatus.ACTIVE]
def get_orders_by_type(self, resource_type: ResourceType) -> list[Order]:
"""Get all active orders for a specific resource type, sorted by price."""
orders = [
o for o in self.orders
if o.status == OrderStatus.ACTIVE and o.resource_type == resource_type
]
return sorted(orders, key=lambda o: o.price_per_unit)
def get_cheapest_order(self, resource_type: ResourceType) -> Optional[Order]:
"""Get the cheapest active order for a resource type."""
orders = self.get_orders_by_type(resource_type)
return orders[0] if orders else None
def get_orders_by_seller(self, seller_id: str) -> list[Order]:
"""Get all active orders from a specific seller."""
return [
o for o in self.orders
if o.status == OrderStatus.ACTIVE and o.seller_id == seller_id
]
def cancel_seller_orders(self, seller_id: str) -> list[Order]:
"""Cancel all orders from a seller (e.g., when they die). Returns cancelled orders."""
cancelled = []
for order in self.orders:
if order.seller_id == seller_id and order.status == OrderStatus.ACTIVE:
order.status = OrderStatus.CANCELLED
cancelled.append(order)
return cancelled
def execute_buy(
self,
buyer_id: str,
order_id: str,
quantity: int,
buyer_money: int,
) -> TradeResult:
"""Execute a buy order. Returns trade result."""
# Find the order
order = None
for o in self.orders:
if o.id == order_id and o.status == OrderStatus.ACTIVE:
order = o
break
if order is None:
return TradeResult(
success=False,
message="Order not found or no longer active",
)
# Check quantity
actual_quantity = min(quantity, order.quantity)
if actual_quantity <= 0:
return TradeResult(
success=False,
message="Invalid quantity",
)
# Check buyer has enough money
total_cost = actual_quantity * order.price_per_unit
if buyer_money < total_cost:
# Try to buy what they can afford
actual_quantity = buyer_money // order.price_per_unit
if actual_quantity <= 0:
return TradeResult(
success=False,
buyer_id=buyer_id,
message="Insufficient funds",
)
total_cost = actual_quantity * order.price_per_unit
# Execute the trade
order.quantity -= actual_quantity
if order.quantity <= 0:
order.status = OrderStatus.FILLED
result = TradeResult(
success=True,
order_id=order.id,
buyer_id=buyer_id,
seller_id=order.seller_id,
resource_type=order.resource_type,
quantity=actual_quantity,
total_paid=total_cost,
message=f"Bought {actual_quantity} {order.resource_type.value} for {total_cost} coins",
)
# Record sale for price history (we need current_turn but don't have it here)
# The turn will be passed via the _record_sale call from engine
self.trade_history.append(result)
return result
def execute_multi_buy(
self,
buyer_id: str,
purchases: list[tuple[str, int]], # List of (order_id, quantity)
buyer_money: int,
) -> list[TradeResult]:
"""Execute multiple buy orders in one action. Returns list of trade results."""
results = []
remaining_money = buyer_money
for order_id, quantity in purchases:
result = self.execute_buy(buyer_id, order_id, quantity, remaining_money)
results.append(result)
if result.success:
remaining_money -= result.total_paid
return results
def update_prices(self, current_turn: int) -> None:
"""Update order prices and supply/demand scores."""
# Update supply/demand scores
self._update_supply_demand_scores(current_turn)
# Apply automatic discounts to stale orders (keeping original behavior)
for order in self.orders:
if order.status != OrderStatus.ACTIVE:
continue
turns_waiting = current_turn - order.created_turn
if turns_waiting > 0 and turns_waiting % self.TURNS_BEFORE_DISCOUNT == 0:
order.turns_without_sale = turns_waiting
order.apply_discount(self.DISCOUNT_RATE)
def _update_supply_demand_scores(self, current_turn: int) -> None:
"""Calculate current supply and demand scores for each resource."""
for resource_type in ResourceType:
history = self.price_history.get(resource_type)
if not history:
history = PriceHistory()
self.price_history[resource_type] = history
# Calculate supply score based on available quantity
total_supply = self.get_total_supply(resource_type)
if total_supply <= self.LOW_SUPPLY_THRESHOLD:
history.supply_score = max(0.1, total_supply / self.LOW_SUPPLY_THRESHOLD * 0.5)
elif total_supply >= self.HIGH_SUPPLY_THRESHOLD:
history.supply_score = min(1.0, 0.5 + (total_supply / self.HIGH_SUPPLY_THRESHOLD) * 0.5)
else:
history.supply_score = 0.5
# Decay demand score over time
history.demand_score *= self.DEMAND_DECAY
def get_total_supply(self, resource_type: ResourceType) -> int:
"""Get total quantity available for a resource type."""
return sum(o.quantity for o in self.get_orders_by_type(resource_type))
def get_supply_demand_ratio(self, resource_type: ResourceType) -> float:
"""Get supply/demand ratio. <1 means scarcity, >1 means surplus."""
history = self.price_history.get(resource_type, PriceHistory())
demand = max(0.1, history.demand_score)
supply = max(0.1, history.supply_score)
return supply / demand
def get_suggested_price(self, resource_type: ResourceType, base_price: int) -> int:
"""Suggest a price based on supply/demand conditions.
Returns an adjusted price that accounts for market conditions:
- Scarcity (low supply, high demand) -> higher price
- Surplus (high supply, low demand) -> lower price
"""
ratio = self.get_supply_demand_ratio(resource_type)
history = self.price_history.get(resource_type, PriceHistory())
# Use average sale price as reference if available
reference_price = base_price
if history.avg_sale_price > 0:
reference_price = int((base_price + history.avg_sale_price) / 2)
# Adjust based on supply/demand
if ratio < 0.7: # Scarcity - raise price
price_multiplier = 1.0 + (0.7 - ratio) * 0.5 # Up to 35% increase
elif ratio > 1.3: # Surplus - lower price
price_multiplier = 1.0 - (ratio - 1.3) * 0.3 # Up to 30% decrease
price_multiplier = max(0.5, price_multiplier) # Floor at 50%
else:
price_multiplier = 1.0
suggested = int(reference_price * price_multiplier)
return max(1, suggested)
def adjust_order_price(self, order_id: str, seller_id: str, new_price: int, current_turn: int) -> bool:
"""Adjust the price of an existing order. Returns True if successful."""
for order in self.orders:
if order.id == order_id and order.seller_id == seller_id:
if order.status == OrderStatus.ACTIVE:
return order.adjust_price(new_price, current_turn)
return False
def _record_sale(self, resource_type: ResourceType, price: int, quantity: int, current_turn: int) -> None:
"""Record a sale for price history tracking."""
history = self.price_history.get(resource_type)
if not history:
history = PriceHistory()
self.price_history[resource_type] = history
history.last_sale_price = price
history.last_sale_turn = current_turn
# Update average sale price
old_total = history.avg_sale_price * history.total_sold
history.total_sold += quantity
history.avg_sale_price = (old_total + price * quantity) / history.total_sold
# Increase demand score when sales happen
history.demand_score = min(1.0, history.demand_score + 0.1 * quantity)
def cleanup_old_orders(self, max_age: int = 50) -> list[Order]:
"""Remove very old orders. Returns removed orders."""
# For now, we don't auto-expire orders, but this could be enabled
return []
def get_market_prices(self) -> dict[str, dict]:
"""Get current market price summary for each resource."""
prices = {}
for resource_type in ResourceType:
orders = self.get_orders_by_type(resource_type)
history = self.price_history.get(resource_type, PriceHistory())
if orders:
prices[resource_type.value] = {
"lowest_price": orders[0].price_per_unit,
"highest_price": orders[-1].price_per_unit,
"total_available": sum(o.quantity for o in orders),
"num_orders": len(orders),
"avg_sale_price": round(history.avg_sale_price, 1) if history.avg_sale_price else None,
"supply_score": round(history.supply_score, 2),
"demand_score": round(history.demand_score, 2),
}
else:
prices[resource_type.value] = {
"lowest_price": None,
"highest_price": None,
"total_available": 0,
"num_orders": 0,
"avg_sale_price": round(history.avg_sale_price, 1) if history.avg_sale_price else None,
"supply_score": round(history.supply_score, 2),
"demand_score": round(history.demand_score, 2),
}
return prices
def get_market_signal(self, resource_type: ResourceType) -> str:
"""Get a simple market signal for a resource: 'sell', 'hold', or 'buy'."""
ratio = self.get_supply_demand_ratio(resource_type)
if ratio < 0.7:
return "sell" # Good time to sell - scarcity
elif ratio > 1.3:
return "buy" # Good time to buy - surplus
return "hold"
def get_state_snapshot(self) -> dict:
"""Get market state for API."""
return {
"orders": [o.to_dict() for o in self.get_active_orders()],
"prices": self.get_market_prices(),
"recent_trades": [
t.to_dict() for t in self.trade_history[-10:]
],
}