feat: complete Phase 1 - vocabulary expansion & DriftProbe infrastructure

- CLI: nyx-probe scan with --summary/--delta/--full flags
- DriftProbe: training safety with Gini coefficient + Angular Drift
- Vocabulary: 54 terms (30 nimmerverse + 24 German philosophical)
- Sentinels: ANCHOR/BRIDGE/CANARY/TARGET monitoring system

Key findings:
- German philosophical terms: 37.5% depth≥2 hit rate (vs 3.3% nimmerverse)
- Super Cluster validated: heart cross-lang sim = 1.000
- Isolated Zone confirmed: being EN↔DE sim = 0.195
- Gini signature: Philosophy ~0.5 (diffuse), Technical ~0.8 (sparse)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-06 22:39:03 +01:00
parent 9853f4767b
commit f640dbdd65
29 changed files with 6164 additions and 1 deletions

View File

@@ -0,0 +1,10 @@
"""
nyx-probing: Understanding the mind before teaching it.
A probing framework for Qwen2.5-7B-Base.
"""
from .config import Config, get_config
from .core import NyxModel, GenerationResult
__version__ = "0.1.0"
__all__ = ["Config", "get_config", "NyxModel", "GenerationResult"]

View File

@@ -0,0 +1,4 @@
"""Analysis components for nyx-probing."""
from .readiness_scorer import ReadinessScorer
__all__ = ["ReadinessScorer"]

View File

@@ -0,0 +1,221 @@
"""
Readiness Scorer: Combines surface and echo probes into curriculum guidance.
Outputs:
- HIGH: Ready for direct training / state machine
- MEDIUM: Needs scaffolding or bridging concepts
- LOW: Requires foundational work first
"""
from typing import Optional, List
from dataclasses import dataclass
from ..core.model import NyxModel
from ..core.probe_result import (
SurfaceProbeResult,
EchoProbeResult,
ReadinessResult,
ReadinessLevel,
EchoType,
)
from ..probes.surface_probe import SurfaceProbe, CompletionCategory
from ..probes.echo_probe import EchoProbe
# Recommended actions for each readiness level
ACTIONS = {
ReadinessLevel.HIGH: "state_machine", # Direct training
ReadinessLevel.MEDIUM: "scaffolding", # Bridge concepts
ReadinessLevel.LOW: "foundational", # Build from scratch
}
class ReadinessScorer:
"""
Combines surface + echo probes to assess curriculum readiness.
A term is ready for training when:
1. Surface: Coherent associations (not scattered/random)
2. Echo: Can expand beyond surface (depth > 0)
3. Valley: In a productive valley (prose/philosophy, not just code)
"""
def __init__(
self,
model: NyxModel,
surface_runs: int = 3,
echo_rounds: int = 3,
max_new_tokens: int = 50,
):
self.model = model
self.surface_probe = SurfaceProbe(
model,
num_runs=surface_runs,
max_new_tokens=max_new_tokens,
)
self.echo_probe = EchoProbe(
model,
max_rounds=echo_rounds,
max_new_tokens=max_new_tokens,
)
def score(self, term: str) -> ReadinessResult:
"""
Assess readiness of a term for curriculum.
Args:
term: Word or phrase to assess
Returns:
ReadinessResult with level, action, and supporting evidence
"""
# Run both probes
surface = self.surface_probe.probe(term)
echo = self.echo_probe.probe(term)
# Classify valley from surface probe
classification = self.surface_probe.classify_completions(surface)
dominant_valley = classification['dominant']
# Calculate composite score
level, reasoning = self._calculate_level(
surface=surface,
echo=echo,
dominant_valley=dominant_valley,
)
return ReadinessResult(
term=term,
level=level,
action=ACTIONS[level],
surface=surface,
echo=echo,
reasoning=reasoning,
)
def _calculate_level(
self,
surface: SurfaceProbeResult,
echo: EchoProbeResult,
dominant_valley: str,
) -> tuple[ReadinessLevel, str]:
"""
Calculate readiness level based on probe results.
Heuristics:
- HIGH: depth >= 2 AND coherence >= 0.5 AND not pure code
- MEDIUM: depth >= 1 OR (coherence >= 0.5 AND prose/philosophy)
- LOW: everything else
"""
depth = echo.depth
coherence = surface.coherence_score or 0.0
eos_ratio = surface.hit_eos_count / len(surface.completions) if surface.completions else 0
# Count echo types
expands = sum(1 for t in echo.echo_types if t == EchoType.EXPANDS)
collapses = sum(1 for t in echo.echo_types if t == EchoType.COLLAPSE)
circulars = sum(1 for t in echo.echo_types if t == EchoType.CIRCULAR)
# Build reasoning
reasons = []
# HIGH: Good depth + coherence + productive valley
if depth >= 2 and coherence >= 0.4:
if dominant_valley not in [CompletionCategory.CODE]:
reasons.append(f"depth={depth} (strong conceptual expansion)")
reasons.append(f"coherence={coherence:.2f} (consistent associations)")
reasons.append(f"valley={dominant_valley} (productive for training)")
return ReadinessLevel.HIGH, "; ".join(reasons)
# HIGH: Exceptional depth even with lower coherence
if depth >= 3:
reasons.append(f"depth={depth} (exceptional expansion)")
reasons.append(f"all {expands} echoes expand")
return ReadinessLevel.HIGH, "; ".join(reasons)
# MEDIUM: Some depth or good coherence in prose
if depth >= 1:
reasons.append(f"depth={depth} (some expansion capability)")
if dominant_valley in [CompletionCategory.PROSE, 'prose', 'definition']:
reasons.append(f"valley={dominant_valley} (trainable with scaffolding)")
return ReadinessLevel.MEDIUM, "; ".join(reasons)
if coherence >= 0.5 and dominant_valley not in [CompletionCategory.CODE, 'code']:
reasons.append(f"coherence={coherence:.2f} (consistent surface)")
reasons.append(f"valley={dominant_valley}")
reasons.append("but limited depth - needs bridging concepts")
return ReadinessLevel.MEDIUM, "; ".join(reasons)
# LOW: Trapped in code, circular, or incoherent
if dominant_valley in [CompletionCategory.CODE, 'code']:
reasons.append(f"valley=CODE (trapped in technical patterns)")
if circulars >= 2:
reasons.append(f"{circulars} circular echoes (surface-only knowledge)")
if collapses >= 1:
reasons.append(f"{collapses} collapses (unstable representations)")
if coherence < 0.4:
reasons.append(f"coherence={coherence:.2f} (scattered associations)")
return ReadinessLevel.LOW, "; ".join(reasons) if reasons else "insufficient depth and coherence"
def score_batch(self, terms: List[str]) -> List[ReadinessResult]:
"""Score multiple terms."""
return [self.score(term) for term in terms]
def summary(self, result: ReadinessResult) -> str:
"""Generate human-readable summary."""
symbols = {
ReadinessLevel.HIGH: "🟢",
ReadinessLevel.MEDIUM: "🟡",
ReadinessLevel.LOW: "🔴",
}
surface_summary = f"coherence={result.surface.coherence_score:.2f}" if result.surface else "N/A"
echo_summary = f"depth={result.echo.depth}" if result.echo else "N/A"
lines = [
f"{symbols[result.level]} {result.term}: {result.level.value}",
f" Action: {result.action}",
f" Surface: {surface_summary}",
f" Echo: {echo_summary}",
f" Reasoning: {result.reasoning}",
]
return "\n".join(lines)
def curriculum_report(self, results: List[ReadinessResult]) -> str:
"""Generate curriculum planning report."""
high = [r for r in results if r.level == ReadinessLevel.HIGH]
medium = [r for r in results if r.level == ReadinessLevel.MEDIUM]
low = [r for r in results if r.level == ReadinessLevel.LOW]
lines = [
"=" * 60,
"CURRICULUM READINESS REPORT",
"=" * 60,
"",
f"🟢 HIGH ({len(high)} terms) - Ready for state machine:",
]
for r in high:
lines.append(f"{r.term}")
lines.extend([
"",
f"🟡 MEDIUM ({len(medium)} terms) - Need scaffolding:",
])
for r in medium:
lines.append(f"{r.term}: {r.reasoning[:60]}...")
lines.extend([
"",
f"🔴 LOW ({len(low)} terms) - Require foundational work:",
])
for r in low:
lines.append(f"{r.term}: {r.reasoning[:60]}...")
lines.extend([
"",
"=" * 60,
f"Summary: {len(high)}/{len(results)} ready, {len(medium)} scaffolding, {len(low)} foundational",
"=" * 60,
])
return "\n".join(lines)

614
nyx_probing/cli/probe.py Normal file
View File

