| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- import httpx
- from typing import List, Dict, Any
- from app.config import get_settings
- class AudiobookshelfClient:
- """Client for interacting with Audiobookshelf API."""
- def __init__(self, abs_url: str, api_token: str):
- """
- Initialize Audiobookshelf client with credentials.
- Args:
- abs_url: Base URL of Audiobookshelf server
- api_token: API token for authentication (unencrypted)
- """
- self.base_url = abs_url.rstrip("/")
- self.api_token = api_token
- self.headers = {"Authorization": f"Bearer {self.api_token}"}
- async def get_libraries(self) -> List[Dict[str, Any]]:
- """Get all libraries from Audiobookshelf."""
- async with httpx.AsyncClient() as client:
- response = await client.get(
- f"{self.base_url}/api/libraries", headers=self.headers
- )
- response.raise_for_status()
- return response.json().get("libraries", [])
- async def get_library_items(self, library_id: str) -> List[Dict[str, Any]]:
- """Get all items in a library."""
- async with httpx.AsyncClient() as client:
- response = await client.get(
- f"{self.base_url}/api/libraries/{library_id}/items",
- headers=self.headers,
- )
- response.raise_for_status()
- return response.json().get("results", [])
- async def get_user_listening_sessions(self) -> List[Dict[str, Any]]:
- """Get user's listening sessions."""
- async with httpx.AsyncClient() as client:
- response = await client.get(
- f"{self.base_url}/api/me/listening-sessions", headers=self.headers
- )
- response.raise_for_status()
- return response.json().get("sessions", [])
- async def get_user_progress(self) -> List[Dict[str, Any]]:
- """Get user's items in progress."""
- async with httpx.AsyncClient() as client:
- response = await client.get(
- f"{self.base_url}/api/me/items-in-progress", headers=self.headers
- )
- response.raise_for_status()
- # The response is an array of library items with progress
- result = response.json()
- return result.get("libraryItems", result if isinstance(result, list) else [])
- async def get_item_details(self, item_id: str) -> Dict[str, Any]:
- """Get detailed information about a specific library item."""
- async with httpx.AsyncClient() as client:
- response = await client.get(
- f"{self.base_url}/api/items/{item_id}", headers=self.headers
- )
- response.raise_for_status()
- return response.json()
- async def get_user_info(self) -> Dict[str, Any]:
- """Get current user information."""
- async with httpx.AsyncClient() as client:
- response = await client.get(
- f"{self.base_url}/api/me", headers=self.headers
- )
- response.raise_for_status()
- return response.json()
- def get_abs_client(user) -> AudiobookshelfClient:
- """
- Create an Audiobookshelf client for a specific user.
- Args:
- user: User model instance with abs_url and abs_api_token
- Returns:
- Configured AudiobookshelfClient instance
- """
- from app.auth import decrypt_token
- # Decrypt the user's API token
- decrypted_token = decrypt_token(user.abs_api_token)
- return AudiobookshelfClient(
- abs_url=user.abs_url,
- api_token=decrypted_token
- )
|