"""
Reliability module for establishing pattern-reliability on fMRI data
In general, this module provides functions to calculate the reliability of a fMRI dataset across different conditions, sessions, or subjects.
The data is assumed to be in the form of (n_subjects x n_measures x n_voxels) or simple in (n_measures x n_voxels) matrix.
"""
import numpy as np
import Functional_Fusion.matrix as matrix
import Functional_Fusion.util as util
from numpy import eye,sqrt,nansum
from numpy.linalg import inv
[docs]
def within_subj(data, cond_vec, part_vec,
separate = 'none',
subtract_mean=True):
""" Calculates the within-subject reliability of a data set
Data (X) is grouped by condition vector, and the
partition vector indicates the independent measurements
Does the calculation for each subject if X is a 3d array
Args:
data (ndarray): (num_subj x) num_trials x num_voxel tensor of data
cond_vec (ndarray): num_trials condition vector
part_vec (ndarray): num_trials partition vector
separate (str): {'none','voxel_wise','condition_wise'}
subtract_mean (bool): Remove the mean per voxel in partition across conditions?
Returns:
r (ndarray): (n_subjects x) n_separate array of correlations
"""
partitions = np.unique(part_vec)
n_part = partitions.shape[0]
if data.ndim == 2:
X = data.reshape(1, data.shape[0], data.shape[1])
single_subj = True
else:
single_subj = False
X = data
n_subj = X.shape[0]
# Transform the data into a 4d array
X = flat2ndarray(X, part_vec, cond_vec)
[n_subjects, n_part,n_cond, n_voxels] = X.shape
# remove mean for each voxel for each partition
if subtract_mean:
X = X - np.nanmean(X, axis=2, keepdims=True)
# rearrange the data to be in the form of (n_separate, n_subjects,n_partitions, n_features)
if separate == 'voxel_wise':
Y = X.transpose([0, 3, 1, 2])
elif separate == 'condition_wise':
Y = X.transpose([0, 2, 1, 3])
elif separate in ['none']:
Y = X.reshape((n_subjects, 1, n_part, n_cond * n_voxels))
else:
raise(NameError('separate needs to be none, voxel_wise, or condition_wise'))
[n_split, _, _, n_features] = Y.shape
# This computes the sums of squares-matrix for each split separately (broadcasting)
YY = np.matmul(Y,Y.transpose([0,1,3,2]))
# Get the mean of ondiagonal and offdiagonal elements
ondiag = np.where(np.eye(n_part))
offdiag = np.where(1-np.eye(n_part))
# Average the within-partition and across-partition cross-products for each subject and separate split
SS_1 = np.nanmean(YY[:,:,ondiag[0],ondiag[1]],axis=2)
SS_2 = np.nanmean(YY[:,:,offdiag[0],offdiag[1]],axis=2)
# Compute the variances from the sums of squares
if subtract_mean and separate == 'voxel_wise':
n_df = (n_cond - 1)
elif subtract_mean and separate == 'cond_wise':
n_df = n_voxels
elif subtract_mean:
n_df = (n_cond-1) * n_voxels
else:
n_df = n_features
v_e = (SS_1 - SS_2) / n_df
v_s = (SS_2) / n_df
r = v_s / (v_s + v_e)
if single_subj:
r = r[0]
return r
[docs]
def between_subj(data, cond_vec=None,
separate='none',
subtract_mean=True):
""" Calculates the average between-subject reliability
The data is averaged across multiple measurements
first - so the reliability is for the mean patterns
Args:
datas (ndarray): num_subj x num_trials x num_voxel tensor of data
cond_vec (ndarray): num_trials condition vector (otherwise assumed to be identity)
separate (str): {'none','voxel_wise','condition_wise'}
subtract_mean (bool): Remove the mean per voxel before correlation calc?
Returns:
r (ndarray): num_subj vector of correlations
"""
n_subj,n_trials,n_voxels = data.shape
if cond_vec is not None:
Z = matrix.indicator(cond_vec)
else:
Z = eye(n_trials)
n_cond = Z.shape[1]
X = np.zeros((n_subj,n_cond,n_voxels))
for s in range(n_subj):
X[s,:,:] = util.nan_linear_model(Z, data[s,:,:])
if subtract_mean:
X=X-np.nanmean(X,axis=1,keepdims=True)
# rearrange the data to be in the form of (n_separate, n_subjects,n_partitions, n_features)
if separate == 'voxel_wise':
Y = X.transpose([2, 0, 1])
elif separate == 'condition_wise':
Y = X.transpose([1, 0, 2])
elif separate in ['none']:
Y = X.reshape((1,n_subj, n_cond * n_voxels))
else:
raise(NameError('separate needs to be none, voxel_wise, or condition_wise'))
# This computes the sums of squares-matrix for each split separately (broadcasting)
YY = np.matmul(Y,Y.transpose([0,2,1]))
# Get the mean of ondiagonal and offdiagonal elements
ondiag = np.where(np.eye(n_subj))
offdiag = np.where(1-np.eye(n_subj))
# Average the within-partition and across-partition cross-products for each subject and separate split
SS_1 = np.nanmean(YY[:,ondiag[0],ondiag[1]],axis=1)
SS_2 = np.nanmean(YY[:,offdiag[0],offdiag[1]],axis=1)
v_e = (SS_1 - SS_2)
v_s = (SS_2)
r = v_s / (v_s + v_e)
return r
[docs]
def within_subj_loo(data, cond_vec,
part_vec,
separate='voxel_wise',
subtract_mean=True):
""" Calculates the within-subject reliability of a data set
Data (X) is grouped by condition vector, and the
partition vector indicates the independent measurements
Does the calculation for each subejct if X is a 3d array
Args:
X (ndarray): (num_subj x) num_trials x num_voxel tensor of data
cond_vec (ndarray): num_trials condition vector
part_vec (ndarray): num_trials partition vector
separate (str): {'none','voxel_wise','condition_wise'}
subtract_mean (bool): Remove the mean per voxel before correlation calc?
Returns:
r (ndarray): (num_subj x) num_partition matrix of correlations
"""
partitions = np.unique(part_vec)
n_part = partitions.shape[0]
if len(data.shape) == 2:
X = data.reshape(1, data.shape[0], data.shape[1])
single_subj = True
else:
single_subj = False
X= data.copy()
n_subj = X.shape[0]
if separate=='voxel_wise':
r = np.zeros((n_subj, n_part, data.shape[2]))
elif separate=='condition_wise':
raise(NameError('condition_wise not implemented yet'))
elif separate=='none':
r = np.zeros((n_subj, n_part))
else:
raise(NameError('separate needs to be none, voxel_wise, or condition_wise'))
Z = matrix.indicator(cond_vec)
for s in np.arange(n_subj):
for pn, part in enumerate(partitions):
i1 = part_vec == part
i2 = part_vec != part
X1 = util.nan_linear_model(Z[i1, :], X[s, i1, :])
X2 = util.nan_linear_model(Z[i2, :], X[s, i2, :])
# Check if this partition contains nan row
if subtract_mean:
X1 -= np.nanmean(X1, axis=0)
X2 -= np.nanmean(X2, axis=0)
if separate=='voxel_wise':
r[s, pn, :] = nansum(X1 * X2, axis=0) / \
sqrt(nansum(X1 * X1, axis=0)
* nansum(X2 * X2, axis=0))
else:
r[s, pn] = nansum(X1 * X2) / \
sqrt(nansum(X1 * X1) * nansum(X2 * X2))
if single_subj:
r = r[0, :]
return r
[docs]
def between_subj_loo(data, cond_vec=None,
separate='none',
subtract_mean=True):
""" Calculates the correlation of the responses of each of the subjects with the mean of the other subjects.
This serves as a lower noise ceiling for any group model (a model that predicts the same value for all subjects).
If cond_vec is given, the data is averaged across multiple measurem first.
Args:
data (ndarray): num_subj x num_trials x num_voxel tensor of data
cond_vec (ndarray): num_trials condition vector
separate (str): {'none','voxel_wise','condition_wise'}
subtract_mean (bool): Remove the mean per voxel before correlation calc?
Returns:
r (ndarray): num_subj vector of correlations
"""
n_subj = data.shape[0]
n_trials = data.shape[1]
if cond_vec is not None:
Z = matrix.indicator(cond_vec)
else:
Z = eye(n_trials)
subj_vec = np.arange(n_subj)
if separate == 'voxel_wise':
r = np.zeros((n_subj, data.shape[2]))
elif separate=='condition_wise':
raise(NameError('condition_wise not implemented yet'))
elif separate=='none':
r = np.zeros((n_subj,))
else:
raise(NameError('separate needs to be none, voxel_wise, or condition_wise'))
for s, i in enumerate(subj_vec):
X1 = util.nan_linear_model(Z, data[s, :, :])
i2 = subj_vec != s
X2 = util.nan_linear_model(Z, np.nanmean(data[i2, :, :], axis=0))
if subtract_mean:
X1 -= np.nanmean(X1, axis=0)
X2 -= np.nanmean(X2, axis=0)
if separate=='voxel_wise':
r[i, :] = nansum(X1 * X2, axis=0) / \
sqrt(nansum(X1 * X1, axis=0)
* nansum(X2 * X2, axis=0))
else:
r[i] = nansum(X1 * X2) / sqrt(nansum(X1 * X1) * nansum(X2 * X2))
return r
def between_subj_avrg(data, cond_vec=None,
separate='none',
subtract_mean=True):
""" Calculates the correlation of the responses of each of the subjects with the mean of all subjects.
This serves as a upper noise ceiling for any group model (a model that predicts the same value for all subjects).
If cond_vec is given, the data is averaged across multiple measure of each condition first.
Args:
data (ndarray): num_subj x num_trials x num_voxel tensor of data
cond_vec (ndarray): num_trials condition vector
separate (str): {'none','voxel_wise','condition_wise'}
subtract_mean (bool): Remove the mean per voxel before correlation calc?
Returns:
r (ndarray): num_subj vector of correlations
"""
n_subj = data.shape[0]
n_trials = data.shape[1]
if cond_vec is not None:
Z = matrix.indicator(cond_vec)
else:
Z = eye(n_trials)
subj_vec = np.arange(n_subj)
if separate == 'voxel_wise':
r = np.zeros((n_subj, data.shape[2]))
elif separate=='condition_wise':
raise(NameError('condition_wise not implemented yet'))
elif separate=='none':
r = np.zeros((n_subj,))
else:
raise(NameError('separate needs to be none, voxel_wise, or condition_wise'))
X2 = util.nan_linear_model(Z, np.nanmean(data, axis=0))
if subtract_mean:
X2 -= np.nanmean(X2, axis=0)
for s, i in enumerate(subj_vec):
X1 = util.nan_linear_model(Z, data[s, :, :])
if subtract_mean:
X1 -= np.nanmean(X1, axis=0)
if separate=='voxel_wise':
r[i, :] = nansum(X1 * X2, axis=0) / \
sqrt(nansum(X1 * X1, axis=0)
* nansum(X2 * X2, axis=0))
else:
r[i] = nansum(X1 * X2) / sqrt(nansum(X1 * X1) * nansum(X2 * X2))
return r
[docs]
def decompose_subj_group(data, cond_vec, part_vec,
separate='none',
subtract_mean=True):
"""
this function decompose a collection of (across subjects and partitions) activity patterns (N condition x P voxels)
into group, individual and noise components, returns the variance estimates of each component.
Args:
data (ndarray):
n_subjects x n_trials x n_voxels array
cond_vec (ndarray):
n_trials condition vector
part_vec (ndarray):
n_trials partition vector
separate (str):
* 'none': partition variance components for the whole pattern (N x P) -> returns a single row
* 'voxel_wise': partition variance components for each voxel separately -> returns as many rows as voxels
* 'condition_wise': partition variance components for each condition separately -> returns as many rows as conditions
* 'subject_wise': partition variance components for the whole pattern (NxP) -> but return split by Subjects
Returns:
variances: (K x 3 ndarray): v_g, v_s, v_e (variance for group, subject, and noise), where K is the number of voxels, conditions, subjects, or 1
"""
if data.ndim != 3:
raise(NameError('data needs to be a 3d array'))
# Transform the data into a 4d array
X = flat2ndarray(data, part_vec, cond_vec)
[n_subjects, n_part,n_cond, n_voxels] = X.shape
# remove mean for each voxel for each partition
if subtract_mean:
X = X - np.nanmean(X, axis=2, keepdims=True)
# rearrange the data to be in the form of (n_separate, n_subjects,n_partitions, n_features)
if separate == 'voxel_wise':
Y = X.transpose([3, 0, 1, 2])
elif separate == 'condition_wise':
Y = X.transpose([2, 0, 1, 3])
elif separate in ['global','none','subject_wise']:
Y = X.reshape((1, n_subjects, n_part, n_cond * n_voxels))
else:
raise(NameError('separate needs to be none, voxel_wise, condition_wise, or subject_wise'))
[n_split, _, _, n_features] = Y.shape
# reshape the data to be in the form of (n_split, n_subjects*n_partitions, n_features)
Y = Y.reshape((n_split, n_subjects * n_part, n_features))
subj_vec = np.kron(np.arange(n_subjects), np.ones(n_part))
part_vec = np.kron(np.ones(n_subjects), np.arange(n_part))
N = n_subjects * n_part
same_subj = np.equal(subj_vec.reshape(N,1),subj_vec.reshape(1,N))
same_part = np.equal(part_vec.reshape(N,1),part_vec.reshape(1,N))
# This computes the sums of squares-matrix for each split separately (broadcasting)
YY = np.matmul(Y,Y.transpose((0,2,1)))
# Find cross-products of same subject and same partition
if separate == 'subject_wise':
SS_1 = np.zeros((n_subjects,))
SS_2 = np.zeros((n_subjects,))
SS_3 = np.zeros((n_subjects,))
for s in np.arange(n_subjects):
# Get the rows pertaining to the subject
YYs = YY[:,subj_vec==s,:]
same_subj_s = same_subj[subj_vec==s,:]
same_part_s = same_part[subj_vec==s,:]
SS_1[s] = np.nanmean(YYs[:,~same_subj_s],axis=1)
SS_2[s] = np.nanmean(YYs[:,same_subj_s & ~same_part_s],axis=1)
SS_3[s] = np.nanmean(YYs[:,same_subj_s & same_part_s],axis=1)
else:
SS_1 = np.nanmean(YY[:,~same_subj],axis=1)
SS_2 = np.nanmean(YY[:,same_subj & ~same_part],axis=1)
SS_3 = np.nanmean(YY[:,same_subj & same_part],axis=1)
# Compute the variances from the sums of squares
if subtract_mean and separate == 'voxel_wise':
n_df = (n_cond - 1)
elif subtract_mean and separate == 'cond_wise':
n_df = n_voxels
elif subtract_mean:
n_df = (n_cond-1) * n_voxels
else:
n_df = n_features
v_e = (SS_3 - SS_2) / n_df
v_s = (SS_2 - SS_1) / n_df
v_g = SS_1 / n_df
variances = np.c_[v_g, v_s, v_e]
return variances
def flat2ndarray(flat_data, part_vec, cond_vec):
"""
convert flat data (n_subjects x n_trials x n_voxels) into a 4d ndarray (n_subjects x n_partitions x n_conditions x n_voxels). Also works on
(n_trials x n_voxels) array that is converted into a
(n_partitions x n_conditions x n_voxels) array.
Args:
flat_data (nd-array): (n_subjects x ) n_trials x n_voxels array
part_vec: (nd-array): n_trials -vector for partition index
cond_vec: (nd-array): n_trials -vector for condition index
Returns:
data: (nd-array): (n_subjects x ) n_partitions x n_conditions x n_voxels
"""
[n_subjects, n_trials, n_voxels] = flat_data.shape
unique_partitions = np.unique(part_vec)
n_partitions = unique_partitions.size
unique_conditions = np.unique(cond_vec)
n_conditions = unique_conditions.size
data = np.zeros((n_subjects, n_partitions, n_conditions, n_voxels))
for p,part in enumerate(unique_partitions):
for c,cond in enumerate(unique_conditions):
trial_inds = np.where((cond_vec == cond) & (part_vec == part))[0]
data[:, p, c, :] = np.mean(flat_data[:, trial_inds, :], axis=1)
return data