259 lines
8.0 KiB
Python
259 lines
8.0 KiB
Python
"""Memory Manager - CLI tool for managing persistent memories."""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
MEMORY_DIR = Path(__file__).parent.parent / "memories"
|
|
INDEX_FILE = MEMORY_DIR / "index.md"
|
|
|
|
|
|
def ensure_dirs():
|
|
"""Ensure memory directory exists."""
|
|
MEMORY_DIR.mkdir(parents=True, exist_ok=True)
|
|
if not INDEX_FILE.exists():
|
|
INDEX_FILE.write_text("# Memory Index\n\n", encoding="utf-8")
|
|
|
|
|
|
def generate_id(title: str) -> str:
|
|
"""Generate a short ID from title."""
|
|
import hashlib
|
|
return hashlib.md5(title.encode()).hexdigest()[:8]
|
|
|
|
|
|
def parse_frontmatter(content: str) -> tuple[dict, str]:
|
|
"""Parse YAML frontmatter from markdown content."""
|
|
pattern = r'^---\n(.*?)\n---\n?(.*)$'
|
|
match = re.match(pattern, content, re.DOTALL)
|
|
if match:
|
|
frontmatter_text = match.group(1)
|
|
body = match.group(2)
|
|
|
|
fm = {}
|
|
for line in frontmatter_text.split('\n'):
|
|
if ':' in line:
|
|
key, value = line.split(':', 1)
|
|
key = key.strip()
|
|
value = value.strip().strip('[]"\'|')
|
|
if value.startswith('['):
|
|
fm[key] = [v.strip().strip('"\'') for v in value[1:-1].split(',')]
|
|
else:
|
|
fm[key] = value
|
|
return fm, body
|
|
return {}, content
|
|
|
|
|
|
def create_frontmatter(data: dict, body: str) -> str:
|
|
"""Create markdown with frontmatter."""
|
|
lines = ["---"]
|
|
for key, value in data.items():
|
|
if isinstance(value, list):
|
|
lines.append(f"{key}: [{', '.join(value)}]")
|
|
else:
|
|
lines.append(f"{key}: {value}")
|
|
lines.append("---\n")
|
|
if body.strip():
|
|
lines.append(body.strip())
|
|
return "\n".join(lines)
|
|
|
|
|
|
def add_memory(title: str, content: str, tags: list[str] = None) -> str:
|
|
"""Add a new memory."""
|
|
ensure_dirs()
|
|
|
|
memory_id = generate_id(title)
|
|
filepath = MEMORY_DIR / f"{memory_id}.md"
|
|
|
|
if filepath.exists():
|
|
print(f"Memory with title '{title}' already exists as {memory_id}", file=sys.stderr)
|
|
return memory_id
|
|
|
|
now = datetime.now().isoformat()
|
|
data = {
|
|
"id": memory_id,
|
|
"title": title,
|
|
"created": now,
|
|
"updated": now,
|
|
"tags": tags or []
|
|
}
|
|
|
|
md_content = create_frontmatter(data, content)
|
|
filepath.write_text(md_content, encoding="utf-8")
|
|
|
|
update_index(memory_id, title, tags or [])
|
|
|
|
print(f"Memory added: {memory_id}")
|
|
return memory_id
|
|
|
|
|
|
def update_index(memory_id: str, title: str, tags: list[str]):
|
|
"""Update the index file."""
|
|
lines = ["# Memory Index\n", "| ID | Title | Tags |", "|---|-------|------|"]
|
|
|
|
for md_file in sorted(MEMORY_DIR.glob("*.md")):
|
|
if md_file.name == "index.md":
|
|
continue
|
|
fm, _ = parse_frontmatter(md_file.read_text(encoding="utf-8"))
|
|
tags_str = ", ".join(fm.get("tags", []))
|
|
lines.append(f"| {fm.get('id', '')} | {fm.get('title', '')} | {tags_str} |")
|
|
|
|
INDEX_FILE.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
|
|
|
|
|
def get_memory(memory_id: str):
|
|
"""Get a specific memory."""
|
|
filepath = MEMORY_DIR / f"{memory_id}.md"
|
|
if not filepath.exists():
|
|
print(f"Memory {memory_id} not found", file=sys.stderr)
|
|
return None
|
|
|
|
content = filepath.read_text(encoding="utf-8")
|
|
fm, body = parse_frontmatter(content)
|
|
|
|
print(f"# {fm.get('title', 'Untitled')}")
|
|
print(f"ID: {memory_id}")
|
|
print(f"Created: {fm.get('created', 'N/A')}")
|
|
print(f"Updated: {fm.get('updated', 'N/A')}")
|
|
print(f"Tags: {', '.join(fm.get('tags', []))}")
|
|
print(f"\n---\n{body}")
|
|
return fm
|
|
|
|
|
|
def list_memories():
|
|
"""List all memories."""
|
|
ensure_dirs()
|
|
|
|
memories = []
|
|
for md_file in MEMORY_DIR.glob("*.md"):
|
|
if md_file.name == "index.md":
|
|
continue
|
|
fm, _ = parse_frontmatter(md_file.read_text(encoding="utf-8"))
|
|
memories.append(fm)
|
|
|
|
if not memories:
|
|
print("No memories found.")
|
|
return
|
|
|
|
print(f"{'ID':<10} {'Title':<30} {'Tags':<20} {'Updated'}")
|
|
print("-" * 80)
|
|
for m in sorted(memories, key=lambda x: x.get('updated', ''), reverse=True):
|
|
tags = ", ".join(m.get('tags', [])[:2])
|
|
print(f"{m.get('id', ''):<10} {m.get('title', ''):<30} {tags:<20} {m.get('updated', '')[:10]}")
|
|
|
|
|
|
def search_memories(query: str):
|
|
"""Search memories by keyword."""
|
|
ensure_dirs()
|
|
|
|
results = []
|
|
for md_file in MEMORY_DIR.glob("*.md"):
|
|
if md_file.name == "index.md":
|
|
continue
|
|
content = md_file.read_text(encoding="utf-8").lower()
|
|
if query.lower() in content:
|
|
fm, body = parse_frontmatter(md_file.read_text(encoding="utf-8"))
|
|
results.append(fm)
|
|
|
|
if not results:
|
|
print(f"No memories found matching '{query}'")
|
|
return
|
|
|
|
print(f"Found {len(results)} result(s):\n")
|
|
for m in results:
|
|
print(f"- [{m.get('id')}] {m.get('title')} ({', '.join(m.get('tags', []))})")
|
|
|
|
|
|
def update_memory(memory_id: str, title: Optional[str] = None, content: Optional[str] = None, tags: list[str] = None):
|
|
"""Update an existing memory."""
|
|
filepath = MEMORY_DIR / f"{memory_id}.md"
|
|
if not filepath.exists():
|
|
print(f"Memory {memory_id} not found", file=sys.stderr)
|
|
return
|
|
|
|
fm, body = parse_frontmatter(filepath.read_text(encoding="utf-8"))
|
|
|
|
if title:
|
|
fm['title'] = title
|
|
if content:
|
|
body = content
|
|
if tags is not None:
|
|
fm['tags'] = tags
|
|
|
|
fm['updated'] = datetime.now().isoformat()
|
|
|
|
filepath.write_text(create_frontmatter(fm, body), encoding="utf-8")
|
|
update_index(memory_id, fm.get('title', ''), fm.get('tags', []))
|
|
|
|
print(f"Memory updated: {memory_id}")
|
|
|
|
|
|
def delete_memory(memory_id: str):
|
|
"""Delete a memory."""
|
|
filepath = MEMORY_DIR / f"{memory_id}.md"
|
|
if not filepath.exists():
|
|
print(f"Memory {memory_id} not found", file=sys.stderr)
|
|
return
|
|
|
|
filepath.unlink()
|
|
update_index("", "", [])
|
|
print(f"Memory deleted: {memory_id}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Memory Manager CLI")
|
|
subparsers = parser.add_subparsers(dest="command", help="Commands")
|
|
|
|
# Add command
|
|
add_parser = subparsers.add_parser("add", help="Add a new memory")
|
|
add_parser.add_argument("--title", "-t", required=True, help="Memory title")
|
|
add_parser.add_argument("--content", "-c", required=True, help="Memory content")
|
|
add_parser.add_argument("--tags", nargs="*", default=[], help="Tags")
|
|
|
|
# Get command
|
|
get_parser = subparsers.add_parser("get", help="Get a memory")
|
|
get_parser.add_argument("id", help="Memory ID")
|
|
|
|
# List command
|
|
subparsers.add_parser("list", help="List all memories")
|
|
|
|
# Search command
|
|
search_parser = subparsers.add_parser("search", help="Search memories")
|
|
search_parser.add_argument("query", help="Search query")
|
|
|
|
# Update command
|
|
update_parser = subparsers.add_parser("update", help="Update a memory")
|
|
update_parser.add_argument("id", help="Memory ID")
|
|
update_parser.add_argument("--title", "-t", help="New title")
|
|
update_parser.add_argument("--content", "-c", help="New content")
|
|
update_parser.add_argument("--tags", nargs="*", help="New tags")
|
|
|
|
# Delete command
|
|
delete_parser = subparsers.add_parser("delete", help="Delete a memory")
|
|
delete_parser.add_argument("id", help="Memory ID")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.command == "add":
|
|
add_memory(args.title, args.content, args.tags)
|
|
elif args.command == "get":
|
|
get_memory(args.id)
|
|
elif args.command == "list":
|
|
list_memories()
|
|
elif args.command == "search":
|
|
search_memories(args.query)
|
|
elif args.command == "update":
|
|
update_memory(args.id, args.title, args.content, args.tags)
|
|
elif args.command == "delete":
|
|
delete_memory(args.id)
|
|
else:
|
|
parser.print_help()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|