feat: Add Oghma RAG Proxy for SkyrimNet lore injection

RAG proxy that intercepts SkyrimNet LLM requests and enriches them
with relevant Tamrielic lore from CHIM's Oghma Infinium database.

Features:
- FastAPI proxy compatible with OpenAI API
- ChromaDB semantic search for lore retrieval
- NPC profile extraction from SkyrimNet prompts
- Google Sheets ingestion for CHIM's Oghma data
- Kubernetes deployment manifests
- Debug endpoint for RAG operation monitoring

Collections ingested to iris-dev ChromaDB:
- oghma_lore: 1951 entries (scholar knowledge)
- oghma_basic: 1949 entries (commoner knowledge)
- oghma_visual: 1151 entries (Omnisight perception)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
dafit
2026-03-30 23:22:46 +02:00
parent 62dcee5fbf
commit 3926ab676f
20 changed files with 2367 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
"""Oghma RAG Proxy - Lore enrichment for SkyrimNet."""
__version__ = "0.1.0"

View File

@@ -0,0 +1,147 @@
"""NPC Profile Extractor - Parses SkyrimNet prompts to extract NPC context."""
from __future__ import annotations
import re
import structlog
from .models import NPCProfile
logger = structlog.get_logger()
class NPCExtractor:
"""Extracts NPC profile information from SkyrimNet prompts."""
# Regex patterns for extraction
PATTERNS = {
# Character bio header
"bio_header": re.compile(
r"## (?P<name>[\w\s'-]+) Bio\s*\n"
r"- Gender: (?P<gender>\w+)\s*\n"
r"- Race: (?P<race>[\w\s]+)",
re.MULTILINE,
),
# Alternative role description
"role_intro": re.compile(
r"You are (?P<name>[^,\n]+),?\s*(?:a |an )?(?P<descriptor>[^.\n]+)",
re.IGNORECASE,
),
# Faction membership
"faction": re.compile(
r"(?:member of|belongs to|joined|part of) (?:the )?(?P<faction>[\w\s]+?)(?:\.|,|\n|$)",
re.IGNORECASE,
),
# Location mentions
"location": re.compile(
r"(?:in|at|near|from) (?P<location>Whiterun|Windhelm|Solitude|Riften|"
r"Markarth|Morthal|Dawnstar|Winterhold|Falkreath|Riverwood|Rorikstead|"
r"Ivarstead|Solstheim|Raven Rock)",
re.IGNORECASE,
),
# Profession/occupation
"occupation": re.compile(
r"(?:works as|profession:|occupation:|is a|as a) (?P<profession>[\w\s]+?)(?:\.|,|\n|$)",
re.IGNORECASE,
),
}
# Known professions for fuzzy matching
KNOWN_PROFESSIONS = {
"priest", "priestess", "mage", "wizard", "scholar", "blacksmith",
"guard", "soldier", "warrior", "thief", "merchant", "innkeeper",
"hunter", "farmer", "peasant", "noble", "jarl", "bard", "alchemist",
"healer", "assassin", "spy", "courier", "carriage driver", "fisherman",
"miller", "brewer", "smith", "armorer", "fletcher", "jeweler",
}
def extract(self, messages: list[dict]) -> NPCProfile:
"""Extract NPC profile from chat messages."""
# Combine all message content for analysis
full_text = "\n".join(
msg.get("content", "") for msg in messages if msg.get("content")
)
profile = NPCProfile()
# Try bio header first (most reliable)
if match := self.PATTERNS["bio_header"].search(full_text):
profile.name = match.group("name").strip()
profile.gender = match.group("gender").strip()
profile.race = match.group("race").strip()
logger.debug("Extracted from bio header", name=profile.name, race=profile.race)
# Fallback to role intro
elif match := self.PATTERNS["role_intro"].search(full_text):
profile.name = match.group("name").strip()
descriptor = match.group("descriptor")
# Try to parse race from descriptor
profile.race = self._extract_race_from_descriptor(descriptor)
logger.debug("Extracted from role intro", name=profile.name)
# Extract location
if match := self.PATTERNS["location"].search(full_text):
profile.location = match.group("location").strip()
# Extract factions
for match in self.PATTERNS["faction"].finditer(full_text):
faction = match.group("faction").strip()
if faction and faction not in profile.factions:
profile.factions.append(faction)
# Extract profession
if match := self.PATTERNS["occupation"].search(full_text):
profession = match.group("profession").strip().lower()
# Validate against known professions
for known in self.KNOWN_PROFESSIONS:
if known in profession:
profile.profession = known
break
# Compute knowledge classes
profile.compute_knowledge_classes()
logger.info(
"Extracted NPC profile",
name=profile.name,
race=profile.race,
profession=profile.profession,
factions=profile.factions,
location=profile.location,
knowledge_classes=profile.knowledge_classes,
education_level=profile.education_level.value,
)
return profile
def _extract_race_from_descriptor(self, descriptor: str) -> str:
"""Try to extract race from a descriptor string."""
races = [
"Nord", "Dunmer", "Dark Elf", "Altmer", "High Elf",
"Bosmer", "Wood Elf", "Argonian", "Khajiit", "Breton",
"Redguard", "Orsimer", "Orc", "Imperial",
]
descriptor_lower = descriptor.lower()
for race in races:
if race.lower() in descriptor_lower:
# Normalize to single-word form
return race.replace(" ", "")
return "Unknown"
def extract_conversation_context(self, messages: list[dict]) -> str:
"""Extract the current conversation topic for RAG query."""
# Get the last few user/assistant exchanges
recent_content = []
for msg in reversed(messages[-6:]):
content = msg.get("content", "")
if content and msg.get("role") in ("user", "assistant"):
# Skip very long content (likely system prompts)
if len(content) < 500:
recent_content.append(content)
if not recent_content:
return ""
# Combine recent conversation as the query context
return " ".join(reversed(recent_content[-3:]))

