Skip to content

Commit

Permalink
Merge pull request #2032 from chaoss/augur-new-fix-db-error
Browse files Browse the repository at this point in the history
Augur new fix db error
  • Loading branch information
sgoggins authored Nov 21, 2022
2 parents 2a741de + 4a4466a commit a29c83b
Show file tree
Hide file tree
Showing 33 changed files with 232 additions and 61 deletions.
2 changes: 2 additions & 0 deletions augur/application/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def new_func(ctx, *args, **kwargs):
usage = re.search(r"Usage:\s(.*)\s\[OPTIONS\]", str(ctx.get_usage())).groups()[0]
try:
engine.connect()
engine.dispose()
return ctx.invoke(function_db_connection, *args, **kwargs)
except OperationalError as e:

Expand Down Expand Up @@ -65,6 +66,7 @@ def new_func(ctx, *args, **kwargs):
if incorrect_values:
print(f"\n\n{usage} command setup failed\nERROR: connecting to database\nHINT: The {incorrect_values} may be incorrectly specified in {location}\n")

engine.dispose()
sys.exit()

return update_wrapper(new_func, function_db_connection)
Expand Down
13 changes: 12 additions & 1 deletion augur/application/cli/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def get_repo_groups():
connection,
)
print(df)
engine.dispose()

return df

Expand Down Expand Up @@ -127,6 +128,8 @@ def add_repo_groups(filename):
f"Repo group with ID {row[1]} for repo group {row[1]} already exists, skipping..."
)

engine.dispose()


@cli.command("add-github-org")
@click.argument("organization_name")
Expand Down Expand Up @@ -157,7 +160,11 @@ def get_db_version():
engine = create_database_engine()
with engine.connect() as connection:

return int(connection.execute(db_version_sql).fetchone()[2])
result = int(connection.execute(db_version_sql).fetchone()[2])

engine.dispose()
return result



@cli.command("print-db-version")
Expand Down Expand Up @@ -241,6 +248,8 @@ def update_api_key(api_key):
connection.execute(update_api_key_sql, api_key=api_key)
logger.info(f"Updated Augur API key to: {api_key}")

engine.dispose()


@cli.command("get-api-key")
@test_connection
Expand All @@ -259,6 +268,8 @@ def get_api_key():
except TypeError:
print("No Augur API key found.")

engine.dispose()


