import numpy as np
from numpy.linalg import inv, pinv
import nibabel as nb
import os, subprocess
import Functional_Fusion.atlas_map as am
from pathlib import Path
default_atlas_dir = os.path.dirname(am.__file__) + '/Atlases'
def get_base_dir():
possible_dirs = ['/Volumes/diedrichsen_data$/data/FunctionalFusion_new',
'/cifs/diedrichsen/data/FunctionalFusion_new',
'Y:/data/FunctionalFusion_new']
for directory in possible_dirs:
if Path(directory).exists():
return directory
raise FileNotFoundError('Could not find base_dir')
[docs]
def file_nii_or_gz(file_path):
""" Checks if the file exists with .nii or .nii.gz extension and returns the correct path
returns None if the file does not exist with either extension
Args:
file_path (str): the file path to check, can end with .nii or .nii.gz
Returns:
str: the file path with the correct extension that exists
"""
if Path(file_path).exists():
return file_path
elif file_path.endswith('.nii') and Path(file_path + '.gz').exists():
return file_path + '.gz'
elif file_path.endswith('.nii.gz') and Path(file_path[:-3]).exists():
return file_path[:-3]
else:
return None
[docs]
def sq_eucl_distances(coordA,coordB):
""" Calculates the squared euclidean distance between two sets of coordinates
Args:
coordA (ndarray): 3 x N array of coordinates
coordB (ndarray): 3 x M array of coordinates
Returns:
D (ndarray): N x M array of squared euclidean distances between each pair of coordinates in A and B
"""
D = coordA.reshape(3,-1,1)-coordB.reshape(3,1,-1)
D = np.sum(D**2,axis=0)
return D
[docs]
def nan_linear_model(X,Y,unknowns_to_nan = True):
""" Calculates the Nansafe estimates of the regression model Y on design matrix X
Y = X @ B
If an entire row in Y is missing, the B-estimate will skip that observation
If this there is no information in the data about that B anymore, the B's are set to NaN
Args:
X (ndarray): N x Q Design matrix
Y (ndarray): N x P data matrix
unknowns_to_nan (bool): Set regression coefficients without information to Nan (instead of to zero)
Returns:
B (ndarray): Q x P regression weights - non-estimable weights are set to NaN
"""
indx = np.logical_not(np.isnan(Y).all(axis=1))
B = pinv(X[indx,:]) @ Y[indx,:]
if unknowns_to_nan:
unknown = np.abs(X[indx,:]).sum(axis=0)==0
B[unknown,:]=np.nan
return B
[docs]
def volume_from_cifti(ts_cifti, struct_names=None):
"""
Gets the 4D nifti object containing the time series
for all the subcortical structures
Args:
ts_cifti (cifti obj ) - cifti object of the time series
Returns:
nii_vol(nifti vol object) - nifti object containing the time series of subcorticals
"""
# get brain axis models
bmf = ts_cifti.header.get_axis(1)
# get the data array with all the time points, all the structures
ts_array = ts_cifti.get_fdata(dtype=np.float32)
# initialize a matrix representing 4D data (x, y, z, time_point)
subcorticals_vol = np.zeros(bmf.volume_shape + (ts_array.shape[0],))
for idx, (nam,slc,bm) in enumerate(bmf.iter_structures()):
# if (struct_names is None) | (nam in struct_names):
# get the voxels/vertices corresponding to the current brain model
ijk = bm.voxel
bm_data = ts_array[:, slc]
i = (ijk[:,0] > -1)
# fill in data
subcorticals_vol[ijk[i, 0], ijk[i, 1], ijk[i, 2], :]=bm_data[:,i].T
# save as nii
nii_vol_4d = nb.Nifti1Image(subcorticals_vol,bmf.affine)
# if save:
# ts_nifti = dir+'/sub-100307_ses-01_task-rest_space-subcortex_run-01_bold.nii'
# nb.save(nii_vol,ts_nifti)
return nii_vol_4d
[docs]
def surf_from_cifti(ts_cifti,
struct_names=['CIFTI_STRUCTURE_CORTEX_LEFT',
'CIFTI_STRUCTURE_CORTEX_RIGHT'],
mask_gii=None):
"""Gets the time series of cortical surface vertices (Left and Right)
Args:
ts_cifti (cifti obj) - cifti object of time series
struct_names (list): the struct name of left and right cortex
mask_gii: (list of Obj.) the mask gifti object for each hemisphere
if given. Default is None,
indicating no mask for return.
(list of str.) the file path of mask gifti for each
hemisphere if given. Default is None,
indicating no mask for return.
Returns:
cii (cifti object) - contains the time series for the cortex
"""
# get brain axis models
bmf = ts_cifti.header.get_axis(1)
# print(dir(bmf))
# get the data array with all the time points, all the structures
ts_array = ts_cifti.get_fdata()
ts_list = []
for idx, (nam,slc,bm) in enumerate(bmf.iter_structures()):
# just get the cortical surfaces
if nam in struct_names:
values = np.full((ts_array.shape[0],bmf.nvertices[nam]),np.nan)
# get the values corresponding to the brain model
values[:,bm.vertex] = ts_array[:, slc]
if mask_gii is not None:
Xmask = mask_gii[struct_names.index(nam)]
if isinstance(Xmask, str):
Xmask = nb.load(Xmask).agg_data()
elif isinstance(Xmask, object):
Xmask = Xmask.agg_data()
else:
raise ValueError("The input mask_gii must be either a string of path"
"or a nibabel gii image object!")
values = values[:, np.where(Xmask>0)[0]]
ts_list.append(values)
else:
break
return ts_list
def zstandarize_ts(X):
X = X - X.mean(axis = 0, keepdims = True)
X = X / np.sqrt(np.nansum(X**2, axis=0)/X.shape[0])
return X
[docs]
def correlate(X, Y):
""" Correlate X and Y numpy arrays after standardizing them"""
X = zstandarize_ts(X)
Y = zstandarize_ts(Y)
return Y.T @ X / X.shape[0]
[docs]
def get_volumes(data, atlas_name='MNISymC2'):
"""
Projects CIFTI data from any space (e.g., SUIT3/MNISymC2) back to volume space.
Args:
data (list, np.ndarray): Input data. Can be a list, NumPy array.
atlas (str): Atlas code to specify the projection space (e.g., 'SUIT3', 'MNISymC3'). Default is 'MNISymC2'.
Returns:
list: A list of NIfTI volume objects projected from the input data.
"""
atlas,_ = am.get_atlas(atlas_name)
# Determine number of volumes based on data type
if isinstance(data, np.ndarray):
n_vols = data.shape[0]
elif isinstance(data, list):
n_vols = len(data)
else:
raise TypeError("data must be a list, np.ndarray, or torch.Tensor")
nii_vols = []
# Project each data volume back to NIfTI volume space
for i in range(n_vols):
cifti_data = data[i]
vol = atlas.data_to_nifti(cifti_data)
nii_vols.append(vol)
return nii_vols
[docs]
def split_string(s):
""" Split string into integer and string
Args:
s: input string to be splitted
Returns:
integer part, and string part
"""
# Split string into integer and string
for i, char in enumerate(s):
if not char.isdigit():
return int(s[:i]), s[i:]
[docs]
def smooth_fs32k_data(input_file, smooth=1, kernel='gaussian',
return_data_only=False):
""" Smooth cortical data in fs32k space from cifti file, then save as cifti
format using workbench command
Args:
input_file: the input cifti file to be smoothed
smooth: width of smoothing kernel in mm
kernel: the type of smoothing kernel, either "gaussian" or "fwhm"
return_data_only: only return the smoothed data but not store any
Returns:
Write in the cifti file of smoothed data
"""
# get the surfaces for smoothing
surf_L = default_atlas_dir + f'/tpl-fs32k/tpl-fs32k_hemi-L_sphere.surf.gii'
surf_R = default_atlas_dir + f'/tpl-fs32k/tpl-fs32k_hemi-R_sphere.surf.gii'
# Making in / out file names
dir_path, file_name = os.path.split(input_file)
base_name = file_name.split('.')[0]
ext = '.' + '.'.join(file_name.split('.')[1:])
if kernel == 'gaussian':
smooth_suffix = f'_desc-sm{smooth}'
elif kernel == 'fwhm':
smooth_suffix = f'_desc-sm{smooth}fwhm'
else:
raise ValueError('Only gaussian and fwhm kernels are supported!')
cifti_out = os.path.join(dir_path, base_name + smooth_suffix + ext)
# Load data and fill nan with zeros if unsmoothed data contains any
contain_nan = False ## a flag
C = nb.load(input_file)
if np.isnan(C.get_fdata()).any():
contain_nan = True
mask = np.isnan(C.get_fdata())
C = nb.Cifti2Image(dataobj=np.nan_to_num(C.get_fdata()), header=C.header)
input_file = dir_path + f'/tmp' + ext
nb.save(C, input_file)
# Write in smoothed surface data (filled with 0)
smooth_cmd = f"wb_command -cifti-smoothing {input_file} " \
f"{smooth} {smooth} COLUMN {cifti_out} " \
f"{f'-{kernel} ' if kernel == 'fwhm' else ''}" \
f"-left-surface {surf_L} -right-surface {surf_R} " \
f"-fix-zeros-surface"
subprocess.run(smooth_cmd, shell=True)
# Double-check if the original data contain NaN values
if contain_nan:
os.remove(dir_path + f'/tmp' + ext)
# Replace 0s back to NaN (we don't want the 0s impact model learning)
data = nb.load(cifti_out).get_fdata()
data[mask] = np.nan
C = nb.Cifti2Image(dataobj=data, header=C.header)
nb.save(C, cifti_out)
if return_data_only:
os.remove(cifti_out)
return data