View File

@@ -0,0 +1,444 @@
"""Oghma Data Ingestion - Loads CHIM's Oghma lore into ChromaDB."""
from __future__ import annotations
import argparse
import io
import re
import sys
import time
from typing import Iterator
import chromadb
import httpx
import structlog
from chromadb.config import Settings
logger = structlog.get_logger()
# Google Sheet ID for CHIM's Oghma Infinium
OGHMA_SHEET_ID = "1dcfctU-iOqprwy2BOc7___4Awteczgdlv8886KalPsQ"
def discover_sheet_gids(sheet_id: str) -> dict[str, str]:
"""
Discover actual gids for all sheets by parsing the HTML page.
Returns:
Dict mapping sheet name to gid
"""
url = f"https://docs.google.com/spreadsheets/d/{sheet_id}/htmlview"
with httpx.Client(follow_redirects=True, timeout=30.0) as client:
response = client.get(url)
response.raise_for_status()
html = response.text
sheet_gids = {}
# Pattern 1: items.push({name: "Sheet Name", ...gid: "12345"...})
# This is the format Google Sheets uses in the htmlview page
for match in re.finditer(
r'items\.push\(\{name:\s*"([^"]+)"[^}]*gid:\s*"(\d+)"',
html
):
name = match.group(1)
gid = match.group(2)
sheet_gids[name] = gid
# Pattern 2: Also check for gid in URL patterns as backup
# ...gid=12345", gid: "12345"...
if not sheet_gids:
for match in re.finditer(r'gid=(\d+)"[^}]*gid:\s*"(\d+)"', html):
gid = match.group(2)
# Try to find associated name nearby
# This is a fallback pattern
# Pattern 3: Look for the sheet tabs JSON structure
for match in re.finditer(
r'\{name:\s*"([^"]+)"[^}]*gid:\s*"(\d+)"[^}]*\}',
html
):
name = match.group(1)
gid = match.group(2)
if name not in sheet_gids:
sheet_gids[name] = gid
logger.info("Discovered sheets", count=len(sheet_gids), sheets=list(sheet_gids.keys())[:10])
return sheet_gids
def fetch_sheet_csv(sheet_id: str, gid: str, sheet_name: str = "") -> str:
"""Fetch a Google Sheet as CSV."""
url = f"https://docs.google.com/spreadsheets/d/{sheet_id}/export?format=csv&gid={gid}"
with httpx.Client(follow_redirects=True, timeout=60.0) as client:
response = client.get(url)
if response.status_code == 400:
logger.warning("Sheet fetch failed with 400", sheet=sheet_name, gid=gid)
raise httpx.HTTPStatusError(
f"Failed to fetch sheet {sheet_name}",
request=response.request,
response=response,
)
response.raise_for_status()
return response.text
def parse_csv(csv_text: str) -> Iterator[dict]:
"""Parse CSV text into rows."""
import csv
reader = csv.DictReader(io.StringIO(csv_text))
for row in reader:
yield row
def categorize_sheet(sheet_name: str) -> str | None:
"""
Determine category for a sheet and whether to process it.
Returns category name or None if sheet should be skipped.
"""
# Normalize escaped slashes from JSON
normalized_name = sheet_name.replace("\\/", "/").replace("\\", "")
# Sheets to process and their categories
sheet_categories = {
"Figures/Gods": "figures_gods",
"Artifacts": "artifacts",
"Armor and Weapons": "armor_weapons",
"Items": "items",
"Spells": "spells",
"Creatures": "creatures",
"Groups/Lore/Books": "groups_lore",
"Dynamic Oghma": "dynamic",
"Visual Descriptions": "visual",
}
# Check direct match
if normalized_name in sheet_categories:
return sheet_categories[normalized_name]
# Check location sheets - handles both "Locations - Whiterun" and "Locations (Whiterun)"
if normalized_name.startswith("Locations"):
# Try pattern: "Locations (Whiterun)"
match = re.match(r"Locations\s*[\(\-]\s*([^\)]+)\)?", normalized_name)
if match:
hold = match.group(1).strip().lower().replace(" ", "_")
return f"locations_{hold}"
# Skip meta/reference sheets
skip_sheets = ["Project Oghma", "Knowledge Classes Reference", "Vanilla NPCS", "Template"]
if any(skip in normalized_name for skip in skip_sheets):
return None
return None
def ingest_oghma(
chromadb_host: str = "iris-dev.eachpath.local",
chromadb_port: int = 35000,
dry_run: bool = False,
) -> dict:
"""
Ingest all Oghma sheets into ChromaDB.
Returns:
Statistics about ingestion
"""
stats = {
"sheets_processed": 0,
"sheets_skipped": 0,
"lore_entries": 0,
"basic_entries": 0,
"visual_entries": 0,
"errors": [],
}
# Discover actual sheet gids
logger.info("Discovering sheet gids...")
try:
sheet_gids = discover_sheet_gids(OGHMA_SHEET_ID)
except Exception as e:
logger.error("Failed to discover sheets", error=str(e))
# Fallback to known gids (manually discovered)
sheet_gids = {
"Figures/Gods": "0",
# Add more as we discover them
}
if not sheet_gids:
logger.error("No sheets discovered!")
return stats
if not dry_run:
client = chromadb.HttpClient(
host=chromadb_host,
port=chromadb_port,
settings=Settings(anonymized_telemetry=False),
)
# Get or create collections
collection_lore = client.get_or_create_collection(
name="oghma_lore",
metadata={"description": "Full Tamrielic lore for educated NPCs"},
)
collection_basic = client.get_or_create_collection(
name="oghma_basic",
metadata={"description": "Basic Tamrielic lore for commoners"},
)
collection_visual = client.get_or_create_collection(
name="oghma_visual",
metadata={"description": "Visual descriptions for Omnisight perception"},
)
logger.info("Connected to ChromaDB", host=chromadb_host, port=chromadb_port)
else:
logger.info("DRY RUN - not connecting to ChromaDB")
collection_lore = None
collection_basic = None
collection_visual = None
for sheet_name, gid in sheet_gids.items():
category = categorize_sheet(sheet_name)
if category is None:
logger.debug("Skipping sheet", sheet=sheet_name)
stats["sheets_skipped"] += 1
continue
logger.info("Processing sheet", sheet=sheet_name, gid=gid, category=category)
# Rate limit to avoid Google blocking
time.sleep(1.0)
try:
csv_text = fetch_sheet_csv(OGHMA_SHEET_ID, gid, sheet_name)
rows = list(parse_csv(csv_text))
if not rows:
logger.warning("Empty sheet", sheet=sheet_name)
continue
# Check if this sheet has the expected columns
sample_row = rows[0]
is_visual_sheet = category == "visual"
# Visual Descriptions has different schema: baseid, name, description
if is_visual_sheet:
if "name" not in sample_row or "description" not in sample_row:
logger.warning("Visual sheet missing expected columns", sheet=sheet_name, columns=list(sample_row.keys())[:5])
continue
elif "topic" not in sample_row:
logger.warning("Sheet missing 'topic' column", sheet=sheet_name, columns=list(sample_row.keys())[:5])
continue
lore_docs = []
lore_ids = []
lore_metadatas = []
basic_docs = []
basic_ids = []
basic_metadatas = []
# Visual descriptions - universal perception (Omnisight)
visual_docs = []
visual_ids = []
visual_metadatas = []
# Track seen IDs to handle duplicates
seen_ids = set()
duplicates_skipped = 0
for row in rows:
# Handle visual descriptions separately
if is_visual_sheet:
name = row.get("name", "").strip()
description = row.get("description", "").strip()
baseid = row.get("baseid", "").strip()
# Clean up Excel scientific notation for zero (0.00E+00)
if baseid and ("E+" in baseid or "E-" in baseid):
try:
if float(baseid) == 0:
baseid = ""
except ValueError:
pass
if name and description:
# Use baseid:name for uniqueness, fall back to name only
doc_id = f"visual:{baseid}:{name}" if baseid else f"visual:{name}"
if doc_id in seen_ids:
duplicates_skipped += 1
continue
seen_ids.add(doc_id)
visual_docs.append(description)
visual_ids.append(doc_id)
visual_metadatas.append({
"name": name,
"baseid": baseid,
"category": "visual",
"sheet": sheet_name,
})
continue
topic = row.get("topic", "").strip()
if not topic:
continue
# Full lore entry
topic_desc = row.get("topic_desc", "").strip()
if topic_desc:
lore_id = f"{category}:{topic}"
if lore_id in seen_ids:
duplicates_skipped += 1
else:
seen_ids.add(lore_id)
knowledge_classes = row.get("knowledge_class", "").strip()
lore_docs.append(topic_desc)
lore_ids.append(lore_id)
lore_metadatas.append({
"topic": topic,
"category": category,
"sheet": sheet_name,
"knowledge_classes": knowledge_classes,
"tags": row.get("tags", "").strip(),
})
# Basic lore entry
topic_desc_basic = row.get("topic_desc_basic", "").strip()
if topic_desc_basic:
basic_id = f"{category}:{topic}:basic"
if basic_id in seen_ids:
duplicates_skipped += 1
else:
seen_ids.add(basic_id)
knowledge_classes_basic = row.get("knowledge_class_basic", "").strip()
basic_docs.append(topic_desc_basic)
basic_ids.append(basic_id)
basic_metadatas.append({
"topic": topic,
"category": category,
"sheet": sheet_name,
"knowledge_classes": knowledge_classes_basic,
"tags": row.get("tags", "").strip(),
})
# Batch insert to ChromaDB
if not dry_run:
if lore_docs:
collection_lore.upsert(
documents=lore_docs,
ids=lore_ids,
metadatas=lore_metadatas,
)
if basic_docs:
collection_basic.upsert(
documents=basic_docs,
ids=basic_ids,
metadatas=basic_metadatas,
)
if visual_docs:
collection_visual.upsert(
documents=visual_docs,
ids=visual_ids,
metadatas=visual_metadatas,
)
stats["sheets_processed"] += 1
stats["lore_entries"] += len(lore_docs)
stats["basic_entries"] += len(basic_docs)
stats["visual_entries"] += len(visual_docs)
if duplicates_skipped > 0:
logger.debug("Duplicates skipped", sheet=sheet_name, count=duplicates_skipped)
logger.info(
"Sheet processed",
sheet=sheet_name,
rows=len(rows),
lore_entries=len(lore_docs),
basic_entries=len(basic_docs),
visual_entries=len(visual_docs),
)
except httpx.HTTPStatusError as e:
logger.error("HTTP error fetching sheet", sheet=sheet_name, status=e.response.status_code)
stats["errors"].append({"sheet": sheet_name, "error": f"HTTP {e.response.status_code}"})
except Exception as e:
logger.error("Failed to process sheet", sheet=sheet_name, error=str(e))
stats["errors"].append({"sheet": sheet_name, "error": str(e)})
logger.info(
"Ingestion complete",
sheets_processed=stats["sheets_processed"],
sheets_skipped=stats["sheets_skipped"],
lore_entries=stats["lore_entries"],
basic_entries=stats["basic_entries"],
visual_entries=stats["visual_entries"],
errors=len(stats["errors"]),
)
return stats
def main():
"""CLI entry point."""
parser = argparse.ArgumentParser(
description="Ingest CHIM's Oghma Infinium lore into ChromaDB"
)
parser.add_argument(
"--host",
default="iris-dev.eachpath.local",
help="ChromaDB host",
)
parser.add_argument(
"--port",
type=int,
default=35000,
help="ChromaDB port",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Fetch and parse sheets without writing to ChromaDB",
)
args = parser.parse_args()
# Configure logging
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.dev.ConsoleRenderer(),
],
)
try:
stats = ingest_oghma(
chromadb_host=args.host,
chromadb_port=args.port,
dry_run=args.dry_run,
)
if stats["errors"]:
logger.warning("Ingestion completed with errors", errors=stats["errors"])
# Don't exit 1 if we processed some sheets successfully
if stats["sheets_processed"] == 0:
sys.exit(1)
except Exception as e:
logger.error("Ingestion failed", error=str(e))
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,153 @@
"""Lore Injector - Injects retrieved lore into SkyrimNet prompts."""
from __future__ import annotations
import structlog
from .models import InjectionResult, LoreEntry, NPCProfile
logger = structlog.get_logger()
class LoreInjector:
"""Injects Oghma lore into SkyrimNet chat messages."""
DEFAULT_TEMPLATE = """
## Relevant Lore Knowledge
Based on your background as a {race} {profession} in {location}, you would know:
{lore_items}
Note: Reference this knowledge naturally when relevant to the conversation. Do not recite it.
"""
def __init__(self, template: str | None = None, position: str = "after_bio"):
"""
Initialize injector.
Args:
template: Jinja-style template for injection block
position: Where to inject - 'after_bio', 'before_conversation', 'system_suffix'
"""
self.template = template or self.DEFAULT_TEMPLATE
self.position = position
def inject(
self,
messages: list[dict],
npc_profile: NPCProfile,
lore_entries: list[LoreEntry],
query_time_ms: float,
) -> tuple[list[dict], InjectionResult]:
"""
Inject lore into chat messages.
Args:
messages: Original chat messages
npc_profile: Extracted NPC profile
lore_entries: Retrieved lore entries
query_time_ms: Time taken for retrieval
Returns:
Tuple of (modified messages, injection result)
"""
if not lore_entries:
return messages, InjectionResult(
npc_profile=npc_profile,
lore_entries=[],
injection_text="",
query_time_ms=query_time_ms,
)
# Build injection text
injection_text = self._build_injection_text(npc_profile, lore_entries)
# Clone messages to avoid modifying original
modified_messages = [dict(msg) for msg in messages]
# Find injection point
injected = False
for i, msg in enumerate(modified_messages):
if msg.get("role") == "system":
content = msg.get("content", "")
if self.position == "after_bio":
# Inject after character bio section
bio_markers = ["## Background", "## Personality", "## Speech Style"]
for marker in bio_markers:
if marker in content:
# Insert before this section
idx = content.index(marker)
modified_messages[i]["content"] = (
content[:idx] + injection_text + "\n\n" + content[idx:]
)
injected = True
break
elif self.position == "system_suffix":
# Append to end of system message
modified_messages[i]["content"] = content + "\n\n" + injection_text
injected = True
if injected:
break
# Fallback: prepend to first user message if no system message found
if not injected and self.position == "before_conversation":
for i, msg in enumerate(modified_messages):
if msg.get("role") == "user":
content = msg.get("content", "")
modified_messages[i]["content"] = (
f"[Context for the NPC you're speaking with]\n{injection_text}\n\n"
f"[Player speaks]\n{content}"
)
injected = True
break
if injected:
logger.info(
"Injected lore",
npc_name=npc_profile.name,
entries_count=len(lore_entries),
position=self.position,
)
else:
logger.warning("Could not find injection point", position=self.position)
result = InjectionResult(
npc_profile=npc_profile,
lore_entries=lore_entries,
injection_text=injection_text if injected else "",
query_time_ms=query_time_ms,
)
return modified_messages, result
def _build_injection_text(
self,
npc_profile: NPCProfile,
lore_entries: list[LoreEntry],
) -> str:
"""Build the injection text block."""
# Build lore items list
lore_items = []
for entry in lore_entries:
# Truncate very long entries
content = entry.content
if len(content) > 300:
content = content[:297] + "..."
lore_items.append(f"- **{entry.topic}**: {content}")
lore_items_text = "\n".join(lore_items)
# Fill template
injection_text = self.template.format(
race=npc_profile.race or "person",
profession=npc_profile.profession or "citizen",
location=npc_profile.location or "Skyrim",
lore_items=lore_items_text,
name=npc_profile.name,
)
return injection_text.strip()

