#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Data fusion project dataset module
The class for converting and mapping raw data from multi-dataset
to a standard data structure that can be used in Diedrichsen lab
"""
from pathlib import Path
import numpy as np
import pandas as pd
import os, time, sys
import os.path as op
import Functional_Fusion.util as util
import Functional_Fusion.matrix as matrix
import Functional_Fusion.atlas_map as am
import scipy.linalg as sl
import nibabel as nb
from numpy import eye, zeros, ones, empty, nansum, sqrt
from numpy.linalg import pinv, solve
import warnings
import glob
import re
[docs]
def get_dataset_class(base_dir, dataset):
""" Returns the dataset object without loading the data
Args:
base_dir (str): Functional Fusion base directory
dataset (str): _description_
Returns:
my_dataset (Dataset): Dataset object
"""
T = pd.read_csv(base_dir + '/dataset_description.tsv', sep='\t')
T.name = [n.casefold() for n in T.name]
i = np.where(dataset.casefold() == T.name)[0]
if len(i) == 0:
raise (NameError(f'Unknown dataset: {dataset}'))
t = T.iloc[int(i)]
dsclass = getattr(sys.modules[__name__], t.class_name)
dir_name = t.dir_name
if dir_name[0] == '/':
abs_path = dir_name
elif dir_name[0] == '.': # Relative path relative to fusion project
abs_path = Path(base_dir).parent + Path(dir_name[1:])
else:
abs_path = base_dir + '/' + t.dir_name
my_dataset = dsclass(abs_path)
my_dataset.sessions = eval(t.sessions)
my_dataset.default_type = t.default_type
my_dataset.cond_ind = t.cond_ind # Condition Indicator (field in tsv file )
my_dataset.part_ind = t.part_ind # Partition Indicator (field in tsv file )
my_dataset.subtract_baseline = bool(t.subtract_baseline) # If True, baseline is subtracted from the data
return my_dataset
[docs]
def get_dataset(base_dir, dataset, atlas='SUIT3', sess='all', subj=None,
type=None, ext=None,exclude_subjects=True):
"""get_dataset tensor and data set object
Args:
base_dir (str): Basis directory for the Functional Fusion datastructure
dataset (str): Data set indicator
atlas (str): Atlas indicator. Defaults to 'SUIT3'.
sess (str or list): Sessions. Defaults to 'all'.
subj (ndarray, str, or list): Subject numbers /names to get [None = all]
type (str): 'CondHalf','CondRun', etc....
ext (str): added qualifier (smoothing, etc.) default None
exclude_subjects (bool): If True, excludes subjects that have been specified
in the exclude column of the participants.tsv file.
Returns:
data (nd.array):nsubj x ncond x nvox data tensor
info (pd.DataFrame): Dataframe with info about the data
my_dataset (DataSet): Dataset object
"""
my_dataset = get_dataset_class(base_dir, dataset)
# Get defaults sessions from dataset itself
if sess == 'all':
sess = my_dataset.sessions
elif not isinstance(sess, (list, np.ndarray)):
sess = [sess]
# Empty default type (Future change: define per file?)
if type is None:
type = my_dataset.default_type
# Load all data and concatenate
# across sessions
info_l = []
data_l = []
for s in sess:
dat, inf = my_dataset.get_data(atlas, s, type, subj, ext=ext,exclude_subjects=exclude_subjects)
data_l.append(dat)
inf['sess'] = [s] * inf.shape[0]
info_l.append(inf)
info = pd.concat(info_l, ignore_index=True, sort=False)
data = np.concatenate(data_l, axis=1)
return data, info, my_dataset
def build_dataset_from_fusionProject(dataset, atlas, base_dir, sess='all',
cond_ind='cond_num_uni', type='CondHalf',
part_ind='half', subj=None, join_sess=False,
join_sess_part=False, smooth=None):
""" Builds dataset, cond_vec, part_vec, subj_ind from the given
dataset in Functional fusion project
Args:
dataset (str): Names of the dataset to build
atlas (object): Atlas indicator
sess (list): list of 'all' or list of sessions
design_ind (list, optional): _description_. Defaults to None.
part_ind (list, optional): _description_. Defaults to None.
subj (list, optional): _description_. Defaults to None.
join_sess (bool, optional): Model the sessions with a single model.
Defaults to True.
Returns:
data, cond_vec, part_vec, subj_ind
"""
sub = 0
data, cond_vec, part_vec, subj_ind = [],[],[],[]
# Run over datasets get data + design
dat, info, tds = get_dataset(base_dir, dataset, atlas=atlas.name,
sess=sess, type=type, smooth=smooth)
# Sub-index the subjects: (REMOVE AND PASS TO GET_DATA)
if subj is not None:
dat = dat[subj, :, :]
n_subj = dat.shape[0]
# Find correct indices
if cond_ind is None:
cond_ind = tds.cond_ind
if part_ind is None:
part_ind = tds.part_ind
# Make different sessions either the same or different
if join_sess:
data.append(dat)
cond_vec.append(info[cond_ind].values.reshape(-1, ))
# Check if we want to set no partition after join sessions
if join_sess_part:
part_vec.append(np.ones(info[part_ind].shape))
else:
part_vec.append(info[part_ind].values.reshape(-1, ))
subj_ind.append(np.arange(sub, sub + n_subj))
else:
if sess == 'all':
sessions = tds.sessions
else:
sessions = sess
# Now build and split across the correct sessions:
for s in sessions:
indx = info.sess == s
data.append(dat[:, indx, :])
cond_vec.append(info[cond_ind].values[indx].reshape(-1, ))
part_vec.append(info[part_ind].values[indx].reshape(-1, ))
subj_ind.append(np.arange(sub, sub + n_subj))
return data, cond_vec, part_vec, subj_ind
[docs]
def prewhiten_data(data):
""" prewhitens a list of data matrices.
It assumes that the last row of each data matrix is the ResMS-value
Returns a list of data matrices that is one shorter
Args:
data (list of ndarrays): List of data arrays
"""
# Get the resms and prewhiten the data
data_n = []
for i in range(len(data)):
# Prewhiten the data univariately
resms = data[i][-1, :]
data_n.append(data[i][0:-1, :])
resms[resms <= 0] = np.nan
data_n[i] = data_n[i] / np.sqrt(resms)
return data_n
[docs]
def agg_data(info, by, over, subset=None):
""" Aggregates data over rows (condition) safely by sorting them by the fields in "by"
while integrating out "over".
Adds a n_rep field to count how many instances are of each
Returns condensed data frame + Contrast matrix.
Args:
info (DataFrame): Original DataFrame
by (list): Fields that define the index of the new data
over (list): Fields to ignore / integrate over. All other fields
will be pulled through.
subset (bool array): If given, ignores certain rows from the
original data frame
Return
data_info (DataFrame): Reduced data frame
C (ndarray): Indicator matrix defining the mapping from full to reduced
Example:
data,info,mdtb= ds.get_data('MDTB','MNISymDentate1',ses_id='ses-s1',type='CondRun')
cinfo,C = ds.agg_data(info,['cond_num_uni'],['run','half','reg_num','names'])
cdata = np.linalg.pinv(C) @ data
"""
# Subset original data frame as needed
info_n = info.copy()
if subset is None:
indx = np.arange(info.shape[0])
else:
indx = np.nonzero(subset.values)[0]
info_n = info_n[subset]
# Generate n_rep field if not present
if 'n_rep' not in info.columns:
info_n['n_rep'] = np.ones((info_n.shape[0],))
# Other contains the fields to remain constant
other = list(info.columns.values)
for ov in over + by:
other.remove(ov)
# Define operations on data
operations = {'n_rep': 'sum'}
for o in other:
operations[o] = 'max'
# Group the new data frame
info_gb = info_n.groupby(by,sort=False)
data_info = info_gb.agg(operations).reset_index()
# Build indicator matrix for averaging
C = np.zeros((info.shape[0], data_info.shape[0]))
for i, d in data_info.iterrows():
rows = np.ones(info.shape[0], dtype=bool)
for j in by:
rows &= (info[j] == d[j])
C[rows, i] = 1
return data_info, C
[docs]
def agg_parcels(data, label_vec, fcn=np.nanmean):
""" Aggregates data over colums to condense to parcels
Args:
data (ndarray): Either 2d or 3d data structure, P has to be the last dimension
labels (ndarray): 1d-array that gives the labels (P-vector)
fcn (function): Function to use to aggregate over these
Returns:
aggdata (ndarray): Aggregated either 2d or 3d data structure
labels (ndarray): Region number corresponding to each "column"
"""
# Subset original data frame as needed
labels = np.unique(label_vec[label_vec > 0])
n_parcels = len(labels)
psize = np.array(data.shape)
psize[-1] = n_parcels
parcel_data = np.zeros(psize)
for i, l in enumerate(labels):
parcel_data[..., i] = fcn(
data[..., label_vec == l], axis=len(psize) - 1)
return parcel_data, labels
def combine_parcel_labels(labels_org,labels_new, labelvec_org=None):
""" Combines parcel labels from a new atlas to an existing atlas
Example call:
mapping, lv = combine_parcel_labels(labels_org,['0','A.L','A.R','S..','M3.'],labelvec_org)
To get different aggregations of the Nettekoven atlas
* A.L includes A1-4L
* A.R includes A1-4R
* S.. includes all S-areas
* M3. includes M3L and M3R
Args:
labels_org (list of str): N-lenght original label names (should include '0' for 0)
labels_new (list of str): List of regexpressions for new labels (should include '0' for 0)
labelvec_org (ndarray, optional): Original label vector to remap (P-vector)
Returns:
mapping (ndarray): New label indices for the old labels (N-length vector)
labelvec_new (ndarray): New label vector (P-vector) - returned if labelvec_org is given
"""
mapping = np.zeros(len(labels_org))
for i,l in enumerate(labels_new):
for j,lo in enumerate(labels_org):
if re.match(l,lo):
mapping[j] = i
if labelvec_org is None:
return mapping
else :
labelvec_new = np.zeros(labelvec_org.shape)
for i in np.arange(len(mapping)):
labelvec_new[labelvec_org == i] = mapping[i]
return mapping, labelvec_new
[docs]
def optimal_contrast(data, C, X, reg_in=None):
"""Recombines betas from a GLM into an optimal new contrast, taking into account a design matrix
For mathematical background and motivation, see:
Args:
data (list of ndarrays): List of N x P_i arrays of beta estimates of the original GLM
C (ndarray): Contrast matrix (N x Q) going from the original GLM to the new GLM
X (ndarray): Original (T x Nx) design matrix used in estimation of the data.
Nx could be longer than N by regressors of no interest
reg_in (ndarray): Contrast of interest: Logical vector indicating
which rows of C we will put in the matrix (defaults to all)
"""
# Check the sizes
N, Q = C.shape
T, Np = X.shape
# infer number of regressors of no interest that are not in the data structure
num_nointerest = Np - N
# Add to the contrast matrix
Cn = sl.block_diag(C, np.eye(num_nointerest))
# Make new design matrix
Xn = X @ Cn
# If no subset of regressors is given, use all:
if reg_in is None:
reg_in = np.arange(Q)
# Loop over the data:
data_new = []
for i in range(len(data)):
# Append the regressors of no interest regressors
dat = np.concatenate([data[i],
np.zeros((num_nointerest, data[i].shape[1]))])
# Do the averaging / reweighting:
d = pinv(Xn) @ X @ dat
# Subset to the contrast of interest
d = d[reg_in, :]
# Put the data in a list:
data_new.append(d)
return data_new
def remove_baseline(data, baseline):
""" Removes a baseline from the data
Arg:
data (ndarray): (nsubj x N x P) OR (N x P) array
baseline (narray): 1-dimensional array (N,) of partition numbers or (N x npart) indicator matrix
Returns:
data_sub (ndarray): Baseline subtracted data
"""
if baseline is None:
return data
N = data.shape[-2] # This is the trial dimension
if baseline.ndim == 1:
B = matrix.indicator(baseline)
else:
B = baseline
R = eye(N) - B @ pinv(B) # Residual forming matrix
return R @ data # Uses broadcasting for >2 dim arrays
[docs]
def reliability_maps(base_dir, dataset_name, atlas='MNISymC3', type='CondHalf',
subtract_mean=True, voxel_wise=True, subject_wise=False):
""" Calculates the average within subject reliability maps across sessions for a single dataset
Args:
base_dir (str / path): Base directory
dataset_name (str): Name of data set
atlas (str): _description_. Defaults to 'MNISymC3'.
subtract_mean (bool): Remove the mean per voxel before correlation calc?
Returns:
_type_: _description_
"""
data, info, dataset = get_dataset(base_dir, dataset_name, atlas=atlas, type = type)
n_sess = len(dataset.sessions)
n_vox = data.shape[2]
Rel = np.zeros((n_sess, n_vox))
if subject_wise:
Rel = np.zeros((n_sess, data.shape[0], n_vox))
for i, s in enumerate(dataset.sessions):
indx = info.sess == s
r = reliability_within_subj(data[:, indx, :],
part_vec=info[dataset.part_ind][indx],
cond_vec=info[dataset.cond_ind][indx],
voxel_wise=voxel_wise,
subtract_mean=subtract_mean)
if subject_wise:
Rel[i, :, :] = np.nanmean(r, axis=1)
else:
Rel[i, :] = np.nanmean(np.nanmean(r, axis=0), axis=0)
return Rel, dataset.sessions
def reliability_within_subj(X, part_vec, cond_vec,voxel_wise=False,
subtract_mean=True):
raise(NameError('Depreciated. Use reliability.within_subj_loo'))
def reliability_between_subj(X, cond_vec=None,voxel_wise=False,
subtract_mean=True):
raise(NameError('Depreciated. Use reliability.between_subj_loo'))
def decompose_pattern_into_group_indiv_noise(data, criterion='global'):
raise(NameError('Depreciated. Use reliability.decompose_subj_group'))
[docs]
class DataSet:
def __init__(self, base_dir):
"""DataSet class:
Implements the interface for each of the data set
Note that the actual preprocessing and glm estimate
do not have to be performed with functionality provided by
this class. The class is just a instrument to present the user with
a uniform interface of how to get subject info
Args:
base_dir (str): base directory for dataset
"""
self.base_dir = base_dir
self.surface_dir = base_dir + '/derivatives/ffimport/{0}/anat'
self.anatomical_dir = base_dir + '/derivatives/ffimport/{0}/anat'
self.estimates_dir = base_dir + '/derivatives/ffimport/{0}/func'
self.func_dir = base_dir + '/derivatives/ffimport/{0}/func'
self.suit_dir = base_dir + '/derivatives/ffimport/{0}/anat'
self.data_dir = base_dir + '/derivatives/ffextract/{0}'
# assume that the common atlas directory is on the level before
self.atlas_dir = util.default_atlas_dir
# Some information that a standard data set should have
self.sessions = [None]
self.default_type = None
self.cond_ind = None # Condition Indicator (field in tsv file )
self.part_ind = None # Partition Indicator (field in tsv file )
self.cond_name = None # Condition Names (field in tsv file )
self.subtract_baseline = False # If True, baseline is subtracted from the data
[docs]
def get_participants(self, exclude_subjects=True):
""" returns a data frame with all participants
available in the study. The fields in the data frame correspond to the
standard columns in participant.tsv.
https://bids-specification.readthedocs.io/en/stable/03-modality-agnostic-files.html
Args:
exclude_subjects (bool): If True, excludes subjects that have been specified
in the exclude column of the participants.tsv file.
Returns:
Pinfo (pandas data frame): participant information in standard bids format
"""
self.part_info = pd.read_csv(
self.base_dir + '/participants.tsv', delimiter='\t')
if exclude_subjects and 'exclude' in self.part_info.columns:
# Exclude subjects that have been specified in the exclude column
# 1 = exclude, 0 = include
self.part_info = self.part_info[self.part_info.exclude == 0].reset_index()
return self.part_info
[docs]
def get_data_fnames(self, participant_id, session_id=None, type='Cond'):
""" Gets all raw data files
Args:
participant_id (str): Subject
session_id (str): Session ID. Defaults to None.
type (str): Type of data. Defaults to 'Cond' for task-based data. For rest data use 'Tseries'.
Returns:
fnames (list): List of fnames, last one is the resMS image
T (pd.DataFrame): Info structure for regressors (reginfo)
"""
dirw = self.estimates_dir.format(participant_id) + f'/{session_id}'
if type[:4] == 'Cond' or type[:4] == 'Task':
T = pd.read_csv(
dirw + f'/{participant_id}_{session_id}_reginfo.tsv', sep='\t')
fnames = [f'{dirw}/{participant_id}_{session_id}_run-{t.run:02}_reg-{t.reg_id:02}_beta.nii' for i, t in T.iterrows()]
fnames.append(f'{dirw}/{participant_id}_{session_id}_resms.nii')
elif type == 'Tseries' or type == 'FixTseries':
# Find all run files of the structure f'{dirw}/{participant_id}_{session_id}_run-??.nii'
fnames = glob.glob(f'{dirw}/{participant_id}_{session_id}_run-??.nii')
# If no files are found, throw error
if fnames == []:
raise ValueError('No timepoints found in timeseries files')
runs = [int(fname.split('run-')[-1].split('_')[0].split('.')[0]) for fname in fnames]
runs = np.unique(runs)
fnames = [f'{dirw}/{participant_id}_{session_id}_run-{r:02}.nii' for r in runs]
if type == 'FixTseries':
# Make sure to load fix-cleaned timeseries
fnames = [f'{dirw}/{participant_id}_{session_id}_run-{r:02}_fix.nii' for r in runs]
try:
# Load timeseries info file if it exists
T = pd.read_csv(
dirw + f'/{participant_id}_{session_id}_tinfo.tsv', sep='\t')
except:
# Create timeseries info yourself if it doesn't exist
timepoints = [nb.load(fname).shape[-1] for fname in fnames]
runs = np.repeat(runs, timepoints)
# Timepoints start counting at 1, not 0!
timepoints = np.arange(sum(timepoints))+1
timepoints_string = [f'T{timepoint:04d}' for timepoint in timepoints]
T = pd.DataFrame({'run': runs,
'timepoint': timepoints_string,
'time_id':timepoints}, index=None)
return fnames, T
[docs]
def get_atlasmaps(self, atlas, sub, ses_id, smooth=None, interpolation=1):
"""This function generates atlas map for the data of a specific subject into a specific atlas space. The general DataSet.get_atlasmaps defines atlas maps for different spaces
- SUIT: Using individual normalization from source space.
- MNI152NLin2009cSymC: Via indivual SUIT normalization + group
- MNI152NLin6AsymC: Via indivual SUIT normalization + group
- MNI152Lin2009cSym: Via individual MNI normalization
- MNI152NLin6Asym: Via individual MNI normalization
fs32k: Via individual pial and white surfaces (need to be in source space)
Other dataset classes will overwrite and extend this function.
Args:
atlas (FunctionFusion.Atlas):
Functional Fusion atlas object
sub (str):
Subject_id for the individual subject
ses_id (str):
Session_id for the individual subject if atlasmap is session dependent. (defaults to none)
smooth (float):
Width of smoothing kernel for extraction. Defaults to None.
Returns:
AtlasMap:
List of AtlasMap object - usually but for fs32k these are two
"""
cerebellar_spaces = ['SUIT','MNI152NLin2009cSymC','MNI152NLin6AsymC']
wholebrain_spaces = ['MNI152Lin2009cSym','MNI152NLin6Asym']
atlas_maps = []
adir = self.anatomical_dir.format(sub)
edir = self.estimates_dir.format(sub)
if atlas.space in cerebellar_spaces:
deform = util.file_nii_or_gz(self.suit_dir.format(sub) + f'/{sub}_space-{atlas.space}_xfm.nii')
mask = util.file_nii_or_gz(self.suit_dir.format(sub) + f'/{sub}_desc-cereb_mask.nii')
if deform is None:
print('Direct transform not found, trying to build from SUIT normalization')
deform1 = am.get_deform(atlas.space, 'SUIT')
deform2 = util.file_nii_or_gz(self.suit_dir.format(sub) + f'/{sub}_space-SUIT_xfm.nii')
if deform2 is None:
raise ValueError(f'Neither direct transform nor SUIT transform found. Run import_data.run SUIT using desired space.')
else:
deform = [deform1, deform2]
atlas_maps.append(am.AtlasMapDeform(atlas.world, deform, mask))
atlas_maps[0].build(interpolation=interpolation, smooth=smooth)
elif atlas.space in wholebrain_spaces:
# This is direct MNI normalization
deform = adir + f'/{sub}_space-{atlas.space}_xfm.nii'
mask = edir + f'/{ses_id}/{sub}_{ses_id}_mask.nii'
atlas_maps.append(am.AtlasMapDeform(atlas.world, deform, mask))
atlas_maps[0].build(interpolation=interpolation, smooth=smooth)
elif atlas.space == 'fs32k':
for i, struc in enumerate(atlas.structure):
if struc=='cortex_left':
hem = 'L'
elif struc=='cortex_right':
hem = 'R'
else:
raise ValueError('Structure for fs32k needs to be cortex_left or cortex_right.')
pial = adir + f'/{sub}_space-32k_hemi-{hem}_pial.surf.gii'
white = adir + f'/{sub}_space-32k_hemi-{hem}_white.surf.gii'
mask = edir + f'/{ses_id}/{sub}_{ses_id}_mask.nii'
atlas_maps.append(am.AtlasMapSurf(atlas.vertex[i],
white, pial, mask))
atlas_maps[i].build()
else:
raise ValueError(f'Atlas space {atlas.space} not supported for extraction')
return atlas_maps
[docs]
def condense_data(self, data, info,
type='CondHalf',
participant_id=None,
ses_id=None,
subset=None):
""" Condense the data across the measures to a certain level
If a design matrix file exisits, it is used to combine betas optimally
'CondHalf': Conditions with seperate estimates for first and second half of experiment (Default)
'CondRun': Conditions with seperate estimates per run.
'CondAll': Conditions with a single estimate averaging over all runs.
'TaskHalf': Task with seperate estimates for first and second half of experiment
'TaskRun': Task with seperate estimates per run.
'TaskAll': Task with a single estimate averaging over all runs.
if dataset.subtract_baseline is True, the baseline is subtracted from the data.
Args:
data (ndarray): List of extracted datasets
info (DataFrame): Data Frame with description of data - row-wise
type (str): Type of extraction:
participant_id (str): ID of participant
ses_id (str): Name of session
subset (bool array): If given, ignores certain rows from the
Returns:
Y (list of np.ndarray):
A list (len = numatlas) with N x P_i numpy array of prewhitened data
T (pd.DataFrame):
A data frame with information about the N numbers provided
"""
# Time series: do not condence, just return the data
if type == 'Tseries' or type == 'FixTseries':
info['names'] = info['timepoint']
return data, info
# Task-based data (betas)
info['cond_code'] = info['cond_code'].fillna('task') # for tasks that have no condition code
if subset is None:
subset = (info.task_code != 'instrct') # By defaults, ignore instruction regressors
# Depending on the type, make a new contrast
if type == 'CondHalf':
data_info, C = agg_data(info,
['half', 'task_code','cond_code'],
['run','reg_id'],
subset=subset)
data_info['names'] = [
f'{d.task_code}_{d.cond_code}_half{d.half}' for i, d in data_info.iterrows()]
# Baseline substraction
B = matrix.indicator(data_info.half, positive=True)
elif type == 'CondRun':
data_info, C = agg_data(info,
['run', 'task_code','cond_code'],
['half','reg_id'],
subset=subset)
data_info['names'] = [
f'{d.task_code}_{d.cond_code}_run{d.run:02d}' for i, d in data_info.iterrows()]
# Baseline substraction
B = matrix.indicator(data_info.run, positive=True)
elif type == 'CondAll':
data_info, C = agg_data(info,
['task_code','cond_code'],
['run', 'half','reg_id'],
subset=subset)
data_info['names'] = [
f'{d.task_code}_{d.cond_code}' for i, d in data_info.iterrows()]
# Baseline substraction
B = np.ones((data_info.shape[0],1))
elif type == 'TaskRun':
data_info, C = agg_data(info,
by=['task_code','run'],
over=['half'],
subset=subset)
data_info['names'] = [
f'{d.task_code}_run{d.run}' for i, d in data_info.iterrows()]
# Baseline substraction
B = matrix.indicator(data_info.run, positive=True)
elif type == 'TaskHalf':
data_info, C = agg_data(info,
by=['task_code', 'half'],
over=['run'],
subset=subset)
data_info['names'] = [
f'{d.task_code}_half{d.half}' for i, d in data_info.iterrows()]
# Baseline substraction
B = matrix.indicator(data_info.half, positive=True)
elif type == 'TaskAll':
data_info, C = agg_data(info,
by=['task_code'],
over=['run', 'half'],
subset=subset)
data_info['names'] = [
f'{d.task_code}' for i, d in data_info.iterrows()]
# Baseline substraction
B = np.ones((data_info.shape[0],1))
# Prewhiten the data
data_n = prewhiten_data(data)
# Load the designmatrix and perform optimal contrast
dir = self.estimates_dir.format(participant_id) + f'/{ses_id}'
design_matrix_file = dir + f'/{participant_id}_{ses_id}_designmatrix.npy'
if os.path.exists(design_matrix_file):
X = np.load(design_matrix_file)
data_n = optimal_contrast(data_n, C, X)
else:
for i in range(len(data_n)):
data_n[i] = pinv(C) @ data_n[i]
# Subtract baseline if needed
if self.subtract_baseline:
data_n = [remove_baseline(d, B) for d in data_n]
return data_n, data_info
[docs]
def get_info(self, ses_id='ses-s1', type=None, subj=None, fields=None, exclude_subjects=True):
"""Loads the tsv-files and returns the most complete info structure
Args:
ses_id (str): Session ID (Defaults to 'ses-s1').
type (str): Type of data (Defaults to 'CondHalf').
subj (ndarray): Subject numbers to get - by default none (all)
fields (list): Column names of info stucture that are returned
these are also be tested to be equivalent across subjects
exclude_subjects (bool): If True, excludes subjects that have been specified
in the exclude column of the participants.tsv file.
Returns:
Data (ndarray): (n_subj, n_contrast, n_voxel) array of data
info (DataFramw): Data frame with common descriptor
"""
T = self.get_participants(exclude_subjects=exclude_subjects)
# Deal with subset of subject option
if subj is None:
subj = np.arange(T.shape[0])
else:
subj = [T.participant_id.tolist().index(i) for i in subj]
if type is None:
type = self.default_type
max = 0
# Loop over the different subjects to find the most complete info
for s in T.participant_id.iloc[subj]:
# Get an check the information
info_raw = pd.read_csv(self.data_dir.format(s)
+ f'/{s}_{ses_id}_{type}.tsv', sep='\t')
# Reduce tsv file when fields are given
if fields is not None:
info = info_raw[fields]
else:
info = info_raw
# Keep the most complete info
if info.shape[0] > max:
info_com = info
max = info.shape[0]
# Add the cond_num column to the info structure if task_code or cond_code is present
#
if 'task_code' in info_com.columns:
if 'cond_code' in info_com.columns:
code = info_com['task_code'] + '_' + info_com['cond_code']
else:
code = info_com['task_code']
info_com['cond_num'] = pd.factorize(code)[0] + 1
return info_com
[docs]
def get_data(self, space='SUIT3', ses_id='ses-s1', type=None,
subj=None, exclude_subjects=True, fields=None, ext=None, verbose=False):
"""Loads all the CIFTI files in the data directory of a certain space / type and returns they content as a Numpy array
Args:
space (str): Atlas space (Defaults to 'SUIT3').
ses_id (str): Session ID (Defaults to 'ses-s1').
type (str): Type of data (Defaults to 'CondHalf').
subj (ndarray, str, or list): Subject numbers /names to get [None = all]
exclude_subjects (bool): If True, excludes subjects that have been specified
in the exclude column of the participants.tsv file.
fields (list): Column names of info stucture that are returned
these are also be tested to be equivalent across subjects
Returns:
Data (ndarray): (n_subj, n_contrast, n_voxel) array of data
info (DataFrame): Data frame with common descriptor
"""
T = self.get_participants(exclude_subjects=exclude_subjects)
is_group = False
# Assemble the data
Data = None
# Deal with subset of subject option
if subj is None:
subj = T.participant_id
# JD: This is removed to avoid complications - please use 'subj' argument to select specific subjects
# only get data from subjects that have rest, if specified in dataset description
# if type == 'Tseries' and 'ses-rest' in T.columns:
# subj = T[T['ses-rest'] == 1].participant_id.tolist()
elif isinstance(subj, str) and subj == 'group':
is_group = True
subj = [subj]
elif isinstance(subj, str) and subj in T.participant_id.tolist():
subj = [subj]
elif isinstance(subj, (int,np.integer)):
subj = [T.participant_id.iloc[subj]]
elif isinstance(subj, (list, np.ndarray)):
if isinstance(subj[0], (int,np.integer)):
subj = T.participant_id.iloc[subj]
elif isinstance(subj[0], str):
subj = subj
else:
raise (NameError('subj must be a list of strings or integers'))
else:
raise (NameError('subj must be a str, int, list or ndarray'))
if type is None:
type = self.default_type
if is_group:
info_com = self.get_info(subj=None, ses_id=ses_id, type=type, fields=fields,exclude_subjects=exclude_subjects)
else:
info_com = self.get_info(subj=subj, ses_id=ses_id, type=type, fields=fields,exclude_subjects=exclude_subjects)
# Loop again to assemble the data
Data_list = []
for i, s in enumerate(subj):
# If you add verbose printout, make it so
# that by default it is suppressed by a verbose=False option
if verbose:
print(f'- Getting data for {s} in {space}')
# Load the data
if ext is not None:
C = nb.load(self.data_dir.format(s)
+ f'/{s}_space-{space}_{ses_id}_{type}_{ext}.dscalar.nii')
else:
C = nb.load(self.data_dir.format(s)
+ f'/{s}_space-{space}_{ses_id}_{type}.dscalar.nii')
this_data = C.get_fdata()
# Check if this subject data in incomplete
if this_data.shape[0] != info_com.shape[0]:
this_info = pd.read_csv(self.data_dir.format(s)
+ f'/{s}_{ses_id}_{type}.tsv', sep='\t')
base = np.asarray(info_com['names'])
incomplete = np.asarray(this_info['names'])
for j in range(base.shape[0]):
if base[j] != incomplete[j]:
warnings.warn(
f'{s}, {ses_id}, {type} - missing data {base[j]}')
incomplete = np.insert(
np.asarray(incomplete), j, np.nan)
this_data = np.insert(this_data, j, np.nan, axis=0)
Data_list.append(this_data[np.newaxis, ...])
# concatenate along the first dimension (subjects)
Data = np.concatenate(Data_list, axis=0)
# Ensure that infinite values (from div / 0) show up as NaNs
Data[np.isinf(Data)] = np.nan
return Data, info_com
[docs]
def group_average_data(self, ses_id=None,
type=None,
atlas='SUIT3',
subj=None):
"""Loads data from all subjects in for a certain session, type and atlas.
Averages data across subjects. Saves the results as CIFTI files in the data/group directory.
Args:
ses_id (str, optional): Session ID. If not provided, the first session ID in the dataset will be used.
type (str, optional): Type of data. If not provided, the default type will be used.
atlas (str, optional): Short atlas string. Defaults to 'SUIT3'.
subj (list or None, optional): Subset of subjects to include in the group average. If None, all subjects will be included.
"""
if ses_id is None:
ses_id = self.sessions[0]
if type is None:
type = self.default_type
data, info = self.get_data(space=atlas, ses_id=ses_id,
type=type, subj=subj)
# average across participants
X = np.nanmean(data, axis=0)
# make output cifti
s = self.get_participants().participant_id[0]
C = nb.load(self.data_dir.format(s) +
f'/{s}_space-{atlas}_{ses_id}_{type}.dscalar.nii')
C = nb.Cifti2Image(dataobj=X, header=C.header)
# save output
dest_dir = op.join(self.data_dir.format('group'))
Path(dest_dir).mkdir(parents=True, exist_ok=True)
nb.save(C, dest_dir +
f'/group_space-{atlas}_{ses_id}_{type}.dscalar.nii')
if 'sn' in info.columns:
info = info.drop(columns=['sn'])
info.to_csv(dest_dir +
f'/group_{ses_id}_{type}.tsv', sep='\t', index=False)
[docs]
class DataSetNative(DataSet):
"""Data set with estimates data stored as
nifti-files in Native space.
"""
[docs]
def get_atlasmaps(self, atlas, sub, ses_id, smooth=None, interpolation=1):
"""This function generates atlas map for the data of a specific subject into a specific atlas space.
For Native space, we are using indivdual maps for SUIT and surface space.
Addtiionally, we defines deformations MNI space via the individual normalization into MNI152NLin6Asym (FSL, SPM Segement).
Other MNI space (symmetric etc) are not implemented yet.
Args:
atlas (FunctionFusion.Atlas):
Functional Fusion atlas object
sub (str):
Subject_id for the individual subject
ses_id (str):
Session_id for the individual subject if atlasmap is session dependent. (defaults to none)
smooth (float):
Width of smoothing kernel for extraction. Defaults to None.
Returns:
AtlasMap: Built AtlasMap object
"""
atlas_maps = []
if atlas.space == ['MNI152NLin2009cSym','MNI152NLin6Asym']:
# This is for MNI standard space)
deform = self.anatomical_dir.format(sub) + f'/{sub}_space-{atlas.space}_xfm.nii'
if not os.path.exists(deform):
warnings.warn(f'No individual deformation found for {atlas.space} in {sub} - resortinh to MNI_xfm.nii')
deform = self.anatomical_dir.format(sub) + f'/{sub}_space-MNI_xfm.nii'
edir = self.estimates_dir.format(sub)
mask = edir + f'/{ses_id}/{sub}_{ses_id}_mask.nii'
atlas_maps.append(am.AtlasMapDeform(atlas.world, deform, mask))
atlas_maps[0].build(interpolation=interpolation,smooth=None)
else:
atlas_maps = super().get_atlasmaps(atlas,sub,ses_id,smooth=smooth, interpolation=interpolation)
return atlas_maps
[docs]
class DataSetMNIVol(DataSet):
def __init__(self, base_dir,space='MNI152NLin6Asym'):
"""Data set with estimates data stored as nifti-files in a standard group space. The exact MNI template should be indicated in the space-argument ('MNI152NLin6Asym','MNI152N2009cAsym','MNI152N2009cSym'). The small deformations between the different MNI spaces are implemented when extracting the data.
Args:
base_dir (str): basis directory
space (str): Group Space in which data is stored (Defaults to 'MNI152NLin6Asym').
"""
super().__init__(base_dir)
self.space = space
[docs]
def get_atlasmaps(self, atlas, sub, ses_id, smooth=None, interpolation=1):
"""This function generates atlas map for the data stored in MNI space.
For SUIT and surface space, it goes over deformations estimated on the individual anatomy. If atlas.space matches dataset.space, it uses no deformation, but a direct readout. For mismatching MNI space it tries to find the correct transformation file.
Args:
atlas (FunctionFusion.Atlas):
Functional Fusion atlas object
sub (str):
Subject_id for the individual subject
ses_id (str):
Session_id for the individual subject if atlasmap is session dependent. (defaults to none)
smooth (float):
Width of smoothing kernel for extraction. Defaults to None.
Returns:
AtlasMap: Built AtlasMap object
"""
atlas_spaces = ['MNI152NLin6Asym','MNI152NLin2009cAsym','MNI152NLin2009cSym']
atlas_maps = []
edir = self.estimates_dir.format(sub)
mask = edir + f'/{ses_id}/{sub}_{ses_id}_mask.nii'
# Matching MNI space
if atlas.space == self.space:
atlas_maps.append(am.AtlasMapDeform(atlas.world, None, mask))
atlas_maps[0].build(interpolation=interpolation,smooth=smooth)
# Mis-matching MNI space
elif atlas.space in atlas_spaces:
deform = am.get_deform(atlas.space, self.space)
atlas_maps.append(am.AtlasMapDeform(atlas.world, deform, mask))
atlas_maps[0].build(interpolation=interpolation,smooth=smooth)
# Any other space (SUIT + fs32k)
else:
atlas_maps = super().get_atlasmaps(atlas,sub,ses_id,smooth=smooth, interpolation=interpolation)
return atlas_maps
[docs]
class DataSetCifti(DataSet):
"""Data set that comes in HCP-format in already pre-extracted cifti files.
"""
[docs]
def get_data_fnames(self, participant_id, session_id=None):
""" Gets all raw data files
Args:
participant_id (str): Subject
session_id (str): Session ID. Defaults to None.
Returns:
fnames (list): List of fnames, last one is the resMS image
T (pd.DataFrame): Info structure for regressors (reginfo)
"""
dirw = self.estimates_dir.format(participant_id) + f'/{session_id}'
T = pd.read_csv(
dirw + f'/{participant_id}_{session_id}_reginfo.tsv', sep='\t')
fnames = [f'{dirw}/{participant_id}_{session_id}_beta.dscalar.nii']
fnames.append(
f'{dirw}/{participant_id}_{session_id}_resms.dscalar.nii')
return fnames, T
[docs]
class DataSetMDTB(DataSetNative):
def __init__(self, dir):
super().__init__(dir)
self.sessions = ['ses-s1', 'ses-s2']
self.default_type = 'CondHalf'
self.cond_ind = 'cond_num'
self.part_ind = 'half'
self.subtract_baseline = True
class DataSetPontine(DataSetNative):
def __init__(self, dir):
super().__init__(dir)
self.sessions = ['ses-01']
self.default_type = 'TaskHalf'
self.cond_ind = 'cond_num'
self.part_ind = 'half'
self.subtract_baseline = True
def condense_data(self, data, info,
type='CondHalf',
participant_id=None,
ses_id=None):
""" Condense the data from the pontine project after extraction
Args:
data (list of ndarray)
info (dataframe)
type (str): Type of extraction:
'CondHalf': Conditions with seperate estimates for first and second half of experient (Default)
'CondRun': Conditions with seperate estimates per run
participant_id (str): ID of participant
ses_id (str): Name of session
Returns:
Y (list of np.ndarray):
A list (len = numatlas) with N x P_i numpy array of prewhitened data
T (pd.DataFrame):
A data frame with information about the N numbers provide
names: Names for CIFTI-file per row
"""
# Depending on the type, make a new contrast
n_cond = np.max(info.reg_id)
if type == 'CondHalf':
data_info, C = agg_data(info,
['half', 'reg_id'],
['run'],
subset=(info.reg_id > 0))
data_info['names'] = [
f'{d.task_code}_{d.cond_code}_half{d.half}' for i, d in data_info.iterrows()]
# Baseline substraction
B = matrix.indicator(data_info.half, positive=True)
elif type == 'CondRun':
data_info, C = agg_data(info,
['run', 'reg_id'],
['half'],
subset=(info.reg_id > 0))
data_info['names'] = [
f'{d.task_code}_{d.cond_code}_run{d.run}' for i, d in data_info.iterrows()]
# Baseline substraction
B = matrix.indicator(data_info.run, positive=True)
# Prewhiten the data
data_n = prewhiten_data(data)
# Load the designmatrix and perform optimal contrast
X = np.load(self.estimates_dir.format(participant_id) + f'/{ses_id}/{participant_id}_{ses_id}_designmatrix.npy',allow_pickle=True).item()
reg_in = np.arange(C.shape[1], dtype=int)
CI = matrix.indicator(info.run * info.instruction, positive=True)
C = np.c_[C, CI]
data_new = optimal_contrast(data_n, C, X['nKX'], reg_in)
return data_new, data_info
class DataSetNishi(DataSetNative):
def __init__(self, dir):
super().__init__(dir)
self.sessions = ['ses-01', 'ses-02']
self.default_type = 'CondHalf'
self.cond_ind = 'reg_id'
self.cond_name = 'task_name'
self.part_ind = 'half'
self.subtract_baseline = True
class DataSetIBC(DataSetNative):
def __init__(self, dir):
super().__init__(dir)
self.sessions = ['ses-archi',
'ses-clips4',
'ses-enumeration',
'ses-hcp1', 'ses-hcp2',
'ses-lyon1', 'ses-lyon2',
'ses-mathlang',
'ses-mtt1', 'ses-mtt2',
'ses-preference',
'ses-rsvplanguage',
'ses-spatialnavigation',
'ses-tom']
self.default_type = 'CondHalf'
self.cond_ind = 'cond_num'
self.part_ind = 'half'
self.subtract_baseline = False
def get_atlasmaps(self, atlas, sub, ses_id, smooth=None, interpolation=1):
"""This function generates atlas map for the data of a specific subject into a specific atlas space.
This uses the general functionality, but uses a session specific functional mask.
Args:
atlas (FunctionFusion.Atlas):
Functional Fusion atlas object
sub (str):
Subject_id for the individual subject
ses_id (str):
Session_id for the individual subject if atlasmap is session dependent. (defaults to none)
smooth (float):
Width of smoothing kernel for extraction. Defaults to None.
Returns:
AtlasMap: Built AtlasMap object
"""
atlas_maps = []
if atlas.space == 'SUIT':
deform = self.suit_dir.format(sub) + f'/{sub}_space-SUIT_xfm.nii'
mask = self.estimates_dir.format(sub) + f'/{ses_id}/{sub}_{ses_id}_mask.nii'
add_mask = self.suit_dir.format(sub) + f'/{sub}_desc-cereb_mask.nii'
atlas_map = am.AtlasMapDeform(atlas.world, deform, mask)
atlas_map.build(smooth=2.0, additional_mask=add_mask)
elif atlas.space in ['MNI152NLin2009cSymC','MNI152NLin6AsymC']:
# This is nornmalization over SUIT->MNI (cerebellum only)
deform1 = am.get_deform(atlas.space, 'SUIT')
deform2 = self.suit_dir.format(sub) + f'/{sub}_space-SUIT_xfm.nii'
deform = [deform1, deform2]
mask = self.estimates_dir.format(sub) + f'/{ses_id}/{sub}_{ses_id}_mask.nii'
add_mask = self.suit_dir.format(sub) + f'/{sub}_desc-cereb_mask.nii'
atlas_maps.append(am.AtlasMapDeform(atlas.world, deform, mask))
atlas_maps[0].build(smooth=smooth,additional_mask=add_mask)
else:
atlas_maps=super().get_atlasmaps(atlas, sub, ses_id, smooth=smooth, interpolation=interpolation)
return atlas_maps
class DataSetDemand(DataSetCifti):
def __init__(self, dir):
super().__init__(dir)
self.sessions = ['ses-01']
self.default_type = 'CondHalf'
self.cond_ind = 'cond_num'
self.part_ind = 'half'
self.subtract_baseline = False
class DataSetWMFS(DataSetNative):
def __init__(self, dir):
super().__init__(dir)
self.sessions = ['ses-01', 'ses-02']
self.default_type = 'CondHalf'
self.cond_ind = 'cond_num'
self.part_ind = 'half'
self.subtract_baseline = False
def condense_data(self, data, info,
type='CondHalf',
participant_id=None,
ses_id=None):
""" Condense the data in a certain way optimally
Args:
data (list): List of extracted datasets
info (DataFrame): Data Frame with description of data - row-wise
type (str): Type of extraction:
'CondHalf': Conditions with seperate estimates for first and second half of experient (Default)
'CondRun': Conditions with seperate estimates per run
Defaults to 'CondHalf'.
participant_id (str): ID of participant
ses_id (str): Name of session
Returns:
Y (list of np.ndarray):
A list (len = numatlas) with N x P_i numpy array of prewhitened data
T (pd.DataFrame):
A data frame with information about the N numbers provide
names: Names for CIFTI-file per row
"""
# Depending on the type, make a new contrast
subset = info.cond_code != 'err'
data_new, data_info = super().condense_data(data, info, type,
participant_id=participant_id,
ses_id=ses_id,
subset=subset)
return data_new, data_info
class DataSetSomatotopic(DataSetMNIVol):
def __init__(self, dir):
super().__init__(dir)
self.space = 'MNI152NLin6Asym'
self.sessions = ['ses-motor']
self.default_type = 'CondHalf'
self.cond_ind = 'cond_num'
self.part_ind = 'half'
self.subtract_baseline = False
def get_atlasmaps(self, atlas, sub=None, ses_id = None, smooth=None, interpolation=1):
""" Gets group atlasmaps.
Assumes that all scans are in the same space (self.space)
Args:
sub (str; optional): Subject
Returns:
atlas_maps (list): List of atlasmaps
"""
# if you have group xfm file, then you get it from the atlas directory
# if you have xfm files per subject, then you can get it from the anat dir under individual subject
atlas_maps = []
if atlas.structure == 'cerebellum':
deform = self.atlas_dir + \
f'/tpl-{atlas.space}/tpl-{atlas.space}_from-{self.space}_mode-image_xfm.nii'
mask = self.atlas_dir + \
f'/tpl-{self.space}/tpl-{self.space}_desc-cereb_mask.nii'
atlas_maps.append(am.AtlasMapDeform(atlas.world, deform, mask))
atlas_maps[0].build(interpolation=interpolation,smooth=smooth)
elif atlas.space == 'fs32k':
for i, hem in enumerate(['L', 'R']):
adir = self.anatomical_dir.format(sub)
pial = adir + f'/{sub}_space-32k_hemi-{hem}_pial.surf.gii'
white = adir + f'/{sub}_space-32k_hemi-{hem}_white.surf.gii'
mask = self.atlas_dir + \
f'/tpl-{self.space}/tpl-{self.space}_mask.nii'
atlas_maps.append(am.AtlasMapSurf(atlas.vertex[i],
white, pial, mask))
atlas_maps[i].build()
else:
atlas_maps = super().get_atlasmaps(atlas, sub, ses_id, smooth=smooth, interpolation=interpolation)
return atlas_maps
class DataSetLanguage(DataSetNative):
def __init__(self, dir):
super().__init__(dir)
self.sessions = ['ses-localizer','ses-rest']
self.default_type = 'CondHalf'
self.cond_ind = 'cond_num'
self.part_ind = 'half'
self.subtract_baseline = True
class DataSetHcpTask(DataSetNative):
def __init__(self, dir):
super().__init__(dir)
self.sessions = ['ses-task2']
self.default_type = 'CondHalf'
self.cond_ind = 'cond_num'
self.part_ind = 'half'
self.subtract_baseline = False
class DataSetSocial(DataSetNative):
def __init__(self, dir):
super().__init__(dir)
self.sessions = ['ses-social', 'ses-rest']
self.default_type = 'CondHalf'
self.cond_ind = 'cond_num'
self.part_ind = 'half'
self.subtract_baseline = True