Files
nimmerworld.eachpath.local/runtime-engine/LEMNISCATE-RUNTIME-Pseudo-code-001.py
2026-05-04 18:52:24 +02:00

424 lines
14 KiB
Python

# LEMNISCATE RUNTIME - Pseudo-code Implementation
# Companion to runtime-engine/architecture.md
from enum import Enum, auto
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Set, Callable
from abc import ABC, abstractmethod
import math
# ============================================================================
# CORE PRIMITIVES - The shape of flow
# ============================================================================
class SlotState(Enum):
"""State of a slot in the lemniscate"""
EMPTY = auto()
OCCUPIED = auto()
EXITING = auto()
@dataclass
class VerifierFlags:
"""Marker bits carried by slot-tokens, read at crossing"""
exit_eligible: bool = False
has_spoken_this_roundtrip: bool = False
mid_action: bool = False
goal_satisfied: bool = False
silence_eligible: bool = False
priority_pull: bool = False
def clear_transient(self):
"""Clear flags that reset each crossing"""
self.has_spoken_this_roundtrip = False
self.silence_eligible = False
@dataclass
class SlotToken:
"""A token carried around the loop - represents an NPC in the zone"""
npc_id: str
state: SlotState = SlotState.OCCUPIED
flags: VerifierFlags = field(default_factory=VerifierFlags)
# Accumulator for gesture alignment (recursive lemniscate)
gesture_alignment_accumulator: Dict[str, float] = field(default_factory=dict)
def add_gesture_alignment(self, trait: str, value: float):
"""Add a gesture/trait value to the accumulator"""
self.gesture_alignment_accumulator[trait] = \
self.gesture_alignment_accumulator.get(trait, 0.0) + value
# ============================================================================
# THE LEMNISCATE - Geometry as clock
# ============================================================================
class Lemniscate:
"""
The core runtime topology - a through-flow figure-eight.
Traversal IS the turn order.
"""
def __init__(self, n_slots: int):
self.n_slots = n_slots
self.slots: List[Optional[SlotToken]] = [None] * n_slots
# The cursor - position on the lemniscate
# (loop_id, slot_index) - loop 0 = A, loop 1 = B
self.cursor_loop: int = 0
self.cursor_slot: int = 0
# Entry and exit queues
self.entry_line: List[SlotToken] = []
self.exit_line: List[SlotToken] = []
# The crossing is the ONLY synchronous event
self.crossing_callbacks: List[Callable] = []
def advance_cursor(self) -> bool:
"""
Advance cursor through the lemniscate.
Returns True if we reached the crossing.
"""
self.cursor_slot += 1
# Completed one loop
if self.cursor_slot >= self.n_slots:
self.cursor_slot = 0
self.cursor_loop = 1 - self.cursor_loop # Toggle A/B
return True # Reached crossing
return False
def at_crossing(self) -> bool:
"""Cursor is between loops - at the synchronous crossing point"""
return self.cursor_slot == 0
def get_active_token(self) -> Optional[SlotToken]:
"""Get token at current cursor position"""
if self.cursor_slot < len(self.slots):
return self.slots[self.cursor_slot]
return None
def register_crossing_callback(self, callback: Callable):
"""Register a function to run at each crossing"""
self.crossing_callbacks.append(callback)
def trigger_crossing(self):
"""
The crossing event - all state updates resolve atomically.
This is the ONLY time the zone touches the global state bus.
"""
# 1. Run all registered callbacks (dialog turns, state updates)
for callback in self.crossing_callbacks:
callback()
# 2. Process exit-eligible tokens
self._process_exits()
# 3. Push from entry line to fill vacancies
self._process_entries()
# 4. Clear transient flags
self._clear_transient_flags()
def _process_exits(self):
"""Route exit-eligible tokens to exit line"""
for i, token in enumerate(self.slots):
if token and token.flags.exit_eligible:
self.exit_line.append(token)
self.slots[i] = None
token.state = SlotState.EXITING
def _process_entries(self):
"""Fill vacant slots from entry line"""
for i, slot in enumerate(self.slots):
if slot is None and self.entry_line:
self.slots[i] = self.entry_line.pop(0)
def _clear_transient_flags(self):
"""Clear flags that reset each crossing"""
for token in self.slots:
if token:
token.flags.clear_transient()
# ============================================================================
# ZONES - Bounded places of structured speaking
# ============================================================================
class ZoneRegister(Enum):
"""Which ontological register this zone lives in"""
PHYSICAL = auto()
LIMINAL = auto()
IMPERIAL = auto()
class Zone:
"""
A bounded, named, slot-indexed, director-managed event instance.
The zone IS the lemniscate, not a container for it.
"""
def __init__(self, name: str, n_slots: int, register: ZoneRegister = ZoneRegister.PHYSICAL):
self.name = name
self.register = register
self.lemniscate = Lemniscate(n_slots)
# Purpose - spawn intent, immutable
self.purpose: str = ""
# State tracking
self.scene_state_public: Dict = {}
self.ternary_gate_edges: Dict[tuple, str] = {} # (npc_id, npc_id) -> CLOSED/STABLE/OPEN
self.drift_state: Optional[str] = None # Delta from spawn intent
# Lifeforce binding
self.lifeforce_budget: float = 0.0
self.lifeforce_burned: float = 0.0
self.pulse_rate: float = 1.0 # Crossings per second
# Lifecycle
self.is_terminated: bool = False
self.crossing_count: int = 0
# Player gesture overlay - broadcasts to all active slots
self.pending_gestures: List[tuple] = [] # (trait, value)
def set_purpose(self, purpose: str):
"""Set the zone's spawn intent - immutable after this"""
self.purpose = purpose
def add_npc(self, npc_id: str, traits: Dict[str, float]):
"""Add an NPC to the entry line"""
token = SlotToken(npc_id=npc_id)
for trait, value in traits.items():
token.gesture_alignment_accumulator[trait] = value
self.lemniscate.entry_line.append(token)
def add_gesture(self, trait: str, value: float):
"""
Add a player gesture to the accumulator.
These integrate at the next crossing.
"""
self.pending_gestures.append((trait, value))
def broadcast_gestures_to_active(self):
"""
Phase-locked overlay broadcast - all active slots perceive
the same player gesture simultaneously.
"""
for token in self.lemniscate.slots:
if token:
for trait, value in self.pending_gestures:
token.add_gesture_alignment(trait, value)
self.pending_gestures.clear()
def run_crossing(self):
"""Execute one axis crossing - the core unit of zone time"""
self.crossing_count += 1
# Broadcast player gestures before the crossing
self.broadcast_gestures_to_active()
# Trigger the crossing - all state updates resolve atomically
self.lemniscate.trigger_crossing()
# Lifeforce burn
active_slots = sum(1 for t in self.lemniscate.slots if t is not None)
self.lifeforce_burned += self.pulse_rate * active_slots
# Check for termination
if self.lifeforce_burned >= self.lifeforce_budget:
self.terminal_report("budget_exhausted")
self.is_terminated = True
return
# Empty queue behavior: shrink at every crossing
if len(self.lemniscate.entry_line) == 0:
self._shrink()
# Check if zone has dissolved
if all(t is None for t in self.lemniscate.slots):
self.terminal_report("dissolved")
self.is_terminated = True
def _shrink(self):
"""Dissolve one active slot when entry line is empty"""
# Find last occupied slot
for i in range(len(self.lemniscate.slots) - 1, -1, -1):
if self.lemniscate.slots[i] is not None:
self.lemniscate.exit_line.append(self.lemniscate.slots[i])
self.lemniscate.slots[i] = None
break
def terminal_report(self, reason: str) -> Dict:
"""Generate the report sent back to spawning director"""
participants = [t.npc_id for t in self.lemniscate.slots if t is not None]
return {
"name": self.name,
"purpose": self.purpose,
"reason": reason,
"crossings": self.crossing_count,
"lifeforce_cost": self.lifeforce_burned,
"participants": participants,
"drift": self.drift_state
}
# ============================================================================
# TWO-LAYER EXECUTION - State machine + lemniscate
# ============================================================================
class StateMachineLayer:
"""
Physical action layer - runs at simulation tick (fast).
Decoupled from lemniscate cursor.
"""
def __init__(self):
self.actions: Dict[str, Callable] = {}
self.state: Dict[str, Any] = {}
def register_action(self, name: str, callback: Callable):
"""Register a physical action"""
self.actions[name] = callback
def execute(self, npc_id: str, action_name: str):
"""Execute a physical action for an NPC"""
if action_name in self.actions:
self.actions[action_name](npc_id, self.state)
def get_status(self) -> Dict:
"""Snapshot of physical state for dialog layer"""
return self.state.copy()
class EmergentZone(Zone):
"""
A dramatic episode unit with goal evaluation.
"""
def __init__(self, name: str, n_slots: int, purpose: str,
goal: Dict, budget: float):
super().__init__(name, n_slots)
self.set_purpose(purpose)
self.goal = goal
self.lifeforce_budget = budget
# Two-layer execution
self.state_machine = StateMachineLayer()
self.dialog_layer = self # Zone IS the dialog layer
# Goal tracking
self.goal_satisfied = None
self.goal_axis: str = ""
def execute_turn(self, dt: float):
"""
Execute both layers in synchronized-but-loose harmony.
Speech doesn't gate action; action doesn't block speech.
"""
# State machine layer (fast, at sim tick)
for action in self.state_machine.actions:
# Execute physical actions
pass
# Dialog layer (at axis rate)
if self._should_crossing(dt):
self.run_crossing()
self._evaluate_goal()
def _should_crossing(self, dt: float) -> bool:
"""Determine if axis crossing should occur based on pulse rate"""
# Simplified - would track cumulative time in real impl
return dt >= (1.0 / self.pulse_rate)
def _evaluate_goal(self):
"""Ternary evaluation against trait axis"""
if self.goal.get("type") == "numeric":
# Simple threshold
threshold = self.goal["threshold"]
# Would check state machine variable
self.goal_satisfied = 1 if True else 0
elif self.goal.get("type") == "fuzzy":
# Trait-axis ternary evaluation
axis = self.goal["axis"]
# Would evaluate ternary gate state delta
self.goal_satisfied = 0 # +1 / 0 / -1
self.goal_axis = axis
# ============================================================================
# DISTRIBUTED FUNDING - Director + participant lifeforce
# ============================================================================
class Director:
"""
Manages zones for a region. Owns spawn decisions and budget.
"""
def __init__(self, region: str, budget: float):
self.region = region
self.lifeforce_budget = budget
self.active_zones: Dict[str, Zone] = {}
def spawn_zone(self, name: str, purpose: str, slots: int,
budget: float, params: Dict = {}) -> Zone:
"""
Spawn a new zone - pays spawn cost from own budget.
"""
zone = EmergentZone(name, slots, purpose, params.get("goal", {}), budget)
zone.lifeforce_budget = budget
self.lifeforce_budget -= budget # Director pays spawn cost
self.active_zones[name] = zone
return zone
def update_all_zones(self, dt: float):
"""Advance all active zones"""
for zone in list(self.active_zones.values()):
if not zone.is_terminated:
zone.run_crossing()
# Would collect terminal reports
# ============================================================================
# EXAMPLE: A rescue emergent zone
# ============================================================================
def example_rescue_zone_setup():
"""
Demonstrates the rescue event pattern from the architecture.
"""
# Director has a budget
director = Director("district_1", budget=1000.0)
# NPC A's limb is broken - emergent signal
# Director consumes signal and spawns rescue event
rescue_goal = {
"type": "numeric",
"threshold": 10.0, # limb_A >= 10%
"fallback": {
"type": "fuzzy",
"axis": "Sophrosyne",
"direction": "positive"
}
}
rescue = director.spawn_zone(
name="rescue_in_marketplace",
purpose="medical_emergency",
slots=4,
budget=50.0,
params={"goal": rescue_goal}
)
# Add NPCs who might help
rescue.add_npc("npc_patient", {"Philotes": 0.3, "Sophrosyne": 0.1})
rescue.add_npc("npc_bystander_1", {"Philotes": 0.7, "Sophrosyne": 0.5})
rescue.add_npc("npc_bystander_2", {"Philotes": 0.2, "Sophrosyne": 0.8})
# Player adds a gesture (helping)
rescue.add_gesture("Philotes", 0.8)
# Runtime tick
rescue.execute_turn(0.016) # 60fps
if __name__ == "__main__":
example_rescue_zone_setup()