Advanced Usage Guide¶
Overview¶
This guide covers advanced Convoscope usage patterns, customization techniques, and integration strategies for power users and developers who want to extend the application's capabilities.
Custom LLM Provider Integration¶
Creating Custom Provider Adapters¶
Extend Convoscope to work with new LLM providers by implementing the provider interface:
# src/providers/custom_provider.py
from typing import List, Dict, Optional, AsyncGenerator
import aiohttp
from src.services.llm_service import ILLMProvider, LLMServiceError
class CustomLLMProvider(ILLMProvider):
"""Custom LLM provider implementation for XYZ AI service."""
def __init__(self, api_key: str, base_url: str, timeout: int = 30):
self.api_key = api_key
self.base_url = base_url
self.timeout = timeout
self.session = None
async def __aenter__(self):
"""Async context manager entry."""
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self.timeout),
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
if self.session:
await self.session.close()
def get_completion(self, model: str, messages: List[Dict], **kwargs) -> Optional[str]:
"""Synchronous completion method."""
import asyncio
return asyncio.run(self._async_get_completion(model, messages, **kwargs))
async def _async_get_completion(self, model: str, messages: List[Dict], **kwargs) -> Optional[str]:
"""Asynchronous completion implementation."""
# Transform messages to provider format
provider_messages = self._transform_messages(messages)
payload = {
"model": model,
"messages": provider_messages,
"temperature": kwargs.get("temperature", 0.7),
"max_tokens": kwargs.get("max_tokens", 2000),
"stream": False
}
try:
async with self.session.post(f"{self.base_url}/completions", json=payload) as response:
if response.status == 200:
result = await response.json()
return result.get("choices", [{}])[0].get("message", {}).get("content")
elif response.status == 429:
raise LLMServiceError("Rate limit exceeded")
elif response.status == 401:
raise LLMServiceError("Invalid API key")
else:
error_text = await response.text()
raise LLMServiceError(f"API error: {response.status} - {error_text}")
except aiohttp.ClientError as e:
raise LLMServiceError(f"Network error: {e}")
def _transform_messages(self, messages: List[Dict]) -> List[Dict]:
"""Transform OpenAI format messages to provider format."""
transformed = []
for msg in messages:
# Example transformation - adapt to your provider's format
transformed.append({
"role": msg["role"],
"text": msg["content"], # Different field name
"timestamp": msg.get("timestamp")
})
return transformed
def validate_api_key(self) -> bool:
"""Validate API key with provider."""
try:
# Make a test call to validate
test_messages = [{"role": "user", "content": "test"}]
result = self.get_completion("default-model", test_messages)
return result is not None
except LLMServiceError:
return False
def get_available_models(self) -> List[str]:
"""Get list of available models from provider."""
# Implementation depends on provider's model listing API
return ["custom-model-1", "custom-model-2", "custom-model-3"]
def supports_streaming(self) -> bool:
"""Check if provider supports streaming responses."""
return True
async def stream_completion(self, model: str, messages: List[Dict], **kwargs) -> AsyncGenerator[str, None]:
"""Stream completion responses."""
payload = {
"model": model,
"messages": self._transform_messages(messages),
"temperature": kwargs.get("temperature", 0.7),
"max_tokens": kwargs.get("max_tokens", 2000),
"stream": True
}
async with self.session.post(f"{self.base_url}/completions", json=payload) as response:
if response.status != 200:
raise LLMServiceError(f"Streaming failed: {response.status}")
async for line in response.content:
line = line.decode('utf-8').strip()
if line.startswith('data: '):
data = line[6:] # Remove 'data: ' prefix
if data == '[DONE]':
break
try:
import json
chunk = json.loads(data)
content = chunk.get("choices", [{}])[0].get("delta", {}).get("content")
if content:
yield content
except json.JSONDecodeError:
continue
# Register the custom provider
def register_custom_provider(llm_service):
"""Register custom provider with LLM service."""
import os
api_key = os.getenv("CUSTOM_API_KEY")
base_url = os.getenv("CUSTOM_BASE_URL", "https://api.customai.com/v1")
if api_key:
custom_provider = CustomLLMProvider(api_key, base_url)
llm_service.add_provider("custom", custom_provider)
return True
return False
Multi-Model Ensemble¶
Implement ensemble methods that combine responses from multiple models:
# src/services/ensemble_service.py
from typing import List, Dict, Optional, Tuple
import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
from src.services.llm_service import LLMService
class EnsembleLLMService:
"""Ensemble service that combines multiple LLM responses."""
def __init__(self, llm_service: LLMService):
self.llm_service = llm_service
self.executor = ThreadPoolExecutor(max_workers=5)
def get_ensemble_completion(self,
messages: List[Dict],
providers: List[str] = None,
strategy: str = "vote",
**kwargs) -> Dict[str, any]:
"""Get completion using ensemble of providers."""
if providers is None:
providers = list(self.llm_service.get_available_providers().keys())[:3]
# Get responses from multiple providers
responses = self._get_multiple_responses(messages, providers, **kwargs)
# Apply ensemble strategy
if strategy == "vote":
return self._majority_vote(responses)
elif strategy == "weighted":
return self._weighted_average(responses)
elif strategy == "best":
return self._select_best(responses)
else:
return self._simple_average(responses)
def _get_multiple_responses(self, messages: List[Dict], providers: List[str], **kwargs) -> List[Dict]:
"""Get responses from multiple providers concurrently."""
futures = []
for provider in providers:
future = self.executor.submit(
self._get_single_response,
provider,
messages,
**kwargs
)
futures.append((provider, future))
responses = []
for provider, future in futures:
try:
response = future.result(timeout=30)
responses.append({
"provider": provider,
"response": response,
"success": response is not None
})
except Exception as e:
responses.append({
"provider": provider,
"response": None,
"success": False,
"error": str(e)
})
return responses
def _get_single_response(self, provider: str, messages: List[Dict], **kwargs) -> Optional[str]:
"""Get response from single provider."""
available_providers = self.llm_service.get_available_providers()
if provider in available_providers:
models = available_providers[provider].models
if models:
return self.llm_service.get_completion(
provider=provider,
model=models[0],
messages=messages,
**kwargs
)
return None
def _majority_vote(self, responses: List[Dict]) -> Dict[str, any]:
"""Select response by majority vote (similarity-based)."""
successful_responses = [r for r in responses if r["success"]]
if not successful_responses:
return {"response": None, "confidence": 0.0, "strategy": "vote"}
if len(successful_responses) == 1:
return {
"response": successful_responses[0]["response"],
"confidence": 0.5,
"strategy": "vote",
"providers_used": [successful_responses[0]["provider"]]
}
# Simple implementation - in practice, use semantic similarity
response_counts = {}
for resp in successful_responses:
text = resp["response"]
response_counts[text] = response_counts.get(text, 0) + 1
best_response = max(response_counts, key=response_counts.get)
confidence = response_counts[best_response] / len(successful_responses)
return {
"response": best_response,
"confidence": confidence,
"strategy": "vote",
"providers_used": [r["provider"] for r in successful_responses],
"vote_counts": response_counts
}
def _weighted_average(self, responses: List[Dict]) -> Dict[str, any]:
"""Weighted average based on provider reliability."""
provider_weights = {
"openai": 1.0,
"anthropic": 0.9,
"google": 0.8,
"custom": 0.7
}
successful_responses = [r for r in responses if r["success"]]
if not successful_responses:
return {"response": None, "confidence": 0.0, "strategy": "weighted"}
# Simple weighted selection (in practice, might blend responses)
weighted_responses = []
for resp in successful_responses:
weight = provider_weights.get(resp["provider"], 0.5)
weighted_responses.append((resp, weight))
best_response = max(weighted_responses, key=lambda x: x[1])
return {
"response": best_response[0]["response"],
"confidence": best_response[1],
"strategy": "weighted",
"providers_used": [r["provider"] for r in successful_responses]
}
def _select_best(self, responses: List[Dict]) -> Dict[str, any]:
"""Select best response based on quality metrics."""
successful_responses = [r for r in responses if r["success"]]
if not successful_responses:
return {"response": None, "confidence": 0.0, "strategy": "best"}
# Quality scoring (length, coherence, etc.)
scored_responses = []
for resp in successful_responses:
score = self._quality_score(resp["response"])
scored_responses.append((resp, score))
best_response = max(scored_responses, key=lambda x: x[1])
return {
"response": best_response[0]["response"],
"confidence": best_response[1] / 100, # Normalize score
"strategy": "best",
"quality_score": best_response[1],
"providers_used": [r["provider"] for r in successful_responses]
}
def _quality_score(self, response: str) -> float:
"""Calculate response quality score."""
if not response:
return 0.0
score = 0.0
# Length scoring (moderate length preferred)
length = len(response)
if 50 <= length <= 1000:
score += 30
elif length > 1000:
score += 20
else:
score += 10
# Coherence indicators
if '. ' in response: # Sentence structure
score += 20
if '?' in response or '!' in response: # Engagement
score += 10
if response[0].isupper(): # Proper capitalization
score += 10
# Avoid repetition
words = response.lower().split()
unique_words = len(set(words))
if len(words) > 0:
diversity = unique_words / len(words)
score += diversity * 30
return min(score, 100.0)
Advanced Conversation Management¶
Conversation Analytics¶
Implement comprehensive conversation analysis:
# src/services/conversation_analytics.py
from typing import List, Dict, Any, Optional
import json
from datetime import datetime, timedelta
from collections import Counter, defaultdict
import re
class ConversationAnalytics:
"""Advanced conversation analysis and metrics."""
def __init__(self, conversation_manager):
self.conversation_manager = conversation_manager
def analyze_conversation(self, conversation: List[Dict]) -> Dict[str, Any]:
"""Comprehensive conversation analysis."""
analysis = {
"basic_stats": self._basic_statistics(conversation),
"temporal_analysis": self._temporal_analysis(conversation),
"content_analysis": self._content_analysis(conversation),
"interaction_patterns": self._interaction_patterns(conversation),
"quality_metrics": self._quality_metrics(conversation)
}
return analysis
def _basic_statistics(self, conversation: List[Dict]) -> Dict[str, Any]:
"""Basic conversation statistics."""
role_counts = Counter(msg["role"] for msg in conversation)
return {
"total_messages": len(conversation),
"user_messages": role_counts.get("user", 0),
"assistant_messages": role_counts.get("assistant", 0),
"system_messages": role_counts.get("system", 0),
"total_characters": sum(len(msg["content"]) for msg in conversation),
"average_message_length": sum(len(msg["content"]) for msg in conversation) / len(conversation) if conversation else 0,
"message_length_distribution": self._length_distribution(conversation)
}
def _temporal_analysis(self, conversation: List[Dict]) -> Dict[str, Any]:
"""Analyze temporal patterns in conversation."""
timestamps = [msg.get("timestamp") for msg in conversation if msg.get("timestamp")]
if not timestamps:
return {"error": "No timestamps available"}
# Convert to datetime objects
dt_timestamps = []
for ts in timestamps:
try:
if isinstance(ts, str):
dt = datetime.fromisoformat(ts.replace('Z', '+00:00'))
else:
dt = ts
dt_timestamps.append(dt)
except ValueError:
continue
if len(dt_timestamps) < 2:
return {"error": "Insufficient timestamp data"}
# Calculate intervals
intervals = []
for i in range(1, len(dt_timestamps)):
interval = (dt_timestamps[i] - dt_timestamps[i-1]).total_seconds()
intervals.append(interval)
return {
"conversation_duration_minutes": (dt_timestamps[-1] - dt_timestamps[0]).total_seconds() / 60,
"average_response_time_seconds": sum(intervals) / len(intervals) if intervals else 0,
"response_time_distribution": self._time_distribution(intervals),
"conversation_pace": "fast" if sum(intervals) / len(intervals) < 30 else "moderate" if sum(intervals) / len(intervals) < 120 else "slow"
}
def _content_analysis(self, conversation: List[Dict]) -> Dict[str, Any]:
"""Analyze conversation content patterns."""
all_content = " ".join(msg["content"] for msg in conversation)
# Word frequency analysis
words = re.findall(r'\b\w+\b', all_content.lower())
word_freq = Counter(words)
# Question patterns
questions = sum(1 for msg in conversation if '?' in msg["content"])
# Sentiment indicators (simple heuristic)
positive_words = ['good', 'great', 'excellent', 'perfect', 'amazing', 'wonderful', 'fantastic']
negative_words = ['bad', 'terrible', 'awful', 'horrible', 'disappointing', 'frustrating']
positive_count = sum(all_content.lower().count(word) for word in positive_words)
negative_count = sum(all_content.lower().count(word) for word in negative_words)
return {
"total_words": len(words),
"unique_words": len(set(words)),
"vocabulary_diversity": len(set(words)) / len(words) if words else 0,
"most_common_words": word_freq.most_common(10),
"question_count": questions,
"exclamation_count": all_content.count('!'),
"sentiment_indicators": {
"positive_signals": positive_count,
"negative_signals": negative_count,
"sentiment_ratio": positive_count / (positive_count + negative_count + 1)
},
"topic_keywords": self._extract_topic_keywords(words)
}
def _interaction_patterns(self, conversation: List[Dict]) -> Dict[str, Any]:
"""Analyze user-assistant interaction patterns."""
patterns = {
"turn_taking": [],
"response_lengths": {"user": [], "assistant": []},
"conversation_flow": []
}
for i, msg in enumerate(conversation):
if msg["role"] in ["user", "assistant"]:
# Track turn-taking
if i > 0 and conversation[i-1]["role"] != msg["role"]:
patterns["turn_taking"].append("switch")
else:
patterns["turn_taking"].append("continue")
# Track response lengths by role
patterns["response_lengths"][msg["role"]].append(len(msg["content"]))
# Conversation flow analysis
if msg["role"] == "user":
if '?' in msg["content"]:
patterns["conversation_flow"].append("question")
elif any(word in msg["content"].lower() for word in ['thanks', 'thank you']):
patterns["conversation_flow"].append("gratitude")
else:
patterns["conversation_flow"].append("statement")
elif msg["role"] == "assistant":
if len(msg["content"]) > 500:
patterns["conversation_flow"].append("detailed_response")
else:
patterns["conversation_flow"].append("brief_response")
# Analysis of patterns
flow_analysis = Counter(patterns["conversation_flow"])
return {
"conversation_style": self._determine_conversation_style(flow_analysis),
"user_engagement_level": self._calculate_engagement(patterns),
"response_balance": {
"avg_user_length": sum(patterns["response_lengths"]["user"]) / len(patterns["response_lengths"]["user"]) if patterns["response_lengths"]["user"] else 0,
"avg_assistant_length": sum(patterns["response_lengths"]["assistant"]) / len(patterns["response_lengths"]["assistant"]) if patterns["response_lengths"]["assistant"] else 0
},
"interaction_flow": flow_analysis
}
def _quality_metrics(self, conversation: List[Dict]) -> Dict[str, Any]:
"""Calculate conversation quality metrics."""
assistant_messages = [msg for msg in conversation if msg["role"] == "assistant"]
if not assistant_messages:
return {"error": "No assistant messages to analyze"}
# Helpfulness indicators
helpful_phrases = ['i can help', 'here\'s how', 'try this', 'solution', 'answer']
helpfulness_score = sum(
sum(1 for phrase in helpful_phrases if phrase in msg["content"].lower())
for msg in assistant_messages
) / len(assistant_messages)
# Clarity indicators
clarity_indicators = ['.', ':', 'first', 'second', 'then', 'finally']
clarity_score = sum(
sum(1 for indicator in clarity_indicators if indicator in msg["content"].lower())
for msg in assistant_messages
) / len(assistant_messages)
# Error/confusion indicators
error_phrases = ['sorry', 'apologize', 'mistake', 'unclear', 'confusion']
error_score = sum(
sum(1 for phrase in error_phrases if phrase in msg["content"].lower())
for msg in assistant_messages
) / len(assistant_messages)
return {
"helpfulness_score": min(helpfulness_score, 5.0), # Cap at 5
"clarity_score": min(clarity_score, 5.0),
"error_rate": min(error_score, 1.0),
"overall_quality": (helpfulness_score + clarity_score - error_score) / 2,
"response_completeness": self._calculate_completeness(assistant_messages)
}
def _extract_topic_keywords(self, words: List[str]) -> List[str]:
"""Extract likely topic keywords."""
# Filter out common words
common_words = {'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'is', 'are', 'was', 'were', 'be', 'been', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', 'can', 'may', 'might', 'must', 'i', 'you', 'he', 'she', 'it', 'we', 'they', 'me', 'him', 'her', 'us', 'them', 'my', 'your', 'his', 'her', 'its', 'our', 'their', 'this', 'that', 'these', 'those', 'a', 'an'}
# Get words longer than 3 characters that aren't common words
topic_candidates = [word for word in words if len(word) > 3 and word not in common_words]
# Count frequency and return top candidates
word_freq = Counter(topic_candidates)
return [word for word, count in word_freq.most_common(10) if count > 1]
def get_conversation_insights(self, filename: str) -> Optional[Dict[str, Any]]:
"""Get comprehensive insights for a specific conversation."""
success, conversation = self.conversation_manager.load_conversation(filename)
if not success:
return None
insights = self.analyze_conversation(conversation)
insights["filename"] = filename
insights["analysis_timestamp"] = datetime.now().isoformat()
return insights
def batch_analyze_conversations(self, limit: Optional[int] = None) -> Dict[str, Any]:
"""Analyze multiple conversations and provide aggregated insights."""
conversation_list = self.conversation_manager.get_conversation_list()
if limit:
conversation_list = conversation_list[:limit]
all_insights = []
successful_analyses = 0
for filename in conversation_list:
insights = self.get_conversation_insights(filename)
if insights:
all_insights.append(insights)
successful_analyses += 1
# Aggregate statistics
if not all_insights:
return {"error": "No conversations could be analyzed"}
aggregate_stats = self._aggregate_insights(all_insights)
return {
"total_conversations": len(conversation_list),
"analyzed_conversations": successful_analyses,
"aggregate_statistics": aggregate_stats,
"top_conversations": self._rank_conversations(all_insights),
"analysis_timestamp": datetime.now().isoformat()
}
def _aggregate_insights(self, insights_list: List[Dict]) -> Dict[str, Any]:
"""Aggregate insights from multiple conversations."""
total_messages = sum(insight["basic_stats"]["total_messages"] for insight in insights_list)
total_words = sum(insight["content_analysis"]["total_words"] for insight in insights_list)
avg_quality = sum(insight["quality_metrics"].get("overall_quality", 0) for insight in insights_list) / len(insights_list)
# Collect all topic keywords
all_topics = []
for insight in insights_list:
all_topics.extend(insight["content_analysis"]["topic_keywords"])
return {
"total_messages_across_all": total_messages,
"total_words_across_all": total_words,
"average_conversation_length": total_messages / len(insights_list),
"average_quality_score": avg_quality,
"most_common_topics": Counter(all_topics).most_common(20),
"conversation_count": len(insights_list)
}
Custom UI Components¶
Advanced Streamlit Components¶
Create reusable UI components for enhanced functionality:
# src/ui/components.py
import streamlit as st
from typing import List, Dict, Any, Optional, Callable
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime, timedelta
class ConversationDashboard:
"""Advanced conversation dashboard with analytics."""
@staticmethod
def render_provider_status(llm_service) -> None:
"""Render provider status dashboard."""
st.subheader("🔌 Provider Status")
available_providers = llm_service.get_available_providers()
if not available_providers:
st.error("❌ No providers available")
return
# Create columns for provider status
cols = st.columns(len(available_providers))
for i, (name, provider) in enumerate(available_providers.items()):
with cols[i]:
# Provider status card
st.markdown(f"""
<div style="
border: 2px solid #28a745;
border-radius: 8px;
padding: 1rem;
text-align: center;
background: linear-gradient(135deg, #28a745 0%, #20c997 100%);
color: white;
margin-bottom: 1rem;
">
<h4 style="margin: 0; color: white;">{name.title()}</h4>
<p style="margin: 0.5rem 0; font-size: 0.9rem;">✅ Available</p>
<p style="margin: 0; font-size: 0.8rem;">{len(provider.models)} models</p>
</div>
""", unsafe_allow_html=True)
# Models dropdown
with st.expander(f"{name.title()} Models"):
for model in provider.models:
st.text(f"• {model}")
@staticmethod
def render_conversation_metrics(analytics_service, conversation: List[Dict]) -> None:
"""Render conversation metrics dashboard."""
if not conversation:
st.info("No conversation data to analyze")
return
# Get analysis
analysis = analytics_service.analyze_conversation(conversation)
# Key metrics row
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric(
"Total Messages",
analysis["basic_stats"]["total_messages"],
delta=None
)
with col2:
st.metric(
"Avg Message Length",
f"{analysis['basic_stats']['average_message_length']:.0f} chars"
)
with col3:
quality_score = analysis["quality_metrics"].get("overall_quality", 0)
st.metric(
"Quality Score",
f"{quality_score:.2f}/5.0",
delta=quality_score - 2.5 # Relative to neutral
)
with col4:
vocab_diversity = analysis["content_analysis"]["vocabulary_diversity"]
st.metric(
"Vocabulary Diversity",
f"{vocab_diversity:.3f}",
delta=vocab_diversity - 0.5 # Relative to average
)
# Detailed metrics
col1, col2 = st.columns(2)
with col1:
st.subheader("📊 Message Distribution")
role_counts = {
"User": analysis["basic_stats"]["user_messages"],
"Assistant": analysis["basic_stats"]["assistant_messages"],
"System": analysis["basic_stats"]["system_messages"]
}
fig = px.pie(
values=list(role_counts.values()),
names=list(role_counts.keys()),
title="Messages by Role"
)
st.plotly_chart(fig, use_container_width=True)
with col2:
st.subheader("🎯 Content Analysis")
# Top words
top_words = analysis["content_analysis"]["most_common_words"][:5]
if top_words:
words, counts = zip(*top_words)
fig = px.bar(
x=list(words),
y=list(counts),
title="Most Common Words",
labels={"x": "Words", "y": "Count"}
)
st.plotly_chart(fig, use_container_width=True)
@staticmethod
def render_conversation_timeline(conversation: List[Dict]) -> None:
"""Render conversation timeline visualization."""
timestamps = [msg.get("timestamp") for msg in conversation if msg.get("timestamp")]
if not timestamps:
st.warning("No timestamp data available for timeline")
return
# Convert timestamps and create timeline data
timeline_data = []
for i, msg in enumerate(conversation):
if msg.get("timestamp"):
try:
dt = datetime.fromisoformat(msg["timestamp"].replace('Z', '+00:00'))
timeline_data.append({
"time": dt,
"message_index": i,
"role": msg["role"],
"length": len(msg["content"]),
"content_preview": msg["content"][:50] + "..." if len(msg["content"]) > 50 else msg["content"]
})
except ValueError:
continue
if not timeline_data:
st.warning("Could not parse timestamp data")
return
st.subheader("⏰ Conversation Timeline")
# Create timeline plot
fig = go.Figure()
for role in ["user", "assistant", "system"]:
role_data = [d for d in timeline_data if d["role"] == role]
if role_data:
fig.add_trace(go.Scatter(
x=[d["time"] for d in role_data],
y=[d["length"] for d in role_data],
mode='markers+lines',
name=role.title(),
text=[d["content_preview"] for d in role_data],
hovertemplate=f"<b>{role.title()}</b><br>" +
"Time: %{x}<br>" +
"Length: %{y} chars<br>" +
"Content: %{text}<br>" +
"<extra></extra>"
))
fig.update_layout(
title="Message Length Over Time",
xaxis_title="Time",
yaxis_title="Message Length (characters)",
hovermode="closest"
)
st.plotly_chart(fig, use_container_width=True)
class AdvancedChatInterface:
"""Enhanced chat interface with advanced features."""
def __init__(self, llm_service, conversation_manager):
self.llm_service = llm_service
self.conversation_manager = conversation_manager
def render_chat_input_with_tools(self) -> Optional[str]:
"""Render enhanced chat input with additional tools."""
# Main input area
col1, col2, col3 = st.columns([6, 1, 1])
with col1:
user_input = st.chat_input("Type your message...")
with col2:
# Voice input placeholder (would integrate with speech-to-text)
if st.button("🎤", help="Voice input"):
st.info("Voice input would be implemented here")
with col3:
# File upload for context
uploaded_file = st.file_uploader(
"📎",
type=['txt', 'md', 'json'],
label_visibility="collapsed",
help="Upload file for context"
)
if uploaded_file:
content = uploaded_file.read().decode()
if content:
return f"Context from {uploaded_file.name}:\n\n{content}\n\nUser question: {user_input}" if user_input else f"Please analyze this file content:\n\n{content}"
return user_input
def render_response_options(self) -> Dict[str, Any]:
"""Render response customization options."""
st.sidebar.subheader("🎛️ Response Options")
options = {}
# Provider selection
available_providers = list(self.llm_service.get_available_providers().keys())
options["provider"] = st.sidebar.selectbox(
"Provider",
available_providers,
help="Select LLM provider"
)
# Model selection
if options["provider"]:
models = self.llm_service.get_available_models(options["provider"])
options["model"] = st.sidebar.selectbox(
"Model",
models,
help="Select specific model"
)
# Response parameters
options["temperature"] = st.sidebar.slider(
"Temperature",
min_value=0.0,
max_value=1.0,
value=0.7,
step=0.1,
help="Response creativity (0=focused, 1=creative)"
)
options["max_tokens"] = st.sidebar.slider(
"Max Tokens",
min_value=100,
max_value=4000,
value=2000,
step=100,
help="Maximum response length"
)
# Advanced options
with st.sidebar.expander("Advanced Options"):
options["use_ensemble"] = st.checkbox(
"Use Ensemble",
help="Get responses from multiple providers"
)
if options["use_ensemble"]:
options["ensemble_strategy"] = st.selectbox(
"Ensemble Strategy",
["vote", "weighted", "best"],
help="How to combine multiple responses"
)
options["system_prompt"] = st.text_area(
"System Prompt",
value="You are a helpful assistant.",
help="Custom system prompt"
)
return options
def render_conversation_export(self) -> None:
"""Render conversation export options."""
st.sidebar.subheader("💾 Export Options")
if 'conversation' in st.session_state and st.session_state.conversation:
export_format = st.sidebar.selectbox(
"Export Format",
["JSON", "Markdown", "PDF", "HTML"]
)
if st.sidebar.button("Export Conversation"):
conversation = st.session_state.conversation
if export_format == "JSON":
self._export_as_json(conversation)
elif export_format == "Markdown":
self._export_as_markdown(conversation)
elif export_format == "PDF":
st.info("PDF export would be implemented here")
elif export_format == "HTML":
st.info("HTML export would be implemented here")
else:
st.sidebar.info("No conversation to export")
def _export_as_json(self, conversation: List[Dict]) -> None:
"""Export conversation as JSON."""
import json
export_data = {
"conversation": conversation,
"export_timestamp": datetime.now().isoformat(),
"total_messages": len(conversation)
}
json_string = json.dumps(export_data, indent=2)
st.sidebar.download_button(
label="Download JSON",
data=json_string,
file_name=f"conversation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
mime="application/json"
)
def _export_as_markdown(self, conversation: List[Dict]) -> None:
"""Export conversation as Markdown."""
markdown_content = f"# Conversation Export\n\n"
markdown_content += f"**Exported:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
markdown_content += f"**Total Messages:** {len(conversation)}\n\n"
markdown_content += "---\n\n"
for i, msg in enumerate(conversation, 1):
role_icon = {"user": "👤", "assistant": "🤖", "system": "⚙️"}.get(msg["role"], "❓")
markdown_content += f"## {role_icon} {msg['role'].title()} - Message {i}\n\n"
markdown_content += f"{msg['content']}\n\n"
if msg.get("timestamp"):
markdown_content += f"*Timestamp: {msg['timestamp']}*\n\n"
markdown_content += "---\n\n"
st.sidebar.download_button(
label="Download Markdown",
data=markdown_content,
file_name=f"conversation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md",
mime="text/markdown"
)
This advanced usage guide demonstrates how to extend Convoscope with custom providers, analytics, and enhanced UI components for power users and developers who need more sophisticated functionality.
Next: Troubleshooting Guide - Common issues and solutions