abs_client.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. import httpx
  2. from typing import List, Dict, Any
  3. from app.config import get_settings
  4. class AudiobookshelfClient:
  5. """Client for interacting with Audiobookshelf API."""
  6. def __init__(self):
  7. settings = get_settings()
  8. self.base_url = settings.abs_url.rstrip("/")
  9. self.api_token = settings.abs_api_token
  10. self.headers = {"Authorization": f"Bearer {self.api_token}"}
  11. async def get_libraries(self) -> List[Dict[str, Any]]:
  12. """Get all libraries from Audiobookshelf."""
  13. async with httpx.AsyncClient() as client:
  14. response = await client.get(
  15. f"{self.base_url}/api/libraries", headers=self.headers
  16. )
  17. response.raise_for_status()
  18. return response.json().get("libraries", [])
  19. async def get_library_items(self, library_id: str) -> List[Dict[str, Any]]:
  20. """Get all items in a library."""
  21. async with httpx.AsyncClient() as client:
  22. response = await client.get(
  23. f"{self.base_url}/api/libraries/{library_id}/items",
  24. headers=self.headers,
  25. )
  26. response.raise_for_status()
  27. return response.json().get("results", [])
  28. async def get_user_listening_sessions(self) -> List[Dict[str, Any]]:
  29. """Get user's listening sessions."""
  30. async with httpx.AsyncClient() as client:
  31. response = await client.get(
  32. f"{self.base_url}/api/me/listening-sessions", headers=self.headers
  33. )
  34. response.raise_for_status()
  35. return response.json().get("sessions", [])
  36. async def get_user_progress(self) -> List[Dict[str, Any]]:
  37. """Get user's items in progress."""
  38. async with httpx.AsyncClient() as client:
  39. response = await client.get(
  40. f"{self.base_url}/api/me/items-in-progress", headers=self.headers
  41. )
  42. response.raise_for_status()
  43. # The response is an array of library items with progress
  44. result = response.json()
  45. return result.get("libraryItems", result if isinstance(result, list) else [])
  46. async def get_item_details(self, item_id: str) -> Dict[str, Any]:
  47. """Get detailed information about a specific library item."""
  48. async with httpx.AsyncClient() as client:
  49. response = await client.get(
  50. f"{self.base_url}/api/items/{item_id}", headers=self.headers
  51. )
  52. response.raise_for_status()
  53. return response.json()
  54. async def get_user_info(self) -> Dict[str, Any]:
  55. """Get current user information."""
  56. async with httpx.AsyncClient() as client:
  57. response = await client.get(
  58. f"{self.base_url}/api/me", headers=self.headers
  59. )
  60. response.raise_for_status()
  61. return response.json()