Source code for neuroconv.datainterfaces.ecephys.cellexplorer.cellexplorerdatainterface

from pathlib import Path
from typing import Literal, Optional

import numpy as np
import scipy
from pynwb import NWBFile

from ..baserecordingextractorinterface import BaseRecordingExtractorInterface
from ..basesortingextractorinterface import BaseSortingExtractorInterface
from ....tools import get_package
from ....utils import FilePathType, FolderPathType


[docs]def add_channel_metadata_to_recoder(recording_extractor, folder_path: FolderPathType): """ Main function to add channel metadata to a recording extractor from a CellExplorer session. The metadata is added as channel properties to the recording extractor. Parameters ---------- recording_extractor : BaseRecording from spikeinterface The recording extractor to which the metadata will be added. folder_path : str or Path The path to the directory containing the CellExplorer session. Returns ------- RecordingExtractor The same recording extractor passed in the `recording_extractor` argument, but with added metadata as channel properties. Notes ----- The metadata for the channels is extracted from the `basename.session.mat` file. The logic of the extraction is described on the function: `add_channel_metadata_to_recorder_from_session_file`. Note that, while all the new data produced by CellExplorer should have a `session.mat` file, it is not clear if all the channel metadata is always available. Besides, the current implementation also supports extracting channel metadata from the `chanMap.mat` file used by Kilosort. The logic of the extraction is described on the function: `add_channel_metadata_to_recorder_from_channel_map_file`. Bear in mind that this file is not always available for all datasets. From the documentation we also know that channel data can also be found in the following files: * `basename.ChannelName.channelinfo.mat`: general data container for channel-wise dat * `basename.chanCoords.channelinfo.mat`: contains the coordinates of the electrodes in the probe * `basename.ccf.channelinfo.mat`: Allen Institute's Common Coordinate Framework (CCF) Detailed information can be found in the following link https://cellexplorer.org/datastructure/data-structure-and-format/#channels Future versions of this function will support the extraction of this metadata from these files as well """ recording_extractor = add_channel_metadata_to_recorder_from_session_file( recording_extractor=recording_extractor, folder_path=folder_path ) recording_extractor = add_channel_metadata_to_recorder_from_channel_map_file( recording_extractor=recording_extractor, folder_path=folder_path ) return recording_extractor
[docs]def add_channel_metadata_to_recorder_from_session_file( recording_extractor, folder_path: FolderPathType, ): """ Extracts channel metadata from the CellExplorer's `session.mat` file and adds it to the given recording extractor as properties. The metadata includes electrode groups, channel locations, and brain regions. The function will skip addition if the `session.mat` file is not found in the given session path. This is done to support calling the when using files produced by the old cellexplorer format (Buzcode) which does not have a `session.mat` file. Parameters ---------- recording_extractor : BaseRecording from spikeinterface The recording extractor to which the metadata will be added. folder_path : str or Path The path to the directory containing the CellExplorer session. Returns ------- RecordingExtractor The same recording extractor passed in the `recording_extractor` argument, but with added metadata as channel properties. Notes ----- 1. The channel locations are retrieved from the `chanCoords` field in the `extracellular` section of the `session.mat` file. They are set in the recording extractor using the `set_channel_locations` method. 2. The electrode group information is extracted from the `electrodeGroups` field in the `extracellular` section of the `session.mat` file. The groups are set in the recording extractor using the `set_property` method with the `group` key. 3. The brain region data is fetched from the `brainRegions` section of the `session.mat` file. The brain regions are set in the recording extractor using the `set_property` method with the `brain_region` key. """ session_path = Path(folder_path) session_id = session_path.stem session_path = session_path / f"{session_id}.session.mat" if not session_path.is_file(): return recording_extractor from pymatreader import read_mat ignore_fields = ["animal", "behavioralTracking", "timeSeries", "spikeSorting", "epochs"] session_data = read_mat(session_path, ignore_fields=ignore_fields)["session"] channel_ids = recording_extractor.get_channel_ids() if "extracellular" in session_data: extracellular_data = session_data["extracellular"] if "chanCoords" in extracellular_data: channel_coordinates = extracellular_data["chanCoords"] x_coords = channel_coordinates["x"] y_coords = channel_coordinates["y"] locations = np.array((x_coords, y_coords)).T.astype("float32") recording_extractor.set_channel_locations(channel_ids=channel_ids, locations=locations) if "electrodeGroups" in extracellular_data: electrode_groups_data = extracellular_data["electrodeGroups"] channels = electrode_groups_data["channels"] # Channels is a list of arrays where each array corresponds to a group. We flatten it to a single array num_electrode_groups = len(channels) group_labels = [[f"Group {index + 1}"] * len(channels[index]) for index in range(num_electrode_groups)] channels = np.concatenate(channels).astype("int") values = np.concatenate(group_labels) corresponding_channel_ids = [str(channel) for channel in channels] recording_extractor.set_property(key="group", ids=corresponding_channel_ids, values=values) if "brainRegions" in session_data: brain_region_data = session_data["brainRegions"] # The data in the `brainRegions` field is a struct where the keys are the brain region ids and the values # are dictionaries with the brain region data. # Each of those inner diciontaries has a key called "channels" whose values is an array of channel ids channel_id_to_brain_region = dict() for brain_region_id, brain_region_dict in brain_region_data.items(): # Also, each inner dictionary can have a key called "brainRegion" which is the name of the brain region. brain_region_name = brain_region_dict.get("brainRegion", brain_region_id) channels = brain_region_dict["channels"].astype("int") channels = [str(channel) for channel in channels] for channel_id in channels: # This is a fine grained brain region data. # Channels are assigned to more than one brain region (e.g. CA1sp and CA1so) if channel_id not in channel_id_to_brain_region: channel_id_to_brain_region[channel_id] = brain_region_name else: channel_id_to_brain_region[channel_id] += " - " + brain_region_name ids = list(channel_id_to_brain_region.keys()) values = list(channel_id_to_brain_region.values()) recording_extractor.set_property( key="brain_area", ids=ids, values=values, ) return recording_extractor
[docs]def add_channel_metadata_to_recorder_from_channel_map_file( recording_extractor, folder_path: FolderPathType, ): """ Extracts channel metadata from the `chanMap.mat` file used by Kilosort and adds the properties to the given recording extractor as channel properties. The metadata includes channel groups, channel locations, and channel names. The function will skip addition of properties if the `chanMap.mat` file is not found in the given session path. Parameters ---------- recording_extractor : BaseRecording from spikeinterface The recording extractor to which the metadata will be added. folder_path : Path or str The path to the directory containing the session. Returns ------- RecordingExtractor The same recording extractor passed in the `recording_extractor` argument, but with added metadata. Notes ----- 1. The channel locations are retrieved from the `xcoords` and `ycoords` fields in the `chanMap.mat` file. They are set in the recording extractor using the `set_channel_locations` method. 2. The channel groups are extracted from the `connected` field in the `chanMap.mat` file. The groups are set in the recording extractor using the `set_property` method with the `group` key. 3. The channel names are composed of the channel index and group, and are set in the recording extractor using the `set_property` method with the `channel_name` key. 4. Channel group names are created based on the group index and are set in the recording extractor using the `set_property` method with the `group_name` key. """ session_path = Path(folder_path) chan_map_file_path = session_path / f"chanMap.mat" if not chan_map_file_path.is_file(): return recording_extractor recorder_properties = recording_extractor.get_property_keys() from pymatreader import read_mat channel_map_data = read_mat(chan_map_file_path) channel_ids = channel_map_data["chanMap"] channel_ids = [str(channel_id) for channel_id in channel_ids] add_group_to_recorder = "group" not in recorder_properties and "kcoords" in channel_map_data if add_group_to_recorder: channel_groups = channel_map_data["kcoords"] recording_extractor.set_property(key="group", ids=channel_ids, values=channel_groups) add_coordinates_to_recorder = "location" not in recorder_properties and "xcoords" in channel_map_data if add_coordinates_to_recorder: x_coords = channel_map_data["xcoords"] y_coords = channel_map_data["ycoords"] locations = np.array((x_coords, y_coords)).T.astype("float32") recording_extractor.set_channel_locations(channel_ids=channel_ids, locations=locations) return recording_extractor
[docs]class CellExplorerRecordingInterface(BaseRecordingExtractorInterface): """ Adds raw and lfp data from binary files with the new CellExplorer format: https://cellexplorer.org/ Parameters ---------- folder_path : Path or str The folder where the session data is located. It should contain a `{folder.name}.session.mat` file and the binary files `{folder.name}.dat` or `{folder.name}.lfp` for the LFP interface. verbose : bool, default: True Whether to output verbose text. es_key : str, default: "ElectricalSeries" and "ElectricalSeriesLFP" for the LFP interface Notes ----- CellExplorer's new format contains a `basename.session.mat` file containing rich metadata about the session. basename is the name of the session folder / directory and works as a session identifier. Link to the documentation detailing the `basename.session.mat` structure: https://cellexplorer.org/datastructure/data-structure-and-format/#session-metadata Specifically, we can use the following fields from `basename.session.mat` to create a recording extractor using `BinaryRecordingExtractor` from spikeinterface: * Sampling frequency * Gains * Dtype Where the binary file is named `basename.dat` for the raw data and `basename.lfp` for lfp data. The extraction of channel metadata is described in the function: `add_channel_metadata_to_recoder` """ sampling_frequency_key = "sr" binary_file_extension = "dat" help = "Interface for spike sorted data in the CellExplorer format." display_name = "CellExplorer" def __init__(self, folder_path: FolderPathType, verbose: bool = True, es_key: str = "ElectricalSeries"): self.session_path = Path(folder_path) # No super here, we need to do everything by hand self.verbose = verbose self.es_key = es_key self.subset_channels = None self.source_data = dict(folder_path=folder_path) self._number_of_segments = 1 # CellExplorer is mono segment self.session_id = self.session_path.stem session_data_file_path = self.session_path / f"{self.session_id}.session.mat" assert session_data_file_path.is_file(), f"File {session_data_file_path} does not exist" from pymatreader import read_mat ignore_fields = ["animal", "behavioralTracking", "timeSeries", "spikeSorting", "epochs"] session_data = read_mat(filename=session_data_file_path, ignore_fields=ignore_fields)["session"] extracellular_data = session_data["extracellular"] num_channels = int(extracellular_data["nChannels"]) gain = float(extracellular_data["leastSignificantBit"]) # Usually a value of 0.195 when intan is used gains_to_uv = np.ones(num_channels) * gain dtype = np.dtype(extracellular_data["precision"]) sampling_frequency = float(extracellular_data[self.sampling_frequency_key]) # Channels in CellExplorer are 1-indexed channel_ids = [str(1 + i) for i in range(num_channels)] binary_file_path = self.session_path / f"{self.session_id}.{self.binary_file_extension}" assert binary_file_path.is_file(), f"Binary file {binary_file_path.name} does not exist in `folder_path`" from spikeinterface.core.binaryrecordingextractor import ( BinaryRecordingExtractor, ) self.recording_extractor = BinaryRecordingExtractor( file_paths=[binary_file_path], sampling_frequency=sampling_frequency, num_chan=num_channels, dtype=dtype, t_starts=None, file_offset=0, gain_to_uV=gains_to_uv, offset_to_uV=None, channel_ids=channel_ids, ) self.recording_extractor = add_channel_metadata_to_recoder( recording_extractor=self.recording_extractor, folder_path=folder_path )
[docs] def get_original_timestamps(self): num_frames = self.recording_extractor.get_num_frames() sampling_frequency = self.recording_extractor.get_sampling_frequency() timestamps = np.arange(num_frames) / sampling_frequency return timestamps
[docs]class CellExplorerLFPInterface(CellExplorerRecordingInterface): keywords = BaseRecordingExtractorInterface.keywords + [ "extracellular electrophysiology", "LFP", "local field potential", "LF", ] sampling_frequency_key = "srLfp" binary_file_extension = "lfp" def __init__(self, folder_path: FolderPathType, verbose: bool = True, es_key: str = "ElectricalSeriesLFP"): super().__init__(folder_path, verbose, es_key)
[docs] def add_to_nwbfile( self, nwbfile: NWBFile, metadata: Optional[dict] = None, stub_test: bool = False, starting_time: Optional[float] = None, write_as: Literal["raw", "lfp", "processed"] = "lfp", write_electrical_series: bool = True, compression: Optional[str] = "gzip", compression_opts: Optional[int] = None, iterator_type: str = "v2", iterator_opts: Optional[dict] = None, ): super().add_to_nwbfile( nwbfile, metadata, stub_test, starting_time, write_as, write_electrical_series, compression, compression_opts, iterator_type, iterator_opts, )
[docs]class CellExplorerSortingInterface(BaseSortingExtractorInterface): """Primary data interface class for converting Cell Explorer spiking data.""" def __init__(self, file_path: FilePathType, verbose: bool = True): """ Initialize read of Cell Explorer file. Parameters ---------- file_path: FilePathType Path to .spikes.cellinfo.mat file. verbose: bool, default: True """ hdf5storage = get_package(package_name="hdf5storage") file_path = Path(file_path) self.session_path = Path(file_path).parent self.session_id = self.session_path.stem # Temporary hack to get sampling frequency from the spikes cellinfo file until next SI release from pymatreader import read_mat matlab_file = read_mat(file_path) if "spikes" not in matlab_file.keys(): raise KeyError(f"CellExplorer file '{file_path}' does not contain 'spikes' field.") spikes_mat = matlab_file["spikes"] sampling_frequency = spikes_mat.get("sr", None) # If sampling rate is not available in the spikes cellinfo file, try to get it from the session file session_path = self.session_path / f"{self.session_id}.session.mat" if sampling_frequency is None and session_path.is_file(): matlab_file = read_mat(session_path) session_data = matlab_file["session"] if "extracellular" in session_data.keys(): sampling_frequency = session_data["extracellular"].get("sr", None) super().__init__(file_path=file_path, sampling_frequency=sampling_frequency, verbose=verbose) self.source_data = dict(file_path=file_path) spikes_matfile_path = Path(file_path) assert ( spikes_matfile_path.is_file() ), f"The file_path should point to an existing .spikes.cellinfo.mat file ({spikes_matfile_path})" try: spikes_mat = scipy.io.loadmat(file_name=str(spikes_matfile_path)) self.read_spikes_info_with_scipy = True except NotImplementedError: spikes_mat = hdf5storage.loadmat(file_name=str(spikes_matfile_path)) self.read_spikes_info_with_scipy = False cell_info = spikes_mat.get("spikes", np.empty(0)) self.cell_info_fields = cell_info.dtype.names unit_ids = self.sorting_extractor.get_unit_ids() if self.read_spikes_info_with_scipy: if "cluID" in self.cell_info_fields: self.sorting_extractor.set_property( ids=unit_ids, key="clu_id", values=[int(x) for x in cell_info["cluID"][0][0][0]] ) if "shankID" in self.cell_info_fields: self.sorting_extractor.set_property( ids=unit_ids, key="group_id", values=[f"Group{x}" for x in cell_info["shankID"][0][0][0]] ) if "region" in self.cell_info_fields: self.sorting_extractor.set_property( ids=unit_ids, key="location", values=[str(x[0]) for x in cell_info["region"][0][0][0]] ) else: # Logic for hdf5storage if "cluID" in self.cell_info_fields: self.sorting_extractor.set_property( ids=unit_ids, key="clu_id", values=[int(x) for x in cell_info["cluID"][0][0]] ) if "shankID" in self.cell_info_fields: self.sorting_extractor.set_property( ids=unit_ids, key="group_id", values=[f"Group{x}" for x in cell_info["shankID"][0][0]] ) if "region" in self.cell_info_fields: self.sorting_extractor.set_property( ids=unit_ids, key="location", values=[str(x[0]) for x in cell_info["region"][0][0]] ) celltype_mapping = {"pE": "excitatory", "pI": "inhibitory", "[]": "unclassified"} celltype_file_path = self.session_path / f"{self.session_id}.CellClass.cellinfo.mat" if celltype_file_path.is_file(): celltype_info = scipy.io.loadmat(celltype_file_path).get("CellClass", np.empty(0)) if "label" in celltype_info.dtype.names: self.sorting_extractor.set_property( ids=unit_ids, key="cell_type", values=[str(celltype_mapping[str(x[0])]) for x in celltype_info["label"][0][0][0]], ) def generate_recording_with_channel_metadata(self): session_data_file_path = self.session_path / f"{self.session_id}.session.mat" if session_data_file_path.is_file(): from pymatreader import read_mat ignore_fields = ["animal", "behavioralTracking", "timeSeries", "spikeSorting", "epochs"] session_data = read_mat(filename=session_data_file_path, ignore_fields=ignore_fields)["session"] extracellular_data = session_data["extracellular"] num_channels = int(extracellular_data["nChannels"]) num_samples = int(extracellular_data["nSamples"]) sampling_frequency = int(extracellular_data["sr"]) # Create a dummy recording extractor from spikeinterface.core.numpyextractors import NumpyRecording traces_list = [np.empty(shape=(1, num_channels))] channel_ids = [str(1 + i) for i in range(num_channels)] dummy_recording_extractor = NumpyRecording( traces_list=traces_list, sampling_frequency=sampling_frequency, channel_ids=channel_ids, ) # Add the channel metadata dummy_recording_extractor = add_channel_metadata_to_recoder( recording_extractor=dummy_recording_extractor, folder_path=self.session_path ) return dummy_recording_extractor
[docs] def get_metadata(self) -> dict: metadata = super().get_metadata() session_path = Path(self.source_data["file_path"]).parent session_id = session_path.stem # TODO: add condition for retrieving ecephys metadata if no recording or lfp are included in conversion metadata["NWBFile"].update(session_id=session_id) unit_properties = [] cellinfo_file_path = session_path / f"{session_id}.spikes.cellinfo.mat" if cellinfo_file_path.is_file(): cell_info_fields = self.cell_info_fields if "cluID" in cell_info_fields: unit_properties.append( dict( name="clu_id", description="0-indexed id of cluster identified from the shank.", ) ) if "shankID" in cell_info_fields: unit_properties.append( dict( name="group_id", description="The electrode group ID that each unit was identified by.", ) ) if "region" in cell_info_fields: unit_properties.append( dict( name="location", description="Brain region where each unit was detected.", ) ) celltype_filepath = session_path / f"{session_id}.CellClass.cellinfo.mat" if celltype_filepath.is_file(): celltype_info = scipy.io.loadmat(celltype_filepath).get("CellClass", np.empty(0)) if "label" in celltype_info.dtype.names: unit_properties.append( dict( name="cell_type", description="Type of cell this has been classified as.", ) ) metadata.update(Ecephys=dict(UnitProperties=unit_properties)) return metadata