@cli.command(
"check-pgpass",
Expand Down
8 changes: 7 additions & 1 deletion augur/application/cli/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
from augur.application.db.engine import create_database_engine
from sqlalchemy.orm import sessionmaker

Session = sessionmaker(bind=create_database_engine())

engine = create_database_engine()
Session = sessionmaker(bind=engine)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -53,4 +55,8 @@ def add_user(username, email, firstname, lastname, admin, phone_number, password
user_type = "admin user" if admin else "user"
message = f"Successfully added new: {username}"
click.secho(message, bold=True)

session.close()
engine.dispose()

return 0
24 changes: 13 additions & 11 deletions augur/application/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
import json
from typing import List, Any, Optional
import os

from augur.application.db.models import Config

from augur.application.db.util import execute_session_query

def get_development_flag_from_config():
from augur.application.db.session import DatabaseSession

from logging import getLogger
from augur.application.db.session import DatabaseSession

logger = getLogger(__name__)
with DatabaseSession(logger) as session:
Expand All @@ -25,8 +25,6 @@ def get_development_flag():





default_config = {
"Augur": {
"developer": 0,
Expand Down Expand Up @@ -151,7 +149,8 @@ def get_section(self, section_name) -> dict:
Returns:
The section data as a dict
"""
section_data = self.session.query(Config).filter_by(section_name=section_name).all()
query = self.session.query(Config).filter_by(section_name=section_name)
section_data = execute_session_query(query, 'all')

section_dict = {}
for setting in section_data:
Expand All @@ -178,8 +177,8 @@ def get_value(self, section_name: str, setting_name: str) -> Optional[Any]:
The value from config if found, and None otherwise
"""
try:
config_setting = self.session.query(Config).filter(Config.section_name == section_name, Config.setting_name == setting_name).one()
# config_setting = Config.query.filter_by(section_name=section_name, setting_name=setting_name).one()
query = self.session.query(Config).filter(Config.section_name == section_name, Config.setting_name == setting_name)
config_setting = execute_session_query(query, 'one')
except s.orm.exc.NoResultFound:
return None

Expand All @@ -197,7 +196,8 @@ def load_config(self) -> dict:
The config from the database
"""
# get all the sections in the config table
section_names = self.session.query(Config.section_name).all()
query = self.session.query(Config.section_name)
section_names = execute_session_query(query, 'all')

config = {}
# loop through and get the data for each section
Expand Down Expand Up @@ -225,7 +225,8 @@ def empty(self) -> bool:
Returns:
True if the config is empty, and False if it is not
"""
return self.session.query(Config).first() is None
query = self.session.query(Config)
return execute_session_query(query, 'first') is None

def is_section_in_config(self, section_name: str) -> bool:
"""Determine if a section is in the config.
Expand All @@ -236,7 +237,8 @@ def is_section_in_config(self, section_name: str) -> bool:
Returns:
True if section is in the config, and False if it is not
"""
return self.session.query(Config).filter(Config.section_name == section_name).first() is not None
query = self.session.query(Config).filter(Config.section_name == section_name)
return execute_session_query(query, 'first') is not None


def add_or_update_settings(self, settings: List[dict]):
Expand Down
37 changes: 27 additions & 10 deletions augur/application/db/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
from psycopg2.errors import DeadlockDetected

# from augur.tasks.util.random_key_auth import RandomKeyAuth
from augur.application.db.engine import create_database_engine
from augur.application.config import AugurConfig
from augur.application.db.models import Platform
from augur.tasks.util.worker_util import remove_duplicate_dicts, remove_duplicate_naturals
from augur.application.db.util import get_connection


def remove_null_characters_from_string(string):
Expand Down Expand Up @@ -58,7 +58,13 @@ def __init__(self, logger, engine=None):
self.config = AugurConfig(logger=logger, session=self)

self.engine = engine
self.engine_created = False

if self.engine is None:
from augur.application.db.engine import create_database_engine

self.engine_created = True

self.engine = create_database_engine()

super().__init__(self.engine)
Expand All @@ -67,12 +73,17 @@ def __enter__(self):
return self

def __exit__(self, exception_type, exception_value, exception_traceback):

if self.engine_created:
self.engine.dispose()

self.close()

def execute_sql(self, sql_text):
return_data = {}
with self.engine.connect() as connection:
return_data = connection.execute(sql_text)

connection = get_connection(self.engine)
return_data = connection.execute(sql_text)
connection.close()

return return_data

Expand Down Expand Up @@ -151,14 +162,16 @@ def insert_data(self, data: Union[List[dict], dict], table, natural_keys: List[s
sleep_time_list = list(range(1,11))
deadlock_detected = False


connection = get_connection(self.engine)

# if there is no data to return then it executes the insert then returns nothing
if not return_columns:

while attempts < 10:
try:
with self.engine.connect() as connection:
connection.execute(stmnt)
break
connection.execute(stmnt)
break
except s.exc.OperationalError as e:
# print(str(e).split("Process")[1].split(";")[0])
if isinstance(e.orig, DeadlockDetected):
Expand All @@ -170,6 +183,7 @@ def insert_data(self, data: Union[List[dict], dict], table, natural_keys: List[s
attempts += 1
continue

connection.close()
raise e

else:
Expand All @@ -185,9 +199,8 @@ def insert_data(self, data: Union[List[dict], dict], table, natural_keys: List[s
# othewise it gets the requested return columns and returns them as a list of dicts
while attempts < 10:
try:
with self.engine.connect() as connection:
return_data_tuples = connection.execute(stmnt).fetchall()
break
return_data_tuples = connection.execute(stmnt).fetchall()
break
except s.exc.OperationalError as e:
if isinstance(e.orig, DeadlockDetected):
sleep_time = random.choice(sleep_time_list)
Expand All @@ -197,12 +210,16 @@ def insert_data(self, data: Union[List[dict], dict], table, natural_keys: List[s
attempts += 1
continue

connection.close()
raise e

else:
self.logger.error("Unable to insert and return data in 10 attempts")
connection.close()
return None

connection.close()

if deadlock_detected is True:
self.logger.error("Made it through even though Deadlock was detected")

Expand Down
48 changes: 48 additions & 0 deletions augur/application/db/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from sqlalchemy.exc import OperationalError
import time



def catch_operational_error(func):

attempts = 0
while attempts < 4:

# do the sleep here instead of instead of in the exception
# so it doesn't sleep after the last failed time
if attempts > 0:
time.sleep(240)
try:
return func()
except OperationalError:
pass

attempts += 1

raise Exeption("Unable to Resolve Operational Error")


def execute_session_query(query, query_type="all"):

func = None
if query_type == "all":
func = query.all
elif query_type == "one":
func = query.one
elif query_type == "first":
func = query.first
else:
raise Exception(f"ERROR: Unsupported query type '{query_type}'")

return catch_operational_error(func)


def get_connection(engine):

func = engine.connect

return catch_operational_error(func)




7 changes: 6 additions & 1 deletion augur/application/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from augur.application.db.models import Config
from augur.application.config import convert_type_of_value
from augur.application.db.util import execute_session_query

ROOT_AUGUR_DIRECTORY = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))

Expand Down Expand Up @@ -83,7 +84,11 @@ def get_log_config():
engine = create_database_engine()
session = Session(engine)

section_data = session.query(Config).filter_by(section_name="Logging").all()
query = session.query(Config).filter_by(section_name="Logging")
section_data = execute_session_query(query, 'all')

session.close()
engine.dispose()

section_dict = {}
for setting in section_data:
Expand Down
5 changes: 4 additions & 1 deletion augur/tasks/data_analysis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
from augur.tasks.data_analysis.message_insights.tasks import message_insight_model
from augur.tasks.data_analysis.pull_request_analysis_worker.tasks import pull_request_analysis_model
from augur.application.db.session import DatabaseSession
from augur.application.db.util import execute_session_query


def machine_learning_phase(logger):

with DatabaseSession(logger) as session:
repos = session.query(Repo).all()
query = session.query(Repo)
repos = execute_session_query(query, 'all')

ml_tasks = []
clustering_tasks = []
Expand Down
5 changes: 4 additions & 1 deletion augur/tasks/data_analysis/clustering_worker/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from augur.application.db.session import DatabaseSession
from augur.application.db.models import Repo, RepoClusterMessage, RepoTopic, TopicWord
from augur.application.db.engine import create_database_engine
from augur.application.db.util import execute_session_query


MODEL_FILE_NAME = "kmeans_repo_messages"
stemmer = nltk.stem.snowball.SnowballStemmer("english")
Expand All @@ -49,7 +51,8 @@ def clustering_model(repo_git: str) -> None:

with DatabaseSession(logger) as session:

repo_id = session.query(Repo).filter(Repo.repo_git == repo_git).one().repo_id
query = session.query(Repo).filter(Repo.repo_git == repo_git)
repo_id = execute_session_query(query, 'one').repo_id

num_clusters = session.config.get_value("Clustering_Task", 'num_clusters')
max_df = session.config.get_value("Clustering_Task", 'max_df')
Expand Down
4 changes: 3 additions & 1 deletion augur/tasks/data_analysis/discourse_analysis/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from augur.application.db.session import DatabaseSession
from augur.application.db.models import Repo, DiscourseInsight
from augur.application.db.engine import create_database_engine
from augur.application.db.util import execute_session_query

#import os, sys, time, requests, json
# from sklearn.model_selection import train_test_split
Expand Down Expand Up @@ -41,7 +42,8 @@ def discourse_analysis_model(repo_git: str) -> None:

with DatabaseSession(logger) as session:

repo_id = session.query(Repo).filter(Repo.repo_git == repo_git).one().repo_id
query = session.query(Repo).filter(Repo.repo_git == repo_git)
repo_id = execute_session_query(query, 'one').repo_id

get_messages_for_repo_sql = s.sql.text("""
(SELECT r.repo_group_id, r.repo_id, r.repo_git, r.repo_name, i.issue_id thread_id,m.msg_text,i.issue_title thread_title,m.msg_id
Expand Down
Loading

0 comments on commit a29c83b

Please sign in to comment.