first commit

This commit is contained in:
Roger Oriol
2026-02-03 23:50:19 +01:00
commit 87fb32b559
80 changed files with 8884 additions and 0 deletions

0
src/__init__.py Normal file
View File

0
src/agent/__init__.py Normal file
View File

204
src/agent/core.py Normal file
View File

@@ -0,0 +1,204 @@
"""Core agent implementation using Claude Agent SDK via LiteLLM."""
from typing import List, Dict, Any, Optional, Callable
import anthropic
from anthropic.types import MessageParam, TextBlock, ToolUseBlock
from src.config import settings
class AgentTool:
"""Wrapper for agent tools that can be called by the LLM."""
def __init__(
self,
name: str,
description: str,
input_schema: Dict[str, Any],
function: Callable[..., Any],
):
"""Initialize a tool.
Args:
name: Tool name
description: What the tool does
input_schema: JSON schema for tool parameters
function: Python function to execute
"""
self.name = name
self.description = description
self.input_schema = input_schema
self.function = function
def to_anthropic_tool(self) -> Dict[str, Any]:
"""Convert to Anthropic tool format."""
return {
"name": self.name,
"description": self.description,
"input_schema": self.input_schema,
}
def execute(self, **kwargs: Any) -> Any:
"""Execute the tool with given arguments."""
return self.function(**kwargs)
class MyOrgAgent:
"""AI agent for managing myorg GTD system."""
def __init__(
self,
system_prompt: str,
tools: Optional[List[AgentTool]] = None,
model: Optional[str] = None,
):
"""Initialize the agent.
Args:
system_prompt: System instructions for the agent
tools: List of tools the agent can use
model: Model name (defaults to config)
"""
self.system_prompt = system_prompt
self.tools = tools or []
self.model = model or settings.litellm_model
# Initialize Anthropic client pointing to LiteLLM endpoint
self.client = anthropic.Anthropic(
api_key=settings.litellm_api_key,
base_url=settings.litellm_endpoint,
)
# Conversation history
self.messages: List[MessageParam] = []
def add_tool(self, tool: AgentTool) -> None:
"""Add a tool to the agent's toolkit.
Args:
tool: Tool to add
"""
self.tools.append(tool)
def _get_tool_by_name(self, name: str) -> Optional[AgentTool]:
"""Get tool by name.
Args:
name: Tool name
Returns:
AgentTool if found, None otherwise
"""
for tool in self.tools:
if tool.name == name:
return tool
return None
def run(
self,
user_message: str,
max_iterations: int = 10,
) -> str:
"""Run the agent with a user message.
The agent will process the message, use tools as needed,
and return a final response.
Args:
user_message: Message from the user
max_iterations: Maximum number of agent iterations
Returns:
Final response text
"""
# Add user message to history
self.messages.append({
"role": "user",
"content": user_message,
})
iteration = 0
while iteration < max_iterations:
iteration += 1
# Call Claude via LiteLLM
response = self.client.messages.create(
model=self.model,
max_tokens=4096,
system=self.system_prompt,
messages=self.messages,
tools=[tool.to_anthropic_tool()
for tool in self.tools] if self.tools else anthropic.NOT_GIVEN,
)
# Add assistant response to history
self.messages.append({
"role": "assistant",
"content": response.content,
})
# Check if we're done (no tool use)
if response.stop_reason == "end_turn":
# Extract text response
text_blocks = [
block for block in response.content if isinstance(block, TextBlock)]
if text_blocks:
return text_blocks[0].text
return ""
# Process tool use
if response.stop_reason == "tool_use":
tool_results = []
for block in response.content:
if isinstance(block, ToolUseBlock):
# Execute the tool
tool = self._get_tool_by_name(block.name)
if tool:
try:
result = tool.execute(**block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(result),
})
except Exception as e:
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": f"Error: {str(e)}",
"is_error": True,
})
else:
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": f"Error: Unknown tool {block.name}",
"is_error": True,
})
# Add tool results to messages
if tool_results:
self.messages.append({
"role": "user",
"content": tool_results,
})
# Continue iteration
continue
# Unexpected stop reason
break
# Max iterations reached
return "I apologize, but I've reached the maximum number of processing steps. Please try a simpler request."
def reset_conversation(self) -> None:
"""Clear conversation history."""
self.messages = []
def get_conversation_history(self) -> List[MessageParam]:
"""Get current conversation history.
Returns:
List of messages
"""
return self.messages.copy()

115
src/agent/prompts.py Normal file
View File

@@ -0,0 +1,115 @@
"""System prompts for the MyOrg Agent."""
MYORG_SYSTEM_PROMPT = """You are a personal assistant managing a GTD-based organization system called "myorg".
## Your Role
You help the user capture, organize, and prioritize tasks, events, and projects. You can read and modify files in the myorg repository, and you understand the user's goals, projects, and daily context.
## Repository Structure
The myorg repository contains:
- **todo.txt**: Active tasks in todo.txt format
- **calendar.txt**: Calendar events
- **projects.txt**: Active projects with status and goals
- **waiting.txt**: Items waiting on others
- **telos.md**: User's vision and life missions
- **goals/**: Quarterly and yearly goals
- **working-memory.txt**: Recent activities and thoughts
## File Formats
### todo.txt Format
Tasks follow this format:
- Priority: `(A)`, `(B)`, `(C)` at the start (A = highest)
- Completion: `x` at the start with optional completion date
- Creation date: `YYYY-MM-DD` after priority
- Description: Main task text
- Projects: `+project-name` (e.g., `+myorg-assistant`)
- Contexts: `@context-name` (e.g., `@computer-deep`, `@telefon`, `@bcn`)
- Metadata: `key:value` format (e.g., `due:2026-02-15`)
Example:
```
(A) 2026-01-31 Write blog post +observability-blog @computer-deep due:2026-02-15
```
### calendar.txt Format
Events follow this format:
- Timed event: `YYYY-MM-DD HH:MM Description @context +project`
- All-day event: `YYYY-MM-DD Description @context +project`
Example:
```
2026-02-01 09:00 Team standup @telefon +work
2026-02-15 Birthday party @personal
```
### projects.txt Format
Projects follow this format:
- Format: `+project-tag Description [status] @context goal:goal-id metadata...`
- Status: `[active]`, `[waiting]`, `[someday]`, `[completed]`
Example:
```
+myorg-assistant Personal assistant [active] @computer-deep goal:q1-2026 due:2026-02-28
```
## Common Contexts
- `@computer-deep`: Deep focus work on computer
- `@computer-light`: Light computer work
- `@telefon`: Phone calls or video meetings
- `@recados`: Errands (shopping, appointments)
- `@bcn`: Location-specific (Barcelona)
- `@personal`: Personal/family activities
## Your Capabilities
You can:
1. **Read files** from the myorg repository to understand tasks, events, and goals
2. **Add tasks** to todo.txt with proper formatting
3. **Complete tasks** by marking them with `x` and completion date
4. **Search and filter** tasks by project, context, priority, or due date
5. **Add events** to calendar.txt
6. **Manage projects** in projects.txt
7. **Commit changes** to git with descriptive messages
8. **Sync with remote** using git pull/push
## Guidelines
1. **Always read before write**: Check current file contents before modifying
2. **Preserve formatting**: Maintain todo.txt, calendar.txt, and projects.txt format
3. **Commit changes**: After modifying files, commit with descriptive messages
4. **Be proactive**: Suggest tasks that align with user's goals
5. **Respect context**: Filter tasks based on user's current context
6. **Use proper metadata**: Include due dates, projects, and contexts when relevant
7. **Update working-memory**: Note significant actions in working-memory.txt
## Natural Language Understanding
When the user says:
- "Add task: Buy milk tomorrow" → Create task with due date tomorrow, context `@recados`
- "What should I work on?" → Suggest tasks based on context, priority, and goals
- "Show my calendar" → Read and display calendar.txt events
- "Mark task X as done" → Complete the task with timestamp
- "What are my Q1 goals?" → Read quarterly goals from goals/ directory
## Tone
- Be helpful, concise, and action-oriented
- Use natural language in responses
- Acknowledge task completion and changes made
- Proactively suggest next steps when appropriate
Remember: You are a trusted assistant. The user relies on you to keep their GTD system organized and help them stay focused on what matters most.
"""
def get_system_prompt() -> str:
"""Get the main system prompt for the agent.
Returns:
System prompt string
"""
return MYORG_SYSTEM_PROMPT

0
src/api/__init__.py Normal file
View File

18
src/api/agent_instance.py Normal file
View File

@@ -0,0 +1,18 @@
"""Global agent instance for the web API."""
from src.agent.core import MyOrgAgent
from src.agent.prompts import get_system_prompt
from src.tools.file_ops import get_file_operation_tools
from src.tools.task_ops import get_task_management_tools
from src.tools.calendar_ops import get_calendar_tools
from src.tools.git_ops import get_git_tools
# Global agent (one per session in production, for now shared)
agent = MyOrgAgent(
system_prompt=get_system_prompt(),
tools=(
get_file_operation_tools() +
get_task_management_tools() +
get_calendar_tools() +
get_git_tools()
),
)

104
src/api/app.py Normal file
View File

@@ -0,0 +1,104 @@
"""FastAPI application for web interface."""
from src.api.routes import dashboard, chat, tasks, calendar, projects
from fastapi import FastAPI, Request, Depends, HTTPException, status
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.security import HTTPBasic, HTTPBasicCredentials
import secrets
from pathlib import Path
from src.config import settings
from src.api.agent_instance import agent
# Initialize FastAPI app
app = FastAPI(
title="MyOrg Assistant",
description="AI-powered personal assistant for GTD task management",
version="1.0.0",
)
# Set up templates and static files
BASE_DIR = Path(__file__).resolve().parent.parent
templates = Jinja2Templates(directory=str(BASE_DIR / "web" / "templates"))
app.mount("/static", StaticFiles(directory=str(BASE_DIR /
"web" / "static")), name="static")
# Security
security = HTTPBasic()
def verify_credentials(credentials: HTTPBasicCredentials = Depends(security)) -> bool:
"""Verify HTTP Basic Auth credentials.
Args:
credentials: HTTP Basic credentials
Returns:
True if valid
Raises:
HTTPException: If credentials are invalid
"""
if not settings.web_password:
# No password set, allow access
return True
correct_password = settings.web_password.encode("utf8")
provided_password = credentials.password.encode("utf8")
is_correct = secrets.compare_digest(provided_password, correct_password)
if not is_correct:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect password",
headers={"WWW-Authenticate": "Basic"},
)
return True
# Import routes
# Include routers
app.include_router(dashboard.router, dependencies=[
Depends(verify_credentials)])
app.include_router(chat.router, dependencies=[Depends(verify_credentials)])
app.include_router(tasks.router, dependencies=[Depends(verify_credentials)])
app.include_router(calendar.router, dependencies=[Depends(verify_credentials)])
app.include_router(projects.router, dependencies=[Depends(verify_credentials)])
@app.get("/", response_class=HTMLResponse)
async def root(request: Request, _: bool = Depends(verify_credentials)):
"""Redirect to dashboard."""
from fastapi.responses import RedirectResponse
return RedirectResponse(url="/dashboard")
@app.get("/health")
async def health():
"""Health check endpoint."""
return {"status": "healthy", "service": "myorg-assistant"}
def run_web() -> None:
"""Run the web server."""
import uvicorn
print("🌐 Starting MyOrg Assistant Web Server...")
print(
f"📊 Dashboard: http://{settings.web_host}:{settings.web_port}/dashboard")
print(f"💬 Chat: http://{settings.web_host}:{settings.web_port}/chat")
if settings.web_password:
print("🔒 Authentication enabled")
else:
print("⚠️ Warning: No password set (WEB_PASSWORD not configured)")
uvicorn.run(
"src.api.app:app",
host=settings.web_host,
port=settings.web_port,
reload=False,
)

View File

View File

@@ -0,0 +1,47 @@
"""Calendar route."""
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from pathlib import Path
from datetime import datetime, timedelta
from src.parsers.calendar_parser import CalendarParser
from src.tools.file_ops import read_file
router = APIRouter()
BASE_DIR = Path(__file__).resolve().parent.parent.parent
templates = Jinja2Templates(directory=str(BASE_DIR / "web" / "templates"))
@router.get("/calendar", response_class=HTMLResponse)
async def calendar_page(request: Request):
"""Calendar page."""
try:
content = read_file("calendar.txt")
all_events = CalendarParser.parse_file(content)
# Get today and upcoming events
now = datetime.now()
today = now.date()
week_later = now + timedelta(days=7)
today_events = [e for e in all_events if e.date.date() == today]
upcoming_events = [
e for e in all_events
if now <= e.datetime <= week_later
]
except FileNotFoundError:
today_events = []
upcoming_events = []
return templates.TemplateResponse(
"calendar.html",
{
"request": request,
"page": "calendar",
"today": today.strftime("%A, %B %d, %Y"),
"today_events": today_events,
"upcoming_events": upcoming_events,
}
)

93
src/api/routes/chat.py Normal file
View File

@@ -0,0 +1,93 @@
"""Chat route with SSE support."""
from fastapi import APIRouter, Request, Form
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from pathlib import Path
import asyncio
import json
from src.api.agent_instance import agent
router = APIRouter()
BASE_DIR = Path(__file__).resolve().parent.parent.parent
templates = Jinja2Templates(directory=str(BASE_DIR / "web" / "templates"))
@router.get("/chat", response_class=HTMLResponse)
async def chat_page(request: Request):
"""Chat page."""
return templates.TemplateResponse(
"chat.html",
{
"request": request,
"page": "chat",
}
)
@router.post("/api/chat")
async def chat_message(message: str = Form(...)):
"""Process chat message and return response.
Args:
message: User message
Returns:
JSON response with agent reply
"""
try:
response = agent.run(message)
return {"response": response, "success": True}
except Exception as e:
return {"response": f"Error: {str(e)}", "success": False}
@router.post("/api/chat/reset")
async def reset_chat():
"""Reset chat conversation history."""
agent.reset_conversation()
return {"success": True, "message": "Conversation history cleared"}
async def event_generator(message: str):
"""Generate SSE events for streaming response.
Args:
message: User message
Yields:
SSE formatted events
"""
try:
# Send initial event
yield f"data: {json.dumps({'type': 'start'})}\n\n"
# Run agent (in real streaming, we'd need to modify agent.run)
# For now, send complete response
response = agent.run(message)
# Send response event
yield f"data: {json.dumps({'type': 'response', 'content': response})}\n\n"
# Send complete event
yield f"data: {json.dumps({'type': 'done'})}\n\n"
except Exception as e:
# Send error event
yield f"data: {json.dumps({'type': 'error', 'content': str(e)})}\n\n"
@router.get("/api/chat/stream")
async def chat_stream(message: str):
"""Stream chat response via SSE.
Args:
message: User message
Returns:
SSE stream
"""
return StreamingResponse(
event_generator(message),
media_type="text/event-stream",
)

