class MongoDB(Database): class Query: def __init__(self, table: str, select: Dict = None, condition: Dict = None, data: List[Dict] = None): self.table = table self.select = select self.condition = condition self.data = data def __init__(self, host: str = 'localhost', user: str = None, password: str = None, scheme: str = ''): super(MongoDB, self).__init__(host, user, password, scheme) self.conn = pymongo.MongoClient(f'mongodb://{host}:27017/{scheme or ""}') def write(self, query: "MongoDB.Query", single_insert: bool = True) -> bool: super(MongoDB, self).write(query) db = self.conn[self.scheme][query.table] if single_insert: res = db.insert_one(query.data[0]) return res is not None else: res = db.insert_many(query.data) return len(res.inserted_ids) == len(query.data) def read_raw(self, query: "MongoDB.Query") -> pymongo.cursor.Cursor: db = self.conn[self.scheme][query.table] res = db.find(query.condition, query.select) return res def read(self, query: "MongoDB.Query", single_row: bool = False) -> Optional[pd.DataFrame]: super(MongoDB, self).read(query, single_row) db = self.conn[self.scheme][query.table] res = db.find(query.condition, query.select) return self.raw_to_dataframe(res, single_row=single_row) def update(self, query: "MongoDB.Query") -> bool: db = self.conn[self.scheme][query.table] res = db.update_many(query.condition, {'$push': query.data[0]}, upsert=True) return res.matched_count > 0 def close(self) -> None: super(MongoDB, self).close() self.conn.close() def reconnect(self) -> None: super(MongoDB, self).reconnect() @staticmethod def raw_to_dataframe(raw: pymongo.cursor.Cursor, single_row: bool = False) -> Optional[pd.DataFrame]: if raw is None: return None data = list(raw) if len(data) == 0: return None if single_row: return pd.DataFrame(data).iloc[0] else: return pd.DataFrame(data)