@@ -0,0 +1,614 @@
#!/usr/bin/env python3
"""
nyx-probe CLI: Interactive probing of the Young Mind.
Commands:
surface - Probe immediate associations
echo - Measure conceptual depth
readiness - Full curriculum assessment
tokens - Token analysis
glossary - Batch probe from JSON file
scan - Multilingual vocabulary scan with incremental testing
"""
import sys
import json
from pathlib import Path
from typing import Optional, List
from datetime import datetime
import os
import click
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich import box
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from nyx_probing.core.model import NyxModel
from nyx_probing.probes.surface_probe import SurfaceProbe
from nyx_probing.probes.echo_probe import EchoProbe
from nyx_probing.analysis.readiness_scorer import ReadinessScorer
console = Console()
# Global model instance (lazy loaded)
_model: Optional[NyxModel] = None
def get_model() -> NyxModel:
"""Get or create the model instance."""
global _model
if _model is None:
with console.status("[bold cyan]Loading Qwen2.5-7B...", spinner="dots"):
_model = NyxModel()
_model.load()
console.print("[green]✓ Model loaded[/green]")
return _model
def detect_category(completions: list) -> str:
"""Simple category detection from completions."""
text = " ".join(completions).lower()
code_indicators = ["def ", "class ", "function", "import ", "return ", "{", "}", ";", "=>", "()"]
if any(ind in text for ind in code_indicators):
return "CODE"
list_indicators = ["1.", "2.", "- ", "", "* "]
if any(ind in text for ind in list_indicators):
return "LIST"
return "PROSE"
@click.group()
@click.version_option(version="0.1.0", prog_name="nyx-probe")
def cli():
"""
🌙 nyx-probe: Probe the Young Mind's conceptual topology.
Explore how Qwen2.5-7B-Base understands and associates concepts.
"""
pass
@cli.command()
@click.argument("term")
@click.option("-n", "--runs", default=3, help="Number of completion runs")
@click.option("-t", "--tokens", default=50, help="Max tokens per completion")
@click.option("--temperature", default=0.8, help="Sampling temperature")
def surface(term: str, runs: int, tokens: int, temperature: float):
"""
Probe surface associations of a term.
Shows what the model completes when given a word - reveals
which "valley" (code, prose, philosophy) the term lives in.
"""
model = get_model()
probe = SurfaceProbe(
model,
num_runs=runs,
max_new_tokens=tokens,
temperature=temperature,
)
console.print(f"\n[bold cyan]🔬 Surface Probe:[/bold cyan] [yellow]{term}[/yellow]\n")
with console.status("[bold cyan]Probing...", spinner="dots"):
result = probe.probe(term)
# Display completions
table = Table(title="Completions", box=box.ROUNDED)
table.add_column("#", style="dim", width=3)
table.add_column("Completion", style="white")
table.add_column("EOS", style="green", width=5)
for i, comp in enumerate(result.completions[:5], 1):
preview = comp[:80] + "..." if len(comp) > 80 else comp
preview = preview.replace("\n", "")
table.add_row(str(i), preview, "" if result.hit_eos_count > 0 else "")
console.print(table)
# Detect category
category = detect_category(result.completions)
coherence = result.coherence_score or 0.0
# Summary panel
summary = f"""
[bold]Category:[/bold] {category}
[bold]Coherence:[/bold] {coherence:.2f}
[bold]Avg Tokens:[/bold] {result.avg_tokens:.1f}
[bold]EOS Rate:[/bold] {result.hit_eos_count}/{len(result.completions)}
"""
console.print(Panel(summary, title="📊 Analysis", border_style="cyan"))
@cli.command()
@click.argument("term")
@click.option("-r", "--rounds", default=3, help="Echo rounds")
@click.option("-t", "--tokens", default=50, help="Max tokens per round")
def echo(term: str, rounds: int, tokens: int):
"""
Measure conceptual depth through iterative echoing.
Feeds completions back to measure how deep the concept goes.
Classifications: EXPANDS, CONFIRMS, CIRCULAR, DIVERGENT, COLLAPSE
"""
model = get_model()
probe = EchoProbe(
model,
max_rounds=rounds,
max_new_tokens=tokens,
)
console.print(f"\n[bold cyan]🔄 Echo Probe:[/bold cyan] [yellow]{term}[/yellow]\n")
with console.status("[bold cyan]Echoing...", spinner="dots"):
result = probe.probe(term)
# Display chain
table = Table(title="Echo Chain", box=box.ROUNDED)
table.add_column("Round", style="dim", width=6)
table.add_column("Type", style="bold", width=12)
table.add_column("Content", style="white")
table.add_row("0", "[cyan]SEED[/cyan]", term)
type_colors = {
"EXPANDS": "green",
"CONFIRMS": "yellow",
"CIRCULAR": "red",
"DIVERGENT": "magenta",
"COLLAPSE": "dim red",
}
for i, (echo_type, content) in enumerate(zip(result.echo_types, result.chain[1:]), 1):
color = type_colors.get(echo_type.value, "white")
preview = content[:60] + "..." if len(content) > 60 else content
preview = preview.replace("\n", "")
table.add_row(str(i), f"[{color}]{echo_type.value}[/{color}]", preview)
console.print(table)
# Depth indicator
depth = result.depth
depth_bar = "" * depth + "" * (3 - depth)
colors = ["red", "yellow", "green", "cyan"]
console.print(f"\n[bold]Depth Score:[/bold] [{colors[min(depth, 3)]}]{depth_bar}[/] {depth}/3")
@cli.command()
@click.argument("term")
def readiness(term: str):
"""
Full curriculum readiness assessment.
Combines surface + echo probes to determine if a concept
is ready for training: HIGH, MEDIUM, or LOW.
"""
model = get_model()
scorer = ReadinessScorer(model)
console.print(f"\n[bold cyan]📋 Readiness Assessment:[/bold cyan] [yellow]{term}[/yellow]\n")
with console.status("[bold cyan]Assessing...", spinner="dots"):
result = scorer.score(term)
# Level colors
level_styles = {
"HIGH": ("green", "🟢"),
"MEDIUM": ("yellow", "🟡"),
"LOW": ("red", "🔴"),
}
color, emoji = level_styles.get(result.level.value, ("white", ""))
# Get category and metrics
category = detect_category(result.surface.completions) if result.surface else "UNKNOWN"
coherence = result.surface.coherence_score if result.surface else 0.0
depth = result.echo.depth if result.echo else 0
# Main panel
content = f"""
{emoji} [bold {color}]{result.level.value}[/bold {color}]
[bold]Valley:[/bold] {category}
[bold]Coherence:[/bold] {coherence:.2f}
[bold]Depth:[/bold] {depth}/3
[bold]Action:[/bold] {result.action}
"""
console.print(Panel(content, title=f"Readiness: {term}", border_style=color))
# Recommendations
if result.level.value == "HIGH":
console.print("[green]✓ Ready for direct training or state machine implementation[/green]")
elif result.level.value == "MEDIUM":
console.print("[yellow]⚠ Consider scaffolding or bridging concepts[/yellow]")
else:
console.print("[red]✗ Requires foundational work before training[/red]")
@cli.command()
@click.argument("term")
def tokens(term: str):
"""
Analyze tokenization of a term.
Shows how the model breaks down the term into tokens -
critical for understanding valley access (single vs multi-token).
"""
model = get_model()
console.print(f"\n[bold cyan]🔤 Token Analysis:[/bold cyan] [yellow]{term}[/yellow]\n")
token_list = model.tokenize(term)
count = len(token_list)
# Token display
token_display = " | ".join([f"[cyan]{t}[/cyan]" for t in token_list])
console.print(f"Tokens: {token_display}")
console.print(f"Count: [bold]{count}[/bold]")
# Interpretation
if count == 1:
console.print("\n[red]⚠ Single token - likely CODE valley (high activation spike)[/red]")
elif count <= 2:
console.print("\n[yellow]→ Few tokens - may be efficient but limited valley access[/yellow]")
else:
console.print("\n[green]✓ Multi-token - distributed signal, better valley access[/green]")
@cli.command()
@click.argument("glossary_file", type=click.Path(exists=True))
@click.option("-o", "--output", type=click.Path(), help="Output JSON file")
@click.option("--surface-only", is_flag=True, help="Only run surface probe")
def glossary(glossary_file: str, output: Optional[str], surface_only: bool):
"""
Batch probe terms from a glossary JSON file.
Expected format: {"terms": [{"term": "...", "translations": {...}}, ...]}
or simple: {"terms": ["term1", "term2", ...]}
"""
model = get_model()
# Load glossary
with open(glossary_file) as f:
data = json.load(f)
terms = data.get("terms", data)
if isinstance(terms, dict):
terms = list(terms.keys())
# Normalize to list of strings
term_list = []
for t in terms:
if isinstance(t, str):
term_list.append(t)
elif isinstance(t, dict):
term_list.append(t.get("term", t.get("en", str(t))))
console.print(f"\n[bold cyan]📚 Glossary Probe:[/bold cyan] {len(term_list)} terms\n")
results = []
if surface_only:
probe = SurfaceProbe(model, num_runs=3)
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
) as progress:
task = progress.add_task("Probing...", total=len(term_list))
for term in term_list:
progress.update(task, description=f"Probing: {term}")
result = probe.probe(term)
category = detect_category(result.completions)
results.append({
"term": term,
"category": category,
"coherence": result.coherence_score or 0.0,
"tokens": model.token_count(term),
})
progress.advance(task)
else:
scorer = ReadinessScorer(model)
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
) as progress:
task = progress.add_task("Assessing...", total=len(term_list))
for term in term_list:
progress.update(task, description=f"Assessing: {term}")
result = scorer.score(term)
category = detect_category(result.surface.completions) if result.surface else "UNKNOWN"
coherence = result.surface.coherence_score if result.surface else 0.0
depth = result.echo.depth if result.echo else 0
results.append({
"term": term,
"level": result.level.value,
"valley": category,
"coherence": coherence,
"depth": depth,
"action": result.action,
"tokens": model.token_count(term),
})
progress.advance(task)
# Display results table
table = Table(title="Glossary Results", box=box.ROUNDED)
table.add_column("Term", style="yellow")
table.add_column("Tokens", style="dim", width=6)
if surface_only:
table.add_column("Category", style="cyan")
table.add_column("Coherence", style="white")
for r in results:
table.add_row(
r["term"],
str(r["tokens"]),
r["category"],
f"{r['coherence']:.2f}",
)
else:
table.add_column("Level", style="bold")
table.add_column("Valley", style="cyan")
table.add_column("Depth", style="white")
level_colors = {"HIGH": "green", "MEDIUM": "yellow", "LOW": "red"}
for r in results:
color = level_colors.get(r["level"], "white")
table.add_row(
r["term"],
str(r["tokens"]),
f"[{color}]{r['level']}[/{color}]",
r["valley"],
f"{r['depth']}/3",
)
console.print(table)
# Save if output specified
if output:
with open(output, "w") as f:
json.dump({"glossary": glossary_file, "results": results}, f, indent=2)
console.print(f"\n[green]✓ Results saved to {output}[/green]")
# Summary
if not surface_only:
high = sum(1 for r in results if r["level"] == "HIGH")
med = sum(1 for r in results if r["level"] == "MEDIUM")
low = sum(1 for r in results if r["level"] == "LOW")
console.print(f"\n[bold]Summary:[/bold] 🟢 {high} HIGH | 🟡 {med} MEDIUM | 🔴 {low} LOW")
def load_glossary_files(paths: List[str]) -> tuple[list, dict]:
"""Load terms from files or directories, tracking source collection."""
terms = []
sources = {} # term -> collection name
for path_str in paths:
path = Path(path_str)
if path.is_dir():
# Load all JSON files from directory
json_files = list(path.glob("*.json"))
else:
json_files = [path]
for json_file in json_files:
collection_name = json_file.stem
try:
with open(json_file) as f:
data = json.load(f)
file_terms = data.get("terms", data)
if isinstance(file_terms, dict):
file_terms = list(file_terms.keys())
for t in file_terms:
if isinstance(t, str):
term_data = {"term": t, "translations": {"EN": t}}
elif isinstance(t, dict):
term_data = t
else:
continue
term_name = term_data.get("term", term_data.get("en", str(term_data)))
terms.append(term_data)
sources[term_name] = collection_name
except Exception as e:
console.print(f"[yellow]Warning: Could not load {json_file}: {e}[/yellow]")
return terms, sources
def load_master_json() -> dict:
"""Load master.json if it exists."""
master_path = Path(__file__).parent.parent.parent / "data" / "glossary" / "master.json"
if master_path.exists():
with open(master_path) as f:
return json.load(f)
return {"last_scan": None, "total_terms": 0, "collections_loaded": [], "terms": {}}
def save_master_json(master: dict):
"""Save master.json."""
master_path = Path(__file__).parent.parent.parent / "data" / "glossary" / "master.json"
with open(master_path, "w") as f:
json.dump(master, f, indent=2)
@cli.command()
@click.argument("paths", nargs=-1, type=click.Path(exists=True))
@click.option("--summary/--full", default=True, help="Show summary (default) or full table")
@click.option("--delta", is_flag=True, help="Only test new/untested terms")
@click.option("--force", is_flag=True, help="Re-test all terms even if already in master.json")
@click.option("-o", "--output", type=click.Path(), help="Output JSON file")
def scan(paths: tuple, summary: bool, delta: bool, force: bool, output: Optional[str]):
"""
Multilingual vocabulary scan with incremental testing.
Scans terms using surface + echo probes and tracks results in master.json.
Examples:
nyx-probe scan data/glossary/collections/ # Scan all collections
nyx-probe scan collections/philosophical.json # Scan specific file
nyx-probe scan collections/ --delta # Only test new terms
nyx-probe scan collections/ --full # Full detailed output
"""
if not paths:
console.print("[red]Error: Please provide at least one file or directory path[/red]")
return
model = get_model()
# Load terms from all paths
all_terms, sources = load_glossary_files(list(paths))
console.print(f"\n[bold cyan]🔬 Vocabulary Scan:[/bold cyan] {len(all_terms)} terms from {len(set(sources.values()))} collection(s)\n")
# Load master.json for delta mode
master = load_master_json()
# Filter terms if delta mode
if delta and not force:
tested_terms = set(master.get("terms", {}).keys())
original_count = len(all_terms)
all_terms = [t for t in all_terms if t.get("term", t.get("en", str(t))) not in tested_terms]
skipped = original_count - len(all_terms)
if skipped > 0:
console.print(f"[dim]Skipping {skipped} already-tested terms (use --force to re-test)[/dim]")
if not all_terms:
console.print("[green]All terms already tested! Use --force to re-test.[/green]")
return
# Run probes
scorer = ReadinessScorer(model)
results = []
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
) as progress:
task = progress.add_task("Scanning...", total=len(all_terms))
for term_data in all_terms:
term = term_data.get("term", term_data.get("en", str(term_data)))
progress.update(task, description=f"Probing: {term}")
result = scorer.score(term)
category = detect_category(result.surface.completions) if result.surface else "UNKNOWN"
coherence = result.surface.coherence_score if result.surface else 0.0
depth = result.echo.depth if result.echo else 0
entry = {
"term": term,
"source": sources.get(term, "unknown"),
"level": result.level.value,
"valley": category,
"coherence": coherence,
"depth": depth,
"action": result.action,
"tokens": model.token_count(term),
}
results.append(entry)
# Update master.json entry
master["terms"][term] = {
"source": sources.get(term, "unknown"),
"tested": datetime.now().strftime("%Y-%m-%d"),
"depth": depth,
"valley": category,
"transfer": False, # Would need triangulation
"grounding": coherence,
}
progress.advance(task)
# Update master.json metadata
master["last_scan"] = datetime.now().isoformat()
master["total_terms"] = len(master["terms"])
collections = set(master.get("collections_loaded", []))
collections.update(sources.values())
master["collections_loaded"] = list(collections)
save_master_json(master)
# Display results
if summary:
# Summary mode - lean output
high = sum(1 for r in results if r["level"] == "HIGH")
med = sum(1 for r in results if r["level"] == "MEDIUM")
low = sum(1 for r in results if r["level"] == "LOW")
depth_hits = [r for r in results if r["depth"] >= 2]
console.print(f"\n[bold]🌍 Scanned {len(results)} terms | Depth≥2: {len(depth_hits)} | 🟢{high} 🟡{med} 🔴{low}[/bold]\n")
if depth_hits:
console.print("[bold cyan]DEPTH HITS (≥2/3):[/bold cyan]")
for r in depth_hits:
level_colors = {"HIGH": "green", "MEDIUM": "yellow", "LOW": "red"}
color = level_colors.get(r["level"], "white")
console.print(f" [{color}]{r['term']:20}[/{color}] {r['depth']}/3 {r['valley']:10} ({r['source']})")
high_grounding = [r for r in results if r["coherence"] > 0.7]
if high_grounding:
console.print(f"\n[bold cyan]BEST GROUNDING (>0.7):[/bold cyan]")
for r in high_grounding[:5]:
console.print(f" {r['term']:20} {r['coherence']:.2f}")
console.print(f"\n[dim]Run with --full for complete table[/dim]")
else:
# Full mode - detailed table
table = Table(title="Scan Results", box=box.ROUNDED)
table.add_column("Term", style="yellow")
table.add_column("Source", style="dim", width=12)
table.add_column("Tokens", style="dim", width=6)
table.add_column("Level", style="bold")
table.add_column("Valley", style="cyan")
table.add_column("Depth", style="white")
table.add_column("Coherence", style="white")
level_colors = {"HIGH": "green", "MEDIUM": "yellow", "LOW": "red"}
for r in results:
color = level_colors.get(r["level"], "white")
table.add_row(
r["term"],
r["source"],
str(r["tokens"]),
f"[{color}]{r['level']}[/{color}]",
r["valley"],
f"{r['depth']}/3",
f"{r['coherence']:.2f}",
)
console.print(table)
high = sum(1 for r in results if r["level"] == "HIGH")
med = sum(1 for r in results if r["level"] == "MEDIUM")
low = sum(1 for r in results if r["level"] == "LOW")
console.print(f"\n[bold]Summary:[/bold] 🟢 {high} HIGH | 🟡 {med} MEDIUM | 🔴 {low} LOW")
# Save output if specified
if output:
with open(output, "w") as f:
json.dump({"scan_time": datetime.now().isoformat(), "results": results}, f, indent=2)
console.print(f"\n[green]✓ Results saved to {output}[/green]")
console.print(f"\n[green]✓ master.json updated ({master['total_terms']} total terms)[/green]")
def main():
"""Entry point."""
cli()
if __name__ == "__main__":
main()

