Skip to content

API Reference

Comprehensive reference for Digital Memory Chest's internal APIs, database operations, and extension points.

Core API Architecture

graph TB
    subgraph "API Layers"
        REST[REST-like Interface]
        BUSINESS[Business Logic Layer]
        DATA[Data Access Layer]
        STORAGE[Storage Layer]
    end

    subgraph "Core Managers"
        CM[ChestManager]
        AM[AssetManager]
        SM[StoryManager]
        CON[ContributionManager]
    end

    subgraph "AI Services"
        TR[TranscriptionService]
        IT[ImageTaggerService]
        SG[StoryGeneratorService]
        MP[MediaProcessorService]
    end

    REST --> BUSINESS
    BUSINESS --> CM
    BUSINESS --> AM
    BUSINESS --> SM
    BUSINESS --> CON

    AM --> TR
    AM --> IT
    SM --> SG
    AM --> MP

    CM --> DATA
    AM --> DATA
    SM --> DATA
    CON --> DATA

    DATA --> STORAGE

    style REST fill:#e3f2fd
    style BUSINESS fill:#e8f5e8
    style DATA fill:#f3e5f5
    style STORAGE fill:#fff3e0

Database API (SQLModel)

Core Models

Chest Model

from sqlmodel import SQLModel, Field
from typing import Optional, List
from datetime import datetime, date

class ChestBase(SQLModel):
    """Base model for memory chests"""
    person_name: str = Field(max_length=200)
    person_description: str = Field(max_length=2000)
    birth_date: Optional[date] = None
    passing_date: Optional[date] = None
    hero_image_path: Optional[str] = Field(default=None, max_length=500)
    is_public: bool = Field(default=False)
    allow_contributions: bool = Field(default=False)

class Chest(ChestBase, table=True):
    """Main chest table with relationships"""
    id: Optional[int] = Field(default=None, primary_key=True)
    created_at: datetime = Field(default_factory=datetime.utcnow)
    updated_at: datetime = Field(default_factory=datetime.utcnow)
    is_deleted: bool = Field(default=False)
    share_token: Optional[str] = Field(default=None, unique=True)

    # Relationships
    assets: List["Asset"] = Relationship(back_populates="chest")
    stories: List["Story"] = Relationship(back_populates="chest")
    contributions: List["Contribution"] = Relationship(back_populates="chest")

Asset Model

class AssetBase(SQLModel):
    """Base model for digital assets (photos, videos, audio, documents)"""
    filename: str = Field(max_length=255)
    file_path: str = Field(max_length=1000)
    file_type: str = Field(max_length=50)  # image, video, audio, document
    file_size: int
    mime_type: str = Field(max_length=100)

    # AI Processing Results
    transcription: Optional[str] = None
    ai_tags: Optional[str] = None  # JSON string
    processing_status: str = Field(default="pending")  # pending, processing, complete, failed

    # Metadata
    date_taken: Optional[datetime] = None
    location_data: Optional[str] = None  # JSON string
    thumbnail_path: Optional[str] = None

class Asset(AssetBase, table=True):
    """Asset table with relationships"""
    id: Optional[int] = Field(default=None, primary_key=True)
    chest_id: int = Field(foreign_key="chest.id")
    created_at: datetime = Field(default_factory=datetime.utcnow)
    updated_at: datetime = Field(default_factory=datetime.utcnow)
    is_deleted: bool = Field(default=False)

    # Relationships
    chest: Chest = Relationship(back_populates="assets")

Database Operations

ChestOperations

from src.db.operations import ChestOperations
from sqlmodel import Session

class ChestOperations:
    """Database operations for memory chests"""

    @staticmethod
    async def create_chest(session: Session, chest_data: ChestBase) -> Chest:
        """Create a new memory chest"""
        chest = Chest.from_orm(chest_data)
        chest.share_token = generate_share_token()
        session.add(chest)
        session.commit()
        session.refresh(chest)
        return chest

    @staticmethod
    async def get_chest_by_token(session: Session, share_token: str) -> Optional[Chest]:
        """Retrieve chest by share token"""
        return session.query(Chest).filter(
            Chest.share_token == share_token,
            Chest.is_deleted == False
        ).first()

    @staticmethod
    async def update_chest(session: Session, chest_id: int, updates: dict) -> Optional[Chest]:
        """Update chest with new data"""
        chest = session.get(Chest, chest_id)
        if not chest or chest.is_deleted:
            return None

        for field, value in updates.items():
            if hasattr(chest, field):
                setattr(chest, field, value)

        chest.updated_at = datetime.utcnow()
        session.commit()
        session.refresh(chest)
        return chest

    @staticmethod
    async def soft_delete_chest(session: Session, chest_id: int) -> bool:
        """Soft delete a chest (mark as deleted)"""
        chest = session.get(Chest, chest_id)
        if not chest:
            return False

        chest.is_deleted = True
        chest.updated_at = datetime.utcnow()
        session.commit()
        return True

