dbi.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. from typing import Dict, List, Union, Tuple, Optional, Any, Final, NamedTuple
  2. import abc
  3. from dataclasses import dataclass
  4. import os
  5. from enum import Enum
  6. import pandas as pd
  7. class DBI(abc.ABC):
  8. OUTPUT = print
  9. OUTPUT_KWARGS: dict = {}
  10. BACKEND = None
  11. #WriteReturn: Final = namedtuple('WriteReturn', ['rows', 'id'])
  12. @dataclass
  13. class WriteReturn:
  14. rows: int
  15. key: Optional[Any]
  16. class ConfigSupplier(Enum):
  17. FILE = 'FILE'
  18. DIRECT = 'DIRECT'
  19. ENV = 'ENV'
  20. def __init__(self,
  21. host: Optional[str] = None,
  22. user: Optional[str] = None,
  23. password: Optional[str] = None,
  24. scheme: Optional[str] = None,
  25. config_supply: ConfigSupplier = ConfigSupplier.ENV):
  26. """Initialize a DBI class"""
  27. self.OUTPUT(f'Using {config_supply.name} supplied DBI configuration', **self.OUTPUT_KWARGS)
  28. if config_supply == DBI.ConfigSupplier.ENV:
  29. self.host: Optional[str] = os.environ.get(host or '', 'localhost')
  30. self.user: Optional[str] = os.environ.get(user or '', None)
  31. self.password: Optional[str] = os.environ.get(password or '', None)
  32. self.scheme: Optional[str] = os.environ.get(scheme or '', None)
  33. elif config_supply == DBI.ConfigSupplier.FILE:
  34. raise NotImplementedError()
  35. else:
  36. self.host = host
  37. self.user = user
  38. self.password = password
  39. self.scheme = scheme
  40. self.OUTPUT(f'Connecting to {self.host} -> {self.scheme if self.scheme else "*"}',
  41. **self.OUTPUT_KWARGS)
  42. @abc.abstractmethod
  43. def write(self,
  44. query) \
  45. -> WriteReturn:
  46. """Execute a query that writes data to the database (INSERT, UPDATE, DELETE)"""
  47. pass
  48. @abc.abstractmethod
  49. def read(self,
  50. query,
  51. single_row=False) \
  52. -> Union[pd.DataFrame, Optional[pd.Series]]:
  53. """Execute a query that reads data from the database (SELECT, SHOW)"""
  54. pass
  55. @abc.abstractmethod
  56. def insert(self,
  57. table: str,
  58. rows: Union[pd.DataFrame, Dict[str, Any]],
  59. ignore: bool = False,
  60. update: bool = False,
  61. schema: Optional[str] = None) \
  62. -> WriteReturn:
  63. """Prepared function to insert new data to a database"""
  64. pass
  65. @abc.abstractmethod
  66. def update(self,
  67. table: str,
  68. ids: Dict[str, Any],
  69. values: Dict[str, Any],
  70. schema: Optional[str] = None) \
  71. -> WriteReturn:
  72. """Prepared function to update data in a database"""
  73. pass
  74. @abc.abstractmethod
  75. def close(self) -> None:
  76. """Close the database connection"""
  77. self.OUTPUT(f'Closing connection to {self.host} -> {self.scheme}',
  78. **self.OUTPUT_KWARGS)
  79. @abc.abstractmethod
  80. def reconnect(self) -> None:
  81. """Reconnect the database connection"""
  82. self.OUTPUT(f'Reconnecting to {self.host} -> {self.scheme}',
  83. **self.OUTPUT_KWARGS)