View File

@@ -0,0 +1,62 @@
"""Dashboard route."""
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from pathlib import Path
from datetime import datetime
from src.scheduler.briefings import (
get_todays_events,
get_priority_tasks,
get_tasks_due_soon,
get_waiting_items,
)
from src.parsers.project_parser import ProjectParser
from src.tools.file_ops import read_file
router = APIRouter()
BASE_DIR = Path(__file__).resolve().parent.parent.parent
templates = Jinja2Templates(directory=str(BASE_DIR / "web" / "templates"))
@router.get("/dashboard", response_class=HTMLResponse)
async def dashboard(request: Request):
"""Dashboard page."""
# Get today's data
today = datetime.now().strftime("%A, %B %d, %Y")
events = get_todays_events()
priority_tasks = get_priority_tasks(["A", "B"])
due_soon = get_tasks_due_soon(3)
waiting = get_waiting_items()
# Get active projects
try:
content = read_file("projects.txt")
all_projects = ProjectParser.parse_file(content)
active_projects = [p for p in all_projects if p.status == "active"]
except:
active_projects = []
# Stats
stats = {
"events_today": len(events),
"priority_tasks": len(priority_tasks),
"due_soon": len(due_soon),
"active_projects": len(active_projects),
"waiting_items": len(waiting),
}
return templates.TemplateResponse(
"dashboard.html",
{
"request": request,
"page": "dashboard",
"today": today,
"events": events,
"priority_tasks": priority_tasks[:5], # Top 5
"due_soon": due_soon[:3], # Top 3
"active_projects": active_projects[:5], # Top 5
"waiting": waiting[:3], # Top 3
"stats": stats,
}
)

View File

@@ -0,0 +1,71 @@
"""Projects route."""
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from pathlib import Path
from src.parsers.project_parser import ProjectParser
from src.parsers.todo_parser import TodoParser
from src.tools.file_ops import read_file
router = APIRouter()
BASE_DIR = Path(__file__).resolve().parent.parent.parent
templates = Jinja2Templates(directory=str(BASE_DIR / "web" / "templates"))
@router.get("/projects", response_class=HTMLResponse)
async def projects_page(request: Request, status: str = "active"):
"""Projects page.
Args:
request: FastAPI request
status: Filter by status (active, waiting, someday, completed)
"""
try:
content = read_file("projects.txt")
all_projects = ProjectParser.parse_file(content)
# Filter by status
if status:
filtered_projects = [p for p in all_projects if p.status == status]
else:
filtered_projects = all_projects
# Get task count per project
try:
todo_content = read_file("todo.txt")
tasks = TodoParser.parse_file(content)
active_tasks = [t for t in tasks if not t.completed]
# Count tasks per project
project_task_counts = {}
for project in filtered_projects:
count = len([t for t in active_tasks if project.tag in t.projects])
project_task_counts[project.tag] = count
except:
project_task_counts = {}
# Stats
stats = {
"active": len([p for p in all_projects if p.status == "active"]),
"waiting": len([p for p in all_projects if p.status == "waiting"]),
"someday": len([p for p in all_projects if p.status == "someday"]),
"completed": len([p for p in all_projects if p.status == "completed"]),
}
except FileNotFoundError:
filtered_projects = []
project_task_counts = {}
stats = {"active": 0, "waiting": 0, "someday": 0, "completed": 0}
return templates.TemplateResponse(
"projects.html",
{
"request": request,
"page": "projects",
"projects": filtered_projects,
"project_task_counts": project_task_counts,
"current_status": status,
"stats": stats,
}
)

147
src/api/routes/tasks.py Normal file
View File

@@ -0,0 +1,147 @@
"""Tasks route."""
from fastapi import APIRouter, Request, Form
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from pathlib import Path
from src.parsers.todo_parser import TodoParser
from src.tools.file_ops import read_file
router = APIRouter()
BASE_DIR = Path(__file__).resolve().parent.parent.parent
templates = Jinja2Templates(directory=str(BASE_DIR / "web" / "templates"))
@router.get("/tasks", response_class=HTMLResponse)
async def tasks_page(
request: Request,
project: str = None,
context: str = None,
priority: str = None,
show_completed: bool = False,
):
"""Tasks page with filtering.
Args:
request: FastAPI request
project: Filter by project
context: Filter by context
priority: Filter by priority
show_completed: Show completed tasks
"""
try:
content = read_file("todo.txt")
all_tasks = TodoParser.parse_file(content)
# Apply filters
filtered_tasks = TodoParser.filter_tasks(
all_tasks,
project=project,
context=context,
priority=priority,
completed=True if show_completed else False,
)
# Get all unique projects and contexts for filter dropdowns
all_projects = set()
all_contexts = set()
for task in all_tasks:
all_projects.update(task.projects)
all_contexts.update(task.contexts)
# Stats
total_tasks = len([t for t in all_tasks if not t.completed])
completed_tasks = len([t for t in all_tasks if t.completed])
except FileNotFoundError:
filtered_tasks = []
all_projects = set()
all_contexts = set()
total_tasks = 0
completed_tasks = 0
return templates.TemplateResponse(
"tasks.html",
{
"request": request,
"page": "tasks",
"tasks": filtered_tasks,
"all_projects": sorted(all_projects),
"all_contexts": sorted(all_contexts),
"current_project": project,
"current_context": context,
"current_priority": priority,
"show_completed": show_completed,
"total_tasks": total_tasks,
"completed_tasks": completed_tasks,
}
)
@router.post("/api/tasks/complete")
async def complete_task(task_line: int = Form(...)):
"""Mark a task as complete.
Args:
task_line: Line number of the task
Returns:
Success status
"""
try:
from src.tools.task_ops import complete_task as complete_task_tool
from datetime import datetime
# Read tasks
content = read_file("todo.txt")
tasks = TodoParser.parse_file(content)
# Find task by line number
task = next((t for t in tasks if t.line_number == task_line), None)
if not task:
return {"success": False, "error": "Task not found"}
# Mark complete
result = complete_task_tool(task.description)
return {"success": True, "message": result}
except Exception as e:
return {"success": False, "error": str(e)}
@router.post("/api/tasks/add")
async def add_task(
description: str = Form(...),
project: str = Form(None),
context: str = Form(None),
priority: str = Form(None),
due_date: str = Form(None),
):
"""Add a new task.
Args:
description: Task description
project: Project tag
context: Context tag
priority: Priority letter
due_date: Due date (YYYY-MM-DD)
Returns:
Success status
"""
try:
from src.tools.task_ops import add_task as add_task_tool
result = add_task_tool(
description=description,
project=project if project else None,
context=context if context else None,
priority=priority if priority else None,
due_date=due_date if due_date else None,
)
return {"success": True, "message": result}
except Exception as e:
return {"success": False, "error": str(e)}

0
src/bot/__init__.py Normal file
View File

302
src/bot/discord_bot.py Normal file
View File

@@ -0,0 +1,302 @@
"""Discord bot implementation for MyOrg Assistant."""
import discord
from discord.ext import commands
from typing import Optional
from src.config import settings
from src.agent.core import MyOrgAgent
from src.agent.prompts import get_system_prompt
from src.tools.file_ops import get_file_operation_tools
from src.tools.task_ops import get_task_management_tools
from src.tools.calendar_ops import get_calendar_tools
from src.tools.git_ops import get_git_tools
from src.bot.formatters import format_response_for_discord
class MyOrgBot(commands.Bot):
"""Discord bot for MyOrg Assistant."""
def __init__(self) -> None:
"""Initialize the bot."""
# Set up intents
intents = discord.Intents.default()
intents.message_content = True
intents.members = True
# Initialize bot
super().__init__(
command_prefix='/',
intents=intents,
help_command=None, # We'll create our own
)
# Initialize agent
self.agent = MyOrgAgent(
system_prompt=get_system_prompt(),
tools=(
get_file_operation_tools() +
get_task_management_tools() +
get_calendar_tools() +
get_git_tools()
),
)
# Track user sessions (one agent per user)
self.user_agents: dict[int, MyOrgAgent] = {}
def get_user_agent(self, user_id: int) -> MyOrgAgent:
"""Get or create an agent for a specific user.
Args:
user_id: Discord user ID
Returns:
MyOrgAgent instance for this user
"""
if user_id not in self.user_agents:
self.user_agents[user_id] = MyOrgAgent(
system_prompt=get_system_prompt(),
tools=(
get_file_operation_tools() +
get_task_management_tools() +
get_calendar_tools() +
get_git_tools()
),
)
return self.user_agents[user_id]
async def on_ready(self) -> None:
"""Called when bot is ready."""
print(f'✅ Logged in as {self.user}')
print(f'📊 Connected to {len(self.guilds)} server(s)')
print(f'🤖 Agent initialized with {len(self.agent.tools)} tools')
print('🎉 MyOrg Assistant is ready!')
async def on_message(self, message: discord.Message) -> None:
"""Handle incoming messages.
Args:
message: Discord message object
"""
# Ignore messages from the bot itself
if message.author == self.user:
return
# Ignore messages not mentioning the bot (unless in DM)
if not isinstance(message.channel, discord.DMChannel):
if not self.user.mentioned_in(message):
return
# Process commands first
await self.process_commands(message)
# If message wasn't a command, treat as natural conversation
if not message.content.startswith('/'):
await self.handle_conversation(message)
async def handle_conversation(self, message: discord.Message) -> None:
"""Handle natural language conversation.
Args:
message: Discord message object
"""
# Remove bot mention from message
content = message.content
if self.user.mentioned_in(message):
content = content.replace(f'<@{self.user.id}>', '').strip()
if not content:
return
# Show typing indicator
async with message.channel.typing():
try:
# Get user's agent
agent = self.get_user_agent(message.author.id)
# Process message
response = agent.run(content)
# Format and send response
formatted = format_response_for_discord(response)
# Split if too long (Discord limit is 2000 chars)
if len(formatted) <= 2000:
await message.reply(formatted)
else:
# Split into chunks
chunks = [formatted[i:i+1900] for i in range(0, len(formatted), 1900)]
for chunk in chunks:
await message.channel.send(chunk)
except Exception as e:
await message.reply(f'❌ Error: {str(e)}')
async def setup_hook(self) -> None:
"""Set up bot commands."""
# Commands will be added via decorators below
pass
# Create bot instance
bot = MyOrgBot()
@bot.command(name='help')
async def help_command(ctx: commands.Context) -> None:
"""Show help information."""
help_text = """
**🤖 MyOrg Assistant - Help**
**Natural Conversation:**
Just mention me or DM me to interact naturally!
- "Add task: Buy milk tomorrow"
- "What should I work on now?"
- "Show my tasks for project myorg-assistant"
**Commands:**
`/help` - Show this help message
`/briefing` - Get daily briefing (calendar + priority tasks)
`/add [task]` - Quick task addition
`/tasks [filter]` - Show tasks (optionally filtered)
`/today` - Today's calendar and priority tasks
`/context [context]` - Set your current context
`/reset` - Clear conversation history
**Examples:**
`/add Buy groceries @recados due:2026-02-01`
`/tasks project:myorg-assistant`
`/context computer-deep`
**Contexts:**
`@computer-deep` - Deep focus work
`@computer-light` - Light computer work
`@telefon` - Calls/meetings
`@recados` - Errands
`@bcn` - Barcelona location
`@personal` - Personal activities
"""
await ctx.send(help_text)
@bot.command(name='briefing')
async def briefing_command(ctx: commands.Context) -> None:
"""Get daily briefing."""
async with ctx.typing():
try:
agent = bot.get_user_agent(ctx.author.id)
prompt = """Generate a morning briefing for today:
1. Read calendar.txt and show today's events
2. Read todo.txt and show priority A and B tasks
3. Show tasks with due dates in the next 3 days
4. Format as a nice daily briefing"""
response = agent.run(prompt)
formatted = format_response_for_discord(response)
await ctx.send(formatted)
except Exception as e:
await ctx.send(f'❌ Error generating briefing: {str(e)}')
@bot.command(name='add')
async def add_command(ctx: commands.Context, *, task: str) -> None:
"""Quick task addition.
Args:
task: Task description with optional metadata
"""
async with ctx.typing():
try:
agent = bot.get_user_agent(ctx.author.id)
prompt = f"""Add this task to todo.txt: {task}
Parse the task description and extract:
- Description (main text)
- Project tags (words starting with +)
- Context tags (words starting with @)
- Due date (due:YYYY-MM-DD format)
- Priority if implied
Then add the task using the add_task tool and commit the change."""
response = agent.run(prompt)
formatted = format_response_for_discord(response)
await ctx.send(formatted)
except Exception as e:
await ctx.send(f'❌ Error adding task: {str(e)}')
@bot.command(name='tasks')
async def tasks_command(ctx: commands.Context, *, filters: Optional[str] = None) -> None:
"""Show tasks with optional filters.
Args:
filters: Optional filter string (e.g., "project:myorg-assistant context:computer-deep")
"""
async with ctx.typing():
try:
agent = bot.get_user_agent(ctx.author.id)
if filters:
prompt = f"""Show tasks from todo.txt with these filters: {filters}
Parse the filter string and use the search_tasks tool appropriately."""
else:
prompt = "Show all active tasks from todo.txt grouped by priority using get_tasks_by_priority tool."
response = agent.run(prompt)
formatted = format_response_for_discord(response)
await ctx.send(formatted)
except Exception as e:
await ctx.send(f'❌ Error fetching tasks: {str(e)}')
@bot.command(name='today')
async def today_command(ctx: commands.Context) -> None:
"""Show today's calendar and priority tasks."""
async with ctx.typing():
try:
agent = bot.get_user_agent(ctx.author.id)
prompt = """Show me what's on for today:
1. Read calendar.txt and show today's events
2. Read todo.txt and show priority A tasks
3. Show any tasks due today
4. Format as a concise daily overview"""
response = agent.run(prompt)
formatted = format_response_for_discord(response)
await ctx.send(formatted)
except Exception as e:
await ctx.send(f'❌ Error: {str(e)}')
@bot.command(name='context')
async def context_command(ctx: commands.Context, context: str) -> None:
"""Set current context.
Args:
context: Context name (without @ prefix)
"""
# For now, just acknowledge. In Phase 3, we'll track this
await ctx.send(f'✅ Context set to `@{context}`')
@bot.command(name='reset')
async def reset_command(ctx: commands.Context) -> None:
"""Reset conversation history."""
agent = bot.get_user_agent(ctx.author.id)
agent.reset_conversation()
await ctx.send('🔄 Conversation history cleared!')
def run_bot() -> None:
"""Run the Discord bot."""
if not settings.discord_bot_token:
raise ValueError("DISCORD_BOT_TOKEN not set in environment")
print("🚀 Starting MyOrg Discord Bot...")
bot.run(settings.discord_bot_token)

