Source code for dvha.tools.stats

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# tools.stats.py
"""
Take numerical data from main app and convert to a format suitable for statistical analysis
in Regression and Control Chart tabs

"""
# Copyright (c) 2016-2019 Dan Cutright
# This file is part of DVH Analytics, released under a BSD license.
#    See the file LICENSE included with this distribution, also
#    available at https://github.com/cutright/DVH-Analytics

import numpy as np
from scipy import stats as scipy_stats
from sklearn import linear_model
from sklearn.metrics import mean_squared_error, r2_score
from regressors import stats as regressors_stats
from dvha.db import sql_columns


[docs]class StatsData: """Class used to to collect data for Regression and Control Chart This process is different than for Time Series since regressions require all variables to be the same length Parameters ---------- dvhs : DVH data from DVH query table_data : dict table data other than from DVHs. Has keys of 'Plans', 'Rxs', 'Beams' with values of QuerySQL objects """ def __init__(self, dvhs, table_data, group=1): self.dvhs = dvhs self.table_data = table_data self.group = group self.column_info = sql_columns.numerical self.correlation_variables = list(self.column_info) self.correlation_variables.sort() self.__map_data() def __map_data(self): self.data = {} stat_types = ["min", "mean", "median", "max"] for var in self.correlation_variables: if var in self.column_info.keys(): var_name = self.column_info[var]["var_name"] table = self.column_info[var]["table"] if table == "DVHs" and hasattr(self.dvhs, var_name): self.data[var] = { "units": self.column_info[var]["units"], "values": getattr(self.dvhs, var_name), } # single value variables elif table == "Plans" and hasattr( self.table_data[table], var_name ): src = self.table_data[table] self.data[var] = { "units": self.column_info[var]["units"], "values": [ getattr(src, var_name)[self.get_plan_index(uid)] for uid in self.uids ], } # multi value variables elif table == "Beams" and hasattr( self.table_data[table], var_name ): src = self.table_data[table] if str_starts_with_any_in_list( var, [ "Beam Complexity", "Beam Area", "Control Point MU", "Beam Perimeter", "Beam Energy", ], ): # stats of these four variable types have min, mean, median, and max types in DB # The following will take min, mean, median, or max of all values for a UID based on var type # Example, if var_name == Beam Complexity (Max), the following will return the Max of these temp = [] for uid in self.uids: indices = self.get_beam_indices(uid) beam_data = getattr( self.table_data["Beams"], var_name ) values = [ beam_data[i] for i in indices if beam_data[i] != "None" ] for stat in stat_types: if stat in var.lower(): if values: temp.append(getattr(np, stat)(values)) else: temp.append(None) self.data[var] = { "units": self.column_info[var]["units"], "values": temp, } else: temp = {s: [] for s in stat_types} for uid in self.uids: for stat in stat_types: values = self._get_src_values( src, var_name, uid ) values = [v for v in values if v != "None"] if values: temp[stat].append( getattr(np, stat)(values) ) else: temp[stat].append(None) for stat in stat_types: corr_key = "%s (%s)" % (var, stat.capitalize()) self.data[corr_key] = { "units": self.column_info[var]["units"], "values": temp[stat], } self._validate_data() def _validate_data(self): """Remove any variables that are constant to avoid crash on regression""" bad_vars = [] for var_name, var_obj in self.data.items(): if "Date" in var_name: if var_name != "Simulation Date": bad_vars.append(var_name) else: values = [ float(val) for val in var_obj["values"] if val != "None" and val is not None ] if not any(np.diff(values).tolist()): bad_vars.append(var_name) for var in bad_vars: self.data.pop(var)
[docs] def update_endpoints_and_radbio(self): """Update endpoint and radbio data in self.data. This function is needed since all of these values are calculated after a query and user may change these values. """ if self.dvhs: if self.dvhs.endpoints["defs"]: for var in self.dvhs.endpoints["defs"]["label"]: if var not in self.variables: self.data[var] = { "units": "", "values": self.dvhs.endpoints["data"][var], } for var in self.variables: if var[0:2] in {"D_", "V_"}: if var not in self.dvhs.endpoints["defs"]["label"]: self.data.pop(var) if self.dvhs.eud: self.data["EUD"] = {"units": "Gy", "values": self.dvhs.eud} if self.dvhs.ntcp_or_tcp: self.data["NTCP or TCP"] = { "units": "", "values": self.dvhs.ntcp_or_tcp, } self._validate_data()
@staticmethod def _get_src_values(src, var_name, uid): """ Parameters ---------- src : QuerySQL table data var_name : str attribute to be extracted from table data uid : str StudyInstanceUID stored in the SQL database Returns ------- list values of ``var_name`` from table data that match ``uid`` """ uid_indices = [ i for i, x in enumerate(src.study_instance_uid) if x == uid ] return [getattr(src, var_name)[i] for i in uid_indices]
[docs] def get_plan_index(self, uid): """Get the index of ``uid`` from the Plans table Parameters ---------- uid : str StudyInstanceUID as stored in the SQL database Returns ------- int Plans table index for ``uid`` """ return self.table_data["Plans"].study_instance_uid.index(uid)
[docs] def get_beam_indices(self, uid): """Get the indices of the Beams table with ``uid`` Parameters ---------- uid : str StudyInstanceUID as stored in the SQL database Returns ------- list Beams table indices that match ``uid`` """ return [ i for i, x in enumerate(self.table_data["Beams"].study_instance_uid) if x == uid ]
[docs] def get_bokeh_data(self, x, y): """Get data in a format compatible with bokeh's ColumnDataSource.data Parameters ---------- x : str x-variable name y : str y-variable name Returns ------- dict x and y data """ if x in list(self.data) and y in list(self.data): # TODO: potential data length issue with study_instance_uid # Received the following error with 0.6.9, can't reproduce # BokehUserWarning: ColumnDataSource's columns must be of the # same length. Current lengths: # ('date', 69), ('mrn', 69), ('uid', 70), ('x', 69), ('y', 69) # Appears to point to this function's return? return { "uid": self.uids, "mrn": self.mrns, "date": self.sim_study_dates, "x": self.data[x]["values"], "y": self.data[y]["values"], } return {key: [] for key in ["uid", "mrn", "date", "x", "y"]}
@property def uids(self): """StudyInstanceUIDs from DVH object Returns ------- list ``DVH.study_instance_uid`` """ return self.dvhs.study_instance_uid @property def mrns(self): """MRNs from DVH object Returns ------- list ``DVH.mrn`` """ return self.dvhs.mrn @property def sim_study_dates(self): """Simulation dates from Plans table Returns ------- list Simulation dates """ return self.data["Simulation Date"]["values"] @property def variables(self): """Get variable names for plotting Returns ------- list keys of ``StatsData.data`` sans 'Simulation Date' """ return [var for var in list(self.data) if var != "Simulation Date"] @property def vars_with_nan_values(self): """Find variable names that contain non-numerical values Returns ------- list Variable names that cannot be converted to ``float`` """ ans = [] for var in self.variables: for val in self.data[var]["values"]: try: float(val) except Exception: ans.append(var) break return ans
[docs] def get_axis_title(self, variable): """Get the plot axis title for ``variable`` Parameters ---------- variable : str A key of ``StatsData.data`` Returns ------- str ``variable`` with units if stored """ if self.data[variable]["units"]: return "%s (%s)" % (variable, self.data[variable]["units"]) return variable
[docs] def get_X_and_y(self, y_variable, x_variables, include_patient_info=False): """Collect data for input into multi-variable regression Parameters ---------- y_variable : str dependent variable x_variables : list independent variables include_patient_info : bool If True, return mrn, uid, dates with X and y Returns ------- type X, y or X, y, mrn, uid, dates """ data, mrn, uid, dates = [], [], [], [] y_var_data = [] for i, value in enumerate(self.data[y_variable]["values"]): y_var_data.append([value, np.nan][value == "None"]) mrn.append(self.mrns[i]) uid.append(self.uids[i]) dates.append(self.sim_study_dates[i]) data.append(y_var_data) for var in x_variables: x_var_data = [] for value in self.data[var]["values"]: x_var_data.append( [value, np.nan][str(value).lower() == "none"] ) data.append(x_var_data) data = np.array(data) bad_indices = get_index_of_nan(data) for bad_index in bad_indices[::-1]: data = np.delete(data, bad_index, 1) mrn.pop(bad_index) uid.pop(bad_index) dates.pop(bad_index) X = np.transpose(data[1:]) y = data[0] if not include_patient_info: return X, y return X, y, mrn, uid, dates
[docs] def add_variable(self, variable, values, units=""): """Add a new variable to ``StatsData.data``, will not over-write Parameters ---------- variable : str variable name to be used as a key and plot title values : list values to be stored for ``variable`` units : str, optional Define units for display on plot """ if variable not in list(self.data): self.data[variable] = {"units": units, "values": values} if variable not in self.correlation_variables: self.correlation_variables.append(variable) self.correlation_variables.sort()
[docs] def del_variable(self, variable): """Delete a variable from ``StatsData.data`` Parameters ---------- variable : str variable name """ if variable in list(self.data): self.data.pop(variable) if variable in self.correlation_variables: index = self.correlation_variables.index(variable) self.correlation_variables.pop(index)
[docs] def set_variable_data(self, variable, data, units=None): """Replace the data for the given variable in ``StatsData.data`` Parameters ---------- variable : str variable name data : list new data units : str, optional Define units for display on plot """ self.data[variable]["values"] = data if units is not None: self.data[variable]["units"] = units
[docs] def set_variable_units(self, variable, units): """Set the units for the given variable in ``StatsData.data`` Parameters ---------- variable : str variable name units : str units for display on plot """ self.data[variable]["units"] = units
[docs] def get_corr_matrix_data( self, options, included_vars=None, extra_vars=None ): """Get a Pearson-R correlation matrix Parameters ---------- options : dvha.options.Options DVHA options class object. Used to get colors. included_vars : list, optional variables to be included in matrix extra_vars : list, optional variables to be excluded from the matrix Returns ------- tuple (dict, list) The dictionary has keys of 'source_data', 'x_factors' and 'y_factors'. `source_data` is used for bokeh plotting, the factors are for axis tick labels. The 2nd parameter of the tuple is a list of removed mrns """ if included_vars is None: included_vars = list(self.data) if extra_vars is not None: included_vars = included_vars + extra_vars else: extra_vars = [] categories = [ c for c in list(self.data) if "date" not in c.lower() and c in included_vars ] categories.extend(extra_vars) categories = list(set(categories)) categories.sort() var_count = len(categories) categories_for_label = [ category.replace("Control Point", "CP") for category in categories ] categories_for_label = [ category.replace("control point", "CP") for category in categories_for_label ] categories_for_label = [ category.replace("Distance", "Dist") for category in categories_for_label ] for i, category in enumerate(categories_for_label): if category.startswith("DVH"): categories_for_label[i] = category.split("DVH Endpoint: ")[1] x_factors = categories_for_label y_factors = categories_for_label[::-1] s_keys = [ "x", "y", "x_name", "y_name", "color", "alpha", "r", "p", "size", "x_normality", "y_normality", "group", ] source_data = { "corr": {sk: [] for sk in s_keys}, "line": {"x": [0.5, var_count - 0.5], "y": [var_count - 0.5, 0.5]}, } min_size, max_size = 3, 20 removed_mrns = set() for x in range(var_count): for y in range(var_count): if x > y and self.group == 1 or x < y and self.group == 2: if ( categories[x] not in extra_vars and categories[y] not in extra_vars ): bad_indices = [ i for i, v in enumerate( self.data[categories[x]]["values"] ) if type(v) in [str, type(None)] ] bad_indices.extend( [ i for i, v in enumerate( self.data[categories[y]]["values"] ) if type(v) in [str, type(None)] ] ) bad_indices = list(set(bad_indices)) removed_mrns = removed_mrns.union( set(self.mrns[i] for i in bad_indices) ) x_data = [ v for i, v in enumerate( self.data[categories[x]]["values"] ) if i not in bad_indices ] y_data = [ v for i, v in enumerate( self.data[categories[y]]["values"] ) if i not in bad_indices ] if x_data and len(x_data) == len(y_data): r, p_value = scipy_stats.pearsonr(x_data, y_data) else: r, p_value = 0, 0 if np.isnan(r): r = 0 sign = ["neg", "pos"][r >= 0] color = getattr( options, "CORRELATION_%s_COLOR_%s" % (sign.upper(), self.group), ) source_data["corr"]["color"].append(color) source_data["corr"]["r"].append(r) source_data["corr"]["p"].append(p_value) source_data["corr"]["alpha"].append(abs(r)) source_data["corr"]["size"].append( ((max_size - min_size) * abs(r)) + min_size ) source_data["corr"]["x"].append( x + 0.5 ) # 0.5 offset due to bokeh 0.12.9 bug source_data["corr"]["y"].append( var_count - y - 0.5 ) # 0.5 offset due to bokeh 0.12.9 bug source_data["corr"]["x_name"].append( categories_for_label[x] ) source_data["corr"]["y_name"].append( categories_for_label[y] ) source_data["corr"]["group"].append(self.group) try: x_norm, x_p = scipy_stats.normaltest(x_data) except ValueError: x_p = "N/A" try: y_norm, y_p = scipy_stats.normaltest(y_data) except ValueError: y_p = "N/A" source_data["corr"]["x_normality"].append(x_p) source_data["corr"]["y_normality"].append(y_p) return { "source_data": source_data, "x_factors": x_factors, "y_factors": y_factors, }, removed_mrns
[docs]def get_index_of_nan(numpy_array): """Find indices of np.nan values Parameters ---------- numpy_array : np.ndarray A numpy array Returns ------- list indices of ``numpy_array`` that are ``np.nan`` """ bad_indices = [] nan_data = np.isnan(numpy_array) for var_data in nan_data: indices = np.where(var_data)[0].tolist() if indices: bad_indices.extend(indices) bad_indices = list(set(bad_indices)) bad_indices.sort() return bad_indices
[docs]def str_starts_with_any_in_list(string_a, string_list): """Check if string_a starts with any string the provided list of strings Parameters ---------- string_a : str Any string string_list : list A list of strings Returns ------- bool True if any ``string_a`` starts with any string in ``string_list`` """ for string_b in string_list: if string_a.startswith(string_b): return True return False
[docs]def get_p_values(X, y, predictions, params): """Get p-values using sklearn based on https://stackoverflow.com/questions/27928275/find-p-value-significance-in-scikit-learn-linearregression Parameters ---------- X : np.ndarray independent data y : np.ndarray dependent data predictions : output from linear_model.LinearRegression.predict params : np.array([y_incercept, slope]) Returns ------- list p-values """ newX = np.append(np.ones((len(X), 1)), X, axis=1) MSE = (sum((y - predictions) ** 2)) / (len(newX) - len(newX[0])) var_b = MSE * (np.linalg.inv(np.dot(newX.T, newX)).diagonal()) sd_b = np.sqrt(var_b) ts_b = params / sd_b return ( [ 2 * (1 - scipy_stats.t.cdf(np.abs(i), (len(newX) - 1))) for i in ts_b ], sd_b, ts_b, )
[docs]class MultiVariableRegression: """Perform a multi-variable regression using sklearn Parameters ---------- X : np.array independent data y : list dependent data """ def __init__(self, X, y, saved_reg=None): if saved_reg is None: self.reg = linear_model.LinearRegression() self.ols = self.reg.fit(X, y) else: self.reg = saved_reg.reg self.ols = saved_reg.ols self.y_intercept = self.reg.intercept_ self.slope = self.reg.coef_ params = np.append(self.y_intercept, self.slope) self.predictions = self.reg.predict(X) self.r_sq = r2_score(y, self.predictions) self.mse = mean_squared_error(y, self.predictions) self.p_values, self.sd_b, self.ts_b = get_p_values( X, y, self.predictions, params ) self.residuals = np.subtract(y, self.predictions) self.norm_prob_plot = scipy_stats.probplot( self.residuals, dist="norm", fit=False, plot=None, rvalue=False ) reg_prob = linear_model.LinearRegression() reg_prob.fit( [[val] for val in self.norm_prob_plot[0]], self.norm_prob_plot[1] ) self.y_intercept_prob = reg_prob.intercept_ self.slope_prob = reg_prob.coef_ self.x_trend_prob = [ min(self.norm_prob_plot[0]), max(self.norm_prob_plot[0]), ] self.y_trend_prob = np.add( np.multiply(self.x_trend_prob, self.slope_prob), self.y_intercept_prob, ) self.f_stat = regressors_stats.f_stat(self.ols, X, y) self.df_error = len(X[:, 0]) - len(X[0, :]) - 1 self.df_model = len(X[0, :]) self.f_p_value = scipy_stats.f.cdf( self.f_stat, self.df_model, self.df_error )
[docs]def get_control_limits(y, std_devs=3): """Calculate control limits for Control Chart Parameters ---------- y : list data std_devs : int or float values greater than std_devs away are out-of-control (Default value = 3) Returns ------- type center line, upper control limit, and lower control limit """ y = np.array(y) center_line = np.mean(y) avg_moving_range = np.mean(np.absolute(np.diff(y))) scalar_d = ( 1.128 # since moving range is calculated based on 2 consecutive points ) ucl = center_line + std_devs * avg_moving_range / scalar_d lcl = center_line - std_devs * avg_moving_range / scalar_d return center_line, ucl, lcl
[docs]def sync_variables_in_stats_data_objects(stats_data_1, stats_data_2): """Ensure both stats_data objects have the same variables Parameters ---------- stats_data_1 : StatsData A StatsData object (e.g., Group 1) stats_data_2 : StatsData Another StatsData object (e.g., Group 2) """ stats_data = {1: stats_data_1, 2: stats_data_2} variables = {grp: list(sd.data) for grp, sd in stats_data.items()} for grp, sd in stats_data.items(): for var in variables[grp]: if var not in variables[3 - grp]: values = ["None"] * len(stats_data[3 - grp].mrns) units = stats_data[grp].data[var]["units"] stats_data[3 - grp].add_variable(var, values, units)