| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- from typing import Dict, List, Union, Tuple, Optional, Any, Final, NamedTuple
- import abc
- from dataclasses import dataclass
- import os
- from enum import Enum
- import pandas as pd
- class DBI(abc.ABC):
- OUTPUT = print
- OUTPUT_KWARGS: dict = {}
- BACKEND = None
- #WriteReturn: Final = namedtuple('WriteReturn', ['rows', 'id'])
- @dataclass
- class WriteReturn:
- rows: int
- key: Optional[Any]
- class ConfigSupplier(Enum):
- FILE = 'FILE'
- DIRECT = 'DIRECT'
- ENV = 'ENV'
- def __init__(self,
- host: Optional[str] = None,
- user: Optional[str] = None,
- password: Optional[str] = None,
- scheme: Optional[str] = None,
- config_supply: ConfigSupplier = ConfigSupplier.ENV):
- """Initialize a DBI class"""
- self.OUTPUT(f'Using {config_supply.name} supplied DBI configuration', **self.OUTPUT_KWARGS)
- if config_supply == DBI.ConfigSupplier.ENV:
- self.host: Optional[str] = os.environ.get(host or '', 'localhost')
- self.user: Optional[str] = os.environ.get(user or '', None)
- self.password: Optional[str] = os.environ.get(password or '', None)
- self.scheme: Optional[str] = os.environ.get(scheme or '', None)
- elif config_supply == DBI.ConfigSupplier.FILE:
- raise NotImplementedError()
- else:
- self.host = host
- self.user = user
- self.password = password
- self.scheme = scheme
- self.OUTPUT(f'Connecting to {self.host} -> {self.scheme if self.scheme else "*"}',
- **self.OUTPUT_KWARGS)
- @abc.abstractmethod
- def write(self,
- query) \
- -> WriteReturn:
- """Execute a query that writes data to the database (INSERT, UPDATE, DELETE)"""
- pass
- @abc.abstractmethod
- def read(self,
- query,
- single_row=False) \
- -> Union[pd.DataFrame, Optional[pd.Series]]:
- """Execute a query that reads data from the database (SELECT, SHOW)"""
- pass
- @abc.abstractmethod
- def insert(self,
- table: str,
- rows: Union[pd.DataFrame, Dict[str, Any]],
- ignore: bool = False,
- update: bool = False,
- schema: Optional[str] = None) \
- -> WriteReturn:
- """Prepared function to insert new data to a database"""
- pass
-
- @abc.abstractmethod
- def update(self,
- table: str,
- ids: Dict[str, Any],
- values: Dict[str, Any],
- schema: Optional[str] = None) \
- -> WriteReturn:
- """Prepared function to update data in a database"""
- pass
- @abc.abstractmethod
- def close(self) -> None:
- """Close the database connection"""
- self.OUTPUT(f'Closing connection to {self.host} -> {self.scheme}',
- **self.OUTPUT_KWARGS)
- @abc.abstractmethod
- def reconnect(self) -> None:
- """Reconnect the database connection"""
- self.OUTPUT(f'Reconnecting to {self.host} -> {self.scheme}',
- **self.OUTPUT_KWARGS)
|