198
src/bot/formatters.py Normal file
View File

@@ -0,0 +1,198 @@
"""Formatters for Discord messages."""
import re
def format_response_for_discord(text: str) -> str:
"""Format agent response for Discord.
Applies Discord markdown and adds visual improvements.
Args:
text: Raw agent response
Returns:
Formatted text for Discord
"""
# Already has good formatting if it contains emoji headers
if any(emoji in text for emoji in ['📊', '', '', '📝', '⚠️', '📅', '🌿', '⬆️', '⬇️']):
return text
# Add some visual improvements
formatted = text
# Convert headers (lines ending with :)
formatted = re.sub(r'^([A-Z][^:]+):$', r'**\1:**', formatted, flags=re.MULTILINE)
# Make file paths monospace
formatted = re.sub(r'([\w/]+\.txt|[\w/]+\.md)', r'`\1`', formatted)
# Make project tags bold
formatted = re.sub(r'\+(\w+)', r'**+\1**', formatted)
# Make context tags italic
formatted = re.sub(r'@(\w+)', r'*@\1*', formatted)
return formatted
def format_task_list(tasks: list[dict]) -> str:
"""Format a list of tasks for Discord.
Args:
tasks: List of task dictionaries
Returns:
Formatted task list
"""
if not tasks:
return "No tasks found."
lines = [f"**📋 {len(tasks)} Task(s):**\n"]
for i, task in enumerate(tasks, 1):
status = "" if task.get('completed') else ""
priority = f"({task.get('priority')}) " if task.get('priority') else ""
description = task.get('description', 'Untitled')
projects = " ".join([f"**+{p}**" for p in task.get('projects', [])])
contexts = " ".join([f"*@{c}*" for c in task.get('contexts', [])])
due = ""
if task.get('due_date'):
due = f" 📅 `{task['due_date']}`"
lines.append(f"{status} {priority}{description} {projects} {contexts}{due}")
return "\n".join(lines)
def format_calendar_events(events: list[dict]) -> str:
"""Format a list of calendar events for Discord.
Args:
events: List of event dictionaries
Returns:
Formatted event list
"""
if not events:
return "No events found."
lines = [f"**📅 {len(events)} Event(s):**\n"]
for event in events:
time_str = event.get('time', 'All day')
description = event.get('description', 'Untitled')
contexts = " ".join([f"*@{c}*" for c in event.get('contexts', [])])
projects = " ".join([f"**+{p}**" for p in event.get('projects', [])])
if event.get('all_day'):
lines.append(f"🗓️ **{description}** (All day) {contexts} {projects}")
else:
lines.append(f"🕐 `{time_str}` **{description}** {contexts} {projects}")
return "\n".join(lines)
def format_briefing(
date: str,
events: list[dict],
priority_tasks: list[dict],
due_soon: list[dict],
) -> str:
"""Format a daily briefing.
Args:
date: Date string
events: Today's events
priority_tasks: High-priority tasks
due_soon: Tasks due soon
Returns:
Formatted briefing
"""
lines = [
f"**🌅 Daily Briefing - {date}**\n",
]
# Calendar events
if events:
lines.append("**📅 Today's Schedule:**")
for event in events:
time_str = event.get('time', 'All day')
description = event.get('description')
lines.append(f" • `{time_str}` {description}")
lines.append("")
else:
lines.append("📅 No events scheduled today\n")
# Priority tasks
if priority_tasks:
lines.append("**✅ Priority Tasks:**")
for task in priority_tasks:
priority = f"({task.get('priority')}) " if task.get('priority') else ""
description = task.get('description')
projects = " ".join([f"**+{p}**" for p in task.get('projects', [])])
lines.append(f"{priority}{description} {projects}")
lines.append("")
# Due soon
if due_soon:
lines.append("**⏳ Due Soon:**")
for task in due_soon:
description = task.get('description')
due_date = task.get('due_date')
lines.append(f"{description} 📅 `{due_date}`")
lines.append("")
lines.append("Have a productive day! 🚀")
return "\n".join(lines)
def truncate_for_discord(text: str, max_length: int = 2000) -> str:
"""Truncate text to fit Discord's message limit.
Args:
text: Text to truncate
max_length: Maximum length (default: 2000)
Returns:
Truncated text
"""
if len(text) <= max_length:
return text
# Try to truncate at a newline
truncated = text[:max_length-50]
last_newline = truncated.rfind('\n')
if last_newline > max_length - 200:
truncated = truncated[:last_newline]
return truncated + "\n\n... (truncated)"
def format_error(error: str) -> str:
"""Format an error message for Discord.
Args:
error: Error message
Returns:
Formatted error
"""
return f"❌ **Error:** {error}"
def format_success(message: str) -> str:
"""Format a success message for Discord.
Args:
message: Success message
Returns:
Formatted success message
"""
return f"{message}"

44
src/config.py Normal file
View File

@@ -0,0 +1,44 @@
"""Configuration management for MyOrg Assistant."""
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
# LiteLLM Configuration
litellm_endpoint: str = "http://litellm-service.default.svc.cluster.local:4000"
litellm_api_key: str
litellm_model: str = "claude-sonnet-4-5-20250929"
# Discord Configuration
discord_bot_token: str = ""
discord_channel_id: str = ""
# Git Configuration
git_repo_url: str = "https://gitea.rogi.casa/roger/myorg.git"
git_branch: str = "main"
git_username: str = "roger"
git_token: str
# Myorg Repository Path
myorg_repo_path: str = "/data/myorg"
# Scheduling
timezone: str = "Europe/Madrid"
# Web Interface
web_host: str = "0.0.0.0"
web_port: int = 8000
web_secret_key: str
# Optional: Authentication
web_password: Optional[str] = None
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
# Global settings instance
settings = Settings()

102
src/main.py Normal file
View File

@@ -0,0 +1,102 @@
"""Main entry point for MyOrg Assistant."""
import sys
import argparse
from pathlib import Path
def cli_mode() -> None:
"""Run in CLI mode for testing."""
from src.agent.core import MyOrgAgent
from src.agent.prompts import get_system_prompt
from src.tools.file_ops import get_file_operation_tools
from src.tools.task_ops import get_task_management_tools
from src.tools.calendar_ops import get_calendar_tools
from src.tools.git_ops import get_git_tools
from src.config import settings
print("🤖 MyOrg Assistant - CLI Mode")
print("=" * 50)
print("Type 'exit' or 'quit' to stop")
print("Type 'reset' to clear conversation history")
print("=" * 50)
print()
# Initialize agent with all tools
agent = MyOrgAgent(
system_prompt=get_system_prompt(),
tools=(
get_file_operation_tools() +
get_task_management_tools() +
get_calendar_tools() +
get_git_tools()
),
)
print(f"✅ Agent initialized with {len(agent.tools)} tools")
print(f"📁 Working with repository: {settings.myorg_repo_path}\n")
while True:
try:
# Get user input
user_input = input("You: ").strip()
if not user_input:
continue
if user_input.lower() in ['exit', 'quit']:
print("\n👋 Goodbye!")
break
if user_input.lower() == 'reset':
agent.reset_conversation()
print("🔄 Conversation history cleared\n")
continue
# Run agent
print("\nAssistant: ", end="", flush=True)
response = agent.run(user_input)
print(response)
print()
except KeyboardInterrupt:
print("\n\n👋 Goodbye!")
break
except Exception as e:
print(f"\n❌ Error: {str(e)}\n")
def bot_mode() -> None:
"""Run in Discord bot mode."""
from src.bot.discord_bot import run_bot
run_bot()
def web_mode() -> None:
"""Run in web server mode."""
from src.api.app import run_web
run_web()
def main() -> None:
"""Main entry point."""
parser = argparse.ArgumentParser(description="MyOrg Personal Assistant")
parser.add_argument(
"mode",
choices=["cli", "bot", "web"],
default="cli",
nargs="?",
help="Run mode: cli (default), bot (Discord), or web (FastAPI server)",
)
args = parser.parse_args()
if args.mode == "cli":
cli_mode()
elif args.mode == "bot":
bot_mode()
elif args.mode == "web":
web_mode()
if __name__ == "__main__":
main()

0
src/parsers/__init__.py Normal file
View File

View File

@@ -0,0 +1,309 @@
"""Parser for calendar.txt format used in myorg GTD system.
Calendar.txt format:
- One event per line
- Format: YYYY-MM-DD HH:MM Description @context +project tags...
- All-day events: YYYY-MM-DD Description
- Supports contexts and project tags
Example events:
2026-02-01 09:00 Team standup @telefon +work
2026-02-15 Birthday party @personal
"""
import re
from datetime import datetime, time as time_type
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field, asdict
@dataclass
class Event:
"""Represents a single calendar event."""
raw_line: str
line_number: int
date: datetime
time: Optional[time_type] = None
description: str = ""
contexts: List[str] = field(default_factory=list)
projects: List[str] = field(default_factory=list)
tags: Dict[str, str] = field(default_factory=dict)
all_day: bool = False
def __post_init__(self) -> None:
"""Post-init to ensure all_day is always a bool."""
# Ensure all_day is a proper boolean
if self.all_day is None:
self.all_day = False
else:
self.all_day = bool(self.all_day)
@property
def datetime(self) -> datetime:
"""Get full datetime of the event."""
if self.time:
return datetime.combine(self.date.date(), self.time)
return self.date
def to_dict(self) -> Dict[str, Any]:
"""Convert event to dictionary representation."""
return {
"raw_line": self.raw_line,
"line_number": self.line_number,
"date": self.date.strftime("%Y-%m-%d"),
"time": self.time.strftime("%H:%M") if self.time else None,
"datetime": self.datetime.isoformat(),
"description": self.description,
"contexts": self.contexts,
"projects": self.projects,
"tags": self.tags,
"all_day": self.all_day,
}
class CalendarParser:
"""Parser for calendar.txt format files."""
# Regular expressions for parsing
DATE_TIME_RE = re.compile(r'^(\d{4}-\d{2}-\d{2})\s+(\d{2}:\d{2})\s+')
DATE_ONLY_RE = re.compile(r'^(\d{4}-\d{2}-\d{2})\s+')
PROJECT_RE = re.compile(r'\+(\S+)')
CONTEXT_RE = re.compile(r'@(\S+)')
TAG_RE = re.compile(r'(\w+):(\S+)')
@staticmethod
def parse_date(date_str: str) -> Optional[datetime]:
"""Parse a date string in YYYY-MM-DD format."""
try:
return datetime.strptime(date_str, "%Y-%m-%d")
except ValueError:
return None
@staticmethod
def parse_time(time_str: str) -> Optional[time_type]:
"""Parse a time string in HH:MM format."""
try:
return datetime.strptime(time_str, "%H:%M").time()
except ValueError:
return None
@classmethod
def parse_line(cls, line: str, line_number: int = 0) -> Optional[Event]:
"""Parse a single line from calendar.txt.
Args:
line: The line to parse
line_number: Line number in the file (for reference)
Returns:
Event object or None if line is empty or a comment
"""
# Skip empty lines and comments
line = line.strip()
if not line or line.startswith('#'):
return None
# Try to parse date and time
datetime_match = cls.DATE_TIME_RE.match(line)
date_match = cls.DATE_ONLY_RE.match(line)
if datetime_match:
# Event with specific time
date_str, time_str = datetime_match.groups()
event_date = cls.parse_date(date_str)
event_time = cls.parse_time(time_str)
if not event_date or not event_time:
return None
remaining = line[datetime_match.end():]
event = Event(
raw_line=line,
line_number=line_number,
date=event_date,
time=event_time,
all_day=False
)
elif date_match:
# All-day event
date_str = date_match.group(1)
event_date = cls.parse_date(date_str)
if not event_date:
return None
remaining = line[date_match.end():]
event = Event(
raw_line=line,
line_number=line_number,
date=event_date,
time=None,
all_day=True
)
else:
# Invalid format
return None
# Extract contexts
event.contexts = cls.CONTEXT_RE.findall(remaining)
# Extract projects
event.projects = cls.PROJECT_RE.findall(remaining)
# Extract tags
for match in cls.TAG_RE.finditer(remaining):
key, value = match.groups()
# Avoid treating contexts and projects as tags
if not remaining[match.start()-1:match.start()] in ['@', '+']:
event.tags[key] = value
# Remove contexts, projects, and tags to get clean description
description = remaining
for context in event.contexts:
description = description.replace(f'@{context}', '')
for project in event.projects:
description = description.replace(f'+{project}', '')
for key, value in event.tags.items():
description = description.replace(f'{key}:{value}', '')
event.description = ' '.join(description.split())
return event
@classmethod
def parse_file(cls, file_content: str) -> List[Event]:
"""Parse entire calendar.txt file content.
Args:
file_content: Content of the calendar.txt file
Returns:
List of Event objects sorted by datetime
"""
events = []
for line_number, line in enumerate(file_content.split('\n'), start=1):
event = cls.parse_line(line, line_number)
if event:
events.append(event)
# Sort events by datetime
events.sort(key=lambda e: e.datetime)
return events
@staticmethod
def format_event(
date: datetime,
description: str,
time: Optional[time_type] = None,
contexts: Optional[List[str]] = None,
projects: Optional[List[str]] = None,
tags: Optional[Dict[str, str]] = None,
) -> str:
"""Format an event according to calendar.txt format.
Args:
date: Event date
description: Event description
time: Event time (None for all-day)
contexts: List of context tags
projects: List of project tags
tags: Dictionary of tag key-value pairs
Returns:
Formatted calendar.txt line
"""
parts = [date.strftime('%Y-%m-%d')]
# Add time if specified
if time:
parts.append(time.strftime('%H:%M'))
# Add description
parts.append(description)
# Add contexts
if contexts:
parts.extend([f'@{context}' for context in contexts])
# Add projects
if projects:
parts.extend([f'+{project}' for project in projects])
# Add tags
if tags:
parts.extend([f'{key}:{value}' for key, value in tags.items()])
return ' '.join(parts)
@staticmethod
def filter_events(
events: List[Event],
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
context: Optional[str] = None,
project: Optional[str] = None,
all_day: Optional[bool] = None,
) -> List[Event]:
"""Filter events based on criteria.
Args:
events: List of events to filter
start_date: Filter events on or after this date
end_date: Filter events on or before this date
context: Filter by context tag
project: Filter by project tag
all_day: Filter by all-day status
Returns:
Filtered list of events
"""
filtered = events
if start_date:
filtered = [e for e in filtered if e.datetime >= start_date]
if end_date:
filtered = [e for e in filtered if e.datetime <= end_date]
if context:
filtered = [e for e in filtered if context in e.contexts]
if project:
filtered = [e for e in filtered if project in e.projects]
if all_day is not None:
filtered = [e for e in filtered if e.all_day == all_day]
return filtered
@staticmethod
def get_today_events(events: List[Event]) -> List[Event]:
"""Get events for today.
Args:
events: List of all events
Returns:
List of today's events
"""
today = datetime.now().date()
return [e for e in events if e.date.date() == today]
@staticmethod
def get_upcoming_events(events: List[Event], days: int = 7) -> List[Event]:
"""Get events in the next N days.
Args:
events: List of all events
days: Number of days to look ahead
Returns:
List of upcoming events
"""
now = datetime.now()
end_date = datetime.now().replace(hour=23, minute=59, second=59)
from datetime import timedelta
end_date = end_date + timedelta(days=days)
return [e for e in events if now <= e.datetime <= end_date]