AssetOperations

class AssetOperations:
    """Database operations for digital assets"""

    @staticmethod
    async def create_asset(
        session: Session, 
        chest_id: int, 
        file_info: dict,
        processing_status: str = "pending"
    ) -> Asset:
        """Create new asset record"""
        asset = Asset(
            chest_id=chest_id,
            filename=file_info['filename'],
            file_path=file_info['file_path'],
            file_type=file_info['file_type'],
            file_size=file_info['file_size'],
            mime_type=file_info['mime_type'],
            processing_status=processing_status
        )

        session.add(asset)
        session.commit()
        session.refresh(asset)
        return asset

    @staticmethod
    async def update_processing_results(
        session: Session,
        asset_id: int,
        transcription: Optional[str] = None,
        ai_tags: Optional[List[str]] = None,
        status: str = "complete"
    ) -> Optional[Asset]:
        """Update asset with AI processing results"""
        asset = session.get(Asset, asset_id)
        if not asset:
            return None

        if transcription:
            asset.transcription = transcription

        if ai_tags:
            import json
            asset.ai_tags = json.dumps(ai_tags)

        asset.processing_status = status
        asset.updated_at = datetime.utcnow()

        session.commit()
        session.refresh(asset)
        return asset

    @staticmethod
    async def get_assets_by_chest(
        session: Session, 
        chest_id: int,
        file_type: Optional[str] = None
    ) -> List[Asset]:
        """Get all assets for a chest, optionally filtered by type"""
        query = session.query(Asset).filter(
            Asset.chest_id == chest_id,
            Asset.is_deleted == False
        )

        if file_type:
            query = query.filter(Asset.file_type == file_type)

        return query.order_by(Asset.date_taken.desc(), Asset.created_at.desc()).all()

Business Logic API

ChestManager

from src.db.models import Chest, Asset
from src.db.operations import ChestOperations, AssetOperations
from src.ai.media_processor import MediaProcessor

class ChestManager:
    """High-level business logic for memory chest management"""

    def __init__(self, session: Session):
        self.session = session
        self.media_processor = MediaProcessor()

    async def create_memory_chest(
        self,
        person_name: str,
        person_description: str,
        birth_date: Optional[date] = None,
        passing_date: Optional[date] = None,
        hero_image: Optional[UploadedFile] = None
    ) -> Chest:
        """Create a complete memory chest with optional hero image"""

        # Process hero image if provided
        hero_image_path = None
        if hero_image:
            hero_image_path = await self._process_hero_image(hero_image)

        # Create chest record
        chest_data = ChestBase(
            person_name=person_name,
            person_description=person_description,
            birth_date=birth_date,
            passing_date=passing_date,
            hero_image_path=hero_image_path
        )

        return await ChestOperations.create_chest(self.session, chest_data)

    async def add_memory(
        self,
        chest_id: int,
        uploaded_file: UploadedFile,
        process_with_ai: bool = True
    ) -> Asset:
        """Add a new memory to the chest"""

        # Save file and create asset record
        file_info = await self._save_uploaded_file(uploaded_file)
        asset = await AssetOperations.create_asset(
            self.session, 
            chest_id, 
            file_info
        )

        # Queue for AI processing if enabled
        if process_with_ai:
            await self.media_processor.queue_for_processing(asset.id)

        return asset

    async def get_chest_summary(self, chest_id: int) -> dict:
        """Get comprehensive chest summary with statistics"""
        chest = await ChestOperations.get_chest_by_id(self.session, chest_id)
        if not chest:
            raise ValueError(f"Chest {chest_id} not found")

        assets = await AssetOperations.get_assets_by_chest(self.session, chest_id)

        return {
            "chest": chest,
            "stats": {
                "total_assets": len(assets),
                "photos": len([a for a in assets if a.file_type == "image"]),
                "videos": len([a for a in assets if a.file_type == "video"]),
                "audio": len([a for a in assets if a.file_type == "audio"]),
                "documents": len([a for a in assets if a.file_type == "document"]),
                "processed": len([a for a in assets if a.processing_status == "complete"]),
                "pending": len([a for a in assets if a.processing_status in ["pending", "processing"]])
            },
            "recent_assets": assets[:10]  # Most recent 10
        }

StoryManager

from src.ai.story_generation import StoryGenerator
from typing import Dict, Any