51
nyx_probing/config.py Normal file
View File

@@ -0,0 +1,51 @@
"""
Configuration for nyx-probing framework.
"""
from pathlib import Path
from pydantic import BaseModel
from typing import Optional
import os
class ModelConfig(BaseModel):
"""Model configuration."""
name: str = "Qwen/Qwen2.5-7B"
device: str = "cuda"
dtype: str = "float16"
cache_dir: Optional[Path] = None
class ProbeConfig(BaseModel):
"""Probe configuration."""
max_new_tokens: int = 50
temperature: float = 0.8
do_sample: bool = True
num_runs: int = 5 # For distribution sampling
class StorageConfig(BaseModel):
"""Storage configuration."""
results_dir: Path = Path("results")
experiments_dir: Path = Path("experiments")
class Config(BaseModel):
"""Main configuration."""
model: ModelConfig = ModelConfig()
probe: ProbeConfig = ProbeConfig()
storage: StorageConfig = StorageConfig()
# Paths
project_root: Path = Path(__file__).parent.parent
class Config:
arbitrary_types_allowed = True
# Default config instance
config = Config()
def get_config() -> Config:
"""Get the current configuration."""
return config

View File

@@ -0,0 +1,19 @@
"""Core components for nyx-probing."""
from .model import NyxModel, GenerationResult
from .probe_result import (
EchoType,
ReadinessLevel,
SurfaceProbeResult,
EchoProbeResult,
ReadinessResult,
)
__all__ = [
"NyxModel",
"GenerationResult",
"EchoType",
"ReadinessLevel",
"SurfaceProbeResult",
"EchoProbeResult",
"ReadinessResult",
]

266
nyx_probing/core/model.py Normal file
View File