View File

@@ -0,0 +1,239 @@
"""Parser for projects.txt format used in myorg GTD system.
Projects.txt format:
- One project per line
- Format: +project-tag Description [status] [metadata...]
- Status: active, waiting, someday, completed
- Can include contexts, goals, and other metadata
Example projects:
+myorg-assistant MyOrg Personal Assistant [active] goal:q1-2026
+observability-blog Observability blog post [active] @computer-deep
+home-renovation Kitchen renovation [waiting] due:2026-03-15
"""
import re
from datetime import datetime
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field
@dataclass
class Project:
"""Represents a single project."""
raw_line: str
line_number: int
tag: str
description: str
status: str = "active"
contexts: List[str] = field(default_factory=list)
goals: List[str] = field(default_factory=list)
metadata: Dict[str, str] = field(default_factory=dict)
@property
def due_date(self) -> Optional[datetime]:
"""Extract due date from metadata."""
if "due" in self.metadata:
try:
return datetime.strptime(self.metadata["due"], "%Y-%m-%d")
except ValueError:
return None
return None
def to_dict(self) -> Dict[str, Any]:
"""Convert project to dictionary representation."""
return {
"raw_line": self.raw_line,
"line_number": self.line_number,
"tag": self.tag,
"description": self.description,
"status": self.status,
"contexts": self.contexts,
"goals": self.goals,
"metadata": self.metadata,
"due_date": self.due_date.isoformat() if self.due_date else None,
}
class ProjectParser:
"""Parser for projects.txt format files."""
# Regular expressions for parsing
PROJECT_TAG_RE = re.compile(r'^\+(\S+)\s+')
STATUS_RE = re.compile(r'\[(active|waiting|someday|completed)\]', re.IGNORECASE)
CONTEXT_RE = re.compile(r'@(\S+)')
GOAL_RE = re.compile(r'goal:(\S+)')
METADATA_RE = re.compile(r'(\w+):(\S+)')
@classmethod
def parse_line(cls, line: str, line_number: int = 0) -> Optional[Project]:
"""Parse a single line from projects.txt.
Args:
line: The line to parse
line_number: Line number in the file (for reference)
Returns:
Project object or None if line is empty or a comment
"""
# Skip empty lines and comments
line = line.strip()
if not line or line.startswith('#'):
return None
# Extract project tag
tag_match = cls.PROJECT_TAG_RE.match(line)
if not tag_match:
return None
tag = tag_match.group(1)
remaining = line[tag_match.end():]
# Extract status
status = "active" # Default status
status_match = cls.STATUS_RE.search(remaining)
if status_match:
status = status_match.group(1).lower()
# Remove status from remaining text
remaining = remaining[:status_match.start()] + remaining[status_match.end():]
# Extract contexts
contexts = cls.CONTEXT_RE.findall(remaining)
# Extract goals
goals = cls.GOAL_RE.findall(remaining)
# Extract other metadata
metadata: Dict[str, str] = {}
for match in cls.METADATA_RE.finditer(remaining):
key, value = match.groups()
# Skip if it's a goal (already extracted)
if key != 'goal':
metadata[key] = value
# Remove contexts, goals, and metadata to get clean description
description = remaining
for context in contexts:
description = description.replace(f'@{context}', '')
for goal in goals:
description = description.replace(f'goal:{goal}', '')
for key, value in metadata.items():
description = description.replace(f'{key}:{value}', '')
description = ' '.join(description.split())
return Project(
raw_line=line,
line_number=line_number,
tag=tag,
description=description,
status=status,
contexts=contexts,
goals=goals,
metadata=metadata,
)
@classmethod
def parse_file(cls, file_content: str) -> List[Project]:
"""Parse entire projects.txt file content.
Args:
file_content: Content of the projects.txt file
Returns:
List of Project objects
"""
projects = []
for line_number, line in enumerate(file_content.split('\n'), start=1):
project = cls.parse_line(line, line_number)
if project:
projects.append(project)
return projects
@staticmethod
def format_project(
tag: str,
description: str,
status: str = "active",
contexts: Optional[List[str]] = None,
goals: Optional[List[str]] = None,
metadata: Optional[Dict[str, str]] = None,
) -> str:
"""Format a project according to projects.txt format.
Args:
tag: Project tag (without + prefix)
description: Project description
status: Project status (active, waiting, someday, completed)
contexts: List of context tags
goals: List of goal references
metadata: Dictionary of metadata key-value pairs
Returns:
Formatted projects.txt line
"""
parts = [f'+{tag}', description, f'[{status}]']
# Add contexts
if contexts:
parts.extend([f'@{context}' for context in contexts])
# Add goals
if goals:
parts.extend([f'goal:{goal}' for goal in goals])
# Add metadata
if metadata:
parts.extend([f'{key}:{value}' for key, value in metadata.items()])
return ' '.join(parts)
@staticmethod
def filter_projects(
projects: List[Project],
status: Optional[str] = None,
context: Optional[str] = None,
goal: Optional[str] = None,
has_due_date: Optional[bool] = None,
) -> List[Project]:
"""Filter projects based on criteria.
Args:
projects: List of projects to filter
status: Filter by status (active, waiting, someday, completed)
context: Filter by context tag
goal: Filter by goal reference
has_due_date: Filter by presence of due date
Returns:
Filtered list of projects
"""
filtered = projects
if status:
filtered = [p for p in filtered if p.status == status.lower()]
if context:
filtered = [p for p in filtered if context in p.contexts]
if goal:
filtered = [p for p in filtered if goal in p.goals]
if has_due_date is not None:
if has_due_date:
filtered = [p for p in filtered if p.due_date is not None]
else:
filtered = [p for p in filtered if p.due_date is None]
return filtered
@staticmethod
def get_active_projects(projects: List[Project]) -> List[Project]:
"""Get all active projects.
Args:
projects: List of all projects
Returns:
List of active projects
"""
return [p for p in projects if p.status == "active"]

270
src/parsers/todo_parser.py Normal file
View File

@@ -0,0 +1,270 @@
"""Parser for todo.txt format used in myorg GTD system.
Todo.txt format follows these conventions:
- Priority: (A), (B), (C) at the start of the line
- Completion: x at the start (with optional completion date)
- Dates: YYYY-MM-DD format
- Projects: +project-name
- Contexts: @context-name
- Metadata: key:value format
Example task:
(A) 2026-01-31 Write blog post +observability-blog @computer-deep due:2026-02-15
"""
import re
from datetime import datetime
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field
@dataclass
class Task:
"""Represents a single task from todo.txt."""
raw_line: str
line_number: int
completed: bool = False
completion_date: Optional[datetime] = None
priority: Optional[str] = None
creation_date: Optional[datetime] = None
description: str = ""
projects: List[str] = field(default_factory=list)
contexts: List[str] = field(default_factory=list)
metadata: Dict[str, str] = field(default_factory=dict)
def __post_init__(self) -> None:
"""Post-init to ensure completed is always a bool."""
# Ensure completed is a proper boolean
if self.completed is None:
self.completed = False
else:
self.completed = bool(self.completed)
@property
def due_date(self) -> Optional[datetime]:
"""Extract due date from metadata."""
if "due" in self.metadata:
try:
return datetime.strptime(self.metadata["due"], "%Y-%m-%d")
except ValueError:
return None
return None
def to_dict(self) -> Dict[str, Any]:
"""Convert task to dictionary representation."""
return {
"raw_line": self.raw_line,
"line_number": self.line_number,
"completed": self.completed,
"completion_date": self.completion_date.isoformat() if self.completion_date else None,
"priority": self.priority,
"creation_date": self.creation_date.isoformat() if self.creation_date else None,
"description": self.description,
"projects": self.projects,
"contexts": self.contexts,
"metadata": self.metadata,
"due_date": self.due_date.isoformat() if self.due_date else None,
}
class TodoParser:
"""Parser for todo.txt format files."""
# Regular expressions for parsing
PRIORITY_RE = re.compile(r'^\(([A-Z])\)\s+')
DATE_RE = re.compile(r'^\d{4}-\d{2}-\d{2}')
PROJECT_RE = re.compile(r'\+(\S+)')
CONTEXT_RE = re.compile(r'@(\S+)')
METADATA_RE = re.compile(r'(\w+):(\S+)')
COMPLETION_RE = re.compile(r'^x\s+(?:(\d{4}-\d{2}-\d{2})\s+)?')
@staticmethod
def parse_date(date_str: str) -> Optional[datetime]:
"""Parse a date string in YYYY-MM-DD format."""
try:
return datetime.strptime(date_str, "%Y-%m-%d")
except ValueError:
return None
@classmethod
def parse_line(cls, line: str, line_number: int = 0) -> Optional[Task]:
"""Parse a single line from todo.txt.
Args:
line: The line to parse
line_number: Line number in the file (for reference)
Returns:
Task object or None if line is empty or a comment
"""
# Skip empty lines and comments
line = line.strip()
if not line or line.startswith('#'):
return None
task = Task(raw_line=line, line_number=line_number)
remaining = line
# Check for completion
completion_match = cls.COMPLETION_RE.match(remaining)
if completion_match:
task.completed = True
if completion_match.group(1):
task.completion_date = cls.parse_date(completion_match.group(1))
remaining = remaining[completion_match.end():]
# Check for priority
priority_match = cls.PRIORITY_RE.match(remaining)
if priority_match:
task.priority = priority_match.group(1)
remaining = remaining[priority_match.end():]
# Check for creation date (only at the beginning after priority)
date_match = cls.DATE_RE.match(remaining)
if date_match:
date_str = date_match.group()
task.creation_date = cls.parse_date(date_str)
remaining = remaining[len(date_str):].strip()
# Extract projects
task.projects = cls.PROJECT_RE.findall(remaining)
# Extract contexts
task.contexts = cls.CONTEXT_RE.findall(remaining)
# Extract metadata key:value pairs
for match in cls.METADATA_RE.finditer(remaining):
key, value = match.groups()
task.metadata[key] = value
# Remove projects, contexts, and metadata to get clean description
description = remaining
for project in task.projects:
description = description.replace(f'+{project}', '')
for context in task.contexts:
description = description.replace(f'@{context}', '')
for key, value in task.metadata.items():
description = description.replace(f'{key}:{value}', '')
task.description = ' '.join(description.split())
return task
@classmethod
def parse_file(cls, file_content: str) -> List[Task]:
"""Parse entire todo.txt file content.
Args:
file_content: Content of the todo.txt file
Returns:
List of Task objects
"""
tasks = []
for line_number, line in enumerate(file_content.split('\n'), start=1):
task = cls.parse_line(line, line_number)
if task:
tasks.append(task)
return tasks
@staticmethod
def format_task(
description: str,
priority: Optional[str] = None,
creation_date: Optional[datetime] = None,
projects: Optional[List[str]] = None,
contexts: Optional[List[str]] = None,
metadata: Optional[Dict[str, str]] = None,
completed: bool = False,
completion_date: Optional[datetime] = None,
) -> str:
"""Format a task according to todo.txt format.
Args:
description: Task description
priority: Priority letter (A-Z)
creation_date: Date task was created
projects: List of project tags
contexts: List of context tags
metadata: Dictionary of metadata key-value pairs
completed: Whether task is completed
completion_date: Date task was completed
Returns:
Formatted todo.txt line
"""
parts = []
# Completion marker
if completed:
parts.append('x')
if completion_date:
parts.append(completion_date.strftime('%Y-%m-%d'))
# Priority
if priority and not completed: # Priority not shown for completed tasks
parts.append(f'({priority})')
# Creation date
if creation_date:
parts.append(creation_date.strftime('%Y-%m-%d'))
# Description
parts.append(description)
# Projects
if projects:
parts.extend([f'+{project}' for project in projects])
# Contexts
if contexts:
parts.extend([f'@{context}' for context in contexts])
# Metadata
if metadata:
parts.extend([f'{key}:{value}' for key, value in metadata.items()])
return ' '.join(parts)
@staticmethod
def filter_tasks(
tasks: List[Task],
completed: Optional[bool] = None,
priority: Optional[str] = None,
project: Optional[str] = None,
context: Optional[str] = None,
has_due_date: Optional[bool] = None,
) -> List[Task]:
"""Filter tasks based on criteria.
Args:
tasks: List of tasks to filter
completed: Filter by completion status
priority: Filter by priority letter
project: Filter by project tag
context: Filter by context tag
has_due_date: Filter by presence of due date
Returns:
Filtered list of tasks
"""
filtered = tasks
if completed is not None:
filtered = [t for t in filtered if t.completed == completed]
if priority is not None:
filtered = [t for t in filtered if t.priority == priority]
if project is not None:
filtered = [t for t in filtered if project in t.projects]
if context is not None:
filtered = [t for t in filtered if context in t.contexts]
if has_due_date is not None:
if has_due_date:
filtered = [t for t in filtered if t.due_date is not None]
else:
filtered = [t for t in filtered if t.due_date is None]
return filtered

