| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 |
- import httpx
- from typing import List, Dict, Any
- from app.config import get_settings
- class AudiobookshelfClient:
- """Client for interacting with Audiobookshelf API."""
- def __init__(self):
- settings = get_settings()
- self.base_url = settings.abs_url.rstrip("/")
- self.api_token = settings.abs_api_token
- self.headers = {"Authorization": f"Bearer {self.api_token}"}
- async def get_libraries(self) -> List[Dict[str, Any]]:
- """Get all libraries from Audiobookshelf."""
- async with httpx.AsyncClient() as client:
- response = await client.get(
- f"{self.base_url}/api/libraries", headers=self.headers
- )
- response.raise_for_status()
- return response.json().get("libraries", [])
- async def get_library_items(self, library_id: str) -> List[Dict[str, Any]]:
- """Get all items in a library."""
- async with httpx.AsyncClient() as client:
- response = await client.get(
- f"{self.base_url}/api/libraries/{library_id}/items",
- headers=self.headers,
- )
- response.raise_for_status()
- return response.json().get("results", [])
- async def get_user_listening_sessions(self) -> List[Dict[str, Any]]:
- """Get user's listening sessions."""
- async with httpx.AsyncClient() as client:
- response = await client.get(
- f"{self.base_url}/api/me/listening-sessions", headers=self.headers
- )
- response.raise_for_status()
- return response.json().get("sessions", [])
- async def get_user_progress(self) -> List[Dict[str, Any]]:
- """Get user's media progress for all items."""
- async with httpx.AsyncClient() as client:
- response = await client.get(
- f"{self.base_url}/api/me/progress", headers=self.headers
- )
- response.raise_for_status()
- return response.json().get("libraryItems", [])
- async def get_item_details(self, item_id: str) -> Dict[str, Any]:
- """Get detailed information about a specific library item."""
- async with httpx.AsyncClient() as client:
- response = await client.get(
- f"{self.base_url}/api/items/{item_id}", headers=self.headers
- )
- response.raise_for_status()
- return response.json()
- async def get_user_info(self) -> Dict[str, Any]:
- """Get current user information."""
- async with httpx.AsyncClient() as client:
- response = await client.get(
- f"{self.base_url}/api/me", headers=self.headers
- )
- response.raise_for_status()
- return response.json()
|