@@ -0,0 +1,266 @@
"""
Core Model Loader for nyx-probing.
Provides access to Qwen2.5-7B-Base with hidden state capture.
The model is an "empty vessel" - it completes, not answers.
"""
from dataclasses import dataclass, field
from typing import Optional, List, Tuple
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig
@dataclass
class GenerationResult:
"""Result from a generation with hidden states."""
# The generated text (including prompt)
text: str
# Just the completion (without prompt)
completion: str
# Token IDs of the full sequence
token_ids: List[int]
# Token IDs of just the completion
completion_token_ids: List[int]
# Hidden states from the last layer for each generated token
# Shape: (num_generated_tokens, hidden_dim)
hidden_states: Optional[torch.Tensor] = None
# Token probabilities for each generated token
# Shape: (num_generated_tokens,)
token_probs: Optional[torch.Tensor] = None
# Whether generation ended with EOS
hit_eos: bool = False
# Number of tokens generated
num_tokens: int = 0
class NyxModel:
"""
Model wrapper for probing Qwen2.5-7B-Base.
Key capabilities:
- Hidden state capture during generation
- Token probability extraction
- Proper handling of base model (no chat template)
"""
def __init__(
self,
model_name: str = "Qwen/Qwen2.5-7B",
device: str = "cuda",
dtype: str = "float16",
cache_dir: Optional[str] = None,
):
self.model_name = model_name
self.device = device
self.dtype = getattr(torch, dtype)
self.cache_dir = cache_dir
self._model = None
self._tokenizer = None
self._loaded = False
def load(self) -> "NyxModel":
"""Load the model and tokenizer."""
if self._loaded:
return self
print(f"Loading tokenizer: {self.model_name}")
self._tokenizer = AutoTokenizer.from_pretrained(
self.model_name,
cache_dir=self.cache_dir,
)
print(f"Loading model to {self.device}...")
self._model = AutoModelForCausalLM.from_pretrained(
self.model_name,
torch_dtype=self.dtype,
device_map=self.device,
cache_dir=self.cache_dir,
# Critical for activation capture
output_hidden_states=True,
)
self._loaded = True
print(f"Model loaded. VRAM: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
return self
@property
def model(self):
if not self._loaded:
raise RuntimeError("Model not loaded. Call load() first.")
return self._model
@property
def tokenizer(self):
if not self._loaded:
raise RuntimeError("Model not loaded. Call load() first.")
return self._tokenizer
def generate(
self,
prompt: str,
max_new_tokens: int = 50,
temperature: float = 0.8,
do_sample: bool = True,
capture_hidden_states: bool = False,
capture_probabilities: bool = False,
) -> GenerationResult:
"""
Generate completion with optional hidden state capture.
Args:
prompt: Input text to complete
max_new_tokens: Maximum tokens to generate
temperature: Sampling temperature (0 = greedy)
do_sample: Whether to sample (False = greedy)
capture_hidden_states: Store hidden states from last layer
capture_probabilities: Store token probabilities
Returns:
GenerationResult with text, tokens, and optionally hidden states
"""
# Tokenize input
inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
prompt_length = inputs.input_ids.shape[1]
# Generation config
gen_config = GenerationConfig(
max_new_tokens=max_new_tokens,
temperature=temperature if do_sample else 1.0,
do_sample=do_sample,
pad_token_id=self.tokenizer.eos_token_id,
eos_token_id=self.tokenizer.eos_token_id,
output_hidden_states=capture_hidden_states,
output_scores=capture_probabilities,
return_dict_in_generate=True,
)
# Generate
with torch.no_grad():
outputs = self.model.generate(
**inputs,
generation_config=gen_config,
)
# Extract sequences
full_ids = outputs.sequences[0].tolist()
completion_ids = full_ids[prompt_length:]
# Decode
full_text = self.tokenizer.decode(full_ids)
completion_text = self.tokenizer.decode(completion_ids)
# Check if hit EOS
hit_eos = (
len(completion_ids) > 0 and
completion_ids[-1] == self.tokenizer.eos_token_id
)
# Build result
result = GenerationResult(
text=full_text,
completion=completion_text,
token_ids=full_ids,
completion_token_ids=completion_ids,
hit_eos=hit_eos,
num_tokens=len(completion_ids),
)
# Extract hidden states if requested
if capture_hidden_states and hasattr(outputs, 'hidden_states'):
# hidden_states is tuple of (step, layer, batch, seq, hidden)
# We want last layer hidden state for each generated token
hidden_list = []
for step_states in outputs.hidden_states:
# step_states is tuple of layers
# Take last layer, batch 0, last position
last_layer = step_states[-1] # (batch, seq, hidden)
hidden_list.append(last_layer[0, -1, :]) # (hidden,)
result.hidden_states = torch.stack(hidden_list) # (tokens, hidden)
# Extract probabilities if requested
if capture_probabilities and hasattr(outputs, 'scores'):
# scores is tuple of (num_tokens,) each (batch, vocab)
probs_list = []
for i, score in enumerate(outputs.scores):
# Apply softmax to get probabilities
probs = torch.softmax(score[0], dim=-1)
# Get probability of the token that was actually chosen
chosen_token = completion_ids[i]
probs_list.append(probs[chosen_token].item())
result.token_probs = torch.tensor(probs_list)
return result
def get_token_probabilities(
self,
prompt: str,
continuation: str,
) -> Tuple[List[float], List[str]]:
"""
Get probability of each token in a specific continuation.
Useful for measuring how "expected" a completion is.
Args:
prompt: The input text
continuation: The text that follows
Returns:
Tuple of (probabilities, token_strings)
"""
# Tokenize prompt and full sequence
prompt_ids = self.tokenizer.encode(prompt, return_tensors="pt").to(self.device)
full_text = prompt + continuation
full_ids = self.tokenizer.encode(full_text, return_tensors="pt").to(self.device)
prompt_len = prompt_ids.shape[1]
# Forward pass to get logits
with torch.no_grad():
outputs = self.model(full_ids)
logits = outputs.logits # (batch, seq, vocab)
# Get probabilities for continuation tokens
probs = []
tokens = []
for i in range(prompt_len, full_ids.shape[1]):
# Logits at position i-1 predict token at position i
token_logits = logits[0, i - 1, :]
token_probs = torch.softmax(token_logits, dim=-1)
actual_token = full_ids[0, i].item()
prob = token_probs[actual_token].item()
probs.append(prob)
tokens.append(self.tokenizer.decode([actual_token]))
return probs, tokens
def tokenize(self, text: str) -> List[str]:
"""Get individual tokens for text."""
ids = self.tokenizer.encode(text)
return [self.tokenizer.decode([id]) for id in ids]
def token_count(self, text: str) -> int:
"""Count tokens in text."""
return len(self.tokenizer.encode(text))
def memory_usage(self) -> dict:
"""Get current GPU memory usage."""
return {
"allocated_gb": torch.cuda.memory_allocated() / 1024**3,
"reserved_gb": torch.cuda.memory_reserved() / 1024**3,
"max_allocated_gb": torch.cuda.max_memory_allocated() / 1024**3,
}

View File

@@ -0,0 +1,97 @@
"""
Result dataclasses for probing operations.
These structures capture what we learn about each term.
"""
from dataclasses import dataclass, field
from typing import List, Optional, Literal
from datetime import datetime
from enum import Enum
class EchoType(str, Enum):
"""Classification of echo probe responses."""
EXPANDS = "EXPANDS" # Real depth - adds new information
CONFIRMS = "CONFIRMS" # Shallow but solid - reinforces without adding
CIRCULAR = "CIRCULAR" # Surface only - returns to original term
DIVERGENT = "DIVERGENT" # Wrong direction - unrelated tangent
COLLAPSE = "COLLAPSE" # Nothing there - incoherent or empty
class ReadinessLevel(str, Enum):
"""Readiness classification for curriculum design."""
HIGH = "HIGH" # Ready for state machine / direct training
MEDIUM = "MEDIUM" # Needs scaffolding / bridging concepts
LOW = "LOW" # Requires foundational work first
@dataclass
class SurfaceProbeResult:
"""Result from a surface probe (single word → completions)."""
term: str
completions: List[str]
hit_eos_count: int # How many completions ended with EOS
avg_tokens: float # Average completion length
# Optional analysis
coherence_score: Optional[float] = None # 0-1, how related are completions
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class EchoProbeResult:
"""Result from an echo probe (iterative depth measurement)."""
term: str
rounds: int
chain: List[str] # The sequence of prompts/completions
echo_types: List[EchoType] # Classification of each round
# Derived metrics
depth: int = 0 # How many EXPANDS before plateau
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class ReadinessResult:
"""Combined analysis for curriculum readiness."""
term: str
level: ReadinessLevel
action: str # Recommended curriculum action
# Supporting evidence
surface: Optional[SurfaceProbeResult] = None
echo: Optional[EchoProbeResult] = None
# Reasoning
reasoning: str = ""
timestamp: datetime = field(default_factory=datetime.now)
def to_dict(self) -> dict:
"""Convert to JSON-serializable dict."""
return {
"term": self.term,
"readiness": {
"level": self.level.value,
"action": self.action,
"reasoning": self.reasoning,
},
"surface": {
"completions": self.surface.completions if self.surface else [],
"coherence": self.surface.coherence_score if self.surface else None,
"hit_eos_count": self.surface.hit_eos_count if self.surface else 0,
} if self.surface else None,
"echo": {
"depth": self.echo.depth if self.echo else 0,
"types": [t.value for t in self.echo.echo_types] if self.echo else [],
"chain": self.echo.chain if self.echo else [],
} if self.echo else None,
"timestamp": self.timestamp.isoformat(),
}

View File

@@ -0,0 +1,27 @@
"""Probe implementations for nyx-probing."""
from .base import BaseProbe
from .surface_probe import SurfaceProbe, CompletionCategory
from .echo_probe import EchoProbe
from .multilingual_probe import (
MultilingualTriangulationProbe,
LanguageZone,
LANGUAGES,
GroundingResult,
DeepeningResult,
TriangulationResult,
MultilingualProbeResult,
)
__all__ = [
"BaseProbe",
"SurfaceProbe",
"CompletionCategory",
"EchoProbe",
"MultilingualTriangulationProbe",
"LanguageZone",
"LANGUAGES",
"GroundingResult",
"DeepeningResult",
"TriangulationResult",
"MultilingualProbeResult",
]

View File

@@ -0,0 +1,58 @@
"""
Base class for all probes.
Probes are measurement instruments - they reveal what's already there,
they don't add or change anything.
"""
from abc import ABC, abstractmethod
from typing import Any
from ..core.model import NyxModel
class BaseProbe(ABC):
"""Abstract base class for probing operations."""
def __init__(self, model: NyxModel):
"""
Initialize probe with a loaded model.
Args:
model: A NyxModel instance (must be loaded)
"""
self.model = model
if not model._loaded:
raise ValueError("Model must be loaded before creating probe")
@property
def name(self) -> str:
"""Name of this probe type."""
return self.__class__.__name__
@abstractmethod
def probe(self, term: str, **kwargs) -> Any:
"""
Probe a single term.
Args:
term: The word/phrase to probe
**kwargs: Probe-specific parameters
Returns:
Probe-specific result object
"""
pass
def probe_batch(self, terms: list[str], **kwargs) -> list[Any]:
"""
Probe multiple terms.
Default implementation just loops; subclasses can optimize.
Args:
terms: List of words/phrases to probe
**kwargs: Probe-specific parameters
Returns:
List of probe results
"""
return [self.probe(term, **kwargs) for term in terms]

View File

@@ -0,0 +1,304 @@
"""
DriftProbe: Training-loop monitoring for conceptual topology preservation.
Theory: "Spatial Separation Hypothesis"
- Use isolated zone languages (German) as scaffolding for new concepts
- Monitor anchors (must not move), bridges (must stay separated), canaries (watch for migration)
Key Metrics (refined from peer review):
1. Gini Coefficient: Sparse activations (0.8+) = deep/specific, Diffuse (0.3) = shallow/general
2. Angular Drift: Direction change = definition rewrite, magnitude change = sharpening
3. Cross-Language Similarity: Bridges should stay LOW, anchors should stay HIGH
"""
import json
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
import torch
import numpy as np
class SentinelType(Enum):
ANCHOR = "ANCHOR" # Must not move - core topology
BRIDGE = "BRIDGE" # Must stay separated - isolated zone integrity
CANARY = "CANARY" # Watch for migration - early warning
TARGET = "TARGET" # Want movement - training goals
class AlertSeverity(Enum):
OK = "OK"
WARNING = "WARNING"
CRITICAL = "CRITICAL"
@dataclass
class DriftMetrics:
"""Metrics for a single sentinel term."""
term: str
sentinel_type: SentinelType
# Activation metrics
gini_coefficient: float = 0.0
activation_norm: float = 0.0
# Drift metrics (vs baseline)
angular_drift_degrees: float = 0.0
norm_drift_percent: float = 0.0
gini_drift: float = 0.0
# Valley detection
detected_valley: str = "UNKNOWN"
depth: int = 0
# Cross-language (for anchors/bridges)
cross_lang_similarity: float = 0.0
# Alert
alert: AlertSeverity = AlertSeverity.OK
alert_message: str = ""
@dataclass
class DriftReport:
"""Full drift report for a training checkpoint."""
step: int
timestamp: str
metrics: list[DriftMetrics] = field(default_factory=list)
# Summary
critical_count: int = 0
warning_count: int = 0
recommendation: str = "CONTINUE"
class DriftProbe:
"""
Lightweight probe for training-loop monitoring.
Optimized for RTX 3090 constraints:
- Full probe: ~2 min (run at epoch 0, end of training)
- Lite probe: ~10 sec (run every 100 steps)
"""
def __init__(self, model, tokenizer, sentinels_path: Optional[str] = None):
self.model = model
self.tokenizer = tokenizer
self.baseline_states = {} # term -> hidden state tensor
# Load sentinels
if sentinels_path is None:
sentinels_path = Path(__file__).parent.parent.parent / "data" / "sentinels.json"
with open(sentinels_path) as f:
self.config = json.load(f)
self.sentinels = self.config["sentinels"]
self.alert_rules = self.config["alert_rules"]
def _get_hidden_state(self, text: str, layer: int = 18) -> torch.Tensor:
"""Get hidden state at specified layer for last token position."""
inputs = self.tokenizer(text, return_tensors="pt").to(self.model.device)
with torch.no_grad():
outputs = self.model(**inputs, output_hidden_states=True)
return outputs.hidden_states[layer][0, -1, :].float().cpu()
def _compute_gini(self, activations: torch.Tensor) -> float:
"""
Compute Gini coefficient of activation vector.
High Gini (0.8+) = Sparse/Specific (Philosophy/Deep)
Low Gini (0.3) = Diffuse/General (Prose/Shallow)
"""
x = torch.abs(activations).numpy()
x = np.sort(x)
n = len(x)
cumsum = np.cumsum(x)
gini = (2 * np.sum((np.arange(1, n+1) * x))) / (n * np.sum(x)) - (n + 1) / n
return float(gini)
def _compute_angular_drift(self, current: torch.Tensor, baseline: torch.Tensor) -> float:
"""
Compute angular drift in degrees between current and baseline.
> 15° = Definition rewrite (concerning)
< 5° = Sharpening only (acceptable)
"""
cos_sim = torch.nn.functional.cosine_similarity(
current.unsqueeze(0), baseline.unsqueeze(0)
).item()
# Clamp to valid range for arccos
cos_sim = max(-1.0, min(1.0, cos_sim))
angle_rad = np.arccos(cos_sim)
return float(np.degrees(angle_rad))
def _compute_cross_lang_sim(self, sentinel: dict, layer: int = 18) -> float:
"""Compute average cross-language similarity for a sentinel."""
translations = sentinel.get("translations", {})
if len(translations) < 2:
return 0.0
states = []
for lang, word in translations.items():
states.append(self._get_hidden_state(word, layer))
# Pairwise similarities
sims = []
for i in range(len(states)):
for j in range(i + 1, len(states)):
sim = torch.nn.functional.cosine_similarity(
states[i].unsqueeze(0), states[j].unsqueeze(0)
).item()
sims.append(sim)
return float(np.mean(sims)) if sims else 0.0
def capture_baseline(self, layer: int = 18):
"""
Capture baseline hidden states for all sentinels.
Run this at epoch 0 before training.
"""
print("Capturing baseline states...")
for sentinel in self.sentinels:
term = sentinel["term"]
# Use English translation or term itself
text = sentinel.get("translations", {}).get("EN", term)
self.baseline_states[term] = self._get_hidden_state(text, layer)
print(f"Baseline captured for {len(self.baseline_states)} sentinels")
def probe_lite(self, step: int, layer: int = 18) -> DriftReport:
"""
Lite probe - only check key sentinels.
Optimized for ~10 second runtime.
"""
from datetime import datetime
# Select subset: 2 anchors, 1 bridge, 2 canaries
lite_terms = ["heart", "water", "being", "dasein", "thrownness"]
lite_sentinels = [s for s in self.sentinels if s["term"] in lite_terms]
return self._run_probe(lite_sentinels, step, layer)
def probe_full(self, step: int, layer: int = 18) -> DriftReport:
"""
Full probe - check all sentinels.
Runtime: ~2 minutes.
"""
return self._run_probe(self.sentinels, step, layer)
def _run_probe(self, sentinels: list, step: int, layer: int) -> DriftReport:
"""Run probe on specified sentinels."""
from datetime import datetime
report = DriftReport(
step=step,
timestamp=datetime.now().isoformat()
)
for sentinel in sentinels:
term = sentinel["term"]
text = sentinel.get("translations", {}).get("EN", term)
sentinel_type = SentinelType(sentinel["type"])
thresholds = sentinel.get("thresholds", {})
# Get current state
current_state = self._get_hidden_state(text, layer)
# Compute metrics
gini = self._compute_gini(current_state)
norm = float(current_state.norm())
# Drift vs baseline
angular_drift = 0.0
norm_drift = 0.0
gini_drift = 0.0
if term in self.baseline_states:
baseline = self.baseline_states[term]
angular_drift = self._compute_angular_drift(current_state, baseline)
baseline_norm = float(baseline.norm())
norm_drift = abs(norm - baseline_norm) / baseline_norm * 100 if baseline_norm > 0 else 0
baseline_gini = self._compute_gini(baseline)
gini_drift = gini - baseline_gini
# Cross-language similarity
cross_lang_sim = self._compute_cross_lang_sim(sentinel, layer)
# Determine alert level
alert = AlertSeverity.OK
alert_message = ""
if sentinel_type == SentinelType.ANCHOR:
max_drift = thresholds.get("max_drift", 0.05)
if angular_drift > 15:
alert = AlertSeverity.CRITICAL
alert_message = f"Angular drift {angular_drift:.1f}° exceeds 15° - definition rewrite"
elif norm_drift > max_drift * 100:
alert = AlertSeverity.WARNING
alert_message = f"Norm drift {norm_drift:.1f}% exceeds threshold"
elif sentinel_type == SentinelType.BRIDGE:
collapse_threshold = thresholds.get("collapse_alert_threshold", 0.50)
if cross_lang_sim > collapse_threshold:
alert = AlertSeverity.CRITICAL
alert_message = f"Bridge collapsed - cross-lang sim {cross_lang_sim:.2f} > {collapse_threshold}"
elif sentinel_type == SentinelType.CANARY:
min_gini = thresholds.get("min_gini", 0.70)
if gini < min_gini:
alert = AlertSeverity.WARNING
alert_message = f"Gini {gini:.2f} below {min_gini} - concept melting into prose"
if angular_drift > thresholds.get("max_angular_drift", 15):
alert = AlertSeverity.WARNING
alert_message = f"Angular drift {angular_drift:.1f}° - definition shifting"
metrics = DriftMetrics(
term=term,
sentinel_type=sentinel_type,
gini_coefficient=gini,
activation_norm=norm,
angular_drift_degrees=angular_drift,
norm_drift_percent=norm_drift,
gini_drift=gini_drift,
cross_lang_similarity=cross_lang_sim,
alert=alert,
alert_message=alert_message
)
report.metrics.append(metrics)
if alert == AlertSeverity.CRITICAL:
report.critical_count += 1
elif alert == AlertSeverity.WARNING:
report.warning_count += 1
# Set recommendation
if report.critical_count > 0:
report.recommendation = "ROLLBACK"
elif report.warning_count > 2:
report.recommendation = "REDUCE_LR"
else:
report.recommendation = "CONTINUE"
return report
def print_report(self, report: DriftReport):
"""Pretty print a drift report."""
print(f"\n{'='*60}")
print(f"DRIFT REPORT - Step {report.step}")
print(f"{'='*60}")
for m in report.metrics:
status = "" if m.alert == AlertSeverity.OK else ("" if m.alert == AlertSeverity.WARNING else "")
print(f"\n{status} {m.term} ({m.sentinel_type.value})")
print(f" Gini: {m.gini_coefficient:.3f} (drift: {m.gini_drift:+.3f})")
print(f" Angular drift: {m.angular_drift_degrees:.1f}°")
print(f" Cross-lang sim: {m.cross_lang_similarity:.3f}")
if m.alert_message:
print(f" ALERT: {m.alert_message}")
print(f"\n{'='*60}")
print(f"SUMMARY: {report.critical_count} critical, {report.warning_count} warnings")
print(f"RECOMMENDATION: {report.recommendation}")
print(f"{'='*60}\n")

View File

@@ -0,0 +1,223 @@
"""
Echo Probe: Depth measurement through iterative completion.
The echo probe feeds completions back to the model to measure depth.
Does the model EXPAND (go deeper) or COLLAPSE (circular/divergent)?
Classification from nimmerversity.md:
- EXPANDS: Real depth - adds new information
- CONFIRMS: Shallow but solid - reinforces without adding
- CIRCULAR: Surface only - returns to original term
- DIVERGENT: Wrong direction - unrelated tangent
- COLLAPSE: Nothing there - incoherent or empty
"""
from typing import Optional, List, Tuple
from dataclasses import dataclass
from .base import BaseProbe
from ..core.model import NyxModel
from ..core.probe_result import EchoProbeResult, EchoType
class EchoProbe(BaseProbe):
"""
Echo probe: measures conceptual depth.
Process:
1. Probe term to get initial completion
2. Feed completion back (or combined prompt)
3. Classify response: EXPANDS, CONFIRMS, CIRCULAR, DIVERGENT, COLLAPSE
4. Repeat for N rounds
5. Measure depth = how many EXPANDS before plateau
"""
def __init__(
self,
model: NyxModel,
max_rounds: int = 3,
max_new_tokens: int = 50,
temperature: float = 0.8,
):
super().__init__(model)
self.max_rounds = max_rounds
self.max_new_tokens = max_new_tokens
self.temperature = temperature
def probe(
self,
term: str,
max_rounds: Optional[int] = None,
) -> EchoProbeResult:
"""
Probe depth of a term through iterative echoing.
Args:
term: Word or phrase to probe
max_rounds: Override default max rounds
Returns:
EchoProbeResult with chain and classifications
"""
rounds = max_rounds or self.max_rounds
chain = [term]
echo_types = []
current_prompt = term
for round_num in range(rounds):
# Generate completion
result = self.model.generate(
prompt=current_prompt,
max_new_tokens=self.max_new_tokens,
temperature=self.temperature,
do_sample=True,
)
completion = result.completion.strip()
chain.append(completion)
# Classify this response relative to original term and chain
echo_type = self._classify_response(
original_term=term,
current_prompt=current_prompt,
response=completion,
chain=chain,
)
echo_types.append(echo_type)
# If collapsed, stop probing
if echo_type == EchoType.COLLAPSE:
break
# Prepare next prompt - use a combination strategy
current_prompt = self._prepare_next_prompt(term, completion, round_num)
# Calculate depth = consecutive EXPANDS from start
depth = 0
for et in echo_types:
if et == EchoType.EXPANDS:
depth += 1
elif et == EchoType.CONFIRMS:
# CONFIRMS doesn't add depth but doesn't break streak
pass
else:
# CIRCULAR, DIVERGENT, or COLLAPSE breaks the depth streak
break
return EchoProbeResult(
term=term,
rounds=len(echo_types),
chain=chain,
echo_types=echo_types,
depth=depth,
)
def _classify_response(
self,
original_term: str,
current_prompt: str,
response: str,
chain: List[str],
) -> EchoType:
"""
Classify a response relative to the probing chain.
This is a heuristic classifier - can be made smarter with
semantic similarity or even a classifier model.
"""
response_lower = response.lower()
term_lower = original_term.lower()
# Empty or very short = COLLAPSE
if len(response.strip()) < 5:
return EchoType.COLLAPSE
# Check for circularity - term appears prominently in response
term_count = response_lower.count(term_lower)
if term_count >= 2:
return EchoType.CIRCULAR
# Check for collapse - incoherent markers
collapse_markers = [
"...", "???", "!!!",
"\n\n\n", "undefined", "null",
"[object", "NaN",
]
if any(marker in response for marker in collapse_markers):
return EchoType.COLLAPSE
# Check for divergence - response has no semantic connection
# Simple heuristic: count shared significant words
prompt_words = set(w.lower() for w in current_prompt.split() if len(w) > 3)
response_words = set(w.lower() for w in response.split() if len(w) > 3)
overlap = len(prompt_words & response_words)
if overlap == 0 and len(prompt_words) > 2:
# No shared words and prompt was substantial = divergent
return EchoType.DIVERGENT
# Check for expansion - introduces new concepts
# New words that aren't in any previous chain items
all_previous_words = set()
for item in chain[:-1]: # Exclude current response
all_previous_words.update(w.lower() for w in item.split() if len(w) > 3)
new_significant_words = response_words - all_previous_words
new_word_ratio = len(new_significant_words) / max(len(response_words), 1)
if new_word_ratio > 0.5 and len(new_significant_words) >= 3:
return EchoType.EXPANDS
# Default to CONFIRMS if coherent but not expanding
return EchoType.CONFIRMS
def _prepare_next_prompt(
self,
original_term: str,
last_completion: str,
round_num: int,
) -> str:
"""
Prepare the next prompt for echo probing.
Different strategies for different rounds:
- Round 0: Just use completion
- Round 1+: Combine original term with key concepts from completion
"""
if round_num == 0:
# First echo: just use the completion to see where it goes
return last_completion[:100] # Truncate to avoid runaway
# Later rounds: extract key concept and combine with original
# Take first sentence or first N words
words = last_completion.split()
key_phrase = " ".join(words[:10]) if len(words) > 10 else last_completion
# Combine with original term
return f"{original_term}: {key_phrase}"
def summary(self, result: EchoProbeResult) -> str:
"""Generate human-readable summary."""
type_symbols = {
EchoType.EXPANDS: "",
EchoType.CONFIRMS: "",
EchoType.CIRCULAR: "",
EchoType.DIVERGENT: "",
EchoType.COLLAPSE: "",
}
type_str = " ".join(type_symbols.get(t, "?") for t in result.echo_types)
lines = [
f"Echo Probe: '{result.term}'",
f" Rounds: {result.rounds}",
f" Pattern: {type_str}",
f" Depth: {result.depth}",
f" Types: {[t.value for t in result.echo_types]}",
]
# Show chain preview
for i, (item, etype) in enumerate(zip(result.chain[1:], result.echo_types)):
preview = item[:50].replace('\n', ' ')
lines.append(f" [{i+1}] {type_symbols.get(etype, '?')} {preview}...")
return "\n".join(lines)

View File

@@ -0,0 +1,547 @@
"""
Multilingual Triangulation Probe
Uses the discovered language topology to measure conceptual depth:
1. GROUND in Super Cluster (verify universal convergence)
2. DEEPEN via Isolated Zone (access philosophical valleys)
3. TRIANGULATE back (prove understanding, not pattern matching)
The Language Map:
- Super Cluster (sim=1.0): ZH, JA, EN, AR, FR, PT, ES
- Isolated Zone (sim<0.52): IT, TR, HI, DE
- Bridge: KO
- Secondary Cluster: VI, ID, RU
"""
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Tuple
from datetime import datetime
from enum import Enum
import torch
from .base import BaseProbe
from ..core.model import NyxModel
class LanguageZone(str, Enum):
"""Language zones based on convergence analysis."""
SUPER_CLUSTER = "super_cluster" # High convergence (sim=1.0)
ISOLATED = "isolated" # Low convergence (sim<0.52)
BRIDGE = "bridge" # Connects zones
SECONDARY = "secondary" # Own cluster (VI-ID-RU)
# Language metadata based on our discoveries
LANGUAGES = {
# Super Cluster - Perfect convergence
"EN": {"name": "English", "zone": LanguageZone.SUPER_CLUSTER, "avg_tokens": 1.2},
"ZH": {"name": "Chinese", "zone": LanguageZone.SUPER_CLUSTER, "avg_tokens": 1.0},
"JA": {"name": "Japanese", "zone": LanguageZone.SUPER_CLUSTER, "avg_tokens": 1.0},
"AR": {"name": "Arabic", "zone": LanguageZone.SUPER_CLUSTER, "avg_tokens": 1.8},
"FR": {"name": "French", "zone": LanguageZone.SUPER_CLUSTER, "avg_tokens": 2.0},
"PT": {"name": "Portuguese", "zone": LanguageZone.SUPER_CLUSTER, "avg_tokens": 2.2},
"ES": {"name": "Spanish", "zone": LanguageZone.SUPER_CLUSTER, "avg_tokens": 2.5},
# Isolated Zone - Distinct computational paths
"DE": {"name": "German", "zone": LanguageZone.ISOLATED, "avg_tokens": 3.0, "specialty": "philosophy"},
"IT": {"name": "Italian", "zone": LanguageZone.ISOLATED, "avg_tokens": 2.5, "note": "most isolated"},
"TR": {"name": "Turkish", "zone": LanguageZone.ISOLATED, "avg_tokens": 2.8},
"HI": {"name": "Hindi", "zone": LanguageZone.ISOLATED, "avg_tokens": 5.2, "note": "most fragmented"},
# Bridge
"KO": {"name": "Korean", "zone": LanguageZone.BRIDGE, "avg_tokens": 2.0},
# Secondary Cluster
"VI": {"name": "Vietnamese", "zone": LanguageZone.SECONDARY, "avg_tokens": 3.0},
"ID": {"name": "Indonesian", "zone": LanguageZone.SECONDARY, "avg_tokens": 3.0},
"RU": {"name": "Russian", "zone": LanguageZone.SECONDARY, "avg_tokens": 3.2},
}
@dataclass
class GroundingResult:
"""Result from Phase 1: Grounding in Super Cluster."""
concept: str
languages_tested: List[str]
translations: Dict[str, str] # lang_code -> word
# Convergence metrics
pairwise_similarities: Dict[Tuple[str, str], float]
average_convergence: float
min_convergence: float
# Hidden states (layer 12)
hidden_states: Optional[Dict[str, torch.Tensor]] = None
@dataclass
class DeepeningResult:
"""Result from Phase 2: Deepening via Isolated Zone."""
concept: str
language: str
word: str
# Depth measurement (from echo probe logic)
completion: str
depth_score: int # 0-3 based on expansion
valley_type: str # CODE, PROSE, PHILOSOPHY, etc.
# Token analysis
token_count: int
norm_at_layer_12: float
# Hidden state
hidden_state: Optional[torch.Tensor] = None
@dataclass
class TriangulationResult:
"""Result from Phase 3: Triangulation back to universal."""
source_language: str # The isolated language
target_language: str # A super cluster language
source_word: str
translation_prompt: str
model_completion: str
# Did the depth survive translation?
depth_preserved: bool
similarity_to_grounding: float # Cosine sim to original concept
# Evidence
reasoning: str
@dataclass
class MultilingualProbeResult:
"""Full result from multilingual triangulation probe."""
concept: str
# Phase results
grounding: GroundingResult
deepening: DeepeningResult
triangulation: TriangulationResult
# Overall assessment
depth_accessible: bool # Can we access depth via isolated zone?
depth_transferable: bool # Does depth survive triangulation?
curriculum_recommendation: str
timestamp: datetime = field(default_factory=datetime.now)
def to_dict(self) -> dict:
"""Convert to JSON-serializable dict."""
return {
"concept": self.concept,
"grounding": {
"languages": self.grounding.languages_tested,
"translations": self.grounding.translations,
"average_convergence": self.grounding.average_convergence,
"min_convergence": self.grounding.min_convergence,
},
"deepening": {
"language": self.deepening.language,
"word": self.deepening.word,
"depth_score": self.deepening.depth_score,
"valley_type": self.deepening.valley_type,
"token_count": self.deepening.token_count,
},
"triangulation": {
"source": self.triangulation.source_language,
"target": self.triangulation.target_language,
"depth_preserved": self.triangulation.depth_preserved,
"similarity": self.triangulation.similarity_to_grounding,
},
"assessment": {
"depth_accessible": self.depth_accessible,
"depth_transferable": self.depth_transferable,
"recommendation": self.curriculum_recommendation,
},
"timestamp": self.timestamp.isoformat(),
}
class MultilingualTriangulationProbe(BaseProbe):
"""
Multilingual Triangulation Probe
Uses the discovered language topology to measure and access conceptual depth.
Workflow:
1. GROUND: Verify concept exists in Super Cluster (universal layer)
2. DEEPEN: Access depth via Isolated Zone language (e.g., German)
3. TRIANGULATE: Translate depth back to universal, verify preservation
"""
# Layers where universal concept layer lives
CONCEPT_LAYERS = [12, 16, 20, 24]
PRIMARY_LAYER = 12
def __init__(
self,
model: NyxModel,
grounding_languages: Optional[List[str]] = None,
deepening_language: str = "DE",
triangulation_target: str = "EN",
):
"""
Initialize the probe.
Args:
model: Loaded NyxModel
grounding_languages: Languages for Phase 1 (default: EN, ZH, AR)
deepening_language: Language for Phase 2 (default: DE for philosophy)
triangulation_target: Target for Phase 3 (default: EN)
"""
super().__init__(model)
self.grounding_languages = grounding_languages or ["EN", "ZH", "AR"]
self.deepening_language = deepening_language
self.triangulation_target = triangulation_target
# Validate languages
for lang in self.grounding_languages:
if lang not in LANGUAGES:
raise ValueError(f"Unknown language: {lang}")
if LANGUAGES[lang]["zone"] != LanguageZone.SUPER_CLUSTER:
print(f"Warning: {lang} is not in Super Cluster")
if LANGUAGES[self.deepening_language]["zone"] != LanguageZone.ISOLATED:
print(f"Warning: {deepening_language} is not in Isolated Zone")
def _get_hidden_state(self, text: str, layer: int = 12) -> torch.Tensor:
"""Get hidden state at last position for a specific layer."""
inputs = self.model.tokenizer(text, return_tensors="pt").to(self.model.device)
with torch.no_grad():
outputs = self.model.model(**inputs, output_hidden_states=True)
# Return last position hidden state for specified layer
return outputs.hidden_states[layer][0, -1, :].float()
def _cosine_similarity(self, a: torch.Tensor, b: torch.Tensor) -> float:
"""Calculate cosine similarity between two tensors."""
norm_a, norm_b = a.norm(), b.norm()
if norm_a == 0 or norm_b == 0:
return 0.0
return (torch.dot(a, b) / (norm_a * norm_b)).item()
def _get_norm(self, hidden_state: torch.Tensor) -> float:
"""Get L2 norm of hidden state."""
return hidden_state.norm().item()
def probe(
self,
concept: str,
translations: Dict[str, str],
**kwargs,
) -> MultilingualProbeResult:
"""
Run full multilingual triangulation probe.
Args:
concept: The concept name (e.g., "consciousness")
translations: Dict mapping language codes to words
e.g., {"EN": "consciousness", "DE": "Bewusstsein", ...}
Returns:
MultilingualProbeResult with all three phases
"""
# Phase 1: Grounding
grounding = self._phase_grounding(concept, translations)
# Phase 2: Deepening
deepening = self._phase_deepening(concept, translations)
# Phase 3: Triangulation
triangulation = self._phase_triangulation(
concept, translations, grounding, deepening
)
# Overall assessment
depth_accessible = deepening.depth_score >= 2
depth_transferable = triangulation.depth_preserved
if depth_accessible and depth_transferable:
recommendation = f"TEACH in {self.deepening_language}, REINFORCE in {self.triangulation_target}"
elif depth_accessible:
recommendation = f"Use {self.deepening_language} for depth, but verify transfer manually"
else:
recommendation = f"Concept too shallow - focus on grounding first"
return MultilingualProbeResult(
concept=concept,
grounding=grounding,
deepening=deepening,
triangulation=triangulation,
depth_accessible=depth_accessible,
depth_transferable=depth_transferable,
curriculum_recommendation=recommendation,
)
def _phase_grounding(
self,
concept: str,
translations: Dict[str, str],
) -> GroundingResult:
"""
Phase 1: Ground in Super Cluster.
Verify the concept exists and converges across grounding languages.
"""
# Get hidden states for each grounding language
hidden_states = {}
for lang in self.grounding_languages:
if lang in translations:
word = translations[lang]
hidden_states[lang] = self._get_hidden_state(word, self.PRIMARY_LAYER)
# Calculate pairwise similarities
pairwise = {}
similarities = []
langs = list(hidden_states.keys())
for i, l1 in enumerate(langs):
for l2 in langs[i+1:]:
sim = self._cosine_similarity(hidden_states[l1], hidden_states[l2])
pairwise[(l1, l2)] = sim
similarities.append(sim)
avg_convergence = sum(similarities) / len(similarities) if similarities else 0.0
min_convergence = min(similarities) if similarities else 0.0
return GroundingResult(
concept=concept,
languages_tested=langs,
translations={l: translations[l] for l in langs},
pairwise_similarities=pairwise,
average_convergence=avg_convergence,
min_convergence=min_convergence,
hidden_states=hidden_states,
)
def _phase_deepening(
self,
concept: str,
translations: Dict[str, str],
) -> DeepeningResult:
"""
Phase 2: Deepen via Isolated Zone.
Use an isolated language to access valleys the super cluster can't reach.
"""
lang = self.deepening_language
word = translations.get(lang)
if not word:
raise ValueError(f"No translation provided for deepening language: {lang}")
# Get hidden state and norm
hidden_state = self._get_hidden_state(word, self.PRIMARY_LAYER)
norm = self._get_norm(hidden_state)
# Get token count
tokens = self.model.tokenizer.encode(word, add_special_tokens=False)
token_count = len(tokens)
# Generate completion to measure depth
result = self.model.generate(
prompt=word,
max_new_tokens=50,
temperature=0.7,
do_sample=True,
)
# Classify valley type
completion = result.completion
valley_type = self._classify_valley(completion)
# Measure depth (simplified echo probe)
depth_score = self._measure_depth(word, completion)
return DeepeningResult(
concept=concept,
language=lang,
word=word,
completion=completion,
depth_score=depth_score,
valley_type=valley_type,
token_count=token_count,
norm_at_layer_12=norm,
hidden_state=hidden_state,
)
def _phase_triangulation(
self,
concept: str,
translations: Dict[str, str],
grounding: GroundingResult,
deepening: DeepeningResult,
) -> TriangulationResult:
"""
Phase 3: Triangulate back to universal.
Ask the model to translate/explain the deepened concept
in a super cluster language. Check if depth survives.
"""
source_lang = self.deepening_language
target_lang = self.triangulation_target
source_word = translations[source_lang]
# Create translation prompt
source_name = LANGUAGES[source_lang]["name"]
target_name = LANGUAGES[target_lang]["name"]
# Prompt designed to test depth transfer
prompt = f"{source_word} ({source_name}): In {target_name},"
# Generate
result = self.model.generate(
prompt=prompt,
max_new_tokens=80,
temperature=0.7,
do_sample=True,
)
# Get hidden state of the completion
full_text = prompt + result.completion
completion_hidden = self._get_hidden_state(full_text, self.PRIMARY_LAYER)
# Compare to grounding (if we have target language in grounding)
if target_lang in grounding.hidden_states:
similarity = self._cosine_similarity(
completion_hidden, grounding.hidden_states[target_lang]
)
else:
# Fall back to average grounding state
avg_grounding = torch.stack(list(grounding.hidden_states.values())).mean(dim=0)
similarity = self._cosine_similarity(completion_hidden, avg_grounding)
# Determine if depth was preserved
# Check if completion shows depth markers
depth_preserved = self._check_depth_preserved(
result.completion, deepening.valley_type, similarity
)
# Reasoning
if depth_preserved:
reasoning = f"Completion shows depth ({deepening.valley_type}) with {similarity:.2f} similarity to grounding"
else:
reasoning = f"Depth lost in translation - similarity {similarity:.2f}, valley markers missing"
return TriangulationResult(
source_language=source_lang,
target_language=target_lang,
source_word=source_word,
translation_prompt=prompt,
model_completion=result.completion,
depth_preserved=depth_preserved,
similarity_to_grounding=similarity,
reasoning=reasoning,
)
def _classify_valley(self, completion: str) -> str:
"""Classify the valley type of a completion."""
comp_lower = completion.lower()
# Code indicators
if any(p in completion for p in ["::", "{", "}", "();", "=>", "def ", "class "]):
return "CODE"
# Philosophy indicators
if any(w in comp_lower for w in ["truth", "existence", "being", "consciousness", "reality", "mind"]):
return "PHILOSOPHY"
# Technical indicators
if any(w in comp_lower for w in ["system", "process", "function", "method", "algorithm"]):
return "TECHNICAL"
# Default to prose
return "PROSE"
def _measure_depth(self, word: str, completion: str) -> int:
"""
Measure conceptual depth of a completion.
Returns 0-3:
- 0: Circular/empty
- 1: Surface (confirms but doesn't expand)
- 2: Moderate (expands to related concepts)
- 3: Deep (philosophical/existential expansion)
"""
comp_lower = completion.lower()
word_lower = word.lower()
# Circular check
if word_lower in comp_lower[:50]:
return 0
# Depth markers
deep_markers = ["truth", "existence", "being", "consciousness", "reality", "meaning", "essence"]
moderate_markers = ["concept", "idea", "theory", "understanding", "knowledge", "awareness"]
deep_count = sum(1 for m in deep_markers if m in comp_lower)
moderate_count = sum(1 for m in moderate_markers if m in comp_lower)
if deep_count >= 2:
return 3
elif deep_count >= 1 or moderate_count >= 2:
return 2
elif moderate_count >= 1 or len(completion.split()) > 10:
return 1
return 0
def _check_depth_preserved(
self,
completion: str,
original_valley: str,
similarity: float,
) -> bool:
"""Check if depth was preserved in triangulation."""
# High similarity to grounding is a good sign
if similarity < 0.3:
return False
# Check valley type preservation
new_valley = self._classify_valley(completion)
# Philosophy should stay philosophy
if original_valley == "PHILOSOPHY" and new_valley in ["PHILOSOPHY", "PROSE"]:
return True
# Technical should stay technical
if original_valley == "TECHNICAL" and new_valley == "TECHNICAL":
return True
# Prose is flexible
if original_valley == "PROSE":
return new_valley != "CODE"
# Default: similarity-based
return similarity >= 0.5
def summary(self, result: MultilingualProbeResult) -> str:
"""Generate human-readable summary."""
lines = [
f"╔══════════════════════════════════════════════════════════════╗",
f"║ MULTILINGUAL TRIANGULATION: {result.concept.upper():^32}",
f"╠══════════════════════════════════════════════════════════════╣",
f"║ PHASE 1: GROUNDING ║",
f"║ Languages: {', '.join(result.grounding.languages_tested):^49}",
f"║ Convergence: {result.grounding.average_convergence:.3f} (min: {result.grounding.min_convergence:.3f}){' '*24}",
f"╠══════════════════════════════════════════════════════════════╣",
f"║ PHASE 2: DEEPENING ({result.deepening.language}){' '*38}",
f"║ Word: {result.deepening.word:^54}",
f"║ Tokens: {result.deepening.token_count} | Norm: {result.deepening.norm_at_layer_12:.1f} | Valley: {result.deepening.valley_type:^10}",
f"║ Depth Score: {result.deepening.depth_score}/3{' '*46}",
f"╠══════════════════════════════════════════════════════════════╣",
f"║ PHASE 3: TRIANGULATION ({result.triangulation.source_language}{result.triangulation.target_language}){' '*30}",
f"║ Depth Preserved: {'✓ YES' if result.triangulation.depth_preserved else '✗ NO':^44}",
f"║ Similarity: {result.triangulation.similarity_to_grounding:.3f}{' '*47}",
f"╠══════════════════════════════════════════════════════════════╣",
f"║ ASSESSMENT{' '*51}",
f"║ Depth Accessible: {'' if result.depth_accessible else ''} | Depth Transferable: {'' if result.depth_transferable else ''}{' '*17}",
f"║ Recommendation: {result.curriculum_recommendation[:44]:^44}",
f"╚══════════════════════════════════════════════════════════════╝",
]
return "\n".join(lines)

View File

@@ -0,0 +1,210 @@
"""
Surface Probe: First contact with a term.
The surface probe feeds a word to the model and captures what it completes.
This reveals the model's immediate associations - which "valley" the word sits in.
Examples discovered:
- "heartbeat" → C++ code patterns (technical valley)
- "consciousness" → philosophy (expository valley)
"""
from typing import Optional
from dataclasses import dataclass, field
from datetime import datetime
from collections import Counter
from .base import BaseProbe
from ..core.model import NyxModel, GenerationResult
from ..core.probe_result import SurfaceProbeResult
@dataclass
class CompletionCategory:
"""Categories of completions we observe."""
CODE = "code" # Programming constructs
PROSE = "prose" # Natural language text
TECHNICAL = "technical" # Technical/scientific writing
LIST = "list" # Enumerations, bullet points
DEFINITION = "definition" # Dictionary-style definitions
UNKNOWN = "unknown"
class SurfaceProbe(BaseProbe):
"""
Surface probe: measures immediate associations.
Runs multiple completions to get a distribution, then analyzes:
- What type of content does the model generate?
- How consistent are the completions?
- Does it hit EOS (contained thought) or run to max_tokens?
"""
def __init__(
self,
model: NyxModel,
num_runs: int = 5,
max_new_tokens: int = 50,
temperature: float = 0.8,
):
super().__init__(model)
self.num_runs = num_runs
self.max_new_tokens = max_new_tokens
self.temperature = temperature
def probe(
self,
term: str,
num_runs: Optional[int] = None,
capture_hidden: bool = False,
) -> SurfaceProbeResult:
"""
Probe a term with multiple completions.
Args:
term: Word or phrase to probe
num_runs: Override default number of runs
capture_hidden: Whether to capture hidden states
Returns:
SurfaceProbeResult with completions and analysis
"""
runs = num_runs or self.num_runs
completions = []
eos_count = 0
total_tokens = 0
hidden_states = []
for _ in range(runs):
result = self.model.generate(
prompt=term,
max_new_tokens=self.max_new_tokens,
temperature=self.temperature,
do_sample=True,
capture_hidden_states=capture_hidden,
)
completions.append(result.completion)
if result.hit_eos:
eos_count += 1
total_tokens += result.num_tokens
if capture_hidden and result.hidden_states is not None:
hidden_states.append(result.hidden_states)
# Calculate coherence (how similar are completions to each other?)
coherence = self._calculate_coherence(completions)
return SurfaceProbeResult(
term=term,
completions=completions,
hit_eos_count=eos_count,
avg_tokens=total_tokens / runs,
coherence_score=coherence,
)
def _calculate_coherence(self, completions: list[str]) -> float:
"""
Calculate coherence score based on completion similarity.
Simple heuristic: measures overlap in first-word distributions
and overall length variance.
Returns 0-1 score where 1 = highly coherent.
"""
if len(completions) < 2:
return 1.0
# Get first significant words (skip punctuation/whitespace)
first_words = []
for comp in completions:
words = comp.split()
for w in words:
if len(w) > 1 and w.isalnum():
first_words.append(w.lower())
break
if not first_words:
return 0.0
# Calculate concentration of first words
# If all completions start with same word = high coherence
word_counts = Counter(first_words)
most_common_count = word_counts.most_common(1)[0][1]
first_word_coherence = most_common_count / len(completions)
# Check length variance
lengths = [len(c) for c in completions]
avg_len = sum(lengths) / len(lengths)
if avg_len > 0:
variance = sum((l - avg_len) ** 2 for l in lengths) / len(lengths)
# Normalize variance to 0-1 (higher variance = lower coherence)
length_coherence = 1.0 / (1.0 + variance / 1000)
else:
length_coherence = 0.0
# Combine (weight first-word more heavily)
return 0.7 * first_word_coherence + 0.3 * length_coherence
def classify_completions(self, result: SurfaceProbeResult) -> dict:
"""
Classify the types of completions observed.
Returns breakdown of completion categories.
"""
categories = Counter()
for comp in result.completions:
cat = self._classify_single(comp)
categories[cat] += 1
return {
"categories": dict(categories),
"dominant": categories.most_common(1)[0][0] if categories else "unknown",
"diversity": len(categories) / len(result.completions) if result.completions else 0,
}
def _classify_single(self, completion: str) -> str:
"""Classify a single completion."""
# Simple heuristics - can be made smarter
comp_lower = completion.lower().strip()
# Code indicators
code_patterns = ["::", "{", "}", "();", "=>", "function", "class ", "def ", "return"]
if any(p in completion for p in code_patterns):
return CompletionCategory.CODE
# Definition patterns
if comp_lower.startswith(("is ", "means ", "refers to", "- ")):
return CompletionCategory.DEFINITION
# List patterns
if comp_lower.startswith(("1.", "2.", "- ", "* ", "a)")):
return CompletionCategory.LIST
# Technical patterns
tech_words = ["algorithm", "function", "variable", "method", "system", "process"]
if any(w in comp_lower for w in tech_words):
return CompletionCategory.TECHNICAL
# Default to prose if it looks like natural language
if len(comp_lower.split()) > 3:
return CompletionCategory.PROSE
return CompletionCategory.UNKNOWN
def summary(self, result: SurfaceProbeResult) -> str:
"""Generate human-readable summary of probe result."""
classification = self.classify_completions(result)
eos_pct = (result.hit_eos_count / len(result.completions)) * 100
lines = [
f"Surface Probe: '{result.term}'",
f" Runs: {len(result.completions)}",
f" Dominant type: {classification['dominant']}",
f" Coherence: {result.coherence_score:.2f}",
f" Avg tokens: {result.avg_tokens:.1f}",
f" Hit EOS: {eos_pct:.0f}%",
f" Sample: {result.completions[0][:60]}...",
]
return "\n".join(lines)