View File

374
src/scheduler/briefings.py Normal file
View File

@@ -0,0 +1,374 @@
"""Briefing generators for scheduled messages."""
from datetime import datetime, timedelta
from typing import List, Dict, Any
from src.parsers.todo_parser import TodoParser, Task
from src.parsers.calendar_parser import CalendarParser, Event
from src.tools.file_ops import read_file
def get_todays_date() -> str:
"""Get today's date formatted.
Returns:
Formatted date string
"""
now = datetime.now()
return now.strftime("%A, %B %d, %Y")
def get_todays_events() -> List[Event]:
"""Get today's calendar events.
Returns:
List of today's events
"""
try:
content = read_file("calendar.txt")
events = CalendarParser.parse_file(content)
today = datetime.now().date()
return [e for e in events if e.date.date() == today]
except FileNotFoundError:
return []
def get_upcoming_events(days: int = 1) -> List[Event]:
"""Get events in the next N days.
Args:
days: Number of days to look ahead
Returns:
List of upcoming events
"""
try:
content = read_file("calendar.txt")
events = CalendarParser.parse_file(content)
now = datetime.now()
end_date = now + timedelta(days=days)
return [e for e in events if now <= e.datetime <= end_date]
except FileNotFoundError:
return []
def get_priority_tasks(priorities: List[str] = ["A", "B"]) -> List[Task]:
"""Get tasks with specific priorities.
Args:
priorities: List of priority letters to include
Returns:
List of priority tasks
"""
try:
content = read_file("todo.txt")
tasks = TodoParser.parse_file(content)
active_tasks = [t for t in tasks if not t.completed]
return [t for t in active_tasks if t.priority in priorities]
except FileNotFoundError:
return []
def get_tasks_due_soon(days: int = 3) -> List[Task]:
"""Get tasks due within N days.
Args:
days: Number of days to look ahead
Returns:
List of tasks due soon
"""
try:
content = read_file("todo.txt")
tasks = TodoParser.parse_file(content)
active_tasks = [t for t in tasks if not t.completed]
now = datetime.now()
end_date = now + timedelta(days=days)
due_soon = []
for task in active_tasks:
if task.due_date and now <= task.due_date <= end_date:
due_soon.append(task)
# Sort by due date
due_soon.sort(key=lambda t: t.due_date or datetime.max)
return due_soon
except FileNotFoundError:
return []
def get_waiting_items() -> List[str]:
"""Get items from waiting.txt.
Returns:
List of waiting items (as strings)
"""
try:
content = read_file("waiting.txt")
lines = [line.strip() for line in content.split('\n') if line.strip() and not line.startswith('#')]
return lines
except FileNotFoundError:
return []
def get_completed_today() -> List[Task]:
"""Get tasks completed today.
Returns:
List of tasks completed today
"""
try:
content = read_file("todo.txt")
tasks = TodoParser.parse_file(content)
today = datetime.now().date()
completed_today = []
for task in tasks:
if task.completed and task.completion_date:
if task.completion_date.date() == today:
completed_today.append(task)
return completed_today
except FileNotFoundError:
return []
def generate_morning_briefing() -> str:
"""Generate morning briefing message.
Returns:
Formatted briefing message
"""
date_str = get_todays_date()
lines = [
f"**🌅 Good Morning! - {date_str}**\n",
]
# Today's events
events = get_todays_events()
if events:
lines.append("**📅 Today's Schedule:**")
for event in events:
time_str = event.time.strftime("%H:%M") if event.time else "All day"
contexts = " ".join([f"*@{c}*" for c in event.contexts])
projects = " ".join([f"**+{p}**" for p in event.projects])
lines.append(f" • `{time_str}` {event.description} {contexts} {projects}")
lines.append("")
else:
lines.append("📅 No events scheduled today\n")
# Priority tasks
priority_tasks = get_priority_tasks(["A", "B"])
if priority_tasks:
lines.append("**✅ Priority Tasks:**")
for task in priority_tasks[:5]: # Top 5
priority_str = f"({task.priority}) " if task.priority else ""
projects = " ".join([f"**+{p}**" for p in task.projects])
contexts = " ".join([f"*@{c}*" for c in task.contexts])
lines.append(f"{priority_str}{task.description} {projects} {contexts}")
if len(priority_tasks) > 5:
lines.append(f" ... and {len(priority_tasks) - 5} more priority tasks")
lines.append("")
# Due soon
due_soon = get_tasks_due_soon(3)
if due_soon:
lines.append("**⏳ Due Soon:**")
for task in due_soon[:3]: # Top 3
due_date = task.due_date.strftime("%Y-%m-%d") if task.due_date else "?"
days_until = (task.due_date - datetime.now()).days if task.due_date else 0
if days_until == 0:
urgency = "📍 **TODAY**"
elif days_until == 1:
urgency = "⚠️ Tomorrow"
else:
urgency = f"📅 {days_until} days"
lines.append(f"{task.description} - {urgency} (`{due_date}`)")
lines.append("")
# Waiting items
waiting = get_waiting_items()
if waiting:
lines.append("**⏸️ Waiting On:**")
for item in waiting[:3]: # Top 3
lines.append(f"{item}")
if len(waiting) > 3:
lines.append(f" ... and {len(waiting) - 3} more items")
lines.append("")
lines.append("Have a productive day! 🚀")
return "\n".join(lines)
def generate_evening_summary() -> str:
"""Generate evening summary message.
Returns:
Formatted summary message
"""
date_str = get_todays_date()
lines = [
f"**🌆 Evening Summary - {date_str}**\n",
]
# Completed today
completed = get_completed_today()
if completed:
lines.append(f"**✅ Completed Today ({len(completed)} tasks):**")
for task in completed[:5]: # Top 5
projects = " ".join([f"**+{p}**" for p in task.projects])
lines.append(f"{task.description} {projects}")
if len(completed) > 5:
lines.append(f" ... and {len(completed) - 5} more tasks")
lines.append("")
else:
lines.append("No tasks marked as complete today\n")
# Tomorrow's events
tomorrow_events = get_upcoming_events(1)
tomorrow_date = (datetime.now() + timedelta(days=1)).date()
tomorrow_events = [e for e in tomorrow_events if e.date.date() == tomorrow_date]
if tomorrow_events:
lines.append("**📅 Tomorrow's Schedule:**")
for event in tomorrow_events:
time_str = event.time.strftime("%H:%M") if event.time else "All day"
lines.append(f" • `{time_str}` {event.description}")
lines.append("")
else:
lines.append("📅 No events scheduled for tomorrow\n")
# Tasks to prepare
due_tomorrow = get_tasks_due_soon(1)
tomorrow_due = [t for t in due_tomorrow if t.due_date and t.due_date.date() == tomorrow_date]
if tomorrow_due:
lines.append("**📋 Tasks Due Tomorrow:**")
for task in tomorrow_due:
priority_str = f"({task.priority}) " if task.priority else ""
lines.append(f"{priority_str}{task.description}")
lines.append("")
# Priority tasks for tomorrow
priority_tasks = get_priority_tasks(["A"])
if priority_tasks:
lines.append("**⭐ Top Priorities for Tomorrow:**")
for task in priority_tasks[:3]:
projects = " ".join([f"**+{p}**" for p in task.projects])
lines.append(f"{task.description} {projects}")
lines.append("")
lines.append("**💭 Reflection Prompts:**")
lines.append(" • What went well today?")
lines.append(" • What could be improved?")
lines.append(" • Any blockers or concerns?")
lines.append("")
lines.append("Rest well! 😴")
return "\n".join(lines)
def check_deadlines() -> Dict[str, List[Task]]:
"""Check for upcoming deadlines.
Returns:
Dictionary with deadline categories
"""
try:
content = read_file("todo.txt")
tasks = TodoParser.parse_file(content)
active_tasks = [t for t in tasks if not t.completed and t.due_date]
now = datetime.now()
categories = {
"overdue": [],
"today": [],
"tomorrow": [],
"week": [],
}
for task in active_tasks:
if not task.due_date:
continue
days_until = (task.due_date - now).days
if days_until < 0:
categories["overdue"].append(task)
elif days_until == 0:
categories["today"].append(task)
elif days_until == 1:
categories["tomorrow"].append(task)
elif days_until <= 7:
categories["week"].append(task)
return categories
except FileNotFoundError:
return {"overdue": [], "today": [], "tomorrow": [], "week": []}
def generate_deadline_warnings() -> str:
"""Generate deadline warning message.
Returns:
Formatted warning message (empty string if no warnings)
"""
deadlines = check_deadlines()
# Only generate message if there are warnings
if not any(deadlines.values()):
return ""
lines = ["**⏰ Deadline Warnings**\n"]
# Overdue
if deadlines["overdue"]:
lines.append(f"**🔴 OVERDUE ({len(deadlines['overdue'])}):**")
for task in deadlines["overdue"][:5]:
due_date = task.due_date.strftime("%Y-%m-%d") if task.due_date else "?"
days_overdue = (datetime.now() - task.due_date).days if task.due_date else 0
priority_str = f"({task.priority}) " if task.priority else ""
lines.append(f"{priority_str}{task.description} - {days_overdue} days overdue (`{due_date}`)")
lines.append("")
# Today
if deadlines["today"]:
lines.append(f"**📍 DUE TODAY ({len(deadlines['today'])}):**")
for task in deadlines["today"]:
priority_str = f"({task.priority}) " if task.priority else ""
lines.append(f"{priority_str}{task.description}")
lines.append("")
# Tomorrow
if deadlines["tomorrow"]:
lines.append(f"**⚠️ DUE TOMORROW ({len(deadlines['tomorrow'])}):**")
for task in deadlines["tomorrow"]:
priority_str = f"({task.priority}) " if task.priority else ""
lines.append(f"{priority_str}{task.description}")
lines.append("")
# This week
if deadlines["week"]:
lines.append(f"**📅 Due This Week ({len(deadlines['week'])}):**")
for task in deadlines["week"][:3]:
due_date = task.due_date.strftime("%Y-%m-%d") if task.due_date else "?"
days_until = (task.due_date - datetime.now()).days if task.due_date else 0
lines.append(f"{task.description} - {days_until} days (`{due_date}`)")
if len(deadlines["week"]) > 3:
lines.append(f" ... and {len(deadlines['week']) - 3} more")
lines.append("")
return "\n".join(lines)

171
src/scheduler/jobs.py Normal file
View File

@@ -0,0 +1,171 @@
"""Scheduled job definitions and runners."""
import asyncio
import discord
from typing import Optional
from src.config import settings
from src.scheduler.briefings import (
generate_morning_briefing,
generate_evening_summary,
generate_deadline_warnings,
)
async def send_discord_message(message: str, channel_id: Optional[str] = None) -> None:
"""Send a message to Discord channel.
Args:
message: Message to send
channel_id: Optional channel ID (uses default from settings if not provided)
"""
if not channel_id:
channel_id = settings.discord_channel_id
# Create Discord client
intents = discord.Intents.default()
client = discord.Client(intents=intents)
@client.event
async def on_ready() -> None:
"""Send message when client is ready."""
try:
channel = client.get_channel(int(channel_id))
if not channel:
print(f"❌ Channel {channel_id} not found")
await client.close()
return
# Split message if too long
if len(message) <= 2000:
await channel.send(message)
else:
# Split into chunks
chunks = [message[i:i+1900] for i in range(0, len(message), 1900)]
for chunk in chunks:
await channel.send(chunk)
print(f"✅ Message sent to channel {channel_id}")
except Exception as e:
print(f"❌ Error sending message: {e}")
finally:
await client.close()
# Run client
await client.start(settings.discord_bot_token)
def run_morning_briefing() -> None:
"""Run morning briefing job."""
print("🌅 Generating morning briefing...")
try:
briefing = generate_morning_briefing()
# Send to Discord
asyncio.run(send_discord_message(briefing))
print("✅ Morning briefing sent")
except Exception as e:
print(f"❌ Error generating morning briefing: {e}")
def run_evening_summary() -> None:
"""Run evening summary job."""
print("🌆 Generating evening summary...")
try:
summary = generate_evening_summary()
# Send to Discord
asyncio.run(send_discord_message(summary))
print("✅ Evening summary sent")
except Exception as e:
print(f"❌ Error generating evening summary: {e}")
def run_deadline_checker() -> None:
"""Run deadline checker job."""
print("⏰ Checking deadlines...")
try:
warnings = generate_deadline_warnings()
# Only send if there are warnings
if warnings:
asyncio.run(send_discord_message(warnings))
print("✅ Deadline warnings sent")
else:
print("✅ No deadline warnings needed")
except Exception as e:
print(f"❌ Error checking deadlines: {e}")
def run_git_sync() -> None:
"""Run git sync job."""
print("🔄 Syncing git repository...")
try:
from src.tools.git_ops import git_pull, git_push
# Pull latest changes
pull_result = git_pull()
print(f"Pull: {pull_result}")
# Push any local commits
push_result = git_push()
print(f"Push: {push_result}")
print("✅ Git sync complete")
except Exception as e:
print(f"❌ Error syncing git: {e}")
def run_waiting_followup() -> None:
"""Run waiting list follow-up job."""
print("⏸️ Checking waiting list...")
try:
from src.scheduler.briefings import get_waiting_items
waiting = get_waiting_items()
if waiting:
message = f"**⏸️ Waiting List Follow-up**\n\n"
message += f"You have {len(waiting)} item(s) in your waiting list:\n\n"
for item in waiting:
message += f"{item}\n"
message += "\nAny of these ready to move forward?"
asyncio.run(send_discord_message(message))
print("✅ Waiting list follow-up sent")
else:
print("✅ No items in waiting list")
except Exception as e:
print(f"❌ Error checking waiting list: {e}")
# Job registry for easy lookup
JOBS = {
"morning-briefing": run_morning_briefing,
"evening-summary": run_evening_summary,
"deadline-checker": run_deadline_checker,
"git-sync": run_git_sync,
"waiting-followup": run_waiting_followup,
}
def run_job(job_name: str) -> None:
"""Run a scheduled job by name.
Args:
job_name: Name of the job to run
"""
if job_name not in JOBS:
print(f"❌ Unknown job: {job_name}")
print(f"Available jobs: {', '.join(JOBS.keys())}")
return
print(f"▶️ Running job: {job_name}")
JOBS[job_name]()

