| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- from typing import Dict, List, Union, Tuple, Optional, Any, Final, NamedTuple
- import abc
- from dataclasses import dataclass
- import os
- from collections import namedtuple
- import pandas as pd
- class DBI(abc.ABC):
- OUTPUT = print
- OUTPUT_KWARGS: dict = {}
- BACKEND = None
- #WriteReturn: Final = namedtuple('WriteReturn', ['rows', 'id'])
- @dataclass
- class WriteReturn:
- rows: int
- key: Optional[Any]
- def __init__(self,
- host: str,
- user: Optional[str] = None,
- password: Optional[str] = None,
- scheme: Optional[str] = None):
- self.host: str = os.environ.get(host, 'localhost')
- self.user: Optional[str] = os.environ.get(user or '', None)
- self.password: Optional[str] = os.environ.get(password or '', None)
- self.scheme: Optional[str] = os.environ.get(scheme or '', None)
- self.OUTPUT(f'Connecting to {self.host} -> {self.scheme if self.scheme else "*"}',
- **self.OUTPUT_KWARGS)
- @abc.abstractmethod
- def write(self,
- query) \
- -> WriteReturn:
- """Execute a query that writes data to the database (INSERT, UPDATE, DELETE)"""
- pass
- @abc.abstractmethod
- def read(self,
- query,
- single_row=False) \
- -> Union[pd.DataFrame, Optional[pd.Series]]:
- """Execute a query that reads data from the database (SELECT, SHOW)"""
- pass
- @abc.abstractmethod
- def insert(self,
- table: str,
- rows: Union[pd.DataFrame, Dict[str, Any]],
- ignore: bool = False,
- update: bool = False,
- schema: Optional[str] = None) \
- -> WriteReturn:
- """Prepared function to insert new data to a database"""
- pass
-
- @abc.abstractmethod
- def update(self,
- table: str,
- ids: Dict[str, Any],
- values: Dict[str, Any],
- schema: Optional[str] = None) \
- -> WriteReturn:
- """Prepared function to update data in a database"""
- pass
- @abc.abstractmethod
- def close(self) -> None:
- """Close the database connection"""
- self.OUTPUT(f'Closing connection to {self.host} -> {self.scheme}',
- **self.OUTPUT_KWARGS)
- @abc.abstractmethod
- def reconnect(self) -> None:
- """Reconnect the database connection"""
- self.OUTPUT(f'Reconnecting to {self.host} -> {self.scheme}',
- **self.OUTPUT_KWARGS)
|