"""Memory Manager - CLI tool for managing persistent memories.""" import argparse import json import os import re import sys from datetime import datetime from pathlib import Path from typing import Optional MEMORY_DIR = Path(__file__).parent.parent / "memories" INDEX_FILE = MEMORY_DIR / "index.md" def ensure_dirs(): """Ensure memory directory exists.""" MEMORY_DIR.mkdir(parents=True, exist_ok=True) if not INDEX_FILE.exists(): INDEX_FILE.write_text("# Memory Index\n\n", encoding="utf-8") def generate_id(title: str) -> str: """Generate a short ID from title.""" import hashlib return hashlib.md5(title.encode()).hexdigest()[:8] def parse_frontmatter(content: str) -> tuple[dict, str]: """Parse YAML frontmatter from markdown content.""" pattern = r'^---\n(.*?)\n---\n?(.*)$' match = re.match(pattern, content, re.DOTALL) if match: frontmatter_text = match.group(1) body = match.group(2) fm = {} for line in frontmatter_text.split('\n'): if ':' in line: key, value = line.split(':', 1) key = key.strip() value = value.strip().strip('[]"\'|') if value.startswith('['): fm[key] = [v.strip().strip('"\'') for v in value[1:-1].split(',')] else: fm[key] = value return fm, body return {}, content def create_frontmatter(data: dict, body: str) -> str: """Create markdown with frontmatter.""" lines = ["---"] for key, value in data.items(): if isinstance(value, list): lines.append(f"{key}: [{', '.join(value)}]") else: lines.append(f"{key}: {value}") lines.append("---\n") if body.strip(): lines.append(body.strip()) return "\n".join(lines) def add_memory(title: str, content: str, tags: list[str] = None) -> str: """Add a new memory.""" ensure_dirs() memory_id = generate_id(title) filepath = MEMORY_DIR / f"{memory_id}.md" if filepath.exists(): print(f"Memory with title '{title}' already exists as {memory_id}", file=sys.stderr) return memory_id now = datetime.now().isoformat() data = { "id": memory_id, "title": title, "created": now, "updated": now, "tags": tags or [] } md_content = create_frontmatter(data, content) filepath.write_text(md_content, encoding="utf-8") update_index(memory_id, title, tags or []) print(f"Memory added: {memory_id}") return memory_id def update_index(memory_id: str, title: str, tags: list[str]): """Update the index file.""" lines = ["# Memory Index\n", "| ID | Title | Tags |", "|---|-------|------|"] for md_file in sorted(MEMORY_DIR.glob("*.md")): if md_file.name == "index.md": continue fm, _ = parse_frontmatter(md_file.read_text(encoding="utf-8")) tags_str = ", ".join(fm.get("tags", [])) lines.append(f"| {fm.get('id', '')} | {fm.get('title', '')} | {tags_str} |") INDEX_FILE.write_text("\n".join(lines) + "\n", encoding="utf-8") def get_memory(memory_id: str): """Get a specific memory.""" filepath = MEMORY_DIR / f"{memory_id}.md" if not filepath.exists(): print(f"Memory {memory_id} not found", file=sys.stderr) return None content = filepath.read_text(encoding="utf-8") fm, body = parse_frontmatter(content) print(f"# {fm.get('title', 'Untitled')}") print(f"ID: {memory_id}") print(f"Created: {fm.get('created', 'N/A')}") print(f"Updated: {fm.get('updated', 'N/A')}") print(f"Tags: {', '.join(fm.get('tags', []))}") print(f"\n---\n{body}") return fm def list_memories(): """List all memories.""" ensure_dirs() memories = [] for md_file in MEMORY_DIR.glob("*.md"): if md_file.name == "index.md": continue fm, _ = parse_frontmatter(md_file.read_text(encoding="utf-8")) memories.append(fm) if not memories: print("No memories found.") return print(f"{'ID':<10} {'Title':<30} {'Tags':<20} {'Updated'}") print("-" * 80) for m in sorted(memories, key=lambda x: x.get('updated', ''), reverse=True): tags = ", ".join(m.get('tags', [])[:2]) print(f"{m.get('id', ''):<10} {m.get('title', ''):<30} {tags:<20} {m.get('updated', '')[:10]}") def search_memories(query: str): """Search memories by keyword.""" ensure_dirs() results = [] for md_file in MEMORY_DIR.glob("*.md"): if md_file.name == "index.md": continue content = md_file.read_text(encoding="utf-8").lower() if query.lower() in content: fm, body = parse_frontmatter(md_file.read_text(encoding="utf-8")) results.append(fm) if not results: print(f"No memories found matching '{query}'") return print(f"Found {len(results)} result(s):\n") for m in results: print(f"- [{m.get('id')}] {m.get('title')} ({', '.join(m.get('tags', []))})") def update_memory(memory_id: str, title: Optional[str] = None, content: Optional[str] = None, tags: list[str] = None): """Update an existing memory.""" filepath = MEMORY_DIR / f"{memory_id}.md" if not filepath.exists(): print(f"Memory {memory_id} not found", file=sys.stderr) return fm, body = parse_frontmatter(filepath.read_text(encoding="utf-8")) if title: fm['title'] = title if content: body = content if tags is not None: fm['tags'] = tags fm['updated'] = datetime.now().isoformat() filepath.write_text(create_frontmatter(fm, body), encoding="utf-8") update_index(memory_id, fm.get('title', ''), fm.get('tags', [])) print(f"Memory updated: {memory_id}") def delete_memory(memory_id: str): """Delete a memory.""" filepath = MEMORY_DIR / f"{memory_id}.md" if not filepath.exists(): print(f"Memory {memory_id} not found", file=sys.stderr) return filepath.unlink() update_index("", "", []) print(f"Memory deleted: {memory_id}") def main(): parser = argparse.ArgumentParser(description="Memory Manager CLI") subparsers = parser.add_subparsers(dest="command", help="Commands") # Add command add_parser = subparsers.add_parser("add", help="Add a new memory") add_parser.add_argument("--title", "-t", required=True, help="Memory title") add_parser.add_argument("--content", "-c", required=True, help="Memory content") add_parser.add_argument("--tags", nargs="*", default=[], help="Tags") # Get command get_parser = subparsers.add_parser("get", help="Get a memory") get_parser.add_argument("id", help="Memory ID") # List command subparsers.add_parser("list", help="List all memories") # Search command search_parser = subparsers.add_parser("search", help="Search memories") search_parser.add_argument("query", help="Search query") # Update command update_parser = subparsers.add_parser("update", help="Update a memory") update_parser.add_argument("id", help="Memory ID") update_parser.add_argument("--title", "-t", help="New title") update_parser.add_argument("--content", "-c", help="New content") update_parser.add_argument("--tags", nargs="*", help="New tags") # Delete command delete_parser = subparsers.add_parser("delete", help="Delete a memory") delete_parser.add_argument("id", help="Memory ID") args = parser.parse_args() if args.command == "add": add_memory(args.title, args.content, args.tags) elif args.command == "get": get_memory(args.id) elif args.command == "list": list_memories() elif args.command == "search": search_memories(args.query) elif args.command == "update": update_memory(args.id, args.title, args.content, args.tags) elif args.command == "delete": delete_memory(args.id) else: parser.print_help() if __name__ == "__main__": main()