""" Variance collection runner for Echo Probe. Automates running Echo Probe 1000x to measure variance in depth, echo types, and chain patterns for baseline characterization. """ import uuid from typing import List, Dict, Any from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn from rich.console import Console from ..core.model import NyxModel from ..probes.echo_probe import EchoProbe from nyx_substrate.database import PhoebeConnection, VarianceProbeDAO from nyx_substrate.schemas import VarianceProbeRun console = Console() class VarianceRunner: """ Automated variance collection for Echo Probe. Runs Echo Probe N times on a term, storing each result in phoebe for variance analysis. """ def __init__( self, model: NyxModel, dao: VarianceProbeDAO, max_rounds: int = 3, max_new_tokens: int = 50, temperature: float = 0.8, ): """ Initialize VarianceRunner. Args: model: Loaded NyxModel dao: VarianceProbeDAO for database storage max_rounds: Max echo rounds per probe max_new_tokens: Max tokens per generation temperature: Sampling temperature """ self.model = model self.dao = dao self.probe = EchoProbe( model=model, max_rounds=max_rounds, max_new_tokens=max_new_tokens, temperature=temperature, ) self.max_rounds = max_rounds self.max_new_tokens = max_new_tokens self.temperature = temperature def run_session( self, term: str, runs: int = 1000, show_progress: bool = True, ) -> uuid.UUID: """ Run variance collection session on a single term. Args: term: Term to probe runs: Number of runs (default: 1000) show_progress: Show progress bar Returns: session_id UUID Example: >>> runner = VarianceRunner(model, dao) >>> session_id = runner.run_session("Geworfenheit", runs=1000) >>> print(f"Session: {session_id}") """ session_id = uuid.uuid4() console.print(f"\n[bold cyan]šŸ”¬ Variance Collection Session[/bold cyan]") console.print(f"Term: [bold]{term}[/bold]") console.print(f"Runs: {runs}") console.print(f"Session ID: {session_id}\n") if show_progress: with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TaskProgressColumn(), console=console, ) as progress: task = progress.add_task(f"Probing '{term}'...", total=runs) for run_number in range(1, runs + 1): # Run probe result = self.probe.probe(term) # Convert echo types to strings echo_types_str = [et.name for et in result.echo_types] # Store in phoebe self.dao.insert_run( session_id=session_id, term=term, run_number=run_number, depth=result.depth, rounds=result.rounds, echo_types=echo_types_str, chain=result.chain, model_name=self.model.model_name, temperature=self.temperature, max_rounds=self.max_rounds, max_new_tokens=self.max_new_tokens, ) progress.update(task, advance=1) else: # No progress bar for run_number in range(1, runs + 1): result = self.probe.probe(term) echo_types_str = [et.name for et in result.echo_types] self.dao.insert_run( session_id=session_id, term=term, run_number=run_number, depth=result.depth, rounds=result.rounds, echo_types=echo_types_str, chain=result.chain, model_name=self.model.model_name, temperature=self.temperature, max_rounds=self.max_rounds, max_new_tokens=self.max_new_tokens, ) console.print(f"\nāœ… [bold green]Session complete![/bold green]") console.print(f"Stored {runs} runs in phoebe") console.print(f"Session ID: [bold]{session_id}[/bold]\n") return session_id def run_batch( self, terms: List[str], runs_per_term: int = 1000, show_progress: bool = True, ) -> Dict[str, uuid.UUID]: """ Run variance collection on multiple terms. Args: terms: List of terms to probe runs_per_term: Number of runs per term show_progress: Show progress bar Returns: Dictionary mapping term -> session_id Example: >>> runner = VarianceRunner(model, dao) >>> sessions = runner.run_batch(["Geworfenheit", "Vernunft"], runs_per_term=1000) """ console.print(f"\n[bold cyan]šŸ”¬ Batch Variance Collection[/bold cyan]") console.print(f"Terms: {len(terms)}") console.print(f"Runs per term: {runs_per_term}") console.print(f"Total runs: {len(terms) * runs_per_term}\n") sessions = {} for idx, term in enumerate(terms, 1): console.print(f"[bold]Term {idx}/{len(terms)}:[/bold] {term}") session_id = self.run_session(term, runs=runs_per_term, show_progress=show_progress) sessions[term] = session_id console.print(f"\nāœ… [bold green]Batch complete![/bold green]") console.print(f"Collected variance for {len(terms)} terms") return sessions def get_session_summary(self, session_id: uuid.UUID) -> Dict[str, Any]: """ Get summary statistics for a session. Args: session_id: Session UUID Returns: Dictionary with statistics Example: >>> summary = runner.get_session_summary(session_id) >>> print(f"Average depth: {summary['avg_depth']}") """ return self.dao.get_session_stats(session_id) def display_session_stats(self, session_id: uuid.UUID) -> None: """ Display session statistics to console. Args: session_id: Session UUID """ stats = self.get_session_summary(session_id) console.print(f"\n[bold cyan]šŸ“Š Session Statistics[/bold cyan]") console.print(f"Session ID: {session_id}") console.print(f"Term: [bold]{stats['term']}[/bold]") console.print(f"Total runs: {stats['total_runs']}") console.print(f"Average depth: {stats['avg_depth']:.2f}") console.print(f"Average rounds: {stats['avg_rounds']:.2f}") console.print("\n[bold]Depth Distribution:[/bold]") dist = stats['depth_distribution'] for depth_val in ['0', '1', '2', '3']: count = dist.get(depth_val, 0) pct = (count / stats['total_runs'] * 100) if stats['total_runs'] > 0 else 0 console.print(f" Depth {depth_val}: {count:4d} ({pct:5.1f}%)") console.print("\n[bold]Most Common Echo Types:[/bold]") for echo_info in stats['most_common_echo_types'][:5]: console.print(f" {echo_info['type']:12s}: {echo_info['count']:4d}") console.print()