View File

@@ -0,0 +1,291 @@
"""Oghma RAG Proxy - Main FastAPI Application."""
from __future__ import annotations
import os
import time
from contextlib import asynccontextmanager
from typing import Any
import httpx
import structlog
import yaml
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import StreamingResponse
from .extractor import NPCExtractor
from .injector import LoreInjector
from .models import ChatCompletionRequest
from .retriever import OghmaRetriever
# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
)
logger = structlog.get_logger()
def load_config(config_path: str = "config.yaml") -> dict:
"""Load configuration from YAML file."""
# Try local config first, then default
for path in ["config.local.yaml", config_path]:
if os.path.exists(path):
with open(path) as f:
config = yaml.safe_load(f)
logger.info("Loaded config", path=path)
return config
return {}
# Global instances
config = load_config()
extractor = NPCExtractor()
retriever: OghmaRetriever | None = None
injector: LoreInjector | None = None
http_client: httpx.AsyncClient | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan - setup and teardown."""
global retriever, injector, http_client
# Initialize components
chroma_config = config.get("chromadb", {})
retriever = OghmaRetriever(
host=chroma_config.get("host", "iris-dev.eachpath.local"),
port=chroma_config.get("port", 35000),
collection_lore=chroma_config.get("collection_lore", "oghma_lore"),
collection_basic=chroma_config.get("collection_basic", "oghma_basic"),
max_results=config.get("retrieval", {}).get("max_results", 5),
min_score=config.get("retrieval", {}).get("min_score", 0.55),
)
injection_config = config.get("injection", {})
injector = LoreInjector(
template=injection_config.get("template"),
position=injection_config.get("position", "after_bio"),
)
upstream_config = config.get("upstream", {})
http_client = httpx.AsyncClient(
timeout=httpx.Timeout(
connect=10.0,
read=upstream_config.get("timeout", 120.0),
write=30.0,
pool=10.0,
),
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
)
logger.info(
"Oghma RAG Proxy started",
upstream_url=upstream_config.get("url", ""),
chromadb_host=chroma_config.get("host"),
)
yield
# Cleanup
if http_client:
await http_client.aclose()
logger.info("Oghma RAG Proxy stopped")
app = FastAPI(
title="Oghma RAG Proxy",
description="RAG Proxy for SkyrimNet - Injects Tamrielic lore into NPC conversations",
version="0.1.0",
lifespan=lifespan,
)
@app.get("/health")
async def health_check():
"""Health check endpoint."""
chromadb_healthy = retriever.health_check() if retriever else False
return {
"status": "healthy" if chromadb_healthy else "degraded",
"components": {
"proxy": "healthy",
"chromadb": "healthy" if chromadb_healthy else "unhealthy",
},
}
# Debug: track recent RAG operations
_recent_rag_ops = []
@app.get("/stats")
async def get_stats():
"""Get proxy statistics."""
return {
"version": "0.1.0",
"injection_enabled": config.get("injection", {}).get("enabled", True),
"upstream_url": config.get("upstream", {}).get("url", ""),
}
@app.get("/debug/rag")
async def debug_rag():
"""Debug endpoint to see recent RAG operations."""
return {"recent_operations": _recent_rag_ops[-20:]}
@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
"""
Proxy chat completions with RAG enrichment.
This endpoint intercepts OpenRouter-compatible requests,
enriches them with relevant Tamrielic lore, and forwards
to the upstream LLM.
"""
start_time = time.perf_counter()
# Parse request body
body = await request.json()
messages = body.get("messages", [])
stream = body.get("stream", False)
# Extract NPC profile from messages
npc_profile = extractor.extract(messages)
# Get conversation context for RAG query
context = extractor.extract_conversation_context(messages)
# Retrieve relevant lore
lore_entries = []
query_time_ms = 0.0
if (
retriever
and injector
and config.get("injection", {}).get("enabled", True)
and context
):
lore_entries, query_time_ms = retriever.retrieve(context, npc_profile)
# Inject lore into messages
if lore_entries:
messages, injection_result = injector.inject(
messages,
npc_profile,
lore_entries,
query_time_ms,
)
body["messages"] = messages
# Track for debug endpoint
_recent_rag_ops.append({
"npc": npc_profile.name,
"race": npc_profile.race,
"query": context[:100] if context else "",
"lore_found": len(lore_entries),
"topics": [e.topic for e in lore_entries[:3]],
"time_ms": round(query_time_ms, 2),
})
if len(_recent_rag_ops) > 50:
_recent_rag_ops.pop(0)
# Forward to upstream
upstream_config = config.get("upstream", {})
upstream_url = upstream_config.get("url", "https://openrouter.ai/api/v1")
api_key = upstream_config.get("api_key", os.environ.get("OPENROUTER_API_KEY", ""))
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
# Copy relevant headers from original request
for header in ["HTTP-Referer", "X-Title"]:
if value := request.headers.get(header):
headers[header] = value
try:
if stream:
# Streaming response
return StreamingResponse(
stream_upstream(f"{upstream_url}/chat/completions", headers, body),
media_type="text/event-stream",
)
else:
# Regular response
response = await http_client.post(
f"{upstream_url}/chat/completions",
json=body,
headers=headers,
)
response.raise_for_status()
total_time = (time.perf_counter() - start_time) * 1000
logger.info(
"Request completed",
npc_name=npc_profile.name,
lore_entries=len(lore_entries),
rag_time_ms=round(query_time_ms, 2),
total_time_ms=round(total_time, 2),
)
return response.json()
except httpx.HTTPError as e:
logger.error("Upstream request failed", error=str(e))
raise HTTPException(status_code=502, detail=f"Upstream error: {e}")
async def stream_upstream(url: str, headers: dict, body: dict):
"""Stream response from upstream."""
async with http_client.stream("POST", url, json=body, headers=headers) as response:
async for chunk in response.aiter_bytes():
yield chunk
@app.post("/v1/completions")
async def completions(request: Request):
"""Legacy completions endpoint - passthrough."""
body = await request.json()
upstream_config = config.get("upstream", {})
upstream_url = upstream_config.get("url", "https://openrouter.ai/api/v1")
api_key = upstream_config.get("api_key", os.environ.get("OPENROUTER_API_KEY", ""))
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
response = await http_client.post(
f"{upstream_url}/completions",
json=body,
headers=headers,
)
return response.json()
def main():
"""Run the proxy server."""
import uvicorn
proxy_config = config.get("proxy", {})
uvicorn.run(
"oghma_proxy.main:app",
host=proxy_config.get("host", "0.0.0.0"),
port=proxy_config.get("port", 8100),
workers=proxy_config.get("workers", 1),
log_level="info",
)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,169 @@
"""Data models for Oghma RAG Proxy."""
from __future__ import annotations
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field
class EducationLevel(str, Enum):
"""NPC education level determines lore depth."""
SCHOLAR = "scholar" # Full lore access
COMMONER = "commoner" # Basic summaries only
class NPCProfile(BaseModel):
"""Extracted NPC profile from SkyrimNet prompts."""
name: str = "Unknown"
race: str = "Unknown"
gender: str = "Unknown"
profession: str | None = None
factions: list[str] = Field(default_factory=list)
location: str | None = None
traits: list[str] = Field(default_factory=list)
# Computed
knowledge_classes: list[str] = Field(default_factory=list)
education_level: EducationLevel = EducationLevel.COMMONER
def compute_knowledge_classes(self) -> None:
"""Compute knowledge classes from profile attributes."""
classes = set()
# Race-based knowledge
race_map = {
"nord": ["nord"],
"dunmer": ["darkelf", "dunmer"],
"altmer": ["highelf", "altmer"],
"bosmer": ["woodelf", "bosmer"],
"argonian": ["argonian"],
"khajiit": ["khajiit"],
"breton": ["breton"],
"redguard": ["redguard"],
"orsimer": ["orc", "orsimer"],
"orc": ["orc", "orsimer"],
"imperial": ["imperial"],
}
race_lower = self.race.lower()
if race_lower in race_map:
classes.update(race_map[race_lower])
# Profession-based knowledge
profession_map = {
"priest": ["priest"],
"mage": ["mage", "scholar"],
"wizard": ["mage", "scholar"],
"scholar": ["scholar"],
"blacksmith": ["blacksmith"],
"guard": ["guard", "warrior"],
"soldier": ["warrior", "guard"],
"warrior": ["warrior"],
"thief": ["thief"],
"merchant": ["merchant"],
"innkeeper": ["innkeeper"],
"hunter": ["hunter"],
"farmer": ["peasant"],
"peasant": ["peasant"],
"noble": ["noble"],
"jarl": ["noble"],
"bard": ["bard"],
"alchemist": ["alchemist"],
}
if self.profession:
prof_lower = self.profession.lower()
if prof_lower in profession_map:
classes.update(profession_map[prof_lower])
# Location-based knowledge
location_map = {
"whiterun": ["whiterun"],
"windhelm": ["eastmarch"],
"solitude": ["haafingar"],
"riften": ["rift"],
"markarth": ["reach"],
"morthal": ["hjaalmarch"],
"dawnstar": ["pale"],
"winterhold": ["winterhold"],
"falkreath": ["falkreath"],
"solstheim": ["solstheim"],
}
if self.location:
loc_lower = self.location.lower()
if loc_lower in location_map:
classes.update(location_map[loc_lower])
# Faction-based knowledge
faction_map = {
"companions": ["companions"],
"college of winterhold": ["college", "mage"],
"college": ["college", "mage"],
"thieves guild": ["thieves"],
"dark brotherhood": ["darkbrotherhood"],
"stormcloaks": ["stormcloak"],
"stormcloak": ["stormcloak"],
"imperial legion": ["imperial"],
"legion": ["imperial"],
"thalmor": ["thalmor"],
"dawnguard": ["dawnguard"],
"volkihar": ["vampire", "volkihar"],
}
for faction in self.factions:
faction_lower = faction.lower()
if faction_lower in faction_map:
classes.update(faction_map[faction_lower])
self.knowledge_classes = list(classes)
# Determine education level
educated_professions = {"mage", "wizard", "scholar", "priest", "noble", "bard"}
educated_factions = {"college of winterhold", "thalmor", "college"}
if self.profession and self.profession.lower() in educated_professions:
self.education_level = EducationLevel.SCHOLAR
elif any(f.lower() in educated_factions for f in self.factions):
self.education_level = EducationLevel.SCHOLAR
else:
self.education_level = EducationLevel.COMMONER
class LoreEntry(BaseModel):
"""A retrieved lore entry from Oghma."""
topic: str
content: str
category: str
score: float
knowledge_classes: list[str] = Field(default_factory=list)
class ChatMessage(BaseModel):
"""OpenRouter-compatible chat message."""
role: str
content: str
name: str | None = None
class ChatCompletionRequest(BaseModel):
"""OpenRouter-compatible chat completion request."""
model: str
messages: list[ChatMessage]
temperature: float | None = None
max_tokens: int | None = None
stream: bool = False
# Allow additional fields to pass through
model_config = {"extra": "allow"}
class InjectionResult(BaseModel):
"""Result of lore injection."""
npc_profile: NPCProfile
lore_entries: list[LoreEntry]
injection_text: str
query_time_ms: float

View File

@@ -0,0 +1,173 @@
"""Oghma Lore Retriever - Queries ChromaDB for relevant Tamrielic lore."""
from __future__ import annotations
import time
from functools import lru_cache
from typing import TYPE_CHECKING
import chromadb
import structlog
from chromadb.config import Settings
from .models import EducationLevel, LoreEntry, NPCProfile
if TYPE_CHECKING:
from chromadb import Collection
logger = structlog.get_logger()
class OghmaRetriever:
"""Retrieves relevant lore from Oghma ChromaDB collections."""
def __init__(
self,
host: str = "iris-dev.eachpath.local",
port: int = 35000,
collection_lore: str = "oghma_lore",
collection_basic: str = "oghma_basic",
max_results: int = 5,
min_score: float = 0.55,
):
self.host = host
self.port = port
self.collection_lore_name = collection_lore
self.collection_basic_name = collection_basic
self.max_results = max_results
self.min_score = min_score
self._client: chromadb.HttpClient | None = None
self._collection_lore: Collection | None = None
self._collection_basic: Collection | None = None
def _get_client(self) -> chromadb.HttpClient:
"""Get or create ChromaDB client."""
if self._client is None:
self._client = chromadb.HttpClient(
host=self.host,
port=self.port,
settings=Settings(anonymized_telemetry=False),
)
logger.info("Connected to ChromaDB", host=self.host, port=self.port)
return self._client
def _get_collection(self, education_level: EducationLevel) -> Collection:
"""Get the appropriate collection based on education level."""
client = self._get_client()
if education_level == EducationLevel.SCHOLAR:
if self._collection_lore is None:
self._collection_lore = client.get_collection(self.collection_lore_name)
return self._collection_lore
else:
if self._collection_basic is None:
self._collection_basic = client.get_collection(self.collection_basic_name)
return self._collection_basic
def retrieve(
self,
query: str,
npc_profile: NPCProfile,
) -> tuple[list[LoreEntry], float]:
"""
Retrieve relevant lore entries for an NPC.
Args:
query: Conversation context to search for
npc_profile: NPC profile for knowledge filtering
Returns:
Tuple of (lore entries, query time in ms)
"""
if not query.strip():
return [], 0.0
start_time = time.perf_counter()
try:
collection = self._get_collection(npc_profile.education_level)
# Build metadata filter for knowledge classes
# NOTE: Currently disabled because CHIM's Oghma data doesn't have
# knowledge_class populated consistently. Enable when data is enriched.
where_filter = None
# TODO: Re-enable when knowledge_class data is available
# if npc_profile.knowledge_classes:
# if len(npc_profile.knowledge_classes) == 1:
# where_filter = {"knowledge_classes": {"$contains": npc_profile.knowledge_classes[0]}}
# else:
# where_filter = {
# "$or": [
# {"knowledge_classes": {"$contains": kc}}
# for kc in npc_profile.knowledge_classes
# ]
# }
# Query ChromaDB
results = collection.query(
query_texts=[query],
n_results=self.max_results,
where=where_filter,
include=["documents", "metadatas", "distances"],
)
# Parse results
entries = []
if results and results["documents"] and results["documents"][0]:
for i, doc in enumerate(results["documents"][0]):
metadata = results["metadatas"][0][i] if results["metadatas"] else {}
distance = results["distances"][0][i] if results["distances"] else 1.0
# Convert distance to similarity score (ChromaDB uses L2 distance)
# Lower distance = higher similarity
score = 1.0 / (1.0 + distance)
if score >= self.min_score:
entries.append(
LoreEntry(
topic=metadata.get("topic", "Unknown"),
content=doc,
category=metadata.get("category", "Unknown"),
score=score,
knowledge_classes=metadata.get("knowledge_classes", "").split(","),
)
)
query_time = (time.perf_counter() - start_time) * 1000
logger.info(
"Retrieved lore entries",
query_preview=query[:100],
npc_name=npc_profile.name,
education=npc_profile.education_level.value,
entries_found=len(entries),
query_time_ms=round(query_time, 2),
)
return entries, query_time
except Exception as e:
logger.error("Failed to retrieve lore", error=str(e))
query_time = (time.perf_counter() - start_time) * 1000
return [], query_time
def health_check(self) -> bool:
"""Check if ChromaDB is reachable."""
try:
client = self._get_client()
client.heartbeat()
return True
except Exception as e:
logger.error("ChromaDB health check failed", error=str(e))
return False
# Cached retriever instance
@lru_cache(maxsize=1)
def get_retriever(
host: str = "iris-dev.eachpath.local",
port: int = 35000,
) -> OghmaRetriever:
"""Get cached retriever instance."""
return OghmaRetriever(host=host, port=port)