""" Reading statistics service. Calculates various statistics about user's reading habits. """ from datetime import datetime, timedelta from typing import Dict, Any, List, Optional from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func, and_ import json from app.models import ListeningSession, Book class ReadingStatsService: """Service for calculating reading statistics.""" def __init__(self, db: AsyncSession, user_id: int): """ Initialize statistics service for a user. Args: db: Database session user_id: User ID to calculate stats for """ self.db = db self.user_id = user_id async def calculate_stats( self, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None ) -> Dict[str, Any]: """ Calculate comprehensive reading statistics. Args: start_date: Optional start date filter end_date: Optional end date filter Returns: Dictionary with various statistics """ # Build base query query = select(ListeningSession).where( ListeningSession.user_id == self.user_id ) if start_date: query = query.where(ListeningSession.started_at >= start_date) if end_date: query = query.where(ListeningSession.started_at <= end_date) result = await self.db.execute(query) sessions = result.scalars().all() # Calculate finished books finished_sessions = [s for s in sessions if s.is_finished and s.finished_at] # Calculate total listening time total_hours = 0.0 for session in finished_sessions: if session.started_at and session.finished_at: duration = (session.finished_at - session.started_at).total_seconds() / 3600 total_hours += duration # Get book details for finished books finished_book_ids = [s.book_id for s in finished_sessions] books_dict = {} if finished_book_ids: books_result = await self.db.execute( select(Book).where(Book.id.in_(finished_book_ids)) ) books_dict = {book.id: book for book in books_result.scalars().all()} # Calculate average rating rated_sessions = [s for s in finished_sessions if s.rating] avg_rating = ( sum(s.rating for s in rated_sessions) / len(rated_sessions) if rated_sessions else None ) # Calculate books per month books_by_month = await self._calculate_books_by_month(finished_sessions) # Calculate books by genre books_by_genre = await self._calculate_books_by_genre(finished_sessions, books_dict) # Calculate current streak streak = await self._calculate_streak(finished_sessions) # Calculate recent books recent_books = await self._get_recent_books(finished_sessions, books_dict, limit=10) return { "total_books": len(finished_sessions), "total_hours": round(total_hours, 1), "average_rating": round(avg_rating, 1) if avg_rating else None, "books_in_progress": len([s for s in sessions if not s.is_finished]), "books_by_month": books_by_month, "books_by_genre": books_by_genre, "current_streak": streak, "recent_books": recent_books, "total_sessions": len(sessions) } async def _calculate_books_by_month( self, finished_sessions: List[ListeningSession] ) -> List[Dict[str, Any]]: """ Calculate books finished per month. Returns: List of {month, year, count} dictionaries """ books_by_month = {} for session in finished_sessions: if not session.finished_at: continue month_key = (session.finished_at.year, session.finished_at.month) books_by_month[month_key] = books_by_month.get(month_key, 0) + 1 # Convert to list and sort result = [ { "year": year, "month": month, "count": count, "month_name": datetime(year, month, 1).strftime("%B") } for (year, month), count in sorted(books_by_month.items()) ] return result async def _calculate_books_by_genre( self, finished_sessions: List[ListeningSession], books_dict: Dict[str, Book] ) -> List[Dict[str, Any]]: """ Calculate books finished by genre. Returns: List of {genre, count} dictionaries sorted by count """ genre_counts = {} for session in finished_sessions: book = books_dict.get(session.book_id) if not book or not book.genres: continue try: genres = json.loads(book.genres) if isinstance(book.genres, str) else book.genres for genre in genres: genre_counts[genre] = genre_counts.get(genre, 0) + 1 except (json.JSONDecodeError, TypeError): continue # Sort by count descending result = [ {"genre": genre, "count": count} for genre, count in sorted( genre_counts.items(), key=lambda x: x[1], reverse=True ) ] return result async def _calculate_streak( self, finished_sessions: List[ListeningSession] ) -> int: """ Calculate current reading streak (consecutive days with finished books). Returns: Number of consecutive days """ if not finished_sessions: return 0 # Get unique finish dates, sorted descending finish_dates = sorted( {s.finished_at.date() for s in finished_sessions if s.finished_at}, reverse=True ) if not finish_dates: return 0 # Check if most recent is today or yesterday today = datetime.now().date() if finish_dates[0] not in [today, today - timedelta(days=1)]: return 0 # Count consecutive days streak = 1 for i in range(len(finish_dates) - 1): diff = (finish_dates[i] - finish_dates[i + 1]).days if diff == 1: streak += 1 elif diff == 0: # Same day, continue continue else: break return streak async def _get_recent_books( self, finished_sessions: List[ListeningSession], books_dict: Dict[str, Book], limit: int = 10 ) -> List[Dict[str, Any]]: """ Get recently finished books with details. Returns: List of book details with finish date and rating """ # Sort by finish date descending sorted_sessions = sorted( [s for s in finished_sessions if s.finished_at], key=lambda x: x.finished_at, reverse=True )[:limit] recent = [] for session in sorted_sessions: book = books_dict.get(session.book_id) if not book: continue listening_duration = None if session.started_at and session.finished_at: duration_hours = (session.finished_at - session.started_at).total_seconds() / 3600 listening_duration = round(duration_hours, 1) recent.append({ "book_id": book.id, "title": book.title, "author": book.author, "finished_at": session.finished_at.isoformat(), "rating": session.rating, "listening_duration": listening_duration, "cover_url": book.cover_url }) return recent