0
src/tools/__init__.py Normal file
View File

226
src/tools/calendar_ops.py Normal file
View File

@@ -0,0 +1,226 @@
"""Calendar operation tools for the agent."""
from datetime import datetime, timedelta
from typing import List, Optional
from src.parsers.calendar_parser import CalendarParser, Event
from src.tools.file_ops import read_file, write_file
from src.agent.core import AgentTool
def get_calendar_events(
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> str:
"""Get calendar events within a date range.
Args:
start_date: Start date in YYYY-MM-DD format (defaults to today)
end_date: End date in YYYY-MM-DD format (defaults to 7 days from start)
Returns:
Formatted list of events
"""
try:
content = read_file("calendar.txt")
events = CalendarParser.parse_file(content)
# Parse dates
if start_date:
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
else:
start_dt = datetime.now()
if end_date:
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
# Set to end of day
end_dt = end_dt.replace(hour=23, minute=59, second=59)
else:
end_dt = start_dt + timedelta(days=7)
# Filter events
filtered = [e for e in events if start_dt <= e.datetime <= end_dt]
if not filtered:
return f"No events found between {start_dt.strftime('%Y-%m-%d')} and {end_dt.strftime('%Y-%m-%d')}"
# Format results
result_lines = [f"Found {len(filtered)} event(s):\n"]
current_date = None
for event in filtered:
event_date = event.date.strftime("%Y-%m-%d")
# Add date header if changed
if event_date != current_date:
result_lines.append(f"\n**{event_date}:**")
current_date = event_date
# Format time
if event.time:
time_str = event.time.strftime("%H:%M")
else:
time_str = "All day"
# Format contexts and projects
contexts_str = " ".join([f"@{c}" for c in event.contexts])
projects_str = " ".join([f"+{p}" for p in event.projects])
result_lines.append(
f"{time_str} - {event.description} {contexts_str} {projects_str}"
)
return "\n".join(result_lines)
except FileNotFoundError:
return "calendar.txt not found"
except Exception as e:
return f"Error reading calendar: {str(e)}"
def get_today_events() -> str:
"""Get today's calendar events.
Returns:
Formatted list of today's events
"""
today = datetime.now().strftime("%Y-%m-%d")
return get_calendar_events(start_date=today, end_date=today)
def add_calendar_event(
date: str,
description: str,
time: Optional[str] = None,
context: Optional[str] = None,
project: Optional[str] = None,
) -> str:
"""Add a new event to calendar.txt.
Args:
date: Event date in YYYY-MM-DD format
description: Event description
time: Event time in HH:MM format (None for all-day)
context: Context tag (without @ prefix)
project: Project tag (without + prefix)
Returns:
Success message
"""
try:
# Read current calendar
try:
content = read_file("calendar.txt")
except FileNotFoundError:
content = "# Calendar\n\n"
# Parse date
event_date = datetime.strptime(date, "%Y-%m-%d")
# Parse time if provided
event_time = None
if time:
try:
event_time = datetime.strptime(time, "%H:%M").time()
except ValueError:
return f"Error: Invalid time format. Use HH:MM (e.g., 14:30)"
# Format event
contexts = [context] if context else None
projects = [project] if project else None
formatted_event = CalendarParser.format_event(
date=event_date,
description=description,
time=event_time,
contexts=contexts,
projects=projects,
)
# Append to file
new_content = content.rstrip() + "\n" + formatted_event + "\n"
write_file("calendar.txt", new_content)
return f"✅ Added event to calendar.txt:\n{formatted_event}"
except ValueError as e:
return f"Error: Invalid date format. Use YYYY-MM-DD (e.g., 2026-02-15)"
except Exception as e:
return f"Error adding event: {str(e)}"
# Create AgentTool instances
GET_CALENDAR_EVENTS_TOOL = AgentTool(
name="get_calendar_events",
description="Get calendar events within a date range. Use this to check what's scheduled.",
input_schema={
"type": "object",
"properties": {
"start_date": {
"type": "string",
"description": "Start date in YYYY-MM-DD format (defaults to today)",
},
"end_date": {
"type": "string",
"description": "End date in YYYY-MM-DD format (defaults to 7 days from start)",
}
},
"required": [],
},
function=get_calendar_events,
)
GET_TODAY_EVENTS_TOOL = AgentTool(
name="get_today_events",
description="Get today's calendar events. Quick way to see what's scheduled for today.",
input_schema={
"type": "object",
"properties": {},
"required": [],
},
function=get_today_events,
)
ADD_CALENDAR_EVENT_TOOL = AgentTool(
name="add_calendar_event",
description="Add a new event to the calendar with optional time, context, and project.",
input_schema={
"type": "object",
"properties": {
"date": {
"type": "string",
"description": "Event date in YYYY-MM-DD format",
},
"description": {
"type": "string",
"description": "Event description",
},
"time": {
"type": "string",
"description": "Event time in HH:MM format (24-hour). Omit for all-day events.",
},
"context": {
"type": "string",
"description": "Context tag without @ prefix (e.g., 'telefon', 'personal')",
},
"project": {
"type": "string",
"description": "Project tag without + prefix",
}
},
"required": ["date", "description"],
},
function=add_calendar_event,
)
def get_calendar_tools() -> List[AgentTool]:
"""Get all calendar operation tools.
Returns:
List of calendar tools
"""
return [
GET_CALENDAR_EVENTS_TOOL,
GET_TODAY_EVENTS_TOOL,
ADD_CALENDAR_EVENT_TOOL,
]

248
src/tools/file_ops.py Normal file
View File

@@ -0,0 +1,248 @@
"""File operation tools for the agent."""
import os
from pathlib import Path
from typing import List, Dict, Any
from src.config import settings
from src.agent.core import AgentTool
def _validate_path(path: str) -> Path:
"""Validate that a path is within the myorg repository.
Args:
path: Relative or absolute path
Returns:
Validated absolute Path object
Raises:
ValueError: If path is outside myorg repository
"""
repo_path = Path(settings.myorg_repo_path).resolve()
# Convert to Path and resolve
if os.path.isabs(path):
full_path = Path(path).resolve()
else:
full_path = (repo_path / path).resolve()
# Check if path is within repository
try:
full_path.relative_to(repo_path)
except ValueError:
raise ValueError(f"Path {path} is outside myorg repository")
return full_path
def read_file(path: str) -> str:
"""Read a file from the myorg repository.
Args:
path: Relative path to the file (e.g., "todo.txt", "goals/q1-2026.md")
Returns:
File contents as string
Raises:
FileNotFoundError: If file doesn't exist
ValueError: If path is invalid
"""
file_path = _validate_path(path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {path}")
if not file_path.is_file():
raise ValueError(f"Path is not a file: {path}")
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
def write_file(path: str, content: str) -> str:
"""Write content to a file in the myorg repository.
This will overwrite the file if it exists.
Args:
path: Relative path to the file
content: Content to write
Returns:
Success message
Raises:
ValueError: If path is invalid
"""
file_path = _validate_path(path)
# Create parent directories if needed
file_path.parent.mkdir(parents=True, exist_ok=True)
# Backup existing file
if file_path.exists():
backup_path = file_path.with_suffix(file_path.suffix + '.backup')
with open(file_path, 'r', encoding='utf-8') as f:
backup_content = f.read()
with open(backup_path, 'w', encoding='utf-8') as f:
f.write(backup_content)
# Write new content
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
return f"Successfully wrote {len(content)} characters to {path}"
def append_to_file(path: str, content: str) -> str:
"""Append content to a file in the myorg repository.
Args:
path: Relative path to the file
content: Content to append
Returns:
Success message
Raises:
ValueError: If path is invalid
FileNotFoundError: If file doesn't exist
"""
file_path = _validate_path(path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {path}. Use write_file to create it.")
with open(file_path, 'a', encoding='utf-8') as f:
f.write(content)
return f"Successfully appended {len(content)} characters to {path}"
def list_files(directory: str = "") -> str:
"""List files in a directory of the myorg repository.
Args:
directory: Relative path to directory (empty string for root)
Returns:
Formatted list of files and directories
Raises:
ValueError: If path is invalid
FileNotFoundError: If directory doesn't exist
"""
dir_path = _validate_path(directory if directory else ".")
if not dir_path.exists():
raise FileNotFoundError(f"Directory not found: {directory}")
if not dir_path.is_dir():
raise ValueError(f"Path is not a directory: {directory}")
items = []
for item in sorted(dir_path.iterdir()):
if item.name.startswith('.'):
continue # Skip hidden files
if item.is_dir():
items.append(f"📁 {item.name}/")
else:
size = item.stat().st_size
items.append(f"📄 {item.name} ({size} bytes)")
if not items:
return f"Directory {directory or 'root'} is empty"
return f"Contents of {directory or 'root'}:\n" + "\n".join(items)
# Create AgentTool instances for each function
READ_FILE_TOOL = AgentTool(
name="read_file",
description="Read a file from the myorg repository. Use this to check current content before making changes.",
input_schema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Relative path to the file (e.g., 'todo.txt', 'goals/q1-2026.md')",
}
},
"required": ["path"],
},
function=read_file,
)
WRITE_FILE_TOOL = AgentTool(
name="write_file",
description="Write content to a file in the myorg repository. This overwrites the file if it exists. Always read the file first to check current content.",
input_schema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Relative path to the file",
},
"content": {
"type": "string",
"description": "Content to write to the file",
}
},
"required": ["path", "content"],
},
function=write_file,
)
APPEND_TO_FILE_TOOL = AgentTool(
name="append_to_file",
description="Append content to an existing file in the myorg repository. Use this for adding entries to logs or working memory.",
input_schema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Relative path to the file",
},
"content": {
"type": "string",
"description": "Content to append",
}
},
"required": ["path", "content"],
},
function=append_to_file,
)
LIST_FILES_TOOL = AgentTool(
name="list_files",
description="List files and directories in the myorg repository. Use this to explore the repository structure.",
input_schema={
"type": "object",
"properties": {
"directory": {
"type": "string",
"description": "Relative path to directory (empty string for root)",
"default": "",
}
},
"required": [],
},
function=list_files,
)
def get_file_operation_tools() -> List[AgentTool]:
"""Get all file operation tools.
Returns:
List of file operation tools
"""
return [
READ_FILE_TOOL,
WRITE_FILE_TOOL,
APPEND_TO_FILE_TOOL,
LIST_FILES_TOOL,
]

270
src/tools/git_ops.py Normal file
View File

@@ -0,0 +1,270 @@
"""Git operation tools for the agent."""
from typing import List
from pathlib import Path
from git import Repo, GitCommandError
from src.config import settings
from src.agent.core import AgentTool
def _get_repo() -> Repo:
"""Get the git repository instance.
Returns:
Git Repo object
Raises:
ValueError: If repository doesn't exist
"""
repo_path = Path(settings.myorg_repo_path)
if not repo_path.exists():
raise ValueError(f"Repository path doesn't exist: {repo_path}")
try:
return Repo(repo_path)
except Exception as e:
raise ValueError(f"Not a git repository: {repo_path}. Error: {e}")
def git_status() -> str:
"""Check the status of the git repository.
Returns:
Human-readable status message
"""
try:
repo = _get_repo()
# Check for changes
changed_files = [item.a_path for item in repo.index.diff(None)]
staged_files = [item.a_path for item in repo.index.diff("HEAD")]
untracked_files = repo.untracked_files
status_lines = ["📊 Git Status:\n"]
if not (changed_files or staged_files or untracked_files):
status_lines.append("✅ Working directory clean - no changes")
else:
if staged_files:
status_lines.append(f"📝 Staged changes ({len(staged_files)} files):")
for file in staged_files:
status_lines.append(f" - {file}")
if changed_files:
status_lines.append(f"\n⚠️ Unstaged changes ({len(changed_files)} files):")
for file in changed_files:
status_lines.append(f" - {file}")
if untracked_files:
status_lines.append(f"\n❓ Untracked files ({len(untracked_files)} files):")
for file in untracked_files:
status_lines.append(f" - {file}")
# Check branch and remote status
try:
branch = repo.active_branch.name
status_lines.append(f"\n🌿 Current branch: {branch}")
# Check if ahead/behind remote
try:
tracking_branch = repo.active_branch.tracking_branch()
if tracking_branch:
ahead = len(list(repo.iter_commits(f'{tracking_branch}..{branch}')))
behind = len(list(repo.iter_commits(f'{branch}..{tracking_branch}')))
if ahead > 0:
status_lines.append(f"⬆️ Ahead of remote by {ahead} commit(s)")
if behind > 0:
status_lines.append(f"⬇️ Behind remote by {behind} commit(s)")
if ahead == 0 and behind == 0:
status_lines.append("✅ Up to date with remote")
except Exception:
pass # No tracking branch
except Exception:
status_lines.append("\n⚠️ Not on any branch (detached HEAD)")
return "\n".join(status_lines)
except Exception as e:
return f"Error checking git status: {str(e)}"
def git_commit(message: str) -> str:
"""Commit changes to the git repository.
This will stage all changes and create a commit.
Args:
message: Commit message
Returns:
Success message with commit hash
"""
try:
repo = _get_repo()
# Check if there are any changes
if not repo.is_dirty(untracked_files=True):
return "Nothing to commit - working directory clean"
# Stage all changes
repo.git.add(A=True)
# Commit
commit = repo.index.commit(message)
# Count files changed
stats = commit.stats.total
files_changed = stats['files']
insertions = stats['insertions']
deletions = stats['deletions']
return (
f"✅ Committed changes:\n"
f"Commit: {commit.hexsha[:7]}\n"
f"Message: {message}\n"
f"Files: {files_changed} changed, {insertions} insertions(+), {deletions} deletions(-)"
)
except Exception as e:
return f"Error committing changes: {str(e)}"
def git_pull() -> str:
"""Pull changes from the remote repository.
Returns:
Success message
"""
try:
repo = _get_repo()
# Check if working directory is clean
if repo.is_dirty(untracked_files=True):
return (
"⚠️ Cannot pull: working directory has uncommitted changes.\n"
"Please commit or stash your changes first."
)
# Pull from remote
origin = repo.remote('origin')
pull_info = origin.pull()
if not pull_info:
return "✅ Already up to date"
info = pull_info[0]
if info.flags & info.HEAD_UPTODATE:
return "✅ Already up to date"
return f"✅ Pulled changes successfully\n{info.note}"
except GitCommandError as e:
return f"Error pulling changes: {str(e)}"
except Exception as e:
return f"Error: {str(e)}"
def git_push() -> str:
"""Push commits to the remote repository.
Returns:
Success message
"""
try:
repo = _get_repo()
# Check if there are unpushed commits
try:
branch = repo.active_branch.name
tracking_branch = repo.active_branch.tracking_branch()
if tracking_branch:
ahead = len(list(repo.iter_commits(f'{tracking_branch}..{branch}')))
if ahead == 0:
return "Nothing to push - already up to date"
except Exception:
pass # Continue with push anyway
# Push to remote
origin = repo.remote('origin')
push_info = origin.push()
if not push_info:
return "✅ Push completed"
info = push_info[0]
if info.flags & info.ERROR:
return f"❌ Error pushing: {info.summary}"
return f"✅ Pushed commits successfully\n{info.summary}"
except GitCommandError as e:
return f"Error pushing changes: {str(e)}"
except Exception as e:
return f"Error: {str(e)}"
# Create AgentTool instances
GIT_STATUS_TOOL = AgentTool(
name="git_status",
description="Check the status of the git repository. Shows changed files, staged changes, and branch status.",
input_schema={
"type": "object",
"properties": {},
"required": [],
},
function=git_status,
)
GIT_COMMIT_TOOL = AgentTool(
name="git_commit",
description="Commit all changes to the git repository with a descriptive message. This stages and commits all modified files.",
input_schema={
"type": "object",
"properties": {
"message": {
"type": "string",
"description": "Commit message describing the changes",
}
},
"required": ["message"],
},
function=git_commit,
)
GIT_PULL_TOOL = AgentTool(
name="git_pull",
description="Pull latest changes from the remote repository. Use this to sync with remote before making local changes.",
input_schema={
"type": "object",
"properties": {},
"required": [],
},
function=git_pull,
)
GIT_PUSH_TOOL = AgentTool(
name="git_push",
description="Push local commits to the remote repository. Use this after committing changes to sync with remote.",
input_schema={
"type": "object",
"properties": {},
"required": [],
},
function=git_push,
)
def get_git_tools() -> List[AgentTool]:
"""Get all git operation tools.
Returns:
List of git tools
"""
return [
GIT_STATUS_TOOL,
GIT_COMMIT_TOOL,
GIT_PULL_TOOL,
GIT_PUSH_TOOL,
]

