API Reference¶
Comprehensive reference for Digital Memory Chest's internal APIs, database operations, and extension points.
Core API Architecture¶
graph TB
subgraph "API Layers"
REST[REST-like Interface]
BUSINESS[Business Logic Layer]
DATA[Data Access Layer]
STORAGE[Storage Layer]
end
subgraph "Core Managers"
CM[ChestManager]
AM[AssetManager]
SM[StoryManager]
CON[ContributionManager]
end
subgraph "AI Services"
TR[TranscriptionService]
IT[ImageTaggerService]
SG[StoryGeneratorService]
MP[MediaProcessorService]
end
REST --> BUSINESS
BUSINESS --> CM
BUSINESS --> AM
BUSINESS --> SM
BUSINESS --> CON
AM --> TR
AM --> IT
SM --> SG
AM --> MP
CM --> DATA
AM --> DATA
SM --> DATA
CON --> DATA
DATA --> STORAGE
style REST fill:#e3f2fd
style BUSINESS fill:#e8f5e8
style DATA fill:#f3e5f5
style STORAGE fill:#fff3e0
Database API (SQLModel)¶
Core Models¶
Chest Model¶
from sqlmodel import SQLModel, Field
from typing import Optional, List
from datetime import datetime, date
class ChestBase(SQLModel):
"""Base model for memory chests"""
person_name: str = Field(max_length=200)
person_description: str = Field(max_length=2000)
birth_date: Optional[date] = None
passing_date: Optional[date] = None
hero_image_path: Optional[str] = Field(default=None, max_length=500)
is_public: bool = Field(default=False)
allow_contributions: bool = Field(default=False)
class Chest(ChestBase, table=True):
"""Main chest table with relationships"""
id: Optional[int] = Field(default=None, primary_key=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
is_deleted: bool = Field(default=False)
share_token: Optional[str] = Field(default=None, unique=True)
# Relationships
assets: List["Asset"] = Relationship(back_populates="chest")
stories: List["Story"] = Relationship(back_populates="chest")
contributions: List["Contribution"] = Relationship(back_populates="chest")
Asset Model¶
class AssetBase(SQLModel):
"""Base model for digital assets (photos, videos, audio, documents)"""
filename: str = Field(max_length=255)
file_path: str = Field(max_length=1000)
file_type: str = Field(max_length=50) # image, video, audio, document
file_size: int
mime_type: str = Field(max_length=100)
# AI Processing Results
transcription: Optional[str] = None
ai_tags: Optional[str] = None # JSON string
processing_status: str = Field(default="pending") # pending, processing, complete, failed
# Metadata
date_taken: Optional[datetime] = None
location_data: Optional[str] = None # JSON string
thumbnail_path: Optional[str] = None
class Asset(AssetBase, table=True):
"""Asset table with relationships"""
id: Optional[int] = Field(default=None, primary_key=True)
chest_id: int = Field(foreign_key="chest.id")
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
is_deleted: bool = Field(default=False)
# Relationships
chest: Chest = Relationship(back_populates="assets")
Database Operations¶
ChestOperations¶
from src.db.operations import ChestOperations
from sqlmodel import Session
class ChestOperations:
"""Database operations for memory chests"""
@staticmethod
async def create_chest(session: Session, chest_data: ChestBase) -> Chest:
"""Create a new memory chest"""
chest = Chest.from_orm(chest_data)
chest.share_token = generate_share_token()
session.add(chest)
session.commit()
session.refresh(chest)
return chest
@staticmethod
async def get_chest_by_token(session: Session, share_token: str) -> Optional[Chest]:
"""Retrieve chest by share token"""
return session.query(Chest).filter(
Chest.share_token == share_token,
Chest.is_deleted == False
).first()
@staticmethod
async def update_chest(session: Session, chest_id: int, updates: dict) -> Optional[Chest]:
"""Update chest with new data"""
chest = session.get(Chest, chest_id)
if not chest or chest.is_deleted:
return None
for field, value in updates.items():
if hasattr(chest, field):
setattr(chest, field, value)
chest.updated_at = datetime.utcnow()
session.commit()
session.refresh(chest)
return chest
@staticmethod
async def soft_delete_chest(session: Session, chest_id: int) -> bool:
"""Soft delete a chest (mark as deleted)"""
chest = session.get(Chest, chest_id)
if not chest:
return False
chest.is_deleted = True
chest.updated_at = datetime.utcnow()
session.commit()
return True
AssetOperations¶
class AssetOperations:
"""Database operations for digital assets"""
@staticmethod
async def create_asset(
session: Session,
chest_id: int,
file_info: dict,
processing_status: str = "pending"
) -> Asset:
"""Create new asset record"""
asset = Asset(
chest_id=chest_id,
filename=file_info['filename'],
file_path=file_info['file_path'],
file_type=file_info['file_type'],
file_size=file_info['file_size'],
mime_type=file_info['mime_type'],
processing_status=processing_status
)
session.add(asset)
session.commit()
session.refresh(asset)
return asset
@staticmethod
async def update_processing_results(
session: Session,
asset_id: int,
transcription: Optional[str] = None,
ai_tags: Optional[List[str]] = None,
status: str = "complete"
) -> Optional[Asset]:
"""Update asset with AI processing results"""
asset = session.get(Asset, asset_id)
if not asset:
return None
if transcription:
asset.transcription = transcription
if ai_tags:
import json
asset.ai_tags = json.dumps(ai_tags)
asset.processing_status = status
asset.updated_at = datetime.utcnow()
session.commit()
session.refresh(asset)
return asset
@staticmethod
async def get_assets_by_chest(
session: Session,
chest_id: int,
file_type: Optional[str] = None
) -> List[Asset]:
"""Get all assets for a chest, optionally filtered by type"""
query = session.query(Asset).filter(
Asset.chest_id == chest_id,
Asset.is_deleted == False
)
if file_type:
query = query.filter(Asset.file_type == file_type)
return query.order_by(Asset.date_taken.desc(), Asset.created_at.desc()).all()
Business Logic API¶
ChestManager¶
from src.db.models import Chest, Asset
from src.db.operations import ChestOperations, AssetOperations
from src.ai.media_processor import MediaProcessor
class ChestManager:
"""High-level business logic for memory chest management"""
def __init__(self, session: Session):
self.session = session
self.media_processor = MediaProcessor()
async def create_memory_chest(
self,
person_name: str,
person_description: str,
birth_date: Optional[date] = None,
passing_date: Optional[date] = None,
hero_image: Optional[UploadedFile] = None
) -> Chest:
"""Create a complete memory chest with optional hero image"""
# Process hero image if provided
hero_image_path = None
if hero_image:
hero_image_path = await self._process_hero_image(hero_image)
# Create chest record
chest_data = ChestBase(
person_name=person_name,
person_description=person_description,
birth_date=birth_date,
passing_date=passing_date,
hero_image_path=hero_image_path
)
return await ChestOperations.create_chest(self.session, chest_data)
async def add_memory(
self,
chest_id: int,
uploaded_file: UploadedFile,
process_with_ai: bool = True
) -> Asset:
"""Add a new memory to the chest"""
# Save file and create asset record
file_info = await self._save_uploaded_file(uploaded_file)
asset = await AssetOperations.create_asset(
self.session,
chest_id,
file_info
)
# Queue for AI processing if enabled
if process_with_ai:
await self.media_processor.queue_for_processing(asset.id)
return asset
async def get_chest_summary(self, chest_id: int) -> dict:
"""Get comprehensive chest summary with statistics"""
chest = await ChestOperations.get_chest_by_id(self.session, chest_id)
if not chest:
raise ValueError(f"Chest {chest_id} not found")
assets = await AssetOperations.get_assets_by_chest(self.session, chest_id)
return {
"chest": chest,
"stats": {
"total_assets": len(assets),
"photos": len([a for a in assets if a.file_type == "image"]),
"videos": len([a for a in assets if a.file_type == "video"]),
"audio": len([a for a in assets if a.file_type == "audio"]),
"documents": len([a for a in assets if a.file_type == "document"]),
"processed": len([a for a in assets if a.processing_status == "complete"]),
"pending": len([a for a in assets if a.processing_status in ["pending", "processing"]])
},
"recent_assets": assets[:10] # Most recent 10
}
StoryManager¶
from src.ai.story_generation import StoryGenerator
from typing import Dict, Any
class StoryManager:
"""Manages AI story generation and management"""
def __init__(self, session: Session):
self.session = session
self.story_generator = StoryGenerator()
async def generate_story(
self,
chest_id: int,
style: str = "comprehensive",
include_timeline: bool = True,
include_themes: bool = True
) -> Story:
"""Generate AI story for a memory chest"""
# Get all processed assets
assets = await AssetOperations.get_assets_by_chest(self.session, chest_id)
processed_assets = [a for a in assets if a.processing_status == "complete"]
if not processed_assets:
raise ValueError("No processed memories available for story generation")
# Prepare context for AI
context = await self._prepare_story_context(processed_assets)
# Generate story content
story_content = await self.story_generator.generate_narrative(
context=context,
style=style,
include_timeline=include_timeline,
include_themes=include_themes
)
# Save story to database
story = Story(
chest_id=chest_id,
title=story_content.get("title", f"The Story of {context['person_name']}"),
content=story_content["narrative"],
timeline_data=story_content.get("timeline"),
themes_data=story_content.get("themes"),
generation_metadata=story_content.get("metadata", {})
)
self.session.add(story)
self.session.commit()
self.session.refresh(story)
return story
async def _prepare_story_context(self, assets: List[Asset]) -> Dict[str, Any]:
"""Prepare context data for story generation"""
context = {
"person_name": assets[0].chest.person_name if assets else "Unknown",
"memories": []
}
for asset in assets:
memory = {
"type": asset.file_type,
"filename": asset.filename,
"date": asset.date_taken or asset.created_at,
"transcription": asset.transcription,
"tags": json.loads(asset.ai_tags) if asset.ai_tags else [],
"metadata": {}
}
# Add location data if available
if asset.location_data:
memory["location"] = json.loads(asset.location_data)
context["memories"].append(memory)
# Sort by date for chronological context
context["memories"].sort(key=lambda x: x["date"] or datetime.min)
return context
AI Services API¶
MediaProcessor¶
import asyncio
from typing import Optional, Dict, Any
from src.ai.transcription import TranscriptionService
from src.ai.image_tagging import ImageTaggerService
class MediaProcessor:
"""Coordinates all AI processing for uploaded media"""
def __init__(self):
self.transcription_service = TranscriptionService()
self.image_tagger = ImageTaggerService()
self.processing_queue = asyncio.Queue()
self.active_processors = {}
async def queue_for_processing(self, asset_id: int) -> None:
"""Add asset to processing queue"""
await self.processing_queue.put(asset_id)
async def process_asset(self, asset_id: int) -> Dict[str, Any]:
"""Process single asset with appropriate AI services"""
with get_db_session() as session:
asset = session.get(Asset, asset_id)
if not asset:
raise ValueError(f"Asset {asset_id} not found")
# Update status to processing
asset.processing_status = "processing"
session.commit()
results = {}
try:
if asset.file_type == "image":
results = await self._process_image(asset)
elif asset.file_type in ["video", "audio"]:
results = await self._process_audio_video(asset)
elif asset.file_type == "document":
results = await self._process_document(asset)
# Update asset with results
await AssetOperations.update_processing_results(
session,
asset_id,
transcription=results.get("transcription"),
ai_tags=results.get("tags"),
status="complete"
)
except Exception as e:
# Mark as failed and log error
asset.processing_status = "failed"
session.commit()
raise e
return results
async def _process_image(self, asset: Asset) -> Dict[str, Any]:
"""Process image with CLIP tagging"""
tags = await self.image_tagger.tag_image(asset.file_path)
return {
"tags": tags,
"processing_type": "image_classification"
}
async def _process_audio_video(self, asset: Asset) -> Dict[str, Any]:
"""Process audio/video with transcription"""
transcription = await self.transcription_service.transcribe_file(asset.file_path)
# Extract keywords from transcription
keywords = self._extract_keywords(transcription)
return {
"transcription": transcription["text"],
"tags": keywords,
"confidence": transcription.get("confidence", 0.0),
"processing_type": "transcription"
}
async def get_processing_status(self, asset_id: int) -> Dict[str, Any]:
"""Get current processing status for an asset"""
with get_db_session() as session:
asset = session.get(Asset, asset_id)
if not asset:
return {"status": "not_found"}
return {
"status": asset.processing_status,
"updated_at": asset.updated_at,
"has_transcription": bool(asset.transcription),
"has_tags": bool(asset.ai_tags)
}
TranscriptionService¶
from typing import Dict, Any, Optional
import whisper
from openai import OpenAI
class TranscriptionService:
"""Handles audio/video transcription with multiple backends"""
def __init__(self, config: Config):
self.config = config
self.local_model = None
self.openai_client = None
if config.USE_LOCAL_WHISPER:
self.local_model = whisper.load_model("base")
if config.OPENAI_API_KEY:
self.openai_client = OpenAI(api_key=config.OPENAI_API_KEY)
async def transcribe_file(
self,
file_path: str,
prefer_local: bool = True
) -> Dict[str, Any]:
"""Transcribe audio/video file using best available method"""
# Try local processing first if available and preferred
if prefer_local and self.local_model:
try:
return await self._transcribe_local(file_path)
except Exception as e:
logger.warning(f"Local transcription failed: {e}")
# Fall back to OpenAI API
if self.openai_client:
try:
return await self._transcribe_openai(file_path)
except Exception as e:
logger.warning(f"OpenAI transcription failed: {e}")
# Final fallback - basic metadata
return {
"text": f"Audio file: {Path(file_path).name}",
"confidence": 0.0,
"method": "fallback",
"error": "No transcription service available"
}
async def _transcribe_local(self, file_path: str) -> Dict[str, Any]:
"""Transcribe using local Whisper model"""
result = self.local_model.transcribe(file_path)
return {
"text": result["text"].strip(),
"confidence": 0.9, # Whisper doesn't provide confidence scores
"method": "local_whisper",
"segments": result.get("segments", []),
"language": result.get("language", "unknown")
}
async def _transcribe_openai(self, file_path: str) -> Dict[str, Any]:
"""Transcribe using OpenAI Whisper API"""
with open(file_path, "rb") as audio_file:
transcript = self.openai_client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format="verbose_json",
timestamp_granularities=["segment"]
)
return {
"text": transcript.text,
"confidence": 0.95, # OpenAI generally higher quality
"method": "openai_api",
"segments": getattr(transcript, 'segments', []),
"language": getattr(transcript, 'language', 'unknown')
}
Extension Points¶
Custom AI Providers¶
from abc import ABC, abstractmethod
class AIProvider(ABC):
"""Abstract base class for AI service providers"""
@abstractmethod
async def transcribe(self, audio_file: str) -> Dict[str, Any]:
"""Transcribe audio to text"""
pass
@abstractmethod
async def generate_tags(self, image_file: str) -> List[str]:
"""Generate descriptive tags for images"""
pass
@abstractmethod
async def generate_story(self, context: Dict[str, Any]) -> str:
"""Generate narrative story from context"""
pass
class CustomAIProvider(AIProvider):
"""Example custom AI provider implementation"""
async def transcribe(self, audio_file: str) -> Dict[str, Any]:
# Your custom transcription logic
pass
async def generate_tags(self, image_file: str) -> List[str]:
# Your custom image tagging logic
pass
async def generate_story(self, context: Dict[str, Any]) -> str:
# Your custom story generation logic
pass
# Register custom provider
from src.ai.providers import register_provider
register_provider("custom", CustomAIProvider())
Custom Storage Backends¶
from abc import ABC, abstractmethod
from typing import BinaryIO
class StorageBackend(ABC):
"""Abstract storage backend interface"""
@abstractmethod
async def save_file(
self,
file_content: BinaryIO,
file_path: str
) -> str:
"""Save file and return storage path"""
pass
@abstractmethod
async def get_file(self, file_path: str) -> BinaryIO:
"""Retrieve file by path"""
pass
@abstractmethod
async def delete_file(self, file_path: str) -> bool:
"""Delete file from storage"""
pass
@abstractmethod
async def get_file_url(self, file_path: str) -> str:
"""Get public URL for file"""
pass
class CustomStorageBackend(StorageBackend):
"""Example custom storage implementation"""
async def save_file(self, file_content: BinaryIO, file_path: str) -> str:
# Your custom storage logic (e.g., Azure Blob, Google Cloud)
pass
# Register storage backend
from src.storage import register_backend
register_backend("custom", CustomStorageBackend())
API Versioning
The API is currently in version 1.0. All endpoints and interfaces are considered stable, but we recommend using the provided SDK classes rather than direct database access.
Database Migrations
Always use the provided migration tools when modifying database schema. Direct SQL changes may cause compatibility issues.
Performance Tips
- Use async methods for all AI processing operations
- Implement connection pooling for database access
- Cache frequently accessed data using the built-in cache layer
- Monitor memory usage during bulk operations