class StoryManager:
    """Manages AI story generation and management"""

    def __init__(self, session: Session):
        self.session = session
        self.story_generator = StoryGenerator()

    async def generate_story(
        self,
        chest_id: int,
        style: str = "comprehensive",
        include_timeline: bool = True,
        include_themes: bool = True
    ) -> Story:
        """Generate AI story for a memory chest"""

        # Get all processed assets
        assets = await AssetOperations.get_assets_by_chest(self.session, chest_id)
        processed_assets = [a for a in assets if a.processing_status == "complete"]

        if not processed_assets:
            raise ValueError("No processed memories available for story generation")

        # Prepare context for AI
        context = await self._prepare_story_context(processed_assets)

        # Generate story content
        story_content = await self.story_generator.generate_narrative(
            context=context,
            style=style,
            include_timeline=include_timeline,
            include_themes=include_themes
        )

        # Save story to database
        story = Story(
            chest_id=chest_id,
            title=story_content.get("title", f"The Story of {context['person_name']}"),
            content=story_content["narrative"],
            timeline_data=story_content.get("timeline"),
            themes_data=story_content.get("themes"),
            generation_metadata=story_content.get("metadata", {})
        )

        self.session.add(story)
        self.session.commit()
        self.session.refresh(story)

        return story

    async def _prepare_story_context(self, assets: List[Asset]) -> Dict[str, Any]:
        """Prepare context data for story generation"""
        context = {
            "person_name": assets[0].chest.person_name if assets else "Unknown",
            "memories": []
        }

        for asset in assets:
            memory = {
                "type": asset.file_type,
                "filename": asset.filename,
                "date": asset.date_taken or asset.created_at,
                "transcription": asset.transcription,
                "tags": json.loads(asset.ai_tags) if asset.ai_tags else [],
                "metadata": {}
            }

            # Add location data if available
            if asset.location_data:
                memory["location"] = json.loads(asset.location_data)

            context["memories"].append(memory)

        # Sort by date for chronological context
        context["memories"].sort(key=lambda x: x["date"] or datetime.min)

        return context

AI Services API

MediaProcessor

import asyncio
from typing import Optional, Dict, Any
from src.ai.transcription import TranscriptionService
from src.ai.image_tagging import ImageTaggerService

class MediaProcessor:
    """Coordinates all AI processing for uploaded media"""

    def __init__(self):
        self.transcription_service = TranscriptionService()
        self.image_tagger = ImageTaggerService()
        self.processing_queue = asyncio.Queue()
        self.active_processors = {}

    async def queue_for_processing(self, asset_id: int) -> None:
        """Add asset to processing queue"""
        await self.processing_queue.put(asset_id)

    async def process_asset(self, asset_id: int) -> Dict[str, Any]:
        """Process single asset with appropriate AI services"""
        with get_db_session() as session:
            asset = session.get(Asset, asset_id)
            if not asset:
                raise ValueError(f"Asset {asset_id} not found")

            # Update status to processing
            asset.processing_status = "processing"
            session.commit()

            results = {}

            try:
                if asset.file_type == "image":
                    results = await self._process_image(asset)
                elif asset.file_type in ["video", "audio"]:
                    results = await self._process_audio_video(asset)
                elif asset.file_type == "document":
                    results = await self._process_document(asset)

                # Update asset with results
                await AssetOperations.update_processing_results(
                    session,
                    asset_id,
                    transcription=results.get("transcription"),
                    ai_tags=results.get("tags"),
                    status="complete"
                )

            except Exception as e:
                # Mark as failed and log error
                asset.processing_status = "failed"
                session.commit()
                raise e

            return results

    async def _process_image(self, asset: Asset) -> Dict[str, Any]:
        """Process image with CLIP tagging"""
        tags = await self.image_tagger.tag_image(asset.file_path)

        return {
            "tags": tags,
            "processing_type": "image_classification"
        }

    async def _process_audio_video(self, asset: Asset) -> Dict[str, Any]:
        """Process audio/video with transcription"""
        transcription = await self.transcription_service.transcribe_file(asset.file_path)

        # Extract keywords from transcription
        keywords = self._extract_keywords(transcription)

        return {
            "transcription": transcription["text"],
            "tags": keywords,
            "confidence": transcription.get("confidence", 0.0),
            "processing_type": "transcription"
        }

    async def get_processing_status(self, asset_id: int) -> Dict[str, Any]:
        """Get current processing status for an asset"""
        with get_db_session() as session:
            asset = session.get(Asset, asset_id)
            if not asset:
                return {"status": "not_found"}

            return {
                "status": asset.processing_status,
                "updated_at": asset.updated_at,
                "has_transcription": bool(asset.transcription),
                "has_tags": bool(asset.ai_tags)
            }

TranscriptionService

from typing import Dict, Any, Optional
import whisper
from openai import OpenAI

