"""Memory Store - Hierarchical file-based memory storage with MCP support.""" import argparse import json import os import re import sys import hashlib from datetime import datetime from pathlib import Path from typing import Optional, List # === Hierarchical Memory Store === class MemoryStore: """Hierarchical file-based memory storage with categories.""" def __init__(self, memory_dir: Path): self.memory_dir = Path(memory_dir) self.memory_dir.mkdir(parents=True, exist_ok=True) def _generate_id(self, title: str) -> str: return hashlib.md5(title.encode()).hexdigest()[:8] def _parse_frontmatter(self, content: str) -> tuple[dict, str]: pattern = r'^---\n(.*?)\n---\n?(.*)$' match = re.match(pattern, content, re.DOTALL) if match: fm_text = match.group(1) body = match.group(2) fm = {} for line in fm_text.split('\n'): if ':' in line: key, value = line.split(':', 1) key = key.strip() value = value.strip().strip('[]"\'') if value.startswith('['): fm[key] = [v.strip().strip('"\'') for v in value[1:-1].split(',') if v.strip()] else: fm[key] = value return fm, body return {}, content def _create_frontmatter(self, data: dict, body: str) -> str: lines = ["---"] for key, value in sorted(data.items()): if isinstance(value, list): lines.append(f"{key}: [{', '.join(value)}]") else: lines.append(f"{key}: {value}") lines.append("---\n") if body.strip(): lines.append(body.strip()) return "\n".join(lines) def _get_category_path(self, category: str = None) -> Path: if not category or category == "/": return self.memory_dir path = self.memory_dir / category.lstrip("/") path.mkdir(parents=True, exist_ok=True) return path def _get_memory_path(self, category: str, memory_id: str) -> Path: return self._get_category_path(category) / f"{memory_id}.md" def _relpath(self, path: Path) -> str: """Get relative path from memory_dir.""" try: return str(path.relative_to(self.memory_dir)).replace("\\", "/") except ValueError: return str(path) def add(self, title: str, content: str, category: str = None, tags: List[str] = None, id: str = None) -> str: """Add memory to category.""" memory_id = id or self._generate_id(title) filepath = self._get_memory_path(category, memory_id) now = datetime.now().isoformat() data = { "id": memory_id, "title": title, "created": now, "updated": now, "tags": tags or [], "category": category or "/" } filepath.write_text(self._create_frontmatter(data, content), encoding="utf-8") return memory_id def get(self, memory_id: str, category: str = None) -> Optional[dict]: """Get memory by ID, optionally in category.""" if category: filepath = self._get_memory_path(category, memory_id) if filepath.exists(): return self._read_memory(filepath) return None # Search all categories for md_file in self.memory_dir.rglob("*.md"): fm, _ = self._parse_frontmatter(md_file.read_text(encoding="utf-8")) if fm.get("id") == memory_id: result = fm result["content"] = self._read_memory(md_file)["content"] return result return None def _read_memory(self, filepath: Path) -> dict: fm, body = self._parse_frontmatter(filepath.read_text(encoding="utf-8")) fm["content"] = body.strip() fm["category"] = self._relpath(filepath.parent) return fm def list(self, category: str = None, recursive: bool = True) -> List[dict]: """List memories in category.""" memories = [] cat_path = self._get_category_path(category) pattern = "*.md" if recursive else "*.md" for md_file in cat_path.glob(pattern): if md_file.name.startswith("_"): continue fm, _ = self._parse_frontmatter(md_file.read_text(encoding="utf-8")) fm["category"] = self._relpath(md_file.parent) memories.append(fm) return sorted(memories, key=lambda x: (x.get('category', ''), x.get('updated', '')), reverse=True) def search(self, query: str, category: str = None) -> List[dict]: """Search memories.""" results = [] query_lower = query.lower() base_path = self._get_category_path(category) for md_file in base_path.rglob("*.md"): if md_file.name.startswith("_"): continue content = md_file.read_text(encoding="utf-8").lower() if query_lower in content: fm, body = self._parse_frontmatter(md_file.read_text(encoding="utf-8")) fm["content"] = body.strip() fm["category"] = self._relpath(md_file.parent) results.append(fm) return results def update(self, memory_id: str, category: str = None, title: str = None, content: str = None, tags: List[str] = None, new_category: str = None) -> bool: """Update memory, optionally move to new category.""" old_path = self._get_memory_path(category, memory_id) if not old_path.exists(): # Try search across all for md_file in self.memory_dir.rglob("*.md"): fm, _ = self._parse_frontmatter(md_file.read_text(encoding="utf-8")) if fm.get("id") == memory_id: old_path = md_file category = self._relpath(md_file.parent) break else: return False fm, body = self._parse_frontmatter(old_path.read_text(encoding="utf-8")) if title is not None: fm['title'] = title if content is not None: body = content if tags is not None: fm['tags'] = tags target_cat = new_category if new_category is not None else category fm['updated'] = datetime.now().isoformat() fm['category'] = target_cat or "/" new_path = self._get_memory_path(target_cat, memory_id) if new_path != old_path: old_path.unlink() new_path.write_text(self._create_frontmatter(fm, body), encoding="utf-8") return True def delete(self, memory_id: str, category: str = None) -> bool: """Delete memory.""" filepath = self._get_memory_path(category, memory_id) if not filepath.exists(): # Search all for md_file in self.memory_dir.rglob("*.md"): fm, _ = self._parse_frontmatter(md_file.read_text(encoding="utf-8")) if fm.get("id") == memory_id: filepath = md_file break else: return False filepath.unlink() # Clean empty category dirs cat_dir = filepath.parent if cat_dir != self.memory_dir and not any(cat_dir.iterdir()): cat_dir.rmdir() return True def ls(self, category: str = None) -> dict: """List category structure (like ls -la).""" cat_path = self._get_category_path(category) result = { "path": self._relpath(cat_path), "categories": [], "memories": [] } for item in sorted(cat_path.iterdir()): if item.name.startswith("_"): continue if item.is_dir(): result["categories"].append(item.name) else: fm, _ = self._parse_frontmatter(item.read_text(encoding="utf-8")) result["memories"].append({ "id": fm.get("id"), "title": fm.get("title"), "tags": fm.get("tags", []) }) return result def mkdir(self, category: str) -> bool: """Create category.""" cat_path = self._get_category_path(category) if cat_path.exists(): return False cat_path.mkdir(parents=True) return True def rmdir(self, category: str) -> bool: """Remove empty category.""" cat_path = self._get_category_path(category) if cat_path == self.memory_dir or not cat_path.exists(): return False # Check empty items = list(cat_path.iterdir()) if any(not i.name.startswith("_") for i in items): return False for item in items: if item.is_file(): item.unlink() elif item.is_dir(): import shutil shutil.rmtree(item) cat_path.rmdir() return True def export(self, format: str = "json", category: str = None) -> str: """Export memories.""" memories = self.list(category=category) if format == "json": return json.dumps(memories, indent=2, ensure_ascii=False) elif format == "markdown": lines = ["# Memory Export\n"] current_cat = None for m in memories: cat = m.get("category", "/") if cat != current_cat: lines.append(f"\n## {cat}/\n") current_cat = cat lines.append(f"### {m.get('title', 'Untitled')}\n") lines.append(f"**ID:** `{m.get('id', '')}` **Tags:** {', '.join(m.get('tags', []))}\n\n") return "".join(lines) elif format == "csv": lines = ["id,title,tags,category,created,updated,content\n"] for m in memories: tags = "|".join(m.get('tags', [])) content = (m.get('content', '') or '').replace('"', '""').replace('\n', ' ') lines.append(f'"{m.get("id", "")}","{m.get("title", "")}","{tags}","{m.get("category", "")}","{m.get("created", "")}","{m.get("updated", "")}","{content}"\n') return "".join(lines) return json.dumps(memories) # === CLI Interface === def cli(): parser = argparse.ArgumentParser(description="Memory Store CLI") parser.add_argument("--path", type=Path, default=Path(__file__).parent.parent / "memories") subparsers = parser.add_subparsers(dest="command", help="Commands") # Add add_p = subparsers.add_parser("add", help="Add memory") add_p.add_argument("-t", "--title", required=True) add_p.add_argument("-c", "--content", required=True) add_p.add_argument("--tags", nargs="*", default=[]) add_p.add_argument("--category", "-C", default="/") # Get get_p = subparsers.add_parser("get", help="Get memory") get_p.add_argument("id") get_p.add_argument("--category", "-C") # List list_p = subparsers.add_parser("list", help="List memories") list_p.add_argument("--category", "-C") list_p.add_argument("--no-recursive", action="store_true") # Search search_p = subparsers.add_parser("search", help="Search") search_p.add_argument("query") search_p.add_argument("--category", "-C") # Update update_p = subparsers.add_parser("update", help="Update") update_p.add_argument("id") update_p.add_argument("--category", "-C") update_p.add_argument("-t", "--title") update_p.add_argument("-c", "--content") update_p.add_argument("--tags", nargs="*") update_p.add_argument("--move", "-M", dest="new_category") # Delete delete_p = subparsers.add_parser("delete", help="Delete") delete_p.add_argument("id") delete_p.add_argument("--category", "-C") # ls (tree) ls_p = subparsers.add_parser("ls", help="List structure") ls_p.add_argument("--category", "-C", default="/") # mkdir mkdir_p = subparsers.add_parser("mkdir", help="Create category") mkdir_p.add_argument("category") # rmdir rmdir_p = subparsers.add_parser("rmdir", help="Remove empty category") rmdir_p.add_argument("category") # Export export_p = subparsers.add_parser("export", help="Export") export_p.add_argument("--format", choices=["json", "markdown", "csv"], default="json") export_p.add_argument("-o", "--output", type=Path) export_p.add_argument("--category", "-C") args = parser.parse_args() store = MemoryStore(args.path) if args.command == "add": mid = store.add(args.title, args.content, args.category, args.tags) print(f"Added: {mid} -> {args.category}") elif args.command == "get": m = store.get(args.id, args.category) if m: print(f"# {m['title']}\n") print(f"ID: {m['id']} Category: {m.get('category', '/')} Tags: {', '.join(m.get('tags', []))}") print(f"Created: {m['created']} Updated: {m['updated']}\n") print("---") print(m['content']) else: print(f"Not found: {args.id}", file=sys.stderr) elif args.command == "list": for m in store.list(args.category, not args.no_recursive): cat = m.get('category', '/') tags = ", ".join(m.get('tags', [])[:2]) print(f"{m['id']:<10} [{cat:<15}] {m['title']:<30} [{tags}]") elif args.command == "search": for m in store.search(args.query, args.category): print(f"- [{m.get('category', '/')}] {m['id']} {m['title']} ({', '.join(m.get('tags', []))})") elif args.command == "update": ok = store.update(args.id, args.category, args.title, args.content, args.tags, args.new_category) print("Updated" if ok else "Not found") elif args.command == "delete": ok = store.delete(args.id, args.category) print("Deleted" if ok else "Not found") elif args.command == "ls": result = store.ls(args.category) print(f"📁 {result['path']}/") if result['categories']: for c in result['categories']: print(f" 📂 {c}/") for m in result['memories']: print(f" 📄 {m['id']} - {m['title']}") elif args.command == "mkdir": ok = store.mkdir(args.category) print("Created" if ok else "Already exists") elif args.command == "rmdir": ok = store.rmdir(args.category) print("Removed" if ok else "Failed (not empty or not found)") elif args.command == "export": output = store.export(args.format, args.category) if args.output: args.output.write_text(output, encoding="utf-8") print(f"Exported to {args.output}") else: print(output) else: parser.print_help() # === MCP JSON-RPC Server === def mcp_server(): """Run as MCP JSON-RPC server (stdio).""" store = MemoryStore(Path(__file__).parent.parent / "memories") for line in sys.stdin: try: request = json.loads(line.strip()) method = request.get("method", "") params = request.get("params", {}) req_id = request.get("id") result = None error = None try: if method == "add": result = store.add( params.get("title", ""), params.get("content", ""), params.get("category"), params.get("tags", []) ) elif method == "get": result = store.get(params.get("id", ""), params.get("category")) elif method == "list": result = store.list(params.get("category"), params.get("recursive", True)) elif method == "search": result = store.search(params.get("query", ""), params.get("category")) elif method == "update": result = store.update( params.get("id", ""), params.get("category"), params.get("title"), params.get("content"), params.get("tags"), params.get("new_category") ) elif method == "delete": result = store.delete(params.get("id", ""), params.get("category")) elif method == "ls": result = store.ls(params.get("category")) elif method == "mkdir": result = store.mkdir(params.get("category", "")) elif method == "rmdir": result = store.rmdir(params.get("category", "")) elif method == "export": result = store.export(params.get("format", "json"), params.get("category")) else: error = {"code": -32601, "message": f"Unknown method: {method}"} except Exception as e: error = {"code": -32603, "message": str(e)} response = {"jsonrpc": "2.0", "id": req_id} if error: response["error"] = error else: response["result"] = result print(json.dumps(response), flush=True) except json.JSONDecodeError: continue except Exception: continue if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == "--mcp": mcp_server() else: cli()