Context Addition Deep-Dive¶
Comprehensive guide to the architecture and implementation of GuardKit's Graphiti context addition feature.
Table of Contents¶
- Overview
- Architecture
- Parser System
- Episode Generation
- Metadata Strategy
- Implementation Details
- Extension Points
- Performance Considerations
Overview¶
The context addition feature allows users to add structured knowledge from markdown files to the Graphiti knowledge graph. This enables semantic search across project documentation, architecture decisions, feature specifications, and other artifacts.
Design Goals¶
- Automatic Detection: Detect document type from filename and content
- Structured Extraction: Extract metadata and relationships
- Flexible Input: Support single files or bulk directory operations
- Type Safety: Validate content against expected structure
- Extensibility: Easy to add new parser types
- Error Handling: Graceful degradation with clear error messages
Use Cases¶
- Project Setup: Seed initial project documentation
- Architecture Decisions: Track ADRs in knowledge graph
- Feature Planning: Make feature specs searchable
- Knowledge Management: Centralize project knowledge
- Onboarding: Help new team members discover context
Architecture¶
Component Diagram¶
┌─────────────────────────────────────────────────┐
│ guardkit graphiti add-context │
│ (CLI Command) │
└────────────────┬────────────────────────────────┘
│
├─ Path Resolution
├─ File Discovery (glob patterns)
└─ Batch Processing
│
┌────────────────┴────────────────┐
│ ParserRegistry │
│ - register(parser) │
│ - get_parser(type) │
│ - detect_parser(file, content) │
└────────┬────────────────────────┘
│
┌───────┴────────┐
│ Parser Chain │
└───────┬────────┘
│
┌───────────┼───────────────────────┐
│ │ │
┌─────▼─────┐ ┌──▼──────┐ ┌──────────▼──────────┐
│ ADR │ │ Feature │ │ ProjectOverview │
│ Parser │ │ Spec │ │ Parser │
└─────┬─────┘ └──┬──────┘ └──────────┬──────────┘
│ │ │
└──────────┴────────────────────┘
│
┌──────▼──────┐
│ ParseResult│
│ - success │
│ - episodes │
│ - warnings │
└──────┬──────┘
│
┌────────▼────────┐
│ GraphitiClient │
│ add_episode() │
└─────────────────┘
Data Flow¶
User Input (file/directory)
│
├─> Path Resolution
│ └─> File Discovery (glob)
│
├─> For each file:
│ ├─> Read content
│ ├─> Detect parser (registry)
│ ├─> Parse content → ParseResult
│ │ ├─> Extract metadata
│ │ ├─> Generate episodes
│ │ └─> Collect warnings
│ │
│ └─> Add episodes to Graphiti
│ └─> client.add_episode()
│
└─> Summary Report
├─> Files processed
├─> Episodes added
├─> Warnings
└─> Errors
Parser System¶
Base Parser Interface¶
All parsers implement the BaseParser abstract base class:
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
@dataclass
class EpisodeData:
"""Data for creating a Graphiti episode."""
entity_id: str # Unique identifier (e.g., "ADR-001")
entity_type: str # Type (e.g., "adr", "task")
content: str # Full text content
group_id: str # Graphiti group (e.g., "guardkit__project_decisions")
metadata: dict # Additional metadata
@dataclass
class ParseResult:
"""Result of parsing a file."""
success: bool
episodes: list[EpisodeData]
warnings: list[str]
class BaseParser(ABC):
@property
@abstractmethod
def parser_type(self) -> str:
"""Unique identifier for this parser."""
pass
@property
@abstractmethod
def supported_extensions(self) -> list[str]:
"""File extensions this parser handles."""
pass
@abstractmethod
def can_parse(self, content: str, file_path: str) -> bool:
"""Check if this parser can handle the given content."""
pass
@abstractmethod
def parse(self, content: str, file_path: str) -> ParseResult:
"""Parse content and return structured result."""
pass
Parser Registry¶
The registry manages parser registration and selection:
class ParserRegistry:
def __init__(self):
self._parsers: dict[str, BaseParser] = {}
self._extension_map: dict[str, str] = {}
def register(self, parser: BaseParser):
"""Register a parser and its extensions."""
self._parsers[parser.parser_type] = parser
for ext in parser.supported_extensions:
self._extension_map[ext.lower()] = parser.parser_type
def get_parser(self, parser_type: str) -> Optional[BaseParser]:
"""Get parser by type name."""
return self._parsers.get(parser_type)
def detect_parser(self, file_path: str, content: str) -> Optional[BaseParser]:
"""Auto-detect appropriate parser."""
# Strategy 1: Extension-based detection
ext = Path(file_path).suffix.lower()
if ext in self._extension_map:
parser_type = self._extension_map[ext]
parser = self._parsers.get(parser_type)
if parser and parser.can_parse(content, file_path):
return parser
# Strategy 2: Try all parsers via can_parse
for parser in self._parsers.values():
if parser.can_parse(content, file_path):
return parser
return None
Detection Strategy¶
Parser selection follows this priority order:
- Explicit
--typeflag: Use specified parser directly - Filename pattern: Match against known patterns (ADR-*, FEATURE-SPEC-*)
- Content analysis: Call
can_parse()on all parsers - Fallback: Use generic
project-docparser
Example - ADR Detection:
class ADRParser(BaseParser):
def can_parse(self, content: str, file_path: str) -> bool:
# Check filename pattern
filename = Path(file_path).name.lower()
if filename.startswith('adr-'):
return True
# Check content structure
has_status = '## Status' in content or '## status' in content.lower()
has_context = '## Context' in content or '## context' in content.lower()
has_decision = '## Decision' in content or '## decision' in content.lower()
return has_status and has_context and has_decision
Episode Generation¶
Episode Structure¶
Each parser generates one or more episodes from a single file:
ADR Parser (1 episode):
EpisodeData(
entity_id="ADR-001",
entity_type="adr",
content="[Full ADR text with all sections]",
group_id="guardkit__project_decisions",
metadata={
"title": "Use Graphiti for persistent memory",
"status": "Accepted",
"decision_date": "2024-01-15",
"entity_type": "adr",
"source_file": "docs/architecture/ADR-001.md"
}
)
Feature Spec Parser (N episodes):
# Feature overview episode
EpisodeData(
entity_id="FEAT-AUTH",
entity_type="feature-spec",
content="[Feature overview with objectives, phases]",
group_id="guardkit__feature_specs",
metadata={...}
)
# Task episodes (one per task)
EpisodeData(
entity_id="TASK-AUTH-001",
entity_type="task",
content="[Task description and acceptance criteria]",
group_id="guardkit__feature_specs",
metadata={
"feature_id": "FEAT-AUTH",
"wave": 1,
"depends_on": [],
...
}
)
Group ID Strategy¶
Group IDs organize episodes into logical collections:
System-Level Groups:
- guardkit_templates
- guardkit_patterns
- guardkit_workflows
- product_knowledge
- command_workflows
- quality_gate_phases
Project-Level Groups:
- {project}__project_overview
- {project}__project_architecture
- {project}__feature_specs
- {project}__project_decisions
- {project}__project_docs
Format: {namespace}__{category}
- System namespace: guardkit, product_knowledge, etc.
- Project namespace: {project_name} (e.g., myapp)
- Category: overview, decisions, feature_specs, etc.
Content Formatting¶
Episode content is formatted for optimal semantic search:
ADR Content Template:
# ADR-{number}: {title}
Status: {status}
Date: {decision_date}
## Context
{context_text}
## Decision
{decision_text}
## Consequences
{consequences_text}
Source: {file_path}
Feature Spec Content Template:
# Feature: {feature_name}
Feature ID: {feature_id}
Status: {status}
Priority: {priority}
## Overview
{overview_text}
## Objectives
{objectives_list}
## Phases
{phases_list}
## Dependencies
{dependencies_list}
Source: {file_path}
Metadata Strategy¶
Standard Metadata Fields¶
All episodes include these standard fields:
{
"entity_id": "unique-identifier",
"entity_type": "adr | feature-spec | task | project-overview | project-doc",
"source_file": "relative/path/to/file.md",
"_metadata": {
"source": "add_context",
"created_at": "2024-01-30T12:00:00Z",
"updated_at": "2024-01-30T12:00:00Z",
"parser_type": "adr",
"parser_version": "1.0.0"
}
}
Type-Specific Metadata¶
ADR Metadata:
{
"title": "Decision title",
"status": "Accepted | Rejected | Proposed | Deprecated",
"decision_date": "2024-01-15",
"context": "Background...",
"decision": "We will...",
"consequences": "Benefits and trade-offs..."
}
Feature Spec Metadata:
{
"feature_id": "FEAT-AUTH",
"feature_name": "User Authentication",
"status": "planned | in_progress | completed",
"priority": "high | medium | low",
"phases": ["planning", "implementation", "testing"],
"dependencies": ["FEAT-DB"]
}
Task Metadata:
{
"task_id": "TASK-AUTH-001",
"task_name": "Implement JWT tokens",
"status": "backlog | in_progress | completed",
"priority": "high | medium | low",
"feature_id": "FEAT-AUTH",
"depends_on": ["TASK-AUTH-002"],
"wave": 1,
"implementation_mode": "task-work | direct | manual"
}
Metadata Usage¶
Semantic Search: - Status filtering: Find all "Accepted" ADRs - Priority sorting: High-priority tasks first - Dependency tracking: Tasks that depend on X
Deduplication:
- entity_id uniqueness check
- source_file tracking prevents re-adding
- updated_at timestamp for version detection
Provenance:
- source_file tracks origin
- parser_type identifies extraction method
- created_at / updated_at for audit trail
Implementation Details¶
CLI Command Implementation¶
@graphiti.command("add-context")
@click.argument("path")
@click.option("--type", "parser_type", help="Force parser type")
@click.option("--force", "-f", is_flag=True, help="Overwrite existing")
@click.option("--dry-run", is_flag=True, help="Preview only")
@click.option("--pattern", default="**/*.md", help="Glob pattern")
@click.option("--verbose", "-v", is_flag=True, help="Detailed output")
@click.option("--quiet", "-q", is_flag=True, help="Minimal output")
def add_context(path, parser_type, force, dry_run, pattern, verbose, quiet):
"""Add context from files to Graphiti."""
asyncio.run(_cmd_add_context(path, parser_type, force, dry_run, pattern, verbose, quiet))
async def _cmd_add_context(path, parser_type, force, dry_run, pattern, verbose, quiet):
# 1. Validate inputs
target_path = Path(path)
if not target_path.exists():
console.print(f"[red]Error: Path does not exist: {path}[/red]")
raise SystemExit(1)
# 2. Connect to Graphiti
client = GraphitiClient()
await client.initialize()
# 3. Collect files to process
files = []
if target_path.is_file():
files.append(path)
else:
files.extend(str(f) for f in target_path.glob(pattern) if f.is_file())
# 4. Process each file
registry = ParserRegistry()
for file_path in files:
content = Path(file_path).read_text()
# Detect or use specified parser
parser = registry.get_parser(parser_type) if parser_type else registry.detect_parser(file_path, content)
if not parser:
console.print(f"[yellow]No parser for: {file_path}[/yellow]")
continue
# Parse the file
result = parser.parse(content, file_path)
# Add episodes to Graphiti
if not dry_run:
for episode in result.episodes:
await client.add_episode(
name=episode.entity_id,
episode_body=episode.content,
group_id=episode.group_id,
metadata=episode.metadata
)
# 5. Close client
await client.close()
Error Handling¶
File-Level Errors:
try:
content = Path(file_path).read_text()
except Exception as e:
errors.append(f"{file_path}: Error reading file - {e}")
continue
Parse-Level Errors:
result = parser.parse(content, file_path)
if not result.success:
errors.append(f"{file_path}: Parse failed")
for warn in result.warnings:
warnings.append(f"{file_path}: {warn}")
continue
Episode-Level Errors:
try:
await client.add_episode(...)
except Exception as e:
errors.append(f"{file_path}: Error adding episode - {e}")
Batch Processing¶
Files are processed sequentially to maintain order and handle errors gracefully:
for file_path in files_to_process:
try:
# Read → Parse → Add
process_file(file_path)
files_processed += 1
except Exception as e:
errors.append(f"{file_path}: {e}")
continue # Continue with next file
Benefits: - Partial success (some files succeed even if others fail) - Clear error reporting per file - Graceful degradation
Trade-offs: - Sequential processing (not parallel) - Slower for large batches - Could optimize with async batching
Extension Points¶
Adding New Parser Types¶
- Create Parser Class:
from guardkit.integrations.graphiti.parsers.base import BaseParser, ParseResult, EpisodeData
class MyCustomParser(BaseParser):
@property
def parser_type(self) -> str:
return "custom-type"
@property
def supported_extensions(self) -> list[str]:
return [".md", ".txt"]
def can_parse(self, content: str, file_path: str) -> bool:
# Custom detection logic
return "## My Custom Marker" in content
def parse(self, content: str, file_path: str) -> ParseResult:
# Extract metadata
metadata = self._extract_metadata(content)
# Create episode
episode = EpisodeData(
entity_id=metadata["id"],
entity_type="custom-type",
content=content,
group_id=f"{project}__custom_docs",
metadata=metadata
)
return ParseResult(
success=True,
episodes=[episode],
warnings=[]
)
- Register Parser:
# In guardkit/integrations/graphiti/parsers/__init__.py
from .my_custom import MyCustomParser
__all__ = [
"BaseParser",
"MyCustomParser", # Add to exports
# ...
]
- Use in CLI:
Customizing Metadata¶
Override _extract_metadata() in your parser:
def _extract_metadata(self, content: str) -> dict:
# Custom extraction logic
metadata = {
"custom_field_1": extract_field_1(content),
"custom_field_2": extract_field_2(content),
"entity_type": self.parser_type,
}
return metadata
Adding Validation Rules¶
Implement custom validation in parse():
def parse(self, content: str, file_path: str) -> ParseResult:
warnings = []
# Required sections check
if "## Required Section" not in content:
warnings.append("Missing required section")
# Validation checks
if not self._validate_structure(content):
return ParseResult(
success=False,
episodes=[],
warnings=["Invalid structure"]
)
# Continue with parsing...
Performance Considerations¶
File Discovery Optimization¶
Current: Python glob with Path.glob(pattern)
Optimization opportunities: - Cache file lists for repeated operations - Parallel file reading (asyncio) - Skip unchanged files (checksum comparison)
Parsing Performance¶
Current: Sequential processing per file
Bottlenecks: - File I/O (disk reads) - Regex matching in parsers - Network calls to Graphiti
Optimizations:
- Batch episode additions (single API call)
- Parallel parsing with asyncio.gather()
- Parser result caching
Memory Usage¶
Current approach: - Load full file content into memory - Create all episodes before adding - Single file at a time
Memory profile: - Small files (<100KB): Minimal impact - Large files (>1MB): Could stream content - Many files (1000+): Process in batches
Graphiti API Efficiency¶
Current: One API call per episode
Optimization:
# Instead of:
for episode in episodes:
await client.add_episode(...)
# Use batch operation:
await client.add_episodes_batch(episodes)
Caching Strategy¶
File-level cache:
# Skip unchanged files
file_hash = hashlib.sha256(content.encode()).hexdigest()
if existing_episode_hash == file_hash:
skip_file()
Parser-level cache:
Testing Strategy¶
Unit Tests¶
Parser tests:
def test_adr_parser_detects_valid_adr():
parser = ADRParser()
content = """
# ADR-001: Title
## Status
Accepted
## Context
...
## Decision
...
"""
assert parser.can_parse(content, "ADR-001.md")
def test_adr_parser_extracts_metadata():
parser = ADRParser()
result = parser.parse(valid_adr_content, "ADR-001.md")
assert result.success
assert len(result.episodes) == 1
assert result.episodes[0].metadata["status"] == "Accepted"
Registry tests:
def test_registry_detects_by_filename():
registry = ParserRegistry()
registry.register(ADRParser())
parser = registry.detect_parser("ADR-001.md", content)
assert parser.parser_type == "adr"
Integration Tests¶
End-to-end command tests:
async def test_add_context_single_file(tmp_path):
# Create test file
adr_file = tmp_path / "ADR-001.md"
adr_file.write_text(valid_adr_content)
# Run command
await _cmd_add_context(str(adr_file), None, False, False, "**/*.md", False, False)
# Verify episode in Graphiti
results = await client.search("ADR-001", group_ids=["test__project_decisions"])
assert len(results) > 0
See Also¶
- Graphiti Add Context Guide - Command reference
- Graphiti Parsers Guide - Parser documentation
- Episode Metadata Deep-Dive - Metadata schema details
- Graphiti Integration Guide - Overall architecture