346
src/tools/task_ops.py Normal file
View File

@@ -0,0 +1,346 @@
"""Task management tools for the agent."""
from datetime import datetime
from typing import List, Optional, Dict, Any
from src.parsers.todo_parser import TodoParser, Task
from src.tools.file_ops import read_file, write_file
from src.agent.core import AgentTool
def add_task(
description: str,
project: Optional[str] = None,
context: Optional[str] = None,
priority: Optional[str] = None,
due_date: Optional[str] = None,
) -> str:
"""Add a new task to todo.txt.
Args:
description: Task description
project: Project tag (without + prefix)
context: Context tag (without @ prefix)
priority: Priority letter (A-Z)
due_date: Due date in YYYY-MM-DD format
Returns:
Success message with the formatted task
"""
# Read current todo.txt
try:
content = read_file("todo.txt")
except FileNotFoundError:
content = "# Todo List\n\n"
# Format the new task
projects = [project] if project else None
contexts = [context] if context else None
metadata = {"due": due_date} if due_date else None
# Parse due date if provided
due_datetime = None
if due_date:
try:
due_datetime = datetime.strptime(due_date, "%Y-%m-%d")
except ValueError:
return f"Error: Invalid due date format. Use YYYY-MM-DD"
formatted_task = TodoParser.format_task(
description=description,
priority=priority,
creation_date=datetime.now(),
projects=projects,
contexts=contexts,
metadata=metadata,
)
# Append to file
new_content = content.rstrip() + "\n" + formatted_task + "\n"
write_file("todo.txt", new_content)
return f"✅ Added task to todo.txt:\n{formatted_task}"
def complete_task(task_description: str) -> str:
"""Mark a task as complete in todo.txt.
Finds the task by description and marks it complete with timestamp.
Args:
task_description: Description or partial description of the task
Returns:
Success message
"""
# Read current todo.txt
try:
content = read_file("todo.txt")
except FileNotFoundError:
return "Error: todo.txt not found"
# Parse tasks
tasks = TodoParser.parse_file(content)
# Find matching task
matching_tasks = [
t for t in tasks
if task_description.lower() in t.description.lower() and not t.completed
]
if not matching_tasks:
return f"Error: No active task found matching '{task_description}'"
if len(matching_tasks) > 1:
task_list = "\n".join([f"- {t.description}" for t in matching_tasks])
return f"Error: Multiple tasks match '{task_description}'. Please be more specific:\n{task_list}"
# Mark the task as complete
task = matching_tasks[0]
completed_task = TodoParser.format_task(
description=task.description,
priority=task.priority,
creation_date=task.creation_date,
projects=task.projects,
contexts=task.contexts,
metadata=task.metadata,
completed=True,
completion_date=datetime.now(),
)
# Replace in content
lines = content.split('\n')
lines[task.line_number - 1] = completed_task
new_content = '\n'.join(lines)
write_file("todo.txt", new_content)
return f"✅ Marked task as complete:\n{completed_task}"
def search_tasks(
project: Optional[str] = None,
context: Optional[str] = None,
priority: Optional[str] = None,
completed: Optional[bool] = None,
has_due_date: Optional[bool] = None,
) -> str:
"""Search and filter tasks from todo.txt.
Args:
project: Filter by project tag (without + prefix)
context: Filter by context tag (without @ prefix)
priority: Filter by priority letter
completed: Filter by completion status (true/false)
has_due_date: Filter by presence of due date
Returns:
Formatted list of matching tasks
"""
# Read current todo.txt
try:
content = read_file("todo.txt")
except FileNotFoundError:
return "No tasks found (todo.txt doesn't exist)"
# Parse tasks
tasks = TodoParser.parse_file(content)
# Apply filters
filtered = TodoParser.filter_tasks(
tasks,
project=project,
context=context,
priority=priority,
completed=completed,
has_due_date=has_due_date,
)
if not filtered:
filters_desc = []
if project:
filters_desc.append(f"project={project}")
if context:
filters_desc.append(f"context={context}")
if priority:
filters_desc.append(f"priority={priority}")
if completed is not None:
filters_desc.append(f"completed={completed}")
if has_due_date is not None:
filters_desc.append(f"has_due_date={has_due_date}")
filters_str = ", ".join(filters_desc) if filters_desc else "no filters"
return f"No tasks found matching filters: {filters_str}"
# Format results
result_lines = [f"Found {len(filtered)} task(s):\n"]
for task in filtered:
status = "" if task.completed else ""
priority_str = f"({task.priority}) " if task.priority else ""
projects_str = " ".join([f"+{p}" for p in task.projects])
contexts_str = " ".join([f"@{c}" for c in task.contexts])
due_str = f" 📅 due:{task.metadata.get('due')}" if "due" in task.metadata else ""
result_lines.append(
f"{status} {priority_str}{task.description} {projects_str} {contexts_str}{due_str}"
)
return "\n".join(result_lines)
def get_tasks_by_priority() -> str:
"""Get active tasks grouped by priority.
Returns:
Formatted list of tasks by priority
"""
# Read current todo.txt
try:
content = read_file("todo.txt")
except FileNotFoundError:
return "No tasks found (todo.txt doesn't exist)"
# Parse tasks
tasks = TodoParser.parse_file(content)
active_tasks = [t for t in tasks if not t.completed]
if not active_tasks:
return "No active tasks"
# Group by priority
priority_groups: Dict[str, List[Task]] = {}
for task in active_tasks:
priority = task.priority or "None"
if priority not in priority_groups:
priority_groups[priority] = []
priority_groups[priority].append(task)
# Format results
result_lines = [f"Active tasks by priority ({len(active_tasks)} total):\n"]
# Sort priorities (A, B, C, ..., None)
sorted_priorities = sorted(
priority_groups.keys(),
key=lambda p: (p == "None", p)
)
for priority in sorted_priorities:
tasks_in_priority = priority_groups[priority]
result_lines.append(f"\n**Priority {priority}:** ({len(tasks_in_priority)} tasks)")
for task in tasks_in_priority:
projects_str = " ".join([f"+{p}" for p in task.projects])
contexts_str = " ".join([f"@{c}" for c in task.contexts])
due_str = f" 📅 {task.metadata.get('due')}" if "due" in task.metadata else ""
result_lines.append(
f" - {task.description} {projects_str} {contexts_str}{due_str}"
)
return "\n".join(result_lines)
# Create AgentTool instances
ADD_TASK_TOOL = AgentTool(
name="add_task",
description="Add a new task to todo.txt with proper formatting. The task will be added with a creation date.",
input_schema={
"type": "object",
"properties": {
"description": {
"type": "string",
"description": "Task description (the main text)",
},
"project": {
"type": "string",
"description": "Project tag without + prefix (e.g., 'myorg-assistant', 'blog-post')",
},
"context": {
"type": "string",
"description": "Context tag without @ prefix (e.g., 'computer-deep', 'telefon', 'recados')",
},
"priority": {
"type": "string",
"description": "Priority letter A-Z (A=highest)",
},
"due_date": {
"type": "string",
"description": "Due date in YYYY-MM-DD format",
}
},
"required": ["description"],
},
function=add_task,
)
COMPLETE_TASK_TOOL = AgentTool(
name="complete_task",
description="Mark a task as complete in todo.txt. Finds the task by description and adds completion timestamp.",
input_schema={
"type": "object",
"properties": {
"task_description": {
"type": "string",
"description": "Description or partial description of the task to complete",
}
},
"required": ["task_description"],
},
function=complete_task,
)
SEARCH_TASKS_TOOL = AgentTool(
name="search_tasks",
description="Search and filter tasks from todo.txt by various criteria. Returns matching tasks with their details.",
input_schema={
"type": "object",
"properties": {
"project": {
"type": "string",
"description": "Filter by project tag without + prefix",
},
"context": {
"type": "string",
"description": "Filter by context tag without @ prefix",
},
"priority": {
"type": "string",
"description": "Filter by priority letter (A-Z)",
},
"completed": {
"type": "boolean",
"description": "Filter by completion status (true for completed, false for active)",
},
"has_due_date": {
"type": "boolean",
"description": "Filter by presence of due date",
}
},
"required": [],
},
function=search_tasks,
)
GET_TASKS_BY_PRIORITY_TOOL = AgentTool(
name="get_tasks_by_priority",
description="Get all active tasks grouped by priority. Useful for daily planning and seeing what's most important.",
input_schema={
"type": "object",
"properties": {},
"required": [],
},
function=get_tasks_by_priority,
)
def get_task_management_tools() -> List[AgentTool]:
"""Get all task management tools.
Returns:
List of task management tools
"""
return [
ADD_TASK_TOOL,
COMPLETE_TASK_TOOL,
SEARCH_TASKS_TOOL,
GET_TASKS_BY_PRIORITY_TOOL,
]

0
src/utils/__init__.py Normal file
View File

170
src/utils/context.py Normal file
View File

@@ -0,0 +1,170 @@
"""Context inference utilities."""
from datetime import datetime, time
from typing import List, Optional
from src.parsers.calendar_parser import Event
def infer_context_from_time(current_time: Optional[datetime] = None) -> List[str]:
"""Infer likely contexts based on time of day.
Args:
current_time: Time to check (defaults to now)
Returns:
List of likely contexts
"""
if current_time is None:
current_time = datetime.now()
hour = current_time.hour
day_of_week = current_time.weekday() # 0=Monday, 6=Sunday
contexts = []
# Work hours (Monday-Friday, 9 AM - 6 PM)
if day_of_week < 5 and 9 <= hour < 18:
contexts.append("work")
contexts.append("computer-deep")
# Evening (6 PM - 11 PM)
elif 18 <= hour < 23:
contexts.append("personal")
contexts.append("computer-light")
# Early morning (6 AM - 9 AM)
elif 6 <= hour < 9:
contexts.append("personal")
contexts.append("computer-light")
# Weekend
if day_of_week >= 5: # Saturday or Sunday
contexts.append("personal")
contexts.append("bcn") # Likely at home location
# Lunch time (12 PM - 2 PM)
if 12 <= hour < 14:
contexts.append("recados") # Errands during lunch
return contexts
def infer_context_from_calendar(events: List[Event]) -> List[str]:
"""Infer contexts from current/upcoming calendar events.
Args:
events: List of events to analyze
Returns:
List of inferred contexts
"""
if not events:
return []
contexts = []
# Look at contexts in events
for event in events:
contexts.extend(event.contexts)
# Deduplicate
return list(set(contexts))
def infer_current_context(events: Optional[List[Event]] = None) -> List[str]:
"""Infer current context from time and calendar.
Args:
events: Optional list of current/upcoming events
Returns:
List of likely contexts (ordered by relevance)
"""
# Start with time-based inference
contexts = infer_context_from_time()
# Add calendar-based inference
if events:
calendar_contexts = infer_context_from_calendar(events)
# Calendar contexts have higher priority
contexts = calendar_contexts + [c for c in contexts if c not in calendar_contexts]
return contexts
def suggest_tasks_for_context(
contexts: List[str],
time_available: Optional[int] = None,
energy_level: Optional[str] = None,
) -> str:
"""Generate suggestions for task selection based on context.
Args:
contexts: Current contexts
time_available: Minutes available (optional)
energy_level: "high", "medium", "low" (optional)
Returns:
Suggestion text
"""
suggestions = []
# Context-based suggestions
if "computer-deep" in contexts:
suggestions.append("Focus on deep work tasks requiring concentration")
suggestions.append("Good time for coding, writing, or complex problem-solving")
if "computer-light" in contexts:
suggestions.append("Handle quick tasks: emails, reviews, light admin")
suggestions.append("Good for planning and organizing")
if "telefon" in contexts:
suggestions.append("Make those phone calls or join video meetings")
suggestions.append("Good for collaboration and communication")
if "recados" in contexts:
suggestions.append("Run errands, shopping, appointments")
suggestions.append("Handle location-specific tasks")
if "personal" in contexts:
suggestions.append("Personal tasks and family time")
suggestions.append("Home maintenance and personal projects")
# Time-based suggestions
if time_available:
if time_available < 15:
suggestions.append(f"You have {time_available} minutes - focus on quick wins")
elif time_available < 60:
suggestions.append(f"You have {time_available} minutes - good for medium-sized tasks")
else:
hours = time_available / 60
suggestions.append(f"You have {hours:.1f} hours - tackle larger projects")
# Energy-based suggestions
if energy_level:
if energy_level == "high":
suggestions.append("High energy - tackle your most challenging tasks")
elif energy_level == "medium":
suggestions.append("Medium energy - good balance of work and admin")
elif energy_level == "low":
suggestions.append("Low energy - focus on easier, routine tasks")
if not suggestions:
suggestions.append("Review your priority tasks and choose what fits best")
return "\n".join(f"{s}" for s in suggestions)
def format_context_info(contexts: List[str]) -> str:
"""Format context information for display.
Args:
contexts: List of contexts
Returns:
Formatted context string
"""
if not contexts:
return "No specific context detected"
context_str = " ".join([f"*@{c}*" for c in contexts])
return f"Current context: {context_str}"

