#!/usr/bin/env python
# -*- coding: utf-8 -*-
# db.sql_connector.py
"""Tools used to communicate with the SQL database"""
# Copyright (c) 2016-2019 Dan Cutright
# This file is part of DVH Analytics, released under a BSD license.
# See the file LICENSE included with this distribution, also
# available at https://github.com/cutright/DVH-Analytics
from wx import CallAfter
import psycopg2
import sqlite3
from datetime import datetime
from dateutil.parser import parse as date_parser
from os.path import dirname, join, isfile
from dvha.db.sql_columns import categorical, numerical
from dvha.options import Options
from dvha.paths import CREATE_PGSQL_TABLES, CREATE_SQLITE_TABLES, DATA_DIR
from dvha.tools.errors import SQLError, push_to_log
import json
[docs]class DVH_SQL:
"""This class is used to communicate to the SQL database
Parameters
----------
config : dict
optional SQL login credentials, stored values used if nothing provided.
Allowed keys are 'host', 'port', 'dbname', 'user', 'password'
db_type : str, optional
either 'pgsql' or 'sqlite'
group : int, optional
use a group-specific connection, either 1 or 2
"""
def __init__(self, *config, db_type=None, group=1):
stored_options = Options()
if config:
self.db_type = db_type if db_type is not None else "pgsql"
config = config[0]
else:
# Read SQL configuration file
if group == 2 and stored_options.SYNC_SQL_CNX:
group = 1
self.db_type = (
stored_options.DB_TYPE_GRPS[group]
if db_type is None
else db_type
)
config = stored_options.SQL_LAST_CNX_GRPS[group][self.db_type]
if self.db_type == "sqlite":
db_file_path = config["host"]
if not dirname(
db_file_path
): # file_path has not directory, assume it lives in DATA_DIR
db_file_path = join(DATA_DIR, db_file_path)
self.db_name = None
self.cnx = sqlite3.connect(db_file_path)
else:
self.db_name = config["dbname"]
self.cnx = psycopg2.connect(**config)
self.cursor = self.cnx.cursor()
self.tables = ["DVHs", "Plans", "Rxs", "Beams", "DICOM_Files"]
def __enter__(self):
return self
def __exit__(self, ctx_type, ctx_value, ctx_traceback):
self.close()
[docs] def close(self):
"""Close the SQL DB connection"""
self.cnx.close()
[docs] def execute_file(self, sql_file_name):
"""Executes lines within provided text file to SQL
Parameters
----------
sql_file_name : str
absolute file path of a text file containing SQL commands
"""
for line in open(sql_file_name):
if not line.startswith("--"): # ignore commented lines
self.cursor.execute(line)
self.cnx.commit()
[docs] def execute_str(self, command_str):
"""Execute and commit a string in proper SQL syntax, can handle
multiple lines split by \n
Parameters
----------
command_str : str
command or commands to be executed and committed
"""
for line in command_str.split("\n"):
if line:
self.cursor.execute(line)
self.cnx.commit()
[docs] def check_table_exists(self, table_name):
"""Check if a table exists
Parameters
----------
table_name : st
the SQL table to check
Returns
-------
bool
True if ``table_name`` exists
"""
self.cursor.execute(
"""
SELECT COUNT(*)
FROM information_schema.tables
WHERE table_name = '{0}'
""".format(
table_name.replace("'", "''")
)
)
return self.cursor.fetchone()[0] == 1
[docs] def query(self, table_name, return_col_str, *condition_str, **kwargs):
"""A generalized query function for DVHA
Parameters
----------
table_name : str
DVHs', 'Plans', 'Rxs', 'Beams', or 'DICOM_Files'
return_col_str : str: str
a csv of SQL columns to be returned
condition_str : str: str
a condition in SQL syntax
kwargs :
optional parameters order, order_by, and bokeh_cds
Returns
-------
list, dict
Returns a list of lists by default, or a dict of lists if
bokeh_cds in kwargs and is true
"""
order, order_by = None, None
if kwargs:
if "order" in kwargs:
order = kwargs["order"]
if "order_by" in kwargs:
order_by = kwargs["order_by"]
if not order:
order = "ASC"
query = "Select %s from %s;" % (return_col_str, table_name)
if condition_str and condition_str[0]:
query = "Select %s from %s where %s;" % (
return_col_str,
table_name,
condition_str[0],
)
if order and order_by:
query = "%s Order By %s %s;" % (query[:-1], order_by, order)
try:
self.cursor.execute(query)
results = self.cursor.fetchall()
except Exception as e:
raise SQLError(str(e), query)
if "bokeh_cds" in kwargs and kwargs["bokeh_cds"]:
keys = [c.strip() for c in return_col_str.split(",")]
results = {
key: [results[r][i] for r in range(len(results))]
for i, key in enumerate(keys)
}
for key in list(results):
if "time" in key or "date" in key:
results[key] = [str(value) for value in results[key]]
return results
[docs] def query_generic(self, query_str):
"""A generic query function that executes the provided string
Parameters
----------
query_str : str
SQL command
Returns
-------
list
Results of ``cursor.fetchall()``
"""
self.cursor.execute(query_str)
return self.cursor.fetchall()
@property
def now(self):
"""Get a datetime object for now
Returns
-------
datetime
The current time reported by the database
"""
return self.query_generic("SELECT %s" % self.sql_cmd_now)[0][0]
@property
def sql_cmd_now(self):
"""Get the SQL command for now, based on database type
Returns
-------
str
SQL command for now
"""
if self.db_type == "sqlite":
sql_cmd = "strftime('%Y-%m-%d %H:%M:%f', 'now', 'localtime')"
else:
sql_cmd = "NOW()"
return sql_cmd
[docs] def process_value(self, value):
try:
float(value)
value_is_numeric = True
except ValueError:
value_is_numeric = False
if "::date" in str(value):
amend_type = ["", "::date"][
self.db_type == "pgsql"
] # sqlite3 does not support ::date
value = "'%s'%s" % (
value.strip("::date"),
amend_type,
) # augment value for postgresql date formatting
elif value_is_numeric:
value = str(value)
elif "null" == str(value.lower()):
value = "NULL"
else:
value = "'%s'" % str(value) # need quotes to input a string
return value
[docs] def update(self, table_name, column, value, condition_str):
"""Change the data in the database
Parameters
----------
table_name : str
DVHs', 'Plans', 'Rxs', 'Beams', or 'DICOM_Files'
column : str
SQL column to be updated
value : str, float, int
value to be set
condition_str : str
a condition in SQL syntax
"""
self.update_multicolumn(table_name, [column], [value], condition_str)
[docs] def update_multicolumn(self, table_name, columns, values, condition_str):
"""Change the data in the database
Parameters
----------
table_name : str
DVHs', 'Plans', 'Rxs', 'Beams', or 'DICOM_Files'
columns : list
list of SQL column to be updated
values : list
list value to be set
condition_str : str
a condition in SQL syntax
"""
values = [self.process_value(v) for v in values]
set_str = [f"{columns[i]} = {value}" for i, value in enumerate(values)]
set_str = ','.join(set_str)
update = f"Update {table_name} SET {set_str} WHERE {condition_str}"
try:
self.cursor.execute(update)
self.cnx.commit()
except Exception as e:
push_to_log(e, msg="Database update failure!")
[docs] def is_study_instance_uid_in_table(self, table_name, study_instance_uid):
"""Check if a study instance uid exists in the provided table
Parameters
----------
table_name : str
SQL table name
study_instance_uid : str
study instance uid
Returns
-------
bool
True if ``study_instance_uid`` exists in the study_instance_uid
column of ``table_name``
"""
# As of DVH v0.7.5, study_instance_uid may end with _N where N is the
# nth plan of a file set
query = (
"SELECT DISTINCT study_instance_uid FROM %s WHERE "
"study_instance_uid LIKE '%s%%';"
% (table_name, study_instance_uid)
)
self.cursor.execute(query)
results = self.cursor.fetchall()
return bool(results)
[docs] def is_mrn_in_table(self, table_name, mrn):
"""
Parameters
----------
table_name : str
SQL table name
mrn : str
medical record number
Returns
-------
bool
True if ``mrn`` exists in the mrn column of ``table_name``
"""
return self.is_value_in_table(table_name, mrn, "mrn")
[docs] def is_value_in_table(self, table_name, value, column):
"""Check if a str value exists in a SQL table
Parameters
----------
table_name : sr
SQL table name
value : str
value of interest (str only)
column :
SQL table column
Returns
-------
bool
True if ``value`` exists in ``table_name.column``
"""
query = "Select Distinct %s from %s where %s = '%s';" % (
column,
table_name,
column,
value,
)
self.cursor.execute(query)
results = self.cursor.fetchall()
return bool(results)
[docs] def insert_row(self, table, row):
"""Generic function to import data to the database
Parameters
----------
table : str
SQL table name
row : dict
data returned from DICOM_Parser.get_<table>_row()
"""
columns = list(row)
allowed_columns = self.get_column_names(table)
used_columns = []
values = []
for column in columns:
if column in allowed_columns:
used_columns.append(column)
if (
row[column] is None
or row[column][0] is None
or row[column][0] == ""
):
if column == "import_time_stamp":
values.append(self.sql_cmd_now)
else:
values.append("NULL")
else:
value = row[column][0]
value_type = row[column][1]
if "varchar" in value_type:
max_length = int(
value_type.replace("varchar(", "").replace(")", "")
)
values.append(
"'%s'" % truncate_string(value, max_length)
)
elif value_type in {"time_stamp", "date"}:
date = date_parser(value)
value = date.date()
if value_type == "time_stamp":
value = "%s %s" % (value, date.time())
if (
self.db_type == "pgsql"
): # sqlite3 does not support ::date
values.append("'%s'::date" % value)
else:
values.append("'%s'" % value)
else:
values.append("'%s'" % value)
else:
msg = (
"Failed to update SQL column %s in table %s, "
"it does not exist" % (column, table)
)
push_to_log(msg=msg)
cmd = "INSERT INTO %s (%s) VALUES (%s);\n" % (
table,
",".join(used_columns),
",".join(values),
)
self.execute_str(cmd)
[docs] def insert_data_set(self, data_set):
"""Insert an entire data set for a plan
Parameters
----------
data_set : dict
a dictionary of data with table names for keys, and a list of row
data for values
"""
for key in list(data_set):
for row in data_set[key]:
self.insert_row(key, row)
[docs] def get_dicom_file_paths(self, mrn=None, uid=None):
"""Lookup the dicom file paths of imported data
Parameters
----------
mrn : str, optional
medical record number
uid : str, optional
study instance uid
Returns
-------
list
Query return from DICOM_Files table
"""
condition = None
if uid:
condition = "study_instance_uid = '%s'" % uid
elif mrn:
condition = "mrn = '%s'" % mrn
if condition is not None:
columns = (
"mrn, study_instance_uid, folder_path, plan_file, "
"structure_file, dose_file"
)
return self.query(
"DICOM_Files", columns, condition, bokeh_cds=True
)
[docs] def delete_rows(self, condition_str, ignore_tables=None):
"""Delete all rows from all tables not in ignore_table for a given
condition. Useful when deleting a plan/patient
Parameters
----------
condition_str : str: str
a condition in SQL syntax
ignore_tables : list, optional
tables to be excluded from row deletions
"""
tables = set(self.tables)
if ignore_tables:
tables = tables - set(ignore_tables)
for table in tables:
self.cursor.execute(
"DELETE FROM %s WHERE %s;" % (table, condition_str)
)
self.cnx.commit()
[docs] def change_mrn(self, old, new):
"""Edit all mrns in database
Parameters
----------
old : str
current mrn
new : str
new mrn
"""
condition = "mrn = '%s'" % old
for table in self.tables:
self.update(table, "mrn", new, condition)
[docs] def change_uid(self, old, new):
"""Edit study instance uids in database
Parameters
----------
old : str
current study instance uid
new : str
new study instance uid
"""
condition = "study_instance_uid = '%s'" % old
for table in self.tables:
self.update(table, "study_instance_uid", new, condition)
[docs] def delete_dvh(self, roi_name, study_instance_uid):
"""Delete a specified DVHs table row
Parameters
----------
roi_name : str
the roi name for the row to be deleted
study_instance_uid : str
the associated study instance uid
"""
self.cursor.execute(
"DELETE FROM DVHs WHERE roi_name = '%s' and "
"study_instance_uid = '%s';" % (roi_name, study_instance_uid)
)
self.cnx.commit()
[docs] def ignore_dvh(self, variation, study_instance_uid, unignore=False):
"""Change an uncategorized roi name to ignored so that it won't show
up in the list of uncategorized rois, so that
the user doesn't have to evaluate its need everytime they cleanup the
misc rois imported
Parameters
----------
variation : str
roi name
study_instance_uid : str
the associated study instance uid
unignore : bool, optional
if set to True, sets the variation to 'uncategorized'
"""
physician_roi = ["ignored", "uncategorized"][unignore]
self.update(
"dvhs",
"physician_roi",
physician_roi,
"roi_name = '%s' and study_instance_uid = '%s'"
% (variation, study_instance_uid),
)
[docs] def drop_tables(self):
"""Delete all tables in the database if they exist"""
for table in self.tables:
self.cursor.execute("DROP TABLE IF EXISTS %s;" % table)
self.cnx.commit()
[docs] def drop_table(self, table):
"""Delete a table in the database if it exists
Parameters
----------
table : str
SQL table
"""
self.cursor.execute("DROP TABLE IF EXISTS %s;" % table)
self.cnx.commit()
[docs] def initialize_database(self):
"""Ensure that all of the latest SQL columns exist in the database"""
create_tables_file = [CREATE_PGSQL_TABLES, CREATE_SQLITE_TABLES][
self.db_type == "sqlite"
]
self.execute_file(create_tables_file)
[docs] def reinitialize_database(self):
"""Delete all data and create all tables with latest columns"""
self.drop_tables()
self.vacuum()
self.initialize_database()
[docs] def vacuum(self):
"""Call to reclaim space in the database"""
if self.db_type == "sqlite":
self.cnx.isolation_level = None
self.cnx.execute("VACUUM")
self.cnx.isolation_level = ""
else:
# TODO: PGSQL VACUUM needs testing
# self.cnx.execute('VACUUM')
pass
[docs] def does_db_exist(self):
"""Check if database exists
Returns
-------
bool
True if the database exists
"""
if self.db_name:
line = (
"SELECT datname FROM pg_catalog.pg_database WHERE "
"lower(datname) = lower('%s');" % self.db_name
)
self.cursor.execute(line)
return bool(len(self.cursor.fetchone()))
else:
return True
[docs] def is_sql_table_empty(self, table):
"""Check if specified SQL table is empty
Parameters
----------
table : str
SQL table
Returns
-------
bool
True if ``table`` is empty
"""
line = "SELECT COUNT(*) FROM %s;" % table
self.cursor.execute(line)
count = self.cursor.fetchone()[0]
return not (bool(count))
[docs] def get_unique_values(self, table, column, *condition, **kwargs):
"""Uses SELECT DISTINCT to get distinct values in database
Parameters
----------
table : str
SQL table
column : str
SQL column
condition : str, optional
Condition in SQL syntax
kwargs :
option to ignore null values in return
Returns
-------
list
Unique values in table.column
"""
if condition and condition[0]:
query = "select distinct %s from %s where %s;" % (
column,
table,
str(condition[0]),
)
else:
query = "select distinct %s from %s;" % (column, table)
self.cursor.execute(query)
cursor_return = self.cursor.fetchall()
if "ignore_null" in kwargs and kwargs["ignore_null"]:
unique_values = [str(uv[0]) for uv in cursor_return if str(uv[0])]
else:
unique_values = [str(uv[0]) for uv in cursor_return]
unique_values.sort()
return unique_values
[docs] def get_column_names(self, table_name):
"""Get all of the column names for a specified table
Parameters
----------
table_name : str
SQL table
Returns
-------
list
All columns names in ``table_name``
"""
if self.db_type == "sqlite":
query = "PRAGMA table_info(%s);" % table_name.lower()
index = 1
else:
query = (
"select column_name from information_schema.columns where "
"table_name = '%s';" % table_name.lower()
)
index = 0
self.cursor.execute(query)
cursor_return = self.cursor.fetchall()
columns = [str(c[index]) for c in cursor_return]
columns.sort()
return columns
[docs] def is_sqlite_column_datetime(self, table_name, column):
"""Check if a sqlite column is a datetime data type
Parameters
----------
table_name : str
SQL table
column : str
SQL column
Returns
-------
bool
True if the ``table_name.column`` store datetime data
"""
if self.db_type == "sqlite":
query = "PRAGMA table_info(%s);" % table_name.lower()
self.cursor.execute(query)
cursor_return = self.cursor.fetchall()
columns = {str(c[1]): str(c[2]) for c in cursor_return}
if column in list(columns):
column_type = columns[column]
return (
"time" in column_type.lower()
or "date" in column_type.lower()
)
return False
[docs] def get_min_value(self, table, column, condition=None):
"""Get the minimum value in the database for a given table and column
Parameters
----------
table : str
SQL table
column : str
SQL column
condition : str, optional
Condition in SQL syntax
Returns
-------
any
The minimum value for table.column
"""
return self.get_sql_function_value(
"MIN", table, column, condition=condition
)
[docs] def get_max_value(self, table, column, condition=None):
"""Get the maximum value in the database for a given table and column
Parameters
----------
table : str
SQL table
column : str
SQL column
condition : str, optional
Condition in SQL syntax
Returns
-------
any
The maximum value for table.column
"""
return self.get_sql_function_value(
"MAX", table, column, condition=condition
)
[docs] def get_sql_function_value(
self, func, table, column, condition=None, first_value_only=True
):
"""Helper function used by get_min_values and get_max_values
Parameters
----------
func :
SQL compatible function
table : str
SQL table
column : str
SQL column
condition : str, optional
Condition in SQL syntax (Default value = None)
first_value_only : bool, optional
if true, only return the first value, otherwise all values returned
Returns
-------
list, any
Results of ``cursor.fetchone()`` or ``cursor.fetchone()[0]`` if
``first_value_only`` is True
"""
if condition:
query = "SELECT %s(%s) FROM %s WHERE %s;" % (
func,
column,
table,
condition,
)
else:
query = "SELECT %s(%s) FROM %s;" % (func, column, table)
self.cursor.execute(query)
cursor_return = self.cursor.fetchone()
if first_value_only:
return cursor_return[0]
return cursor_return
[docs] def get_roi_count_from_query(self, uid=None, dvh_condition=None):
"""Counts the DVH rows that match the provided conditions
Parameters
----------
uid : str, optional
study instance uid
dvh_condition : str, optional
condition in SQL syntax for the DVHs table
Returns
-------
int
Number of rows in the DVHs table matching the provided parameters
"""
if uid:
condition = "study_instance_uid in ('%s')" % "', '".join(
dvh_condition
)
if dvh_condition:
condition = " and " + condition
else:
condition = ""
if dvh_condition:
condition = dvh_condition + condition
return len(self.query("DVHs", "mrn", condition))
[docs] def is_uid_imported(self, uid):
"""Check all tables to see if study instance uid is used
Parameters
----------
uid : str
study instance uid
Returns
-------
bool
True if ``uid`` exists in any table
"""
for table in self.tables:
if self.is_study_instance_uid_in_table(table, uid):
return True
return False
[docs] def is_mrn_imported(self, mrn):
"""Check all tables to see if MRN is used
Parameters
----------
mrn : str
medical record number
Returns
-------
bool
True if ``mrn`` exists in any table
"""
for table in self.tables:
if self.is_mrn_in_table(table, mrn):
return True
return False
[docs] def is_roi_imported(self, roi_name, study_instance_uid):
"""Check if a study is already using a specified roi name
Parameters
----------
roi_name : str
roi name to check
study_instance_uid : str
restrict search to this study_instance_uid
Returns
-------
bool
True if ``roi_name`` is used in the DVHs table for the given
``study_instance_uid``
"""
condition = "roi_name = '%s' and study_instance_uid = '%s'" % (
roi_name,
study_instance_uid,
)
roi_names = self.get_unique_values("DVHs", "roi_name", condition)
return bool(roi_names)
[docs] def get_row_count(self, table, condition=None):
"""
Parameters
----------
table : str
SQL table
condition : str
SQL condition
Returns
-------
int
Number of rows in ``table`` meeting ``condition``
"""
ans = self.query(table, "COUNT(mrn)", condition)
if ans:
return ans[0][0]
return 0
[docs] def export_to_sqlite(self, file_path, callback=None, force=False):
"""Create a new SQLite database and import this database's data
Parameters
----------
file_path : str
Path where the new SQLite database will be created
callback : callable, optional
optional function to be called on each row insertion. Should accept
table (str), current row (int), total_row_count (int) as parameters
force : bool, optional
ignore duplicate StudyInstanceUIDs if False
"""
config = {"host": file_path}
new_cnx = DVH_SQL(config, db_type="sqlite")
new_cnx.initialize_database()
self.import_db(self, new_cnx, callback=callback, force=force)
[docs] @staticmethod
def import_db(cnx_src, cnx_dst, callback=None, force=False):
"""
Parameters
----------
cnx_src : DVH_SQL
the source DVHA DB connection
cnx_dst : DVH_SQL
the destination DVHA DB connection
callback : callable, optional
optional function to be called on each row insertion. Should accept
table (str), current row (int), total_row_count (int) as parameters
force : bool, optional
ignore duplicate StudyInstanceUIDs if False
"""
for table in cnx_src.tables:
columns = cnx_src.get_column_names(table)
study_uids_1 = cnx_src.get_unique_values(
table, "study_instance_uid"
)
condition = None
if not force:
study_uids_2 = cnx_dst.get_unique_values(
table, "study_instance_uid"
)
study_uids_1 = list(set(study_uids_1) - set(study_uids_2))
condition = "study_instance_uid IN ('%s')" % "','".join(
study_uids_1
)
total_row_count = cnx_src.get_row_count(table, condition)
counter = 0
for uid in study_uids_1:
study_data = cnx_src.query(
table, ",".join(columns), "study_instance_uid = '%s'" % uid
)
for row in study_data:
if callback is not None:
CallAfter(callback, table, counter, total_row_count)
counter += 1
row_str = "'" + "','".join([str(v) for v in row]) + "'"
row_str = row_str.replace("'None'", "NULL")
cmd = "INSERT INTO %s (%s) VALUES (%s);\n" % (
table,
",".join(columns),
row_str,
)
cnx_dst.execute_str(cmd)
[docs] def save_to_json(self, file_path, callback=None):
"""Export SQL database to a JSON file
Parameters
----------
file_path : str
file_path to new JSON file
callback : callable, optional
optional function to be called on each table insertion. Should
accept table_name (str), table (int), table_count (int) as
parameters
"""
json_data = {
"columns": {"categorical": categorical, "numerical": numerical}
}
for i, table in enumerate(self.tables):
if callback is not None:
CallAfter(callback, table, i, len(self.tables))
columns = self.get_column_names(table)
json_data[table] = self.query(
table, ",".join(columns), bokeh_cds=True
)
with open(file_path, "w") as fp:
json.dump(json_data, fp)
[docs] def get_ptv_counts(self):
"""Get number of PTVs for each study instance uid
Returns
-------
dict
PTV counts stored by ``study_instance_uid``
"""
results = self.query(
"DVHs", "study_instance_uid", "roi_type like 'PTV%'"
)
uids = [uid[0] for uid in results]
return {uid: uids.count(uid) for uid in list(set(uids))}
[docs]def truncate_string(input_string, character_limit):
"""Used to truncate a string to ensure it may be imported into database
Parameters
----------
input_string : str
string to be truncated
character_limit : int
the maximum number of allowed characters
Returns
-------
str
truncated string
"""
if len(input_string) > character_limit:
return input_string[0 : (character_limit - 1)]
return input_string
[docs]def echo_sql_db(config=None, db_type="pgsql", group=1):
"""Echo the database using stored or provided credentials
Parameters
----------
config : dict, optional
database login credentials
db_type : str, optional
either 'pgsql' or 'sqlite'
group : int, optional
either group 1 or 2
Returns
-------
bool
True if echo is successful
"""
try:
if config:
if db_type == "pgsql" and (
"dbname" not in list(config) or "port" not in list(config)
):
return False
cnx = DVH_SQL(config, db_type=db_type, group=group)
else:
cnx = DVH_SQL(group=group)
cnx.close()
return True
except Exception as e:
if type(e) not in [
psycopg2.OperationalError,
sqlite3.OperationalError,
]:
push_to_log(e, msg="Unknown Error during SQL Echo")
return False
[docs]def write_test(
config=None, db_type="pgsql", group=1, table=None, column=None, value=None
):
"""Write test data to database, verify with a query
Parameters
----------
config : dict, optional
database login credentials
db_type : str, optional
either 'pgsql' or 'sqlite'
group : int, optional
either group 1 or 2
table : str, optional
SQL table
column : str, optional
SQL column
value : str, optional
test value
Returns
-------
dict
Write and Read test statuses
"""
try:
if config:
if db_type == "pgsql" and (
"dbname" not in list(config) or "port" not in list(config)
):
return None
cnx = DVH_SQL(config, db_type=db_type, group=group)
else:
cnx = DVH_SQL(group=group)
except Exception as e:
push_to_log(
e, msg="Write Test: Connection to SQL could not be established"
)
return {"write": False, "delete": False}
try:
cnx.initialize_database()
except Exception as e:
push_to_log(e, msg="Write Test: DVH_SQL.initialize_database failed")
if table is None:
table = cnx.tables[-1]
if column is None:
column = "mrn"
if value is None:
# Find a test value that does not exist in the database
value_init = "SqlTest_"
i = 0
value = value_init + str(i)
current_values = cnx.get_unique_values(table, column)
while value in current_values:
value = value_init + str(i)
i += 1
condition_str = "%s = '%s'" % (column, value)
insert_cmd = "INSERT INTO %s (%s) VALUES ('%s');" % (table, column, value)
delete_cmd = "DELETE FROM %s WHERE %s;" % (table, condition_str)
try:
cnx.execute_str(insert_cmd)
except Exception as e:
push_to_log(e, msg="Write Test: SQL test insert command failed")
try:
test_return = cnx.query(table, column, condition_str)
write_test_success = len(test_return) > 0
except Exception as e:
write_test_success = False
push_to_log(e, msg="Write Test: SQL query of test insert failed")
if not write_test_success:
delete_test_success = None
else:
try:
cnx.execute_str(delete_cmd)
test_return = cnx.query(table, column, condition_str)
delete_test_success = len(test_return) == 0
except Exception as e:
delete_test_success = False
push_to_log(
e, msg="Write Test: SQL delete command of test insert failed"
)
try:
cnx.close()
except Exception as e:
push_to_log(e, msg="Write Test: Failed to close SQL connection")
return {"write": write_test_success, "delete": delete_test_success}
[docs]def initialize_db():
"""Initialize the database"""
with DVH_SQL() as cnx:
cnx.initialize_database()
[docs]def is_file_sqlite_db(sqlite_db_file):
"""Check if file is a sqlite database
Parameters
----------
sqlite_db_file : str
path to file to be checked
Returns
-------
bool
True if ``sqlite_db_file`` is a sqlite database
"""
if isfile(sqlite_db_file):
try:
cnx = sqlite3.connect(sqlite_db_file)
cnx.close()
return True
except sqlite3.OperationalError:
pass
return False