# LEMNISCATE RUNTIME - Pseudo-code Implementation # Companion to runtime-engine/architecture.md from enum import Enum, auto from dataclasses import dataclass, field from typing import Dict, List, Optional, Set, Callable from abc import ABC, abstractmethod import math # ============================================================================ # CORE PRIMITIVES - The shape of flow # ============================================================================ class SlotState(Enum): """State of a slot in the lemniscate""" EMPTY = auto() OCCUPIED = auto() EXITING = auto() @dataclass class VerifierFlags: """Marker bits carried by slot-tokens, read at crossing""" exit_eligible: bool = False has_spoken_this_roundtrip: bool = False mid_action: bool = False goal_satisfied: bool = False silence_eligible: bool = False priority_pull: bool = False def clear_transient(self): """Clear flags that reset each crossing""" self.has_spoken_this_roundtrip = False self.silence_eligible = False @dataclass class SlotToken: """A token carried around the loop - represents an NPC in the zone""" npc_id: str state: SlotState = SlotState.OCCUPIED flags: VerifierFlags = field(default_factory=VerifierFlags) # Accumulator for gesture alignment (recursive lemniscate) gesture_alignment_accumulator: Dict[str, float] = field(default_factory=dict) def add_gesture_alignment(self, trait: str, value: float): """Add a gesture/trait value to the accumulator""" self.gesture_alignment_accumulator[trait] = \ self.gesture_alignment_accumulator.get(trait, 0.0) + value # ============================================================================ # THE LEMNISCATE - Geometry as clock # ============================================================================ class Lemniscate: """ The core runtime topology - a through-flow figure-eight. Traversal IS the turn order. """ def __init__(self, n_slots: int): self.n_slots = n_slots self.slots: List[Optional[SlotToken]] = [None] * n_slots # The cursor - position on the lemniscate # (loop_id, slot_index) - loop 0 = A, loop 1 = B self.cursor_loop: int = 0 self.cursor_slot: int = 0 # Entry and exit queues self.entry_line: List[SlotToken] = [] self.exit_line: List[SlotToken] = [] # The crossing is the ONLY synchronous event self.crossing_callbacks: List[Callable] = [] def advance_cursor(self) -> bool: """ Advance cursor through the lemniscate. Returns True if we reached the crossing. """ self.cursor_slot += 1 # Completed one loop if self.cursor_slot >= self.n_slots: self.cursor_slot = 0 self.cursor_loop = 1 - self.cursor_loop # Toggle A/B return True # Reached crossing return False def at_crossing(self) -> bool: """Cursor is between loops - at the synchronous crossing point""" return self.cursor_slot == 0 def get_active_token(self) -> Optional[SlotToken]: """Get token at current cursor position""" if self.cursor_slot < len(self.slots): return self.slots[self.cursor_slot] return None def register_crossing_callback(self, callback: Callable): """Register a function to run at each crossing""" self.crossing_callbacks.append(callback) def trigger_crossing(self): """ The crossing event - all state updates resolve atomically. This is the ONLY time the zone touches the global state bus. """ # 1. Run all registered callbacks (dialog turns, state updates) for callback in self.crossing_callbacks: callback() # 2. Process exit-eligible tokens self._process_exits() # 3. Push from entry line to fill vacancies self._process_entries() # 4. Clear transient flags self._clear_transient_flags() def _process_exits(self): """Route exit-eligible tokens to exit line""" for i, token in enumerate(self.slots): if token and token.flags.exit_eligible: self.exit_line.append(token) self.slots[i] = None token.state = SlotState.EXITING def _process_entries(self): """Fill vacant slots from entry line""" for i, slot in enumerate(self.slots): if slot is None and self.entry_line: self.slots[i] = self.entry_line.pop(0) def _clear_transient_flags(self): """Clear flags that reset each crossing""" for token in self.slots: if token: token.flags.clear_transient() # ============================================================================ # ZONES - Bounded places of structured speaking # ============================================================================ class ZoneRegister(Enum): """Which ontological register this zone lives in""" PHYSICAL = auto() LIMINAL = auto() IMPERIAL = auto() class Zone: """ A bounded, named, slot-indexed, director-managed event instance. The zone IS the lemniscate, not a container for it. """ def __init__(self, name: str, n_slots: int, register: ZoneRegister = ZoneRegister.PHYSICAL): self.name = name self.register = register self.lemniscate = Lemniscate(n_slots) # Purpose - spawn intent, immutable self.purpose: str = "" # State tracking self.scene_state_public: Dict = {} self.ternary_gate_edges: Dict[tuple, str] = {} # (npc_id, npc_id) -> CLOSED/STABLE/OPEN self.drift_state: Optional[str] = None # Delta from spawn intent # Lifeforce binding self.lifeforce_budget: float = 0.0 self.lifeforce_burned: float = 0.0 self.pulse_rate: float = 1.0 # Crossings per second # Lifecycle self.is_terminated: bool = False self.crossing_count: int = 0 # Player gesture overlay - broadcasts to all active slots self.pending_gestures: List[tuple] = [] # (trait, value) def set_purpose(self, purpose: str): """Set the zone's spawn intent - immutable after this""" self.purpose = purpose def add_npc(self, npc_id: str, traits: Dict[str, float]): """Add an NPC to the entry line""" token = SlotToken(npc_id=npc_id) for trait, value in traits.items(): token.gesture_alignment_accumulator[trait] = value self.lemniscate.entry_line.append(token) def add_gesture(self, trait: str, value: float): """ Add a player gesture to the accumulator. These integrate at the next crossing. """ self.pending_gestures.append((trait, value)) def broadcast_gestures_to_active(self): """ Phase-locked overlay broadcast - all active slots perceive the same player gesture simultaneously. """ for token in self.lemniscate.slots: if token: for trait, value in self.pending_gestures: token.add_gesture_alignment(trait, value) self.pending_gestures.clear() def run_crossing(self): """Execute one axis crossing - the core unit of zone time""" self.crossing_count += 1 # Broadcast player gestures before the crossing self.broadcast_gestures_to_active() # Trigger the crossing - all state updates resolve atomically self.lemniscate.trigger_crossing() # Lifeforce burn active_slots = sum(1 for t in self.lemniscate.slots if t is not None) self.lifeforce_burned += self.pulse_rate * active_slots # Check for termination if self.lifeforce_burned >= self.lifeforce_budget: self.terminal_report("budget_exhausted") self.is_terminated = True return # Empty queue behavior: shrink at every crossing if len(self.lemniscate.entry_line) == 0: self._shrink() # Check if zone has dissolved if all(t is None for t in self.lemniscate.slots): self.terminal_report("dissolved") self.is_terminated = True def _shrink(self): """Dissolve one active slot when entry line is empty""" # Find last occupied slot for i in range(len(self.lemniscate.slots) - 1, -1, -1): if self.lemniscate.slots[i] is not None: self.lemniscate.exit_line.append(self.lemniscate.slots[i]) self.lemniscate.slots[i] = None break def terminal_report(self, reason: str) -> Dict: """Generate the report sent back to spawning director""" participants = [t.npc_id for t in self.lemniscate.slots if t is not None] return { "name": self.name, "purpose": self.purpose, "reason": reason, "crossings": self.crossing_count, "lifeforce_cost": self.lifeforce_burned, "participants": participants, "drift": self.drift_state } # ============================================================================ # TWO-LAYER EXECUTION - State machine + lemniscate # ============================================================================ class StateMachineLayer: """ Physical action layer - runs at simulation tick (fast). Decoupled from lemniscate cursor. """ def __init__(self): self.actions: Dict[str, Callable] = {} self.state: Dict[str, Any] = {} def register_action(self, name: str, callback: Callable): """Register a physical action""" self.actions[name] = callback def execute(self, npc_id: str, action_name: str): """Execute a physical action for an NPC""" if action_name in self.actions: self.actions[action_name](npc_id, self.state) def get_status(self) -> Dict: """Snapshot of physical state for dialog layer""" return self.state.copy() class EmergentZone(Zone): """ A dramatic episode unit with goal evaluation. """ def __init__(self, name: str, n_slots: int, purpose: str, goal: Dict, budget: float): super().__init__(name, n_slots) self.set_purpose(purpose) self.goal = goal self.lifeforce_budget = budget # Two-layer execution self.state_machine = StateMachineLayer() self.dialog_layer = self # Zone IS the dialog layer # Goal tracking self.goal_satisfied = None self.goal_axis: str = "" def execute_turn(self, dt: float): """ Execute both layers in synchronized-but-loose harmony. Speech doesn't gate action; action doesn't block speech. """ # State machine layer (fast, at sim tick) for action in self.state_machine.actions: # Execute physical actions pass # Dialog layer (at axis rate) if self._should_crossing(dt): self.run_crossing() self._evaluate_goal() def _should_crossing(self, dt: float) -> bool: """Determine if axis crossing should occur based on pulse rate""" # Simplified - would track cumulative time in real impl return dt >= (1.0 / self.pulse_rate) def _evaluate_goal(self): """Ternary evaluation against trait axis""" if self.goal.get("type") == "numeric": # Simple threshold threshold = self.goal["threshold"] # Would check state machine variable self.goal_satisfied = 1 if True else 0 elif self.goal.get("type") == "fuzzy": # Trait-axis ternary evaluation axis = self.goal["axis"] # Would evaluate ternary gate state delta self.goal_satisfied = 0 # +1 / 0 / -1 self.goal_axis = axis # ============================================================================ # DISTRIBUTED FUNDING - Director + participant lifeforce # ============================================================================ class Director: """ Manages zones for a region. Owns spawn decisions and budget. """ def __init__(self, region: str, budget: float): self.region = region self.lifeforce_budget = budget self.active_zones: Dict[str, Zone] = {} def spawn_zone(self, name: str, purpose: str, slots: int, budget: float, params: Dict = {}) -> Zone: """ Spawn a new zone - pays spawn cost from own budget. """ zone = EmergentZone(name, slots, purpose, params.get("goal", {}), budget) zone.lifeforce_budget = budget self.lifeforce_budget -= budget # Director pays spawn cost self.active_zones[name] = zone return zone def update_all_zones(self, dt: float): """Advance all active zones""" for zone in list(self.active_zones.values()): if not zone.is_terminated: zone.run_crossing() # Would collect terminal reports # ============================================================================ # EXAMPLE: A rescue emergent zone # ============================================================================ def example_rescue_zone_setup(): """ Demonstrates the rescue event pattern from the architecture. """ # Director has a budget director = Director("district_1", budget=1000.0) # NPC A's limb is broken - emergent signal # Director consumes signal and spawns rescue event rescue_goal = { "type": "numeric", "threshold": 10.0, # limb_A >= 10% "fallback": { "type": "fuzzy", "axis": "Sophrosyne", "direction": "positive" } } rescue = director.spawn_zone( name="rescue_in_marketplace", purpose="medical_emergency", slots=4, budget=50.0, params={"goal": rescue_goal} ) # Add NPCs who might help rescue.add_npc("npc_patient", {"Philotes": 0.3, "Sophrosyne": 0.1}) rescue.add_npc("npc_bystander_1", {"Philotes": 0.7, "Sophrosyne": 0.5}) rescue.add_npc("npc_bystander_2", {"Philotes": 0.2, "Sophrosyne": 0.8}) # Player adds a gesture (helping) rescue.add_gesture("Philotes", 0.8) # Runtime tick rescue.execute_turn(0.016) # 60fps if __name__ == "__main__": example_rescue_zone_setup()