class TranscriptionService:
    """Handles audio/video transcription with multiple backends"""

    def __init__(self, config: Config):
        self.config = config
        self.local_model = None
        self.openai_client = None

        if config.USE_LOCAL_WHISPER:
            self.local_model = whisper.load_model("base")

        if config.OPENAI_API_KEY:
            self.openai_client = OpenAI(api_key=config.OPENAI_API_KEY)

    async def transcribe_file(
        self, 
        file_path: str,
        prefer_local: bool = True
    ) -> Dict[str, Any]:
        """Transcribe audio/video file using best available method"""

        # Try local processing first if available and preferred
        if prefer_local and self.local_model:
            try:
                return await self._transcribe_local(file_path)
            except Exception as e:
                logger.warning(f"Local transcription failed: {e}")

        # Fall back to OpenAI API
        if self.openai_client:
            try:
                return await self._transcribe_openai(file_path)
            except Exception as e:
                logger.warning(f"OpenAI transcription failed: {e}")

        # Final fallback - basic metadata
        return {
            "text": f"Audio file: {Path(file_path).name}",
            "confidence": 0.0,
            "method": "fallback",
            "error": "No transcription service available"
        }

    async def _transcribe_local(self, file_path: str) -> Dict[str, Any]:
        """Transcribe using local Whisper model"""
        result = self.local_model.transcribe(file_path)

        return {
            "text": result["text"].strip(),
            "confidence": 0.9,  # Whisper doesn't provide confidence scores
            "method": "local_whisper",
            "segments": result.get("segments", []),
            "language": result.get("language", "unknown")
        }

    async def _transcribe_openai(self, file_path: str) -> Dict[str, Any]:
        """Transcribe using OpenAI Whisper API"""
        with open(file_path, "rb") as audio_file:
            transcript = self.openai_client.audio.transcriptions.create(
                model="whisper-1",
                file=audio_file,
                response_format="verbose_json",
                timestamp_granularities=["segment"]
            )

        return {
            "text": transcript.text,
            "confidence": 0.95,  # OpenAI generally higher quality
            "method": "openai_api",
            "segments": getattr(transcript, 'segments', []),
            "language": getattr(transcript, 'language', 'unknown')
        }

Extension Points

Custom AI Providers

from abc import ABC, abstractmethod

class AIProvider(ABC):
    """Abstract base class for AI service providers"""

    @abstractmethod
    async def transcribe(self, audio_file: str) -> Dict[str, Any]:
        """Transcribe audio to text"""
        pass

    @abstractmethod
    async def generate_tags(self, image_file: str) -> List[str]:
        """Generate descriptive tags for images"""
        pass

    @abstractmethod
    async def generate_story(self, context: Dict[str, Any]) -> str:
        """Generate narrative story from context"""
        pass

class CustomAIProvider(AIProvider):
    """Example custom AI provider implementation"""

    async def transcribe(self, audio_file: str) -> Dict[str, Any]:
        # Your custom transcription logic
        pass

    async def generate_tags(self, image_file: str) -> List[str]:
        # Your custom image tagging logic
        pass

    async def generate_story(self, context: Dict[str, Any]) -> str:
        # Your custom story generation logic
        pass

# Register custom provider
from src.ai.providers import register_provider
register_provider("custom", CustomAIProvider())

Custom Storage Backends

from abc import ABC, abstractmethod
from typing import BinaryIO

class StorageBackend(ABC):
    """Abstract storage backend interface"""

    @abstractmethod
    async def save_file(
        self, 
        file_content: BinaryIO, 
        file_path: str
    ) -> str:
        """Save file and return storage path"""
        pass

    @abstractmethod
    async def get_file(self, file_path: str) -> BinaryIO:
        """Retrieve file by path"""
        pass

    @abstractmethod
    async def delete_file(self, file_path: str) -> bool:
        """Delete file from storage"""
        pass

    @abstractmethod
    async def get_file_url(self, file_path: str) -> str:
        """Get public URL for file"""
        pass

class CustomStorageBackend(StorageBackend):
    """Example custom storage implementation"""

    async def save_file(self, file_content: BinaryIO, file_path: str) -> str:
        # Your custom storage logic (e.g., Azure Blob, Google Cloud)
        pass

# Register storage backend
from src.storage import register_backend
register_backend("custom", CustomStorageBackend())

API Versioning

The API is currently in version 1.0. All endpoints and interfaces are considered stable, but we recommend using the provided SDK classes rather than direct database access.

Database Migrations

Always use the provided migration tools when modifying database schema. Direct SQL changes may cause compatibility issues.

Performance Tips

  • Use async methods for all AI processing operations
  • Implement connection pooling for database access
  • Cache frequently accessed data using the built-in cache layer
  • Monitor memory usage during bulk operations