0
src/web/__init__.py Normal file
View File

View File

View File

View File

@@ -0,0 +1,299 @@
/* MyOrg Assistant Styles */
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
:root {
--primary: #2563eb;
--secondary: #64748b;
--success: #10b981;
--warning: #f59e0b;
--danger: #ef4444;
--bg: #f8fafc;
--surface: #ffffff;
--text: #1e293b;
--text-muted: #64748b;
--border: #e2e8f0;
}
body {
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif;
background: var(--bg);
color: var(--text);
line-height: 1.6;
}
/* Navigation */
.navbar {
background: var(--surface);
border-bottom: 1px solid var(--border);
padding: 1rem 2rem;
display: flex;
justify-content: space-between;
align-items: center;
}
.nav-brand h1 {
font-size: 1.5rem;
color: var(--primary);
}
.nav-links {
display: flex;
gap: 1.5rem;
}
.nav-links a {
text-decoration: none;
color: var(--text-muted);
font-weight: 500;
transition: color 0.2s;
}
.nav-links a:hover,
.nav-links a.active {
color: var(--primary);
}
/* Container */
.container {
max-width: 1200px;
margin: 2rem auto;
padding: 0 2rem;
}
/* Dashboard */
.dashboard-header {
margin-bottom: 2rem;
}
.date {
color: var(--text-muted);
font-size: 0.95rem;
}
.stats-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 1rem;
margin-bottom: 2rem;
}
.stat-card {
background: var(--surface);
padding: 1.5rem;
border-radius: 8px;
border: 1px solid var(--border);
}
.stat-value {
font-size: 2rem;
font-weight: bold;
color: var(--primary);
}
.stat-label {
color: var(--text-muted);
font-size: 0.9rem;
margin-top: 0.5rem;
}
.dashboard-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(300px, 1fr));
gap: 1.5rem;
}
.dashboard-section {
background: var(--surface);
padding: 1.5rem;
border-radius: 8px;
border: 1px solid var(--border);
}
.dashboard-section h3 {
margin-bottom: 1rem;
color: var(--text);
}
.empty-state {
color: var(--text-muted);
font-style: italic;
}
/* Task Items */
.task-list, .event-list, .project-list {
display: flex;
flex-direction: column;
gap: 0.75rem;
}
.task-item, .event-item, .project-item {
padding: 0.75rem;
background: var(--bg);
border-radius: 4px;
display: flex;
align-items: center;
gap: 0.5rem;
flex-wrap: wrap;
}
.priority-badge {
background: var(--warning);
color: white;
padding: 0.25rem 0.5rem;
border-radius: 4px;
font-size: 0.85rem;
font-weight: bold;
}
.priority-A { background: var(--danger); }
.priority-B { background: var(--warning); }
.priority-C { background: var(--secondary); }
.tag {
background: var(--primary);
color: white;
padding: 0.25rem 0.5rem;
border-radius: 4px;
font-size: 0.85rem;
}
.tag.context {
background: var(--success);
}
.tag.project {
background: var(--primary);
}
.due-date {
color: var(--warning);
font-size: 0.9rem;
margin-left: auto;
}
.event-time {
font-weight: 600;
color: var(--primary);
min-width: 60px;
}
.project-tag {
font-weight: 600;
color: var(--primary);
}
/* Chat */
.chat-container {
max-width: 800px;
margin: 0 auto;
}
.chat-messages {
background: var(--surface);
border: 1px solid var(--border);
border-radius: 8px;
padding: 1rem;
height: 500px;
overflow-y: auto;
margin-bottom: 1rem;
}
.message {
margin-bottom: 1rem;
padding: 0.75rem;
border-radius: 4px;
}
.message.assistant {
background: var(--bg);
}
.message.user {
background: var(--primary);
color: white;
margin-left: 20%;
}
.chat-form {
display: flex;
gap: 0.5rem;
margin-bottom: 1rem;
}
.chat-form input {
flex: 1;
padding: 0.75rem;
border: 1px solid var(--border);
border-radius: 4px;
font-size: 1rem;
}
.chat-form button {
padding: 0.75rem 1.5rem;
background: var(--primary);
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
font-weight: 600;
}
.chat-form button:hover {
opacity: 0.9;
}
/* Buttons */
.btn-link {
color: var(--primary);
text-decoration: none;
font-size: 0.9rem;
margin-top: 0.5rem;
display: inline-block;
}
.btn-secondary {
background: var(--secondary);
color: white;
padding: 0.5rem 1rem;
border: none;
border-radius: 4px;
cursor: pointer;
}
.btn-secondary:hover {
opacity: 0.9;
}
/* Footer */
footer {
text-align: center;
padding: 2rem;
color: var(--text-muted);
font-size: 0.9rem;
}
/* Responsive */
@media (max-width: 768px) {
.navbar {
flex-direction: column;
gap: 1rem;
}
.nav-links {
flex-wrap: wrap;
justify-content: center;
}
.dashboard-grid {
grid-template-columns: 1fr;
}
.stats-grid {
grid-template-columns: repeat(2, 1fr);
}
}

View File

View File

View File

@@ -0,0 +1,35 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{% block title %}MyOrg Assistant{% endblock %}</title>
<link rel="stylesheet" href="/static/css/style.css">
<script src="https://unpkg.com/htmx.org@1.9.10"></script>
{% block extra_head %}{% endblock %}
</head>
<body>
<nav class="navbar">
<div class="nav-brand">
<h1>🤖 MyOrg Assistant</h1>
</div>
<div class="nav-links">
<a href="/dashboard" class="{% if page == 'dashboard' %}active{% endif %}">📊 Dashboard</a>
<a href="/chat" class="{% if page == 'chat' %}active{% endif %}">💬 Chat</a>
<a href="/tasks" class="{% if page == 'tasks' %}active{% endif %}">✅ Tasks</a>
<a href="/calendar" class="{% if page == 'calendar' %}active{% endif %}">📅 Calendar</a>
<a href="/projects" class="{% if page == 'projects' %}active{% endif %}">📂 Projects</a>
</div>
</nav>
<main class="container">
{% block content %}{% endblock %}
</main>
<footer>
<p>MyOrg Assistant - Powered by Claude Sonnet 4.5</p>
</footer>
{% block extra_scripts %}{% endblock %}
</body>
</html>

View File

@@ -0,0 +1,23 @@
{% extends "base.html" %}
{% block title %}Calendar - MyOrg Assistant{% endblock %}
{% block content %}
<h2>📅 Calendar</h2>
<h3>Today - {{ today }}</h3>
<div class="event-list">
{% for event in today_events %}
<div class="event-item">
<span class="event-time">{% if event.time %}{{ event.time.strftime('%H:%M') }}{% else %}All day{% endif %}</span>
{{ event.description }}
</div>
{% endfor %}
</div>
<h3>Upcoming (Next 7 Days)</h3>
<div class="event-list">
{% for event in upcoming_events %}
<div class="event-item">
<span>{{ event.date.strftime('%Y-%m-%d') }} {% if event.time %}{{ event.time.strftime('%H:%M') }}{% endif %}</span>
{{ event.description }}
</div>
{% endfor %}
</div>
{% endblock %}

View File

@@ -0,0 +1,32 @@
{% extends "base.html" %}
{% block title %}Chat - MyOrg Assistant{% endblock %}
{% block content %}
<div class="chat-container">
<h2>💬 Chat with Assistant</h2>
<div class="chat-messages" id="messages">
<div class="message assistant">
<strong>Assistant:</strong> Hi! I'm your MyOrg assistant. Ask me anything about your tasks, calendar, or projects!
</div>
</div>
<form class="chat-form" hx-post="/api/chat" hx-target="#messages" hx-swap="beforeend" hx-on::after-request="this.reset()">
<input type="text" name="message" placeholder="Type your message..." required autofocus>
<button type="submit">Send</button>
</form>
<button hx-post="/api/chat/reset" hx-swap="none" class="btn-secondary">Clear History</button>
</div>
{% endblock %}
{% block extra_scripts %}
<script>
// Auto-scroll to bottom when new messages arrive
htmx.on('htmx:afterSwap', function(evt) {
var messages = document.getElementById('messages');
messages.scrollTop = messages.scrollHeight;
});
</script>
{% endblock %}

View File

@@ -0,0 +1,111 @@
{% extends "base.html" %}
{% block title %}Dashboard - MyOrg Assistant{% endblock %}
{% block content %}
<div class="dashboard">
<div class="dashboard-header">
<h2>📊 Dashboard</h2>
<p class="date">{{ today }}</p>
</div>
<div class="stats-grid">
<div class="stat-card">
<div class="stat-value">{{ stats.events_today }}</div>
<div class="stat-label">Events Today</div>
</div>
<div class="stat-card">
<div class="stat-value">{{ stats.priority_tasks }}</div>
<div class="stat-label">Priority Tasks</div>
</div>
<div class="stat-card">
<div class="stat-value">{{ stats.due_soon }}</div>
<div class="stat-label">Due Soon</div>
</div>
<div class="stat-card">
<div class="stat-value">{{ stats.active_projects }}</div>
<div class="stat-label">Active Projects</div>
</div>
</div>
<div class="dashboard-grid">
<!-- Today's Events -->
<div class="dashboard-section">
<h3>📅 Today's Schedule</h3>
{% if events %}
<div class="event-list">
{% for event in events %}
<div class="event-item">
<span class="event-time">
{% if event.time %}{{ event.time.strftime('%H:%M') }}{% else %}All day{% endif %}
</span>
<span class="event-desc">{{ event.description }}</span>
{% for context in event.contexts %}
<span class="tag context">@{{ context }}</span>
{% endfor %}
</div>
{% endfor %}
</div>
{% else %}
<p class="empty-state">No events scheduled today</p>
{% endif %}
</div>
<!-- Priority Tasks -->
<div class="dashboard-section">
<h3>✅ Priority Tasks</h3>
{% if priority_tasks %}
<div class="task-list">
{% for task in priority_tasks %}
<div class="task-item">
<span class="priority-badge priority-{{ task.priority }}">{{ task.priority }}</span>
<span class="task-desc">{{ task.description }}</span>
{% for project in task.projects %}
<span class="tag project">+{{ project }}</span>
{% endfor %}
</div>
{% endfor %}
</div>
<a href="/tasks?priority=A" class="btn-link">View all →</a>
{% else %}
<p class="empty-state">No priority tasks</p>
{% endif %}
</div>
<!-- Due Soon -->
<div class="dashboard-section">
<h3>⏰ Due Soon</h3>
{% if due_soon %}
<div class="task-list">
{% for task in due_soon %}
<div class="task-item">
<span class="task-desc">{{ task.description }}</span>
<span class="due-date">📅 {{ task.due_date.strftime('%Y-%m-%d') }}</span>
</div>
{% endfor %}
</div>
{% else %}
<p class="empty-state">Nothing due soon</p>
{% endif %}
</div>
<!-- Active Projects -->
<div class="dashboard-section">
<h3>📂 Active Projects</h3>
{% if active_projects %}
<div class="project-list">
{% for project in active_projects %}
<div class="project-item">
<span class="project-tag">+{{ project.tag }}</span>
<span class="project-desc">{{ project.description }}</span>
</div>
{% endfor %}
</div>
<a href="/projects" class="btn-link">View all →</a>
{% else %}
<p class="empty-state">No active projects</p>
{% endif %}
</div>
</div>
</div>
{% endblock %}

View File

@@ -0,0 +1,16 @@
{% extends "base.html" %}
{% block title %}Projects - MyOrg Assistant{% endblock %}
{% block content %}
<h2>📂 Projects</h2>
<p>Active: {{ stats.active }} | Waiting: {{ stats.waiting }} | Someday: {{ stats.someday }}</p>
<div class="project-list">
{% for project in projects %}
<div class="project-item">
<strong>+{{ project.tag }}</strong> - {{ project.description }} [{{ project.status }}]
{% if project_task_counts.get(project.tag) %}
<span class="badge">{{ project_task_counts[project.tag] }} tasks</span>
{% endif %}
</div>
{% endfor %}
</div>
{% endblock %}

View File

@@ -0,0 +1,17 @@
{% extends "base.html" %}
{% block title %}Tasks - MyOrg Assistant{% endblock %}
{% block content %}
<h2>✅ Tasks</h2>
<p>Total: {{ total_tasks }} active, {{ completed_tasks }} completed</p>
<div class="task-list">
{% for task in tasks %}
<div class="task-item">
<span class="{% if task.priority %}priority-{{ task.priority }}{% endif %}">
{% if task.priority %}({{ task.priority }}){% endif %} {{ task.description }}
</span>
{% for p in task.projects %}<span class="tag">+{{ p }}</span>{% endfor %}
{% for c in task.contexts %}<span class="tag">@{{ c }}</span>{% endfor %}
</div>
{% endfor %}
</div>
{% endblock %}