dbi.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. from typing import Dict, List, Union, Tuple, Optional, Any, Final, NamedTuple
  2. import abc
  3. from dataclasses import dataclass
  4. import os
  5. from collections import namedtuple
  6. import pandas as pd
  7. class DBI(abc.ABC):
  8. OUTPUT = print
  9. OUTPUT_KWARGS: dict = {}
  10. BACKEND = None
  11. #WriteReturn: Final = namedtuple('WriteReturn', ['rows', 'id'])
  12. @dataclass
  13. class WriteReturn:
  14. rows: int
  15. key: Optional[Any]
  16. def __init__(self,
  17. host: str,
  18. user: Optional[str] = None,
  19. password: Optional[str] = None,
  20. scheme: Optional[str] = None):
  21. self.host: str = os.environ.get(host, 'localhost')
  22. self.user: Optional[str] = os.environ.get(user or '', None)
  23. self.password: Optional[str] = os.environ.get(password or '', None)
  24. self.scheme: Optional[str] = os.environ.get(scheme or '', None)
  25. self.OUTPUT(f'Connecting to {self.host} -> {self.scheme if self.scheme else "*"}',
  26. **self.OUTPUT_KWARGS)
  27. @abc.abstractmethod
  28. def write(self,
  29. query) \
  30. -> WriteReturn:
  31. """Execute a query that writes data to the database (INSERT, UPDATE, DELETE)"""
  32. pass
  33. @abc.abstractmethod
  34. def read(self,
  35. query,
  36. single_row=False) \
  37. -> Union[pd.DataFrame, Optional[pd.Series]]:
  38. """Execute a query that reads data from the database (SELECT, SHOW)"""
  39. pass
  40. @abc.abstractmethod
  41. def insert(self,
  42. table: str,
  43. rows: Union[pd.DataFrame, Dict[str, Any]],
  44. ignore: bool = False,
  45. update: bool = False,
  46. schema: Optional[str] = None) \
  47. -> WriteReturn:
  48. """Prepared function to insert new data to a database"""
  49. pass
  50. @abc.abstractmethod
  51. def update(self,
  52. table: str,
  53. ids: Dict[str, Any],
  54. values: Dict[str, Any],
  55. schema: Optional[str] = None) \
  56. -> WriteReturn:
  57. """Prepared function to update data in a database"""
  58. pass
  59. @abc.abstractmethod
  60. def close(self) -> None:
  61. """Close the database connection"""
  62. self.OUTPUT(f'Closing connection to {self.host} -> {self.scheme}',
  63. **self.OUTPUT_KWARGS)
  64. @abc.abstractmethod
  65. def reconnect(self) -> None:
  66. """Reconnect the database connection"""
  67. self.OUTPUT(f'Reconnecting to {self.host} -> {self.scheme}',
  68. **self.OUTPUT_KWARGS)