import os import time import random import string import logging import psycopg2 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') class Database: def __init__( self, host=os.getenv('DB_HOST', 'localhost'), port=os.getenv('DB_PORT', ''), user=os.getenv('DB_USER', ''), password=os.getenv('DB_PASSWORD', ''), database=os.getenv('DB_DATABASE', ''), ): self.host = host self.port = port self.user = user self.password = password self.database = database self.connection = None def _open(self): if self.connection is None: try: logging.debug("Пытаюсь установить соединение с базой данных.") self.connection = psycopg2.connect( host=self.host, port=self.port, user=self.user, password=self.password, database=self.database, connect_timeout=5 ) logging.info(f"Соединение с базой данными {self.database} - установлено.") except (Exception, psycopg2.Error) as e: logging.error(f"Ошибка при подключении к базе данных: {e}") def close(self): if self.connection is not None: self.connection.close() logging.debug("Соединение с базой данных закрыто.") def execute_query(self, query, params=None): self._open() try: with self.connection.cursor() as cursor: cursor.execute(query, params) if query.strip().upper().startswith("SELECT"): result = cursor.fetchall() logging.debug(f"Выполнен запрос: {query} | Возвращено {len(result)} строк") return result else: self.connection.commit() logging.debug(f"Выполнен запрос: {query}") except Exception as e: logging.error(f"Ошибка выполнения запроса:\n{e}") def create_table(self, table_name, columns): columns_with_types = ', '.join([f"{col} {typ}" for col, typ in columns.items()]) create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns_with_types});" self.execute_query(create_table_query) def insert_row(self, table_name, **kwargs): columns = ', '.join(kwargs.keys()) placeholders = ', '.join(['%s'] * len(kwargs)) insert_query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders});" self.execute_query(insert_query, list(kwargs.values())) def update_row(self, table_name, set_values, condition): set_clause = ', '.join([f"{col} = %s" for col in set_values.keys()]) update_query = f"UPDATE {table_name} SET {set_clause} WHERE {condition};" self.execute_query(update_query, list(set_values.values())) def delete_row(self, table_name, condition): delete_query = f"DELETE FROM {table_name} WHERE {condition};" self.execute_query(delete_query) if __name__ == '__main__': db = Database() db.create_table( 'rows', { 'id': 'SERIAL PRIMARY KEY', 'text': 'VARCHAR(255) NOT NULL', } ) for _ in range(5): random_string = ''.join(random.choices(string.ascii_letters + string.digits, k=16)) db.insert_row('rows', text=random_string) logging.info(f'Создана новая строка: {random_string}') while True: all_rows = db.execute_query('SELECT * FROM rows;') random_string = ''.join(random.choices(string.ascii_letters + string.digits, k=16)) if all_rows: action = random.choice(['insert', 'update', 'delete']) random_row = random.choice(all_rows) if action == 'insert': db.insert_row('rows', text=random_string) logging.info(f'Вставлена строка: {random_string}') elif action == 'update': db.update_row('rows', {'text': random_string}, f'id = {random_row[0]}') logging.info(f'Обновлена строка с id {random_row[0]}: новый текст - {random_string}') elif action == 'delete': db.delete_row('rows', f'id = {random_row[0]}') logging.info(f'Удалена строка с id {random_row[0]}') else: logging.warning("Нет строк для обработки.") time.sleep(60)