Source code for dataset

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Data fusion project dataset module

The class for converting and mapping raw data from multi-dataset
to a standard data structure that can be used in Diedrichsen lab

"""
from pathlib import Path
import numpy as np
import pandas as pd
import os, time, sys
import os.path as op

import Functional_Fusion.util as util
import Functional_Fusion.matrix as matrix
import Functional_Fusion.atlas_map as am
import scipy.linalg as sl
import nibabel as nb
from numpy import eye, zeros, ones, empty, nansum, sqrt
from numpy.linalg import pinv, solve
import warnings
import glob
import re


[docs] def get_dataset_class(base_dir, dataset): """ Returns the dataset object without loading the data Args: base_dir (str): Functional Fusion base directory dataset (str): _description_ Returns: my_dataset (Dataset): Dataset object """ T = pd.read_csv(base_dir + '/dataset_description.tsv', sep='\t') T.name = [n.casefold() for n in T.name] i = np.where(dataset.casefold() == T.name)[0] if len(i) == 0: raise (NameError(f'Unknown dataset: {dataset}')) t = T.iloc[int(i)] dsclass = getattr(sys.modules[__name__], t.class_name) dir_name = t.dir_name if dir_name[0] == '/': abs_path = dir_name elif dir_name[0] == '.': # Relative path relative to fusion project abs_path = Path(base_dir).parent + Path(dir_name[1:]) else: abs_path = base_dir + '/' + t.dir_name my_dataset = dsclass(abs_path) my_dataset.sessions = eval(t.sessions) my_dataset.default_type = t.default_type my_dataset.cond_ind = t.cond_ind # Condition Indicator (field in tsv file ) my_dataset.part_ind = t.part_ind # Partition Indicator (field in tsv file ) my_dataset.subtract_baseline = bool(t.subtract_baseline) # If True, baseline is subtracted from the data return my_dataset
[docs] def get_dataset(base_dir, dataset, atlas='SUIT3', sess='all', subj=None, type=None, ext=None,exclude_subjects=True): """get_dataset tensor and data set object Args: base_dir (str): Basis directory for the Functional Fusion datastructure dataset (str): Data set indicator atlas (str): Atlas indicator. Defaults to 'SUIT3'. sess (str or list): Sessions. Defaults to 'all'. subj (ndarray, str, or list): Subject numbers /names to get [None = all] type (str): 'CondHalf','CondRun', etc.... ext (str): added qualifier (smoothing, etc.) default None exclude_subjects (bool): If True, excludes subjects that have been specified in the exclude column of the participants.tsv file. Returns: data (nd.array):nsubj x ncond x nvox data tensor info (pd.DataFrame): Dataframe with info about the data my_dataset (DataSet): Dataset object """ my_dataset = get_dataset_class(base_dir, dataset) # Get defaults sessions from dataset itself if sess == 'all': sess = my_dataset.sessions elif not isinstance(sess, (list, np.ndarray)): sess = [sess] # Empty default type (Future change: define per file?) if type is None: type = my_dataset.default_type # Load all data and concatenate # across sessions info_l = [] data_l = [] for s in sess: dat, inf = my_dataset.get_data(atlas, s, type, subj, ext=ext,exclude_subjects=exclude_subjects) data_l.append(dat) inf['sess'] = [s] * inf.shape[0] info_l.append(inf) info = pd.concat(info_l, ignore_index=True, sort=False) data = np.concatenate(data_l, axis=1) return data, info, my_dataset
def build_dataset_from_fusionProject(dataset, atlas, base_dir, sess='all', cond_ind='cond_num_uni', type='CondHalf', part_ind='half', subj=None, join_sess=False, join_sess_part=False, smooth=None): """ Builds dataset, cond_vec, part_vec, subj_ind from the given dataset in Functional fusion project Args: dataset (str): Names of the dataset to build atlas (object): Atlas indicator sess (list): list of 'all' or list of sessions design_ind (list, optional): _description_. Defaults to None. part_ind (list, optional): _description_. Defaults to None. subj (list, optional): _description_. Defaults to None. join_sess (bool, optional): Model the sessions with a single model. Defaults to True. Returns: data, cond_vec, part_vec, subj_ind """ sub = 0 data, cond_vec, part_vec, subj_ind = [],[],[],[] # Run over datasets get data + design dat, info, tds = get_dataset(base_dir, dataset, atlas=atlas.name, sess=sess, type=type, smooth=smooth) # Sub-index the subjects: (REMOVE AND PASS TO GET_DATA) if subj is not None: dat = dat[subj, :, :] n_subj = dat.shape[0] # Find correct indices if cond_ind is None: cond_ind = tds.cond_ind if part_ind is None: part_ind = tds.part_ind # Make different sessions either the same or different if join_sess: data.append(dat) cond_vec.append(info[cond_ind].values.reshape(-1, )) # Check if we want to set no partition after join sessions if join_sess_part: part_vec.append(np.ones(info[part_ind].shape)) else: part_vec.append(info[part_ind].values.reshape(-1, )) subj_ind.append(np.arange(sub, sub + n_subj)) else: if sess == 'all': sessions = tds.sessions else: sessions = sess # Now build and split across the correct sessions: for s in sessions: indx = info.sess == s data.append(dat[:, indx, :]) cond_vec.append(info[cond_ind].values[indx].reshape(-1, )) part_vec.append(info[part_ind].values[indx].reshape(-1, )) subj_ind.append(np.arange(sub, sub + n_subj)) return data, cond_vec, part_vec, subj_ind
[docs] def prewhiten_data(data): """ prewhitens a list of data matrices. It assumes that the last row of each data matrix is the ResMS-value Returns a list of data matrices that is one shorter Args: data (list of ndarrays): List of data arrays """ # Get the resms and prewhiten the data data_n = [] for i in range(len(data)): # Prewhiten the data univariately resms = data[i][-1, :] data_n.append(data[i][0:-1, :]) resms[resms <= 0] = np.nan data_n[i] = data_n[i] / np.sqrt(resms) return data_n
[docs] def agg_data(info, by, over, subset=None): """ Aggregates data over rows (condition) safely by sorting them by the fields in "by" while integrating out "over". Adds a n_rep field to count how many instances are of each Returns condensed data frame + Contrast matrix. Args: info (DataFrame): Original DataFrame by (list): Fields that define the index of the new data over (list): Fields to ignore / integrate over. All other fields will be pulled through. subset (bool array): If given, ignores certain rows from the original data frame Return data_info (DataFrame): Reduced data frame C (ndarray): Indicator matrix defining the mapping from full to reduced Example: data,info,mdtb= ds.get_data('MDTB','MNISymDentate1',ses_id='ses-s1',type='CondRun') cinfo,C = ds.agg_data(info,['cond_num_uni'],['run','half','reg_num','names']) cdata = np.linalg.pinv(C) @ data """ # Subset original data frame as needed info_n = info.copy() if subset is None: indx = np.arange(info.shape[0]) else: indx = np.nonzero(subset.values)[0] info_n = info_n[subset] # Generate n_rep field if not present if 'n_rep' not in info.columns: info_n['n_rep'] = np.ones((info_n.shape[0],)) # Other contains the fields to remain constant other = list(info.columns.values) for ov in over + by: other.remove(ov) # Define operations on data operations = {'n_rep': 'sum'} for o in other: operations[o] = 'max' # Group the new data frame info_gb = info_n.groupby(by,sort=False) data_info = info_gb.agg(operations).reset_index() # Build indicator matrix for averaging C = np.zeros((info.shape[0], data_info.shape[0])) for i, d in data_info.iterrows(): rows = np.ones(info.shape[0], dtype=bool) for j in by: rows &= (info[j] == d[j]) C[rows, i] = 1 return data_info, C
[docs] def agg_parcels(data, label_vec, fcn=np.nanmean): """ Aggregates data over colums to condense to parcels Args: data (ndarray): Either 2d or 3d data structure, P has to be the last dimension labels (ndarray): 1d-array that gives the labels (P-vector) fcn (function): Function to use to aggregate over these Returns: aggdata (ndarray): Aggregated either 2d or 3d data structure labels (ndarray): Region number corresponding to each "column" """ # Subset original data frame as needed labels = np.unique(label_vec[label_vec > 0]) n_parcels = len(labels) psize = np.array(data.shape) psize[-1] = n_parcels parcel_data = np.zeros(psize) for i, l in enumerate(labels): parcel_data[..., i] = fcn( data[..., label_vec == l], axis=len(psize) - 1) return parcel_data, labels
def combine_parcel_labels(labels_org,labels_new, labelvec_org=None): """ Combines parcel labels from a new atlas to an existing atlas Example call: mapping, lv = combine_parcel_labels(labels_org,['0','A.L','A.R','S..','M3.'],labelvec_org) To get different aggregations of the Nettekoven atlas * A.L includes A1-4L * A.R includes A1-4R * S.. includes all S-areas * M3. includes M3L and M3R Args: labels_org (list of str): N-lenght original label names (should include '0' for 0) labels_new (list of str): List of regexpressions for new labels (should include '0' for 0) labelvec_org (ndarray, optional): Original label vector to remap (P-vector) Returns: mapping (ndarray): New label indices for the old labels (N-length vector) labelvec_new (ndarray): New label vector (P-vector) - returned if labelvec_org is given """ mapping = np.zeros(len(labels_org)) for i,l in enumerate(labels_new): for j,lo in enumerate(labels_org): if re.match(l,lo): mapping[j] = i if labelvec_org is None: return mapping else : labelvec_new = np.zeros(labelvec_org.shape) for i in np.arange(len(mapping)): labelvec_new[labelvec_org == i] = mapping[i] return mapping, labelvec_new
[docs] def optimal_contrast(data, C, X, reg_in=None): """Recombines betas from a GLM into an optimal new contrast, taking into account a design matrix For mathematical background and motivation, see: Args: data (list of ndarrays): List of N x P_i arrays of beta estimates of the original GLM C (ndarray): Contrast matrix (N x Q) going from the original GLM to the new GLM X (ndarray): Original (T x Nx) design matrix used in estimation of the data. Nx could be longer than N by regressors of no interest reg_in (ndarray): Contrast of interest: Logical vector indicating which rows of C we will put in the matrix (defaults to all) """ # Check the sizes N, Q = C.shape T, Np = X.shape # infer number of regressors of no interest that are not in the data structure num_nointerest = Np - N # Add to the contrast matrix Cn = sl.block_diag(C, np.eye(num_nointerest)) # Make new design matrix Xn = X @ Cn # If no subset of regressors is given, use all: if reg_in is None: reg_in = np.arange(Q) # Loop over the data: data_new = [] for i in range(len(data)): # Append the regressors of no interest regressors dat = np.concatenate([data[i], np.zeros((num_nointerest, data[i].shape[1]))]) # Do the averaging / reweighting: d = pinv(Xn) @ X @ dat # Subset to the contrast of interest d = d[reg_in, :] # Put the data in a list: data_new.append(d) return data_new
def remove_baseline(data, baseline): """ Removes a baseline from the data Arg: data (ndarray): (nsubj x N x P) OR (N x P) array baseline (narray): 1-dimensional array (N,) of partition numbers or (N x npart) indicator matrix Returns: data_sub (ndarray): Baseline subtracted data """ if baseline is None: return data N = data.shape[-2] # This is the trial dimension if baseline.ndim == 1: B = matrix.indicator(baseline) else: B = baseline R = eye(N) - B @ pinv(B) # Residual forming matrix return R @ data # Uses broadcasting for >2 dim arrays
[docs] def reliability_maps(base_dir, dataset_name, atlas='MNISymC3', type='CondHalf', subtract_mean=True, voxel_wise=True, subject_wise=False): """ Calculates the average within subject reliability maps across sessions for a single dataset Args: base_dir (str / path): Base directory dataset_name (str): Name of data set atlas (str): _description_. Defaults to 'MNISymC3'. subtract_mean (bool): Remove the mean per voxel before correlation calc? Returns: _type_: _description_ """ data, info, dataset = get_dataset(base_dir, dataset_name, atlas=atlas, type = type) n_sess = len(dataset.sessions) n_vox = data.shape[2] Rel = np.zeros((n_sess, n_vox)) if subject_wise: Rel = np.zeros((n_sess, data.shape[0], n_vox)) for i, s in enumerate(dataset.sessions): indx = info.sess == s r = reliability_within_subj(data[:, indx, :], part_vec=info[dataset.part_ind][indx], cond_vec=info[dataset.cond_ind][indx], voxel_wise=voxel_wise, subtract_mean=subtract_mean) if subject_wise: Rel[i, :, :] = np.nanmean(r, axis=1) else: Rel[i, :] = np.nanmean(np.nanmean(r, axis=0), axis=0) return Rel, dataset.sessions
def reliability_within_subj(X, part_vec, cond_vec,voxel_wise=False, subtract_mean=True): raise(NameError('Depreciated. Use reliability.within_subj_loo')) def reliability_between_subj(X, cond_vec=None,voxel_wise=False, subtract_mean=True): raise(NameError('Depreciated. Use reliability.between_subj_loo')) def decompose_pattern_into_group_indiv_noise(data, criterion='global'): raise(NameError('Depreciated. Use reliability.decompose_subj_group'))
[docs] class DataSet: def __init__(self, base_dir): """DataSet class: Implements the interface for each of the data set Note that the actual preprocessing and glm estimate do not have to be performed with functionality provided by this class. The class is just a instrument to present the user with a uniform interface of how to get subject info Args: base_dir (str): base directory for dataset """ self.base_dir = base_dir self.surface_dir = base_dir + '/derivatives/ffimport/{0}/anat' self.anatomical_dir = base_dir + '/derivatives/ffimport/{0}/anat' self.estimates_dir = base_dir + '/derivatives/ffimport/{0}/func' self.func_dir = base_dir + '/derivatives/ffimport/{0}/func' self.suit_dir = base_dir + '/derivatives/ffimport/{0}/anat' self.data_dir = base_dir + '/derivatives/ffextract/{0}' # assume that the common atlas directory is on the level before self.atlas_dir = util.default_atlas_dir # Some information that a standard data set should have self.sessions = [None] self.default_type = None self.cond_ind = None # Condition Indicator (field in tsv file ) self.part_ind = None # Partition Indicator (field in tsv file ) self.cond_name = None # Condition Names (field in tsv file ) self.subtract_baseline = False # If True, baseline is subtracted from the data
[docs] def get_participants(self, exclude_subjects=True): """ returns a data frame with all participants available in the study. The fields in the data frame correspond to the standard columns in participant.tsv. https://bids-specification.readthedocs.io/en/stable/03-modality-agnostic-files.html Args: exclude_subjects (bool): If True, excludes subjects that have been specified in the exclude column of the participants.tsv file. Returns: Pinfo (pandas data frame): participant information in standard bids format """ self.part_info = pd.read_csv( self.base_dir + '/participants.tsv', delimiter='\t') if exclude_subjects and 'exclude' in self.part_info.columns: # Exclude subjects that have been specified in the exclude column # 1 = exclude, 0 = include self.part_info = self.part_info[self.part_info.exclude == 0].reset_index() return self.part_info
[docs] def get_data_fnames(self, participant_id, session_id=None, type='Cond'): """ Gets all raw data files Args: participant_id (str): Subject session_id (str): Session ID. Defaults to None. type (str): Type of data. Defaults to 'Cond' for task-based data. For rest data use 'Tseries'. Returns: fnames (list): List of fnames, last one is the resMS image T (pd.DataFrame): Info structure for regressors (reginfo) """ dirw = self.estimates_dir.format(participant_id) + f'/{session_id}' if type[:4] == 'Cond' or type[:4] == 'Task': T = pd.read_csv( dirw + f'/{participant_id}_{session_id}_reginfo.tsv', sep='\t') fnames = [f'{dirw}/{participant_id}_{session_id}_run-{t.run:02}_reg-{t.reg_id:02}_beta.nii' for i, t in T.iterrows()] fnames.append(f'{dirw}/{participant_id}_{session_id}_resms.nii') elif type == 'Tseries' or type == 'FixTseries': # Find all run files of the structure f'{dirw}/{participant_id}_{session_id}_run-??.nii' fnames = glob.glob(f'{dirw}/{participant_id}_{session_id}_run-??.nii') # If no files are found, throw error if fnames == []: raise ValueError('No timepoints found in timeseries files') runs = [int(fname.split('run-')[-1].split('_')[0].split('.')[0]) for fname in fnames] runs = np.unique(runs) fnames = [f'{dirw}/{participant_id}_{session_id}_run-{r:02}.nii' for r in runs] if type == 'FixTseries': # Make sure to load fix-cleaned timeseries fnames = [f'{dirw}/{participant_id}_{session_id}_run-{r:02}_fix.nii' for r in runs] try: # Load timeseries info file if it exists T = pd.read_csv( dirw + f'/{participant_id}_{session_id}_tinfo.tsv', sep='\t') except: # Create timeseries info yourself if it doesn't exist timepoints = [nb.load(fname).shape[-1] for fname in fnames] runs = np.repeat(runs, timepoints) # Timepoints start counting at 1, not 0! timepoints = np.arange(sum(timepoints))+1 timepoints_string = [f'T{timepoint:04d}' for timepoint in timepoints] T = pd.DataFrame({'run': runs, 'timepoint': timepoints_string, 'time_id':timepoints}, index=None) return fnames, T
[docs] def get_atlasmaps(self, atlas, sub, ses_id, smooth=None, interpolation=1): """This function generates atlas map for the data of a specific subject into a specific atlas space. The general DataSet.get_atlasmaps defines atlas maps for different spaces - SUIT: Using individual normalization from source space. - MNI152NLin2009cSymC: Via indivual SUIT normalization + group - MNI152NLin6AsymC: Via indivual SUIT normalization + group - MNI152Lin2009cSym: Via individual MNI normalization - MNI152NLin6Asym: Via individual MNI normalization fs32k: Via individual pial and white surfaces (need to be in source space) Other dataset classes will overwrite and extend this function. Args: atlas (FunctionFusion.Atlas): Functional Fusion atlas object sub (str): Subject_id for the individual subject ses_id (str): Session_id for the individual subject if atlasmap is session dependent. (defaults to none) smooth (float): Width of smoothing kernel for extraction. Defaults to None. Returns: AtlasMap: List of AtlasMap object - usually but for fs32k these are two """ cerebellar_spaces = ['SUIT','MNI152NLin2009cSymC','MNI152NLin6AsymC'] wholebrain_spaces = ['MNI152Lin2009cSym','MNI152NLin6Asym'] atlas_maps = [] adir = self.anatomical_dir.format(sub) edir = self.estimates_dir.format(sub) if atlas.space in cerebellar_spaces: deform = util.file_nii_or_gz(self.suit_dir.format(sub) + f'/{sub}_space-{atlas.space}_xfm.nii') mask = util.file_nii_or_gz(self.suit_dir.format(sub) + f'/{sub}_desc-cereb_mask.nii') if deform is None: print('Direct transform not found, trying to build from SUIT normalization') deform1 = am.get_deform(atlas.space, 'SUIT') deform2 = util.file_nii_or_gz(self.suit_dir.format(sub) + f'/{sub}_space-SUIT_xfm.nii') if deform2 is None: raise ValueError(f'Neither direct transform nor SUIT transform found. Run import_data.run SUIT using desired space.') else: deform = [deform1, deform2] atlas_maps.append(am.AtlasMapDeform(atlas.world, deform, mask)) atlas_maps[0].build(interpolation=interpolation, smooth=smooth) elif atlas.space in wholebrain_spaces: # This is direct MNI normalization deform = adir + f'/{sub}_space-{atlas.space}_xfm.nii' mask = edir + f'/{ses_id}/{sub}_{ses_id}_mask.nii' atlas_maps.append(am.AtlasMapDeform(atlas.world, deform, mask)) atlas_maps[0].build(interpolation=interpolation, smooth=smooth) elif atlas.space == 'fs32k': for i, struc in enumerate(atlas.structure): if struc=='cortex_left': hem = 'L' elif struc=='cortex_right': hem = 'R' else: raise ValueError('Structure for fs32k needs to be cortex_left or cortex_right.') pial = adir + f'/{sub}_space-32k_hemi-{hem}_pial.surf.gii' white = adir + f'/{sub}_space-32k_hemi-{hem}_white.surf.gii' mask = edir + f'/{ses_id}/{sub}_{ses_id}_mask.nii' atlas_maps.append(am.AtlasMapSurf(atlas.vertex[i], white, pial, mask)) atlas_maps[i].build() else: raise ValueError(f'Atlas space {atlas.space} not supported for extraction') return atlas_maps
[docs] def condense_data(self, data, info, type='CondHalf', participant_id=None, ses_id=None, subset=None): """ Condense the data across the measures to a certain level If a design matrix file exisits, it is used to combine betas optimally 'CondHalf': Conditions with seperate estimates for first and second half of experiment (Default) 'CondRun': Conditions with seperate estimates per run. 'CondAll': Conditions with a single estimate averaging over all runs. 'TaskHalf': Task with seperate estimates for first and second half of experiment 'TaskRun': Task with seperate estimates per run. 'TaskAll': Task with a single estimate averaging over all runs. if dataset.subtract_baseline is True, the baseline is subtracted from the data. Args: data (ndarray): List of extracted datasets info (DataFrame): Data Frame with description of data - row-wise type (str): Type of extraction: participant_id (str): ID of participant ses_id (str): Name of session subset (bool array): If given, ignores certain rows from the Returns: Y (list of np.ndarray): A list (len = numatlas) with N x P_i numpy array of prewhitened data T (pd.DataFrame): A data frame with information about the N numbers provided """ # Time series: do not condence, just return the data if type == 'Tseries' or type == 'FixTseries': info['names'] = info['timepoint'] return data, info # Task-based data (betas) info['cond_code'] = info['cond_code'].fillna('task') # for tasks that have no condition code if subset is None: subset = (info.task_code != 'instrct') # By defaults, ignore instruction regressors # Depending on the type, make a new contrast if type == 'CondHalf': data_info, C = agg_data(info, ['half', 'task_code','cond_code'], ['run','reg_id'], subset=subset) data_info['names'] = [ f'{d.task_code}_{d.cond_code}_half{d.half}' for i, d in data_info.iterrows()] # Baseline substraction B = matrix.indicator(data_info.half, positive=True) elif type == 'CondRun': data_info, C = agg_data(info, ['run', 'task_code','cond_code'], ['half','reg_id'], subset=subset) data_info['names'] = [ f'{d.task_code}_{d.cond_code}_run{d.run:02d}' for i, d in data_info.iterrows()] # Baseline substraction B = matrix.indicator(data_info.run, positive=True) elif type == 'CondAll': data_info, C = agg_data(info, ['task_code','cond_code'], ['run', 'half','reg_id'], subset=subset) data_info['names'] = [ f'{d.task_code}_{d.cond_code}' for i, d in data_info.iterrows()] # Baseline substraction B = np.ones((data_info.shape[0],1)) elif type == 'TaskRun': data_info, C = agg_data(info, by=['task_code','run'], over=['half'], subset=subset) data_info['names'] = [ f'{d.task_code}_run{d.run}' for i, d in data_info.iterrows()] # Baseline substraction B = matrix.indicator(data_info.run, positive=True) elif type == 'TaskHalf': data_info, C = agg_data(info, by=['task_code', 'half'], over=['run'], subset=subset) data_info['names'] = [ f'{d.task_code}_half{d.half}' for i, d in data_info.iterrows()] # Baseline substraction B = matrix.indicator(data_info.half, positive=True) elif type == 'TaskAll': data_info, C = agg_data(info, by=['task_code'], over=['run', 'half'], subset=subset) data_info['names'] = [ f'{d.task_code}' for i, d in data_info.iterrows()] # Baseline substraction B = np.ones((data_info.shape[0],1)) # Prewhiten the data data_n = prewhiten_data(data) # Load the designmatrix and perform optimal contrast dir = self.estimates_dir.format(participant_id) + f'/{ses_id}' design_matrix_file = dir + f'/{participant_id}_{ses_id}_designmatrix.npy' if os.path.exists(design_matrix_file): X = np.load(design_matrix_file) data_n = optimal_contrast(data_n, C, X) else: for i in range(len(data_n)): data_n[i] = pinv(C) @ data_n[i] # Subtract baseline if needed if self.subtract_baseline: data_n = [remove_baseline(d, B) for d in data_n] return data_n, data_info
[docs] def extract_all(self, ses_id='ses-s1', type='CondHalf', atlas='SUIT3', smooth=None, interpolation=1, subj='all', exclude_subjects=True): """Extracts data in Volumetric space from a dataset in which the data is stored in Native space. Saves the results as CIFTI files in the data directory. Args: ses_id (str): Session. Defaults to 'ses-s1'. type (str): Type for condense_data. Defaults to 'CondHalf'. atlas (str): Short atlas string. Defaults to 'SUIT3'. smooth (float): Smoothing kernel. Defaults to 2.0. subj (list / str): List of Subject numbers to get use. Default = 'all' exclude_subjects (bool): If True, excludes subjects that have been specified in the exclude column of the participants.tsv file. """ myatlas, _ = am.get_atlas(atlas) # create and calculate the atlas map for each participant T = self.get_participants(exclude_subjects=exclude_subjects) if subj != 'all': T = T.iloc[subj] for s in T.participant_id: print(f'Atlasmap {s}') atlas_maps = self.get_atlasmaps(myatlas, s, ses_id, smooth=smooth, interpolation=interpolation) print(f'Extract {s}') fnames, info = self.get_data_fnames(s, ses_id, type=type) data = am.get_data_nifti(fnames, atlas_maps) data, info = self.condense_data(data, info, type, participant_id=s, ses_id=ses_id) # Write out data as CIFTI file C = myatlas.data_to_cifti(data, info.names) dest_dir = self.data_dir.format(s) Path(dest_dir).mkdir(parents=True, exist_ok=True) nb.save(C, dest_dir + f'/{s}_space-{atlas}_{ses_id}_{type}.dscalar.nii') info.to_csv( dest_dir + f'/{s}_{ses_id}_{type}.tsv', sep='\t', index=False)
[docs] def get_info(self, ses_id='ses-s1', type=None, subj=None, fields=None, exclude_subjects=True): """Loads the tsv-files and returns the most complete info structure Args: ses_id (str): Session ID (Defaults to 'ses-s1'). type (str): Type of data (Defaults to 'CondHalf'). subj (ndarray): Subject numbers to get - by default none (all) fields (list): Column names of info stucture that are returned these are also be tested to be equivalent across subjects exclude_subjects (bool): If True, excludes subjects that have been specified in the exclude column of the participants.tsv file. Returns: Data (ndarray): (n_subj, n_contrast, n_voxel) array of data info (DataFramw): Data frame with common descriptor """ T = self.get_participants(exclude_subjects=exclude_subjects) # Deal with subset of subject option if subj is None: subj = np.arange(T.shape[0]) else: subj = [T.participant_id.tolist().index(i) for i in subj] if type is None: type = self.default_type max = 0 # Loop over the different subjects to find the most complete info for s in T.participant_id.iloc[subj]: # Get an check the information info_raw = pd.read_csv(self.data_dir.format(s) + f'/{s}_{ses_id}_{type}.tsv', sep='\t') # Reduce tsv file when fields are given if fields is not None: info = info_raw[fields] else: info = info_raw # Keep the most complete info if info.shape[0] > max: info_com = info max = info.shape[0] # Add the cond_num column to the info structure if task_code or cond_code is present # if 'task_code' in info_com.columns: if 'cond_code' in info_com.columns: code = info_com['task_code'] + '_' + info_com['cond_code'] else: code = info_com['task_code'] info_com['cond_num'] = pd.factorize(code)[0] + 1 return info_com
[docs] def get_data(self, space='SUIT3', ses_id='ses-s1', type=None, subj=None, exclude_subjects=True, fields=None, ext=None, verbose=False): """Loads all the CIFTI files in the data directory of a certain space / type and returns they content as a Numpy array Args: space (str): Atlas space (Defaults to 'SUIT3'). ses_id (str): Session ID (Defaults to 'ses-s1'). type (str): Type of data (Defaults to 'CondHalf'). subj (ndarray, str, or list): Subject numbers /names to get [None = all] exclude_subjects (bool): If True, excludes subjects that have been specified in the exclude column of the participants.tsv file. fields (list): Column names of info stucture that are returned these are also be tested to be equivalent across subjects Returns: Data (ndarray): (n_subj, n_contrast, n_voxel) array of data info (DataFrame): Data frame with common descriptor """ T = self.get_participants(exclude_subjects=exclude_subjects) is_group = False # Assemble the data Data = None # Deal with subset of subject option if subj is None: subj = T.participant_id # JD: This is removed to avoid complications - please use 'subj' argument to select specific subjects # only get data from subjects that have rest, if specified in dataset description # if type == 'Tseries' and 'ses-rest' in T.columns: # subj = T[T['ses-rest'] == 1].participant_id.tolist() elif isinstance(subj, str) and subj == 'group': is_group = True subj = [subj] elif isinstance(subj, str) and subj in T.participant_id.tolist(): subj = [subj] elif isinstance(subj, (int,np.integer)): subj = [T.participant_id.iloc[subj]] elif isinstance(subj, (list, np.ndarray)): if isinstance(subj[0], (int,np.integer)): subj = T.participant_id.iloc[subj] elif isinstance(subj[0], str): subj = subj else: raise (NameError('subj must be a list of strings or integers')) else: raise (NameError('subj must be a str, int, list or ndarray')) if type is None: type = self.default_type if is_group: info_com = self.get_info(subj=None, ses_id=ses_id, type=type, fields=fields,exclude_subjects=exclude_subjects) else: info_com = self.get_info(subj=subj, ses_id=ses_id, type=type, fields=fields,exclude_subjects=exclude_subjects) # Loop again to assemble the data Data_list = [] for i, s in enumerate(subj): # If you add verbose printout, make it so # that by default it is suppressed by a verbose=False option if verbose: print(f'- Getting data for {s} in {space}') # Load the data if ext is not None: C = nb.load(self.data_dir.format(s) + f'/{s}_space-{space}_{ses_id}_{type}_{ext}.dscalar.nii') else: C = nb.load(self.data_dir.format(s) + f'/{s}_space-{space}_{ses_id}_{type}.dscalar.nii') this_data = C.get_fdata() # Check if this subject data in incomplete if this_data.shape[0] != info_com.shape[0]: this_info = pd.read_csv(self.data_dir.format(s) + f'/{s}_{ses_id}_{type}.tsv', sep='\t') base = np.asarray(info_com['names']) incomplete = np.asarray(this_info['names']) for j in range(base.shape[0]): if base[j] != incomplete[j]: warnings.warn( f'{s}, {ses_id}, {type} - missing data {base[j]}') incomplete = np.insert( np.asarray(incomplete), j, np.nan) this_data = np.insert(this_data, j, np.nan, axis=0) Data_list.append(this_data[np.newaxis, ...]) # concatenate along the first dimension (subjects) Data = np.concatenate(Data_list, axis=0) # Ensure that infinite values (from div / 0) show up as NaNs Data[np.isinf(Data)] = np.nan return Data, info_com
[docs] def group_average_data(self, ses_id=None, type=None, atlas='SUIT3', subj=None): """Loads data from all subjects in for a certain session, type and atlas. Averages data across subjects. Saves the results as CIFTI files in the data/group directory. Args: ses_id (str, optional): Session ID. If not provided, the first session ID in the dataset will be used. type (str, optional): Type of data. If not provided, the default type will be used. atlas (str, optional): Short atlas string. Defaults to 'SUIT3'. subj (list or None, optional): Subset of subjects to include in the group average. If None, all subjects will be included. """ if ses_id is None: ses_id = self.sessions[0] if type is None: type = self.default_type data, info = self.get_data(space=atlas, ses_id=ses_id, type=type, subj=subj) # average across participants X = np.nanmean(data, axis=0) # make output cifti s = self.get_participants().participant_id[0] C = nb.load(self.data_dir.format(s) + f'/{s}_space-{atlas}_{ses_id}_{type}.dscalar.nii') C = nb.Cifti2Image(dataobj=X, header=C.header) # save output dest_dir = op.join(self.data_dir.format('group')) Path(dest_dir).mkdir(parents=True, exist_ok=True) nb.save(C, dest_dir + f'/group_space-{atlas}_{ses_id}_{type}.dscalar.nii') if 'sn' in info.columns: info = info.drop(columns=['sn']) info.to_csv(dest_dir + f'/group_{ses_id}_{type}.tsv', sep='\t', index=False)
[docs] class DataSetNative(DataSet): """Data set with estimates data stored as nifti-files in Native space. """
[docs] def get_atlasmaps(self, atlas, sub, ses_id, smooth=None, interpolation=1): """This function generates atlas map for the data of a specific subject into a specific atlas space. For Native space, we are using indivdual maps for SUIT and surface space. Addtiionally, we defines deformations MNI space via the individual normalization into MNI152NLin6Asym (FSL, SPM Segement). Other MNI space (symmetric etc) are not implemented yet. Args: atlas (FunctionFusion.Atlas): Functional Fusion atlas object sub (str): Subject_id for the individual subject ses_id (str): Session_id for the individual subject if atlasmap is session dependent. (defaults to none) smooth (float): Width of smoothing kernel for extraction. Defaults to None. Returns: AtlasMap: Built AtlasMap object """ atlas_maps = [] if atlas.space == ['MNI152NLin2009cSym','MNI152NLin6Asym']: # This is for MNI standard space) deform = self.anatomical_dir.format(sub) + f'/{sub}_space-{atlas.space}_xfm.nii' if not os.path.exists(deform): warnings.warn(f'No individual deformation found for {atlas.space} in {sub} - resortinh to MNI_xfm.nii') deform = self.anatomical_dir.format(sub) + f'/{sub}_space-MNI_xfm.nii' edir = self.estimates_dir.format(sub) mask = edir + f'/{ses_id}/{sub}_{ses_id}_mask.nii' atlas_maps.append(am.AtlasMapDeform(atlas.world, deform, mask)) atlas_maps[0].build(interpolation=interpolation,smooth=None) else: atlas_maps = super().get_atlasmaps(atlas,sub,ses_id,smooth=smooth, interpolation=interpolation) return atlas_maps
[docs] class DataSetMNIVol(DataSet): def __init__(self, base_dir,space='MNI152NLin6Asym'): """Data set with estimates data stored as nifti-files in a standard group space. The exact MNI template should be indicated in the space-argument ('MNI152NLin6Asym','MNI152N2009cAsym','MNI152N2009cSym'). The small deformations between the different MNI spaces are implemented when extracting the data. Args: base_dir (str): basis directory space (str): Group Space in which data is stored (Defaults to 'MNI152NLin6Asym'). """ super().__init__(base_dir) self.space = space
[docs] def get_atlasmaps(self, atlas, sub, ses_id, smooth=None, interpolation=1): """This function generates atlas map for the data stored in MNI space. For SUIT and surface space, it goes over deformations estimated on the individual anatomy. If atlas.space matches dataset.space, it uses no deformation, but a direct readout. For mismatching MNI space it tries to find the correct transformation file. Args: atlas (FunctionFusion.Atlas): Functional Fusion atlas object sub (str): Subject_id for the individual subject ses_id (str): Session_id for the individual subject if atlasmap is session dependent. (defaults to none) smooth (float): Width of smoothing kernel for extraction. Defaults to None. Returns: AtlasMap: Built AtlasMap object """ atlas_spaces = ['MNI152NLin6Asym','MNI152NLin2009cAsym','MNI152NLin2009cSym'] atlas_maps = [] edir = self.estimates_dir.format(sub) mask = edir + f'/{ses_id}/{sub}_{ses_id}_mask.nii' # Matching MNI space if atlas.space == self.space: atlas_maps.append(am.AtlasMapDeform(atlas.world, None, mask)) atlas_maps[0].build(interpolation=interpolation,smooth=smooth) # Mis-matching MNI space elif atlas.space in atlas_spaces: deform = am.get_deform(atlas.space, self.space) atlas_maps.append(am.AtlasMapDeform(atlas.world, deform, mask)) atlas_maps[0].build(interpolation=interpolation,smooth=smooth) # Any other space (SUIT + fs32k) else: atlas_maps = super().get_atlasmaps(atlas,sub,ses_id,smooth=smooth, interpolation=interpolation) return atlas_maps
[docs] class DataSetCifti(DataSet): """Data set that comes in HCP-format in already pre-extracted cifti files. """
[docs] def get_data_fnames(self, participant_id, session_id=None): """ Gets all raw data files Args: participant_id (str): Subject session_id (str): Session ID. Defaults to None. Returns: fnames (list): List of fnames, last one is the resMS image T (pd.DataFrame): Info structure for regressors (reginfo) """ dirw = self.estimates_dir.format(participant_id) + f'/{session_id}' T = pd.read_csv( dirw + f'/{participant_id}_{session_id}_reginfo.tsv', sep='\t') fnames = [f'{dirw}/{participant_id}_{session_id}_beta.dscalar.nii'] fnames.append( f'{dirw}/{participant_id}_{session_id}_resms.dscalar.nii') return fnames, T
[docs] def extract_all(self, ses_id='ses-s1', type='CondHalf', atlas='SUIT3', exclude_subjects=True,interpolation=1, smooth=None): """Extracts cerebellar data. Saves the results as CIFTI files in the data directory. Args: ses_id (str, optional): Session. Defaults to 'ses-s1'. type (str, optional): Type - defined in ger_data. Defaults to 'CondHalf'. atlas (str, optional): Short atlas string. Defaults to 'SUIT3'. exclude_subjects (bool): If True, excludes subjects that have been specified in the exclude column of the participants.tsv file. """ myatlas, _ = am.get_atlas(atlas) # Get the correct map into CIFTI-format if isinstance(myatlas, am.AtlasVolumetric): deform = am.get_deform(myatlas.space,'MNI152NLin6Asym') mask = util.default_atlas_dir + '/tpl-MNI152NLin6Asym/tpl-MNI152NLin6Asym_desc-subcortexmask.nii.gz' atlas_map = am.AtlasMapDeform(myatlas.world,deform,mask) atlas_map.build(interpolation=interpolation,smooth=smooth) elif isinstance(myatlas, am.AtlasSurface): atlas_map = myatlas # Extract the data for each participant T = self.get_participants(exclude_subjects=exclude_subjects) for s in T.participant_id: print(f'Extract {s}') fnames, info = self.get_data_fnames(s, ses_id) data = am.get_data_cifti(fnames, [atlas_map]) data, info = self.condense_data(data, info, type,participant_id=s, ses_id=ses_id) C = myatlas.data_to_cifti(data[0], info.names) dest_dir = self.data_dir.format(s) Path(dest_dir).mkdir(parents=True, exist_ok=True) nb.save(C, dest_dir + f'/{s}_space-{atlas}_{ses_id}_{type}.dscalar.nii') info.to_csv( dest_dir + f'/{s}_{ses_id}_{type}.tsv', sep='\t', index=False)
[docs] class DataSetMDTB(DataSetNative): def __init__(self, dir): super().__init__(dir) self.sessions = ['ses-s1', 'ses-s2'] self.default_type = 'CondHalf' self.cond_ind = 'cond_num' self.part_ind = 'half' self.subtract_baseline = True
class DataSetPontine(DataSetNative): def __init__(self, dir): super().__init__(dir) self.sessions = ['ses-01'] self.default_type = 'TaskHalf' self.cond_ind = 'cond_num' self.part_ind = 'half' self.subtract_baseline = True def condense_data(self, data, info, type='CondHalf', participant_id=None, ses_id=None): """ Condense the data from the pontine project after extraction Args: data (list of ndarray) info (dataframe) type (str): Type of extraction: 'CondHalf': Conditions with seperate estimates for first and second half of experient (Default) 'CondRun': Conditions with seperate estimates per run participant_id (str): ID of participant ses_id (str): Name of session Returns: Y (list of np.ndarray): A list (len = numatlas) with N x P_i numpy array of prewhitened data T (pd.DataFrame): A data frame with information about the N numbers provide names: Names for CIFTI-file per row """ # Depending on the type, make a new contrast n_cond = np.max(info.reg_id) if type == 'CondHalf': data_info, C = agg_data(info, ['half', 'reg_id'], ['run'], subset=(info.reg_id > 0)) data_info['names'] = [ f'{d.task_code}_{d.cond_code}_half{d.half}' for i, d in data_info.iterrows()] # Baseline substraction B = matrix.indicator(data_info.half, positive=True) elif type == 'CondRun': data_info, C = agg_data(info, ['run', 'reg_id'], ['half'], subset=(info.reg_id > 0)) data_info['names'] = [ f'{d.task_code}_{d.cond_code}_run{d.run}' for i, d in data_info.iterrows()] # Baseline substraction B = matrix.indicator(data_info.run, positive=True) # Prewhiten the data data_n = prewhiten_data(data) # Load the designmatrix and perform optimal contrast X = np.load(self.estimates_dir.format(participant_id) + f'/{ses_id}/{participant_id}_{ses_id}_designmatrix.npy',allow_pickle=True).item() reg_in = np.arange(C.shape[1], dtype=int) CI = matrix.indicator(info.run * info.instruction, positive=True) C = np.c_[C, CI] data_new = optimal_contrast(data_n, C, X['nKX'], reg_in) return data_new, data_info class DataSetNishi(DataSetNative): def __init__(self, dir): super().__init__(dir) self.sessions = ['ses-01', 'ses-02'] self.default_type = 'CondHalf' self.cond_ind = 'reg_id' self.cond_name = 'task_name' self.part_ind = 'half' self.subtract_baseline = True class DataSetIBC(DataSetNative): def __init__(self, dir): super().__init__(dir) self.sessions = ['ses-archi', 'ses-clips4', 'ses-enumeration', 'ses-hcp1', 'ses-hcp2', 'ses-lyon1', 'ses-lyon2', 'ses-mathlang', 'ses-mtt1', 'ses-mtt2', 'ses-preference', 'ses-rsvplanguage', 'ses-spatialnavigation', 'ses-tom'] self.default_type = 'CondHalf' self.cond_ind = 'cond_num' self.part_ind = 'half' self.subtract_baseline = False def get_atlasmaps(self, atlas, sub, ses_id, smooth=None, interpolation=1): """This function generates atlas map for the data of a specific subject into a specific atlas space. This uses the general functionality, but uses a session specific functional mask. Args: atlas (FunctionFusion.Atlas): Functional Fusion atlas object sub (str): Subject_id for the individual subject ses_id (str): Session_id for the individual subject if atlasmap is session dependent. (defaults to none) smooth (float): Width of smoothing kernel for extraction. Defaults to None. Returns: AtlasMap: Built AtlasMap object """ atlas_maps = [] if atlas.space == 'SUIT': deform = self.suit_dir.format(sub) + f'/{sub}_space-SUIT_xfm.nii' mask = self.estimates_dir.format(sub) + f'/{ses_id}/{sub}_{ses_id}_mask.nii' add_mask = self.suit_dir.format(sub) + f'/{sub}_desc-cereb_mask.nii' atlas_map = am.AtlasMapDeform(atlas.world, deform, mask) atlas_map.build(smooth=2.0, additional_mask=add_mask) elif atlas.space in ['MNI152NLin2009cSymC','MNI152NLin6AsymC']: # This is nornmalization over SUIT->MNI (cerebellum only) deform1 = am.get_deform(atlas.space, 'SUIT') deform2 = self.suit_dir.format(sub) + f'/{sub}_space-SUIT_xfm.nii' deform = [deform1, deform2] mask = self.estimates_dir.format(sub) + f'/{ses_id}/{sub}_{ses_id}_mask.nii' add_mask = self.suit_dir.format(sub) + f'/{sub}_desc-cereb_mask.nii' atlas_maps.append(am.AtlasMapDeform(atlas.world, deform, mask)) atlas_maps[0].build(smooth=smooth,additional_mask=add_mask) else: atlas_maps=super().get_atlasmaps(atlas, sub, ses_id, smooth=smooth, interpolation=interpolation) return atlas_maps class DataSetDemand(DataSetCifti): def __init__(self, dir): super().__init__(dir) self.sessions = ['ses-01'] self.default_type = 'CondHalf' self.cond_ind = 'cond_num' self.part_ind = 'half' self.subtract_baseline = False class DataSetWMFS(DataSetNative): def __init__(self, dir): super().__init__(dir) self.sessions = ['ses-01', 'ses-02'] self.default_type = 'CondHalf' self.cond_ind = 'cond_num' self.part_ind = 'half' self.subtract_baseline = False def condense_data(self, data, info, type='CondHalf', participant_id=None, ses_id=None): """ Condense the data in a certain way optimally Args: data (list): List of extracted datasets info (DataFrame): Data Frame with description of data - row-wise type (str): Type of extraction: 'CondHalf': Conditions with seperate estimates for first and second half of experient (Default) 'CondRun': Conditions with seperate estimates per run Defaults to 'CondHalf'. participant_id (str): ID of participant ses_id (str): Name of session Returns: Y (list of np.ndarray): A list (len = numatlas) with N x P_i numpy array of prewhitened data T (pd.DataFrame): A data frame with information about the N numbers provide names: Names for CIFTI-file per row """ # Depending on the type, make a new contrast subset = info.cond_code != 'err' data_new, data_info = super().condense_data(data, info, type, participant_id=participant_id, ses_id=ses_id, subset=subset) return data_new, data_info class DataSetSomatotopic(DataSetMNIVol): def __init__(self, dir): super().__init__(dir) self.space = 'MNI152NLin6Asym' self.sessions = ['ses-motor'] self.default_type = 'CondHalf' self.cond_ind = 'cond_num' self.part_ind = 'half' self.subtract_baseline = False def get_atlasmaps(self, atlas, sub=None, ses_id = None, smooth=None, interpolation=1): """ Gets group atlasmaps. Assumes that all scans are in the same space (self.space) Args: sub (str; optional): Subject Returns: atlas_maps (list): List of atlasmaps """ # if you have group xfm file, then you get it from the atlas directory # if you have xfm files per subject, then you can get it from the anat dir under individual subject atlas_maps = [] if atlas.structure == 'cerebellum': deform = self.atlas_dir + \ f'/tpl-{atlas.space}/tpl-{atlas.space}_from-{self.space}_mode-image_xfm.nii' mask = self.atlas_dir + \ f'/tpl-{self.space}/tpl-{self.space}_desc-cereb_mask.nii' atlas_maps.append(am.AtlasMapDeform(atlas.world, deform, mask)) atlas_maps[0].build(interpolation=interpolation,smooth=smooth) elif atlas.space == 'fs32k': for i, hem in enumerate(['L', 'R']): adir = self.anatomical_dir.format(sub) pial = adir + f'/{sub}_space-32k_hemi-{hem}_pial.surf.gii' white = adir + f'/{sub}_space-32k_hemi-{hem}_white.surf.gii' mask = self.atlas_dir + \ f'/tpl-{self.space}/tpl-{self.space}_mask.nii' atlas_maps.append(am.AtlasMapSurf(atlas.vertex[i], white, pial, mask)) atlas_maps[i].build() else: atlas_maps = super().get_atlasmaps(atlas, sub, ses_id, smooth=smooth, interpolation=interpolation) return atlas_maps class DataSetLanguage(DataSetNative): def __init__(self, dir): super().__init__(dir) self.sessions = ['ses-localizer','ses-rest'] self.default_type = 'CondHalf' self.cond_ind = 'cond_num' self.part_ind = 'half' self.subtract_baseline = True class DataSetHcpTask(DataSetNative): def __init__(self, dir): super().__init__(dir) self.sessions = ['ses-task2'] self.default_type = 'CondHalf' self.cond_ind = 'cond_num' self.part_ind = 'half' self.subtract_baseline = False class DataSetSocial(DataSetNative): def __init__(self, dir): super().__init__(dir) self.sessions = ['ses-social', 'ses-rest'] self.default_type = 'CondHalf' self.cond_ind = 'cond_num' self.part_ind = 'half' self.subtract_baseline = True