Source code for neuroconv.tools.spikeinterface.spikeinterface

import uuid
import warnings
from collections import defaultdict
from numbers import Real
from pathlib import Path
from typing import List, Literal, Optional, Union

import numpy as np
import psutil
import pynwb
from hdmf.backends.hdf5.h5_utils import H5DataIO
from hdmf.data_utils import AbstractDataChunkIterator, DataChunkIterator
from packaging.version import Version
from spikeinterface import BaseRecording, BaseSorting, WaveformExtractor

from .spikeinterfacerecordingdatachunkiterator import (
    SpikeInterfaceRecordingDataChunkIterator,
)
from ..importing import get_package_version
from ..nwb_helpers import get_module, make_or_load_nwbfile
from ...utils import (
    DeepDict,
    FilePathType,
    calculate_regular_series_rate,
    dict_deep_update,
)


[docs]def get_nwb_metadata(recording: BaseRecording, metadata: dict = None): """ Return default metadata for all recording fields. Parameters ---------- recording: spikeinterface.BaseRecording metadata: dict metadata info for constructing the nwb file (optional). """ metadata = dict( NWBFile=dict( session_description="Auto-generated by NwbRecordingExtractor without description.", identifier=str(uuid.uuid4()), ), Ecephys=dict( Device=[dict(name="Device", description="Ecephys probe. Automatically generated.")], ElectrodeGroup=[ dict(name=str(group_name), description="no description", location="unknown", device="Device") for group_name in np.unique(recording.get_channel_groups()) ], ), ) return metadata
[docs]def add_devices(nwbfile: pynwb.NWBFile, metadata: Optional[DeepDict] = None): """ Add device information to nwbfile object. Will always ensure nwbfile has at least one device, but multiple devices within the metadata list will also be created. Parameters ---------- nwbfile: NWBFile nwb file to which the recording information is to be added metadata: DeepDict metadata info for constructing the nwb file (optional). Should be of the format metadata['Ecephys']['Device'] = [ { 'name': my_name, 'description': my_description }, ... ] Missing keys in an element of metadata['Ecephys']['Device'] will be auto-populated with defaults. """ if nwbfile is not None: assert isinstance(nwbfile, pynwb.NWBFile), "'nwbfile' should be of type pynwb.NWBFile" # Default Device metadata defaults = dict(name="Device", description="Ecephys probe. Automatically generated.") if metadata is None: metadata = dict() if "Ecephys" not in metadata: metadata["Ecephys"] = dict() if "Device" not in metadata["Ecephys"]: metadata["Ecephys"]["Device"] = [defaults] for dev in metadata["Ecephys"]["Device"]: if dev.get("name", defaults["name"]) not in nwbfile.devices: nwbfile.create_device(**dict(defaults, **dev))
[docs]def add_electrode_groups(recording: BaseRecording, nwbfile: pynwb.NWBFile, metadata: dict = None): """ Add electrode group information to nwbfile object. Will always ensure nwbfile has at least one electrode group. Will auto-generate a linked device if the specified name does not exist in the nwbfile. Parameters ---------- recording: spikeinterface.BaseRecording nwbfile: pynwb.NWBFile nwb file to which the recording information is to be added metadata: dict metadata info for constructing the nwb file (optional). Should be of the format metadata['Ecephys']['ElectrodeGroup'] = [ { 'name': my_name, 'description': my_description, 'location': electrode_location, 'device': my_device_name }, ... ] Missing keys in an element of metadata['Ecephys']['ElectrodeGroup'] will be auto-populated with defaults. Group names set by RecordingExtractor channel properties will also be included with passed metadata, but will only use default description and location. """ assert isinstance(nwbfile, pynwb.NWBFile), "'nwbfile' should be of type pynwb.NWBFile" if len(nwbfile.devices) == 0: warnings.warn("When adding ElectrodeGroup, no Devices were found on nwbfile. Creating a Device now...") add_devices(nwbfile=nwbfile, metadata=metadata) if metadata is None: metadata = dict() if "Ecephys" not in metadata: metadata["Ecephys"] = dict() if "group_name" in recording.get_property_keys(): group_names = np.unique(recording.get_property("group_name")) else: group_names = np.unique(recording.get_channel_groups()).astype("str", copy=False) defaults = [ dict( name=group_name, description="no description", location="unknown", device=[i.name for i in nwbfile.devices.values()][0], ) for group_name in group_names ] if "ElectrodeGroup" not in metadata["Ecephys"]: metadata["Ecephys"]["ElectrodeGroup"] = defaults assert all( [isinstance(x, dict) for x in metadata["Ecephys"]["ElectrodeGroup"]] ), "Expected metadata['Ecephys']['ElectrodeGroup'] to be a list of dictionaries!" for grp in metadata["Ecephys"]["ElectrodeGroup"]: if grp.get("name", defaults[0]["name"]) not in nwbfile.electrode_groups: device_name = grp.get("device", defaults[0]["device"]) if device_name not in nwbfile.devices: new_device_metadata = dict(Ecephys=dict(Device=[dict(name=device_name)])) add_devices(nwbfile=nwbfile, metadata=new_device_metadata) warnings.warn( f"Device '{device_name}' not detected in " "attempted link to electrode group! Automatically generating." ) electrode_group_kwargs = dict(defaults[0], **grp) electrode_group_kwargs.update(device=nwbfile.devices[device_name]) nwbfile.create_electrode_group(**electrode_group_kwargs) if not nwbfile.electrode_groups: device_name = list(nwbfile.devices.keys())[0] device = nwbfile.devices[device_name] if len(nwbfile.devices) > 1: warnings.warn( "More than one device found when adding electrode group " f"via channel properties: using device '{device_name}'. To use a " "different device, indicate it the metadata argument." ) electrode_group_kwargs = dict(defaults[0]) electrode_group_kwargs.update(device=device) for group_name in np.unique(recording.get_channel_groups()).tolist(): electrode_group_kwargs.update(name=str(group_name)) nwbfile.create_electrode_group(**electrode_group_kwargs)
[docs]def add_electrodes(recording: BaseRecording, nwbfile: pynwb.NWBFile, metadata: dict = None, exclude: tuple = ()): """ Add channels from recording object as electrodes to nwbfile object. Parameters ---------- recording: spikeinterface.BaseRecording nwbfile: NWBFile nwb file to which the recording information is to be added metadata: dict metadata info for constructing the nwb file (optional). Should be of the format metadata['Ecephys']['Electrodes'] = [ { 'name': my_name, 'description': my_description }, ... ] Note that data intended to be added to the electrodes table of the NWBFile should be set as channel properties in the RecordingExtractor object. Missing keys in an element of metadata['Ecephys']['ElectrodeGroup'] will be auto-populated with defaults whenever possible. If 'my_name' is set to one of the required fields for nwbfile electrodes (id, x, y, z, imp, location, filtering, group_name), then the metadata will override their default values. Setting 'my_name' to metadata field 'group' is not supported as the linking to nwbfile.electrode_groups is handled automatically; please specify the string 'group_name' in this case. If no group information is passed via metadata, automatic linking to existing electrode groups, possibly including the default, will occur. exclude: tuple An iterable containing the string names of channel properties in the RecordingExtractor object to ignore when writing to the NWBFile. """ assert isinstance(nwbfile, pynwb.NWBFile), "'nwbfile' should be of type pynwb.NWBFile" # Test that metadata has the expected structure electrodes_metadata = list() if metadata is not None: electrodes_metadata = metadata.get("Ecephys", dict()).get("Electrodes", list()) required_keys = {"name", "description"} assert all( [isinstance(property, dict) and set(property.keys()) == required_keys for property in electrodes_metadata] ), ( "Expected metadata['Ecephys']['Electrodes'] to be a list of dictionaries, " "containing the keys 'name' and 'description'" ) assert all( [property["name"] != "group" for property in electrodes_metadata] ), "Passing metadata field 'group' is deprecated; pass group_name instead!" # Transform to a dict that maps property name to its description property_descriptions = dict() for property in electrodes_metadata: property_descriptions[property["name"]] = property["description"] # 1. Build columns details from extractor properties: dict(name: dict(description='',data=data, index=False)) data_to_add = defaultdict(dict) recorder_properties = recording.get_property_keys() excluded_properties = list(exclude) + ["contact_vector"] properties_to_extract = [property for property in recorder_properties if property not in excluded_properties] for property in properties_to_extract: data = recording.get_property(property) index = isinstance(data[0], (list, np.ndarray, tuple)) # booleans are parsed as strings if isinstance(data[0], (bool, np.bool_)): data = data.astype(str) # Fill with provided custom descriptions description = property_descriptions.get(property, "no description") data_to_add[property].update(description=description, data=data, index=index) # Channel name logic channel_ids = recording.get_channel_ids() if "channel_name" in data_to_add: # if 'channel_name' is set as a property, it is used to override default channel_ids (and "id") channel_name_array = data_to_add["channel_name"]["data"] else: channel_name_array = channel_ids.astype("str", copy=False) data_to_add["channel_name"].update(description="unique channel reference", data=channel_name_array, index=False) # Location in spikeinterface is equivalent to rel_x, rel_y, rel_z in the nwb standard if "location" in data_to_add: data = data_to_add["location"]["data"] column_number_to_property = {0: "rel_x", 1: "rel_y", 2: "rel_z"} for column_number in range(data.shape[1]): property = column_number_to_property[column_number] data_to_add[property].update(description=property, data=data[:, column_number], index=False) data_to_add.pop("location") # Brain area is the meaning of location in the nwb standard if "brain_area" in data_to_add: data_to_add["location"] = data_to_add["brain_area"] data_to_add["location"].update(description="location") data_to_add.pop("brain_area") # If no group_names are provided, use information from groups or default values if "group_name" in data_to_add: group_name_array = data_to_add["group_name"]["data"].astype("str", copy=False) else: if "group" in data_to_add: group_name_array = data_to_add["group"]["data"].astype("str", copy=False) else: default_group_name = "ElectrodeGroup" group_name_array = np.full(channel_name_array.size, fill_value=default_group_name) group_name_array[group_name_array == ""] = "ElectrodeGroup" data_to_add["group_name"].update(description="group_name", data=group_name_array, index=False) # Add missing groups to the nwb file groupless_names = [group_name for group_name in group_name_array if group_name not in nwbfile.electrode_groups] if len(groupless_names) > 0: electrode_group_list = [dict(name=group_name) for group_name in groupless_names] missing_group_metadata = dict(Ecephys=dict(ElectrodeGroup=electrode_group_list)) add_electrode_groups(recording=recording, nwbfile=nwbfile, metadata=missing_group_metadata) group_list = [nwbfile.electrode_groups[group_name] for group_name in group_name_array] data_to_add["group"].update(description="the ElectrodeGroup object", data=group_list, index=False) # 2 Divide properties to those that will be added as rows (default plus previous) and columns (new properties) # This mapping contains all the defaults that might be required by pre-defined columns on the NWB schema # https://nwb-schema.readthedocs.io/en/latest/format.html#groups-general-extracellular-ephys-electrodes required_schema_property_to_default_value = dict( id=None, group=None, group_name="default", location="unknown", ) optional_schema_property_to_default_value = dict( x=np.nan, y=np.nan, z=np.nan, # There doesn't seem to be a canonical default for impedance, if missing. # The NwbRecordingExtractor follows the -1.0 convention, other scripts sometimes use np.nan imp=-1.0, filtering="none", ) required_schema_properties = set(required_schema_property_to_default_value) optional_schema_properties = set(optional_schema_property_to_default_value) schema_properties = required_schema_properties | optional_schema_properties electrode_table_previous_properties = set(nwbfile.electrodes.colnames) if nwbfile.electrodes else set() extracted_properties = set(data_to_add) properties_to_add_by_rows = required_schema_properties | electrode_table_previous_properties properties_to_add_by_columns = extracted_properties - properties_to_add_by_rows # Find default values for a subset of optional schema defined properties already in the electrode table all_properties_to_default_value = dict(required_schema_property_to_default_value) for optional_property in optional_schema_properties.intersection(electrode_table_previous_properties): all_properties_to_default_value.update( {optional_property: optional_schema_property_to_default_value[optional_property]} ) # Find default values for custom (not schema defined) properties / columns already in the electrode table type_to_default_value = {list: [], np.ndarray: np.array(np.nan), str: "", Real: np.nan} for property in electrode_table_previous_properties - schema_properties: # Find a matching data type and get the default value sample_data = nwbfile.electrodes[property].data[0] matching_type = next(type for type in type_to_default_value if isinstance(sample_data, type)) default_value = type_to_default_value[matching_type] all_properties_to_default_value.update({property: default_value}) # Add data by rows excluding the rows containing channel_names and group_names that were previously added # The same channel_name can be added provided that it belongs to a different group channel_group_names_used_previously = [] if "channel_name" in electrode_table_previous_properties and "group_name" in electrode_table_previous_properties: channel_group_names_used_previously = [ (ch_name, gr_name) for ch_name, gr_name in zip(nwbfile.electrodes["channel_name"].data, nwbfile.electrodes["group_name"].data) ] properties_with_data = [property for property in properties_to_add_by_rows if "data" in data_to_add[property]] rows_in_data = [index for index in range(recording.get_num_channels())] rows_to_add = [ index for index in rows_in_data if (channel_name_array[index], group_name_array[index]) not in channel_group_names_used_previously ] for row in rows_to_add: electrode_kwargs = dict(all_properties_to_default_value) for property in properties_with_data: electrode_kwargs[property] = data_to_add[property]["data"][row] nwbfile.add_electrode(**electrode_kwargs, enforce_unique_id=True) # Add channel_name as a column and fill previously existing rows with channel_name equal to str(ids) previous_table_size = len(nwbfile.electrodes.id[:]) - len(channel_name_array) if "channel_name" in properties_to_add_by_columns: cols_args = data_to_add["channel_name"] data = cols_args["data"] previous_ids = nwbfile.electrodes.id[:previous_table_size] default_value = np.array(previous_ids).astype("str") extended_data = np.hstack([default_value, data]) cols_args["data"] = extended_data nwbfile.add_electrode_column("channel_name", **cols_args) # Build a channel name to electrode table index map electrodes_df = nwbfile.electrodes.to_dataframe().reset_index() channel_name_to_electrode_index = { channel_name: electrodes_df.query(f"channel_name=='{channel_name}'").index[0] for channel_name in channel_name_array } indexes_for_new_data = [channel_name_to_electrode_index[channel_name] for channel_name in channel_name_array] indexes_for_default_values = electrodes_df.index.difference(indexes_for_new_data).values # Add properties as columns for property in properties_to_add_by_columns - {"channel_name"}: cols_args = data_to_add[property] data = cols_args["data"] if np.issubdtype(data.dtype, np.integer): data = data.astype("float") if property in optional_schema_property_to_default_value: default_value = optional_schema_property_to_default_value[property] else: # Find first matching data-type for custom column sample_data = data[0] matching_type = next(type for type in type_to_default_value if isinstance(sample_data, type)) default_value = type_to_default_value[matching_type] extended_data = np.empty(shape=len(nwbfile.electrodes.id[:]), dtype=data.dtype) extended_data[indexes_for_new_data] = data extended_data[indexes_for_default_values] = default_value cols_args["data"] = extended_data nwbfile.add_electrode_column(property, **cols_args)
[docs]def check_if_recording_traces_fit_into_memory(recording: BaseRecording, segment_index: int = 0) -> None: """ Raises an error if the full traces of a recording extractor are larger than psutil.virtual_memory().available. Parameters ---------- recording : spikeinterface.BaseRecording A recording extractor object from spikeinterface. segment_index : int, optional The segment index of the recording extractor object, by default 0 Raises ------ MemoryError """ element_size_in_bytes = recording.get_dtype().itemsize num_channels = recording.get_num_channels() num_frames = recording.get_num_frames(segment_index=segment_index) traces_size_in_bytes = element_size_in_bytes * num_channels * num_frames available_memory_in_bytes = psutil.virtual_memory().available if traces_size_in_bytes > available_memory_in_bytes: message = ( f"Memory error, full electrical series is {round(traces_size_in_bytes/1e9, 2)} GB) but only" f"({round(available_memory_in_bytes/1e9, 2)} GB are available. Use iterator_type='V2'" ) raise MemoryError(message)
[docs]def _recording_traces_to_hdmf_iterator( recording: BaseRecording, segment_index: int = None, return_scaled: bool = False, iterator_type: str = "v2", iterator_opts: dict = None, ) -> AbstractDataChunkIterator: """Function to wrap traces of spikeinterface recording into an AbstractDataChunkIterator. Parameters ---------- recording : spikeinterface.BaseRecording A recording extractor from spikeinterface segment_index : int, optional The recording segment to add to the NWBFile. return_scaled : bool, defaults to False When True recording extractor objects from spikeinterface return their traces in microvolts. iterator_type: {"v2", "v1", None}, default: 'v2' The type of DataChunkIterator to use. 'v1' is the original DataChunkIterator of the hdmf data_utils. 'v2' is the locally developed SpikeInterfaceRecordingDataChunkIterator, which offers full control over chunking. None: write the TimeSeries with no memory chunking. iterator_opts: dict, optional Dictionary of options for the iterator. See https://hdmf.readthedocs.io/en/stable/hdmf.data_utils.html#hdmf.data_utils.GenericDataChunkIterator for the full list of options. Returns ------- traces_as_iterator: AbstractDataChunkIterator The traces of the recording extractor wrapped in an iterator object. Raises ------ ValueError If the iterator_type is not 'v1', 'v2' or None. """ supported_iterator_types = ["v1", "v2", None] if iterator_type not in supported_iterator_types: message = f"iterator_type {iterator_type} should be either 'v1', 'v2' (recommended) or None" raise ValueError(message) iterator_opts = dict() if iterator_opts is None else iterator_opts if iterator_type is None: check_if_recording_traces_fit_into_memory(recording=recording, segment_index=segment_index) traces_as_iterator = recording.get_traces(return_scaled=return_scaled, segment_index=segment_index) elif iterator_type == "v2": traces_as_iterator = SpikeInterfaceRecordingDataChunkIterator( recording=recording, segment_index=segment_index, return_scaled=return_scaled, **iterator_opts, ) elif iterator_type == "v1": traces_as_iterator = DataChunkIterator( data=recording.get_traces(return_scaled=return_scaled, segment_index=segment_index), **iterator_opts ) else: raise ValueError("iterator_type must be None, 'v1', or 'v2'.") return traces_as_iterator
[docs]def add_electrical_series( recording: BaseRecording, nwbfile: pynwb.NWBFile, metadata: dict = None, segment_index: int = 0, starting_time: Optional[float] = None, write_as: Literal["raw", "processed", "lfp"] = "raw", es_key: str = None, write_scaled: bool = False, compression: Optional[str] = "gzip", compression_opts: Optional[int] = None, iterator_type: Optional[str] = "v2", iterator_opts: Optional[dict] = None, ): """ Adds traces from recording object as ElectricalSeries to an NWBFile object. Parameters ---------- recording : SpikeInterfaceRecording A recording extractor from spikeinterface nwbfile : NWBFile nwb file to which the recording information is to be added metadata : dict, optional metadata info for constructing the nwb file. Should be of the format metadata['Ecephys']['ElectricalSeries'] = dict( name=my_name, description=my_description ) segment_index : int, default: 0 The recording segment to add to the NWBFile. starting_time : float, optional Sets the starting time of the ElectricalSeries to a manually set value. write_as : {'raw', 'processed', 'lfp'} How to save the traces data in the nwb file. Options: - 'raw': save it in acquisition - 'processed': save it as FilteredEphys, in a processing module - 'lfp': save it as LFP, in a processing module es_key : str, optional Key in metadata dictionary containing metadata info for the specific electrical series write_scaled : bool, default: False If True, writes the traces in uV with the right conversion. If False , the data is stored as it is and the right conversions factors are added to the nwbfile. compression: {None, 'gzip', 'lzf'}, default: 'gzip' Type of compression to use. Set to None to disable all compression. To use the `configure_backend` function, you should set this to None. compression_opts: int, default: 4 Only applies to compression="gzip". Controls the level of the GZIP. iterator_type: {"v2", "v1", None}, default: 'v2' The type of DataChunkIterator to use. 'v1' is the original DataChunkIterator of the hdmf data_utils. 'v2' is the locally developed SpikeInterfaceRecordingDataChunkIterator, which offers full control over chunking. None: write the TimeSeries with no memory chunking. iterator_opts: dict, optional Dictionary of options for the iterator. See https://hdmf.readthedocs.io/en/stable/hdmf.data_utils.html#hdmf.data_utils.GenericDataChunkIterator for the full list of options. Missing keys in an element of metadata['Ecephys']['ElectrodeGroup'] will be auto-populated with defaults whenever possible. """ assert write_as in [ "raw", "processed", "lfp", ], f"'write_as' should be 'raw', 'processed' or 'lfp', but instead received value {write_as}" modality_signature = write_as.upper() if write_as == "lfp" else write_as.capitalize() default_name = f"ElectricalSeries{modality_signature}" default_description = dict(raw="Raw acquired data", lfp="Processed data - LFP", processed="Processed data") eseries_kwargs = dict(name=default_name, description=default_description[write_as]) # Select and/or create module if lfp or processed data is to be stored. if write_as in ["lfp", "processed"]: ecephys_mod = get_module( nwbfile=nwbfile, name="ecephys", description="Intermediate data from extracellular electrophysiology recordings, e.g., LFP.", ) if write_as == "lfp" and "LFP" not in ecephys_mod.data_interfaces: ecephys_mod.add(pynwb.ecephys.LFP(name="LFP")) if write_as == "processed" and "Processed" not in ecephys_mod.data_interfaces: ecephys_mod.add(pynwb.ecephys.FilteredEphys(name="Processed")) if metadata is not None and "Ecephys" in metadata and es_key is not None: assert es_key in metadata["Ecephys"], f"metadata['Ecephys'] dictionary does not contain key '{es_key}'" eseries_kwargs.update(metadata["Ecephys"][es_key]) # If the recording extractor has more than 1 segment, append numbers to the names so that the names are unique. # 0-pad these names based on the number of segments. # If there are 10 segments use 2 digits, if there are 100 segments use 3 digits, etc. if recording.get_num_segments() > 1: width = int(np.ceil(np.log10((recording.get_num_segments())))) eseries_kwargs["name"] += f"{segment_index:0{width}}" # The add_electrodes adds a column with channel name to the electrode table. add_electrodes(recording=recording, nwbfile=nwbfile, metadata=metadata) # That uses either the `channel_name` property or the channel ids as string otherwise. channel_names = recording.get_property("channel_name") if channel_names is None: channel_names = recording.get_channel_ids().astype("str") # We use those channels to select the electrodes to be added to the ElectricalSeries channel_name_column = nwbfile.electrodes["channel_name"][:] mask = np.isin(channel_name_column, channel_names) table_ids = np.nonzero(mask)[0] electrode_table_region = nwbfile.create_electrode_table_region( region=table_ids.tolist(), description="electrode_table_region" ) eseries_kwargs.update(electrodes=electrode_table_region) # Spikeinterface guarantees data in micro volts when return_scaled=True. This multiplies by gain and adds offsets # In nwb to get traces in Volts we take data*channel_conversion*conversion + offset channel_conversion = recording.get_channel_gains() channel_offset = recording.get_channel_offsets() unique_channel_conversion = np.unique(channel_conversion) unique_channel_conversion = unique_channel_conversion[0] if len(unique_channel_conversion) == 1 else None unique_offset = np.unique(channel_offset) if unique_offset.size > 1: raise ValueError("Recording extractors with heterogeneous offsets are not supported") unique_offset = unique_offset[0] if unique_offset[0] is not None else 0 micro_to_volts_conversion_factor = 1e-6 if not write_scaled and unique_channel_conversion is None: eseries_kwargs.update(conversion=micro_to_volts_conversion_factor) eseries_kwargs.update(channel_conversion=channel_conversion) elif not write_scaled and unique_channel_conversion is not None: eseries_kwargs.update(conversion=unique_channel_conversion * micro_to_volts_conversion_factor) if not write_scaled: eseries_kwargs.update(offset=unique_offset * micro_to_volts_conversion_factor) # Iterator ephys_data_iterator = _recording_traces_to_hdmf_iterator( recording=recording, segment_index=segment_index, iterator_type=iterator_type, iterator_opts=iterator_opts, ) if compression is not None: # in this case we assume HDF5 backend and compression eseries_kwargs.update( data=H5DataIO(data=ephys_data_iterator, compression=compression, compression_opts=compression_opts) ) else: eseries_kwargs.update(data=ephys_data_iterator) # Now we decide whether to store the timestamps as a regular series or as an irregular series. if recording.has_time_vector(segment_index=segment_index): # First we check if the recording has a time vector to avoid creating artificial timestamps timestamps = recording.get_times(segment_index=segment_index) rate = calculate_regular_series_rate(series=timestamps) # Returns None if it is not regular recording_t_start = timestamps[0] else: rate = recording.get_sampling_frequency() recording_t_start = recording._recording_segments[segment_index].t_start or 0 starting_time = starting_time if starting_time is not None else 0 if rate: starting_time = float(starting_time + recording_t_start) # Note that we call the sampling frequency again because the estimated rate might be different from the # sampling frequency of the recording extractor by some epsilon. eseries_kwargs.update(starting_time=starting_time, rate=recording.get_sampling_frequency()) else: shifted_timestamps = starting_time + timestamps if compression is not None: # in this case we assume HDF5 backend and compression timestamps_iterator = H5DataIO( data=shifted_timestamps, compression=compression, compression_opts=compression_opts ) else: timestamps_iterator = shifted_timestamps eseries_kwargs.update(timestamps=timestamps_iterator) # Create ElectricalSeries object and add it to nwbfile es = pynwb.ecephys.ElectricalSeries(**eseries_kwargs) if write_as == "raw": nwbfile.add_acquisition(es) elif write_as == "processed": ecephys_mod.data_interfaces["Processed"].add_electrical_series(es) elif write_as == "lfp": ecephys_mod.data_interfaces["LFP"].add_electrical_series(es)
[docs]def add_electrodes_info(recording: BaseRecording, nwbfile: pynwb.NWBFile, metadata: dict = None): """ Add device, electrode_groups, and electrodes info to the nwbfile. Parameters ---------- recording : SpikeInterfaceRecording nwbfile : NWBFile NWB file to which the recording information is to be added metadata : dict, optional metadata info for constructing the nwb file. Should be of the format metadata['Ecephys']['Electrodes'] = [ { 'name': my_name, 'description': my_description }, ... ] Note that data intended to be added to the electrodes table of the NWBFile should be set as channel properties in the RecordingExtractor object. Missing keys in an element of metadata['Ecephys']['ElectrodeGroup'] will be auto-populated with defaults whenever possible. If 'my_name' is set to one of the required fields for nwbfile electrodes (id, x, y, z, imp, location, filtering, group_name), then the metadata will override their default values. Setting 'my_name' to metadata field 'group' is not supported as the linking to nwbfile.electrode_groups is handled automatically; please specify the string 'group_name' in this case. If no group information is passed via metadata, automatic linking to existing electrode groups, possibly including the default, will occur. """ add_devices(nwbfile=nwbfile, metadata=metadata) add_electrode_groups(recording=recording, nwbfile=nwbfile, metadata=metadata) add_electrodes(recording=recording, nwbfile=nwbfile, metadata=metadata)
def add_recording( recording: BaseRecording, nwbfile: pynwb.NWBFile, metadata: Optional[dict] = None, starting_time: Optional[float] = None, write_as: Literal["raw", "processed", "lfp"] = "raw", es_key: Optional[str] = None, write_electrical_series: bool = True, write_scaled: bool = False, compression: Optional[str] = "gzip", compression_opts: Optional[int] = None, iterator_type: str = "v2", iterator_opts: Optional[dict] = None, ): if hasattr(recording, "nwb_metadata"): metadata = dict_deep_update(recording.nwb_metadata, metadata) elif metadata is None: metadata = get_nwb_metadata(recording=recording) # Convenience function to add device, electrode groups and electrodes info add_electrodes_info(recording=recording, nwbfile=nwbfile, metadata=metadata) if write_electrical_series: number_of_segments = recording.get_num_segments() for segment_index in range(number_of_segments): add_electrical_series( recording=recording, nwbfile=nwbfile, segment_index=segment_index, starting_time=starting_time, metadata=metadata, write_as=write_as, es_key=es_key, write_scaled=write_scaled, compression=compression, compression_opts=compression_opts, iterator_type=iterator_type, iterator_opts=iterator_opts, )
[docs]def write_recording( recording: BaseRecording, nwbfile_path: Optional[FilePathType] = None, nwbfile: Optional[pynwb.NWBFile] = None, metadata: Optional[dict] = None, overwrite: bool = False, verbose: bool = True, starting_time: Optional[float] = None, write_as: Optional[str] = "raw", es_key: Optional[str] = None, write_electrical_series: bool = True, write_scaled: bool = False, compression: Optional[str] = "gzip", compression_opts: Optional[int] = None, iterator_type: Optional[str] = "v2", iterator_opts: Optional[dict] = None, ) -> pynwb.NWBFile: """ Primary method for writing a RecordingExtractor object to an NWBFile. Parameters ---------- recording : spikeinterface.BaseRecording nwbfile_path : FilePathType, optional Path for where to write or load (if overwrite=False) the NWBFile. If specified, the context will always write to this location. nwbfile : NWBFile, optional If passed, this function will fill the relevant fields within the NWBFile object. E.g., calling write_recording(recording=my_recording_extractor, nwbfile=my_nwbfile) will result in the appropriate changes to the my_nwbfile object. If neither 'nwbfile_path' nor 'nwbfile' are specified, an NWBFile object will be automatically generated and returned by the function. metadata : dict, optional metadata info for constructing the nwb file (optional). Should be of the format metadata['Ecephys'] = { 'Device': [ { 'name': my_name, 'description': my_description }, ... ] 'ElectrodeGroup': [ { 'name': my_name, 'description': my_description, 'location': electrode_location, 'device': my_device_name }, ... ] 'Electrodes': [ { 'name': my_name, 'description': my_description }, ... ] 'ElectricalSeries' = { 'name': my_name, 'description': my_description } Note that data intended to be added to the electrodes table of the NWBFile should be set as channel properties in the RecordingExtractor object. overwrite : bool, default: False Whether to overwrite the NWBFile if one exists at the nwbfile_path. verbose : bool, default: True If 'nwbfile_path' is specified, informs user after a successful write operation. starting_time : float, optional Sets the starting time of the ElectricalSeries to a manually set value. write_as: {'raw', 'processed', 'lfp'}, optional How to save the traces data in the nwb file. - 'raw' will save it in acquisition - 'processed' will save it as FilteredEphys, in a processing module - 'lfp' will save it as LFP, in a processing module es_key: str, optional Key in metadata dictionary containing metadata info for the specific electrical series write_electrical_series: bool, default: True If True, electrical series are written in acquisition. If False, only device, electrode_groups, and electrodes are written to NWB. write_scaled: bool, default: True If True, writes the scaled traces (return_scaled=True) compression: {None, 'gzip', 'lzp'}, default: 'gzip' Type of compression to use. Set to None to disable all compression. To use the `configure_backend` function, you should set this to None. compression_opts: int, optional, default: 4 Only applies to compression="gzip". Controls the level of the GZIP. iterator_type: {"v2", "v1", None} The type of DataChunkIterator to use. 'v1' is the original DataChunkIterator of the hdmf data_utils. 'v2' is the locally developed SpikeInterfaceRecordingDataChunkIterator, which offers full control over chunking. None: write the TimeSeries with no memory chunking. iterator_opts: dict, optional Dictionary of options for the RecordingExtractorDataChunkIterator (iterator_type='v2'). Valid options are: buffer_gb : float, default: 1.0 In units of GB. Recommended to be as much free RAM as available. Automatically calculates suitable buffer shape. buffer_shape : tuple, optional Manual specification of buffer shape to return on each iteration. Must be a multiple of chunk_shape along each axis. Cannot be set if `buffer_gb` is specified. chunk_mb : float. default: 1.0 Should be below 1 MB. Automatically calculates suitable chunk shape. chunk_shape : tuple, optional Manual specification of the internal chunk shape for the HDF5 dataset. Cannot be set if `chunk_mb` is also specified. display_progress : bool, default: False Display a progress bar with iteration rate and estimated completion time. progress_bar_options : dict, optional Dictionary of keyword arguments to be passed directly to tqdm. See https://github.com/tqdm/tqdm#parameters for options. """ with make_or_load_nwbfile( nwbfile_path=nwbfile_path, nwbfile=nwbfile, metadata=metadata, overwrite=overwrite, verbose=verbose ) as nwbfile_out: add_recording( recording=recording, nwbfile=nwbfile, starting_time=starting_time, metadata=metadata, write_as=write_as, es_key=es_key, write_electrical_series=write_electrical_series, write_scaled=write_scaled, compression=compression, compression_opts=compression_opts, iterator_type=iterator_type, iterator_opts=iterator_opts, ) return nwbfile_out
[docs]def get_nspikes(units_table: pynwb.misc.Units, unit_id: int) -> int: """Return the number of spikes for chosen unit.""" ids = np.array(units_table.id[:]) indexes = np.where(ids == unit_id)[0] if not len(indexes): raise ValueError(f"{unit_id} is an invalid unit_id. Valid ids: {ids}.") index = indexes[0] if index == 0: return units_table["spike_times_index"].data[index] else: return units_table["spike_times_index"].data[index] - units_table["spike_times_index"].data[index - 1]
[docs]def add_units_table( sorting: BaseSorting, nwbfile: pynwb.NWBFile, unit_ids: Optional[List[Union[str, int]]] = None, property_descriptions: Optional[dict] = None, skip_properties: Optional[List[str]] = None, skip_features: Optional[List[str]] = None, units_table_name: str = "units", unit_table_description: str = "Autogenerated by neuroconv.", write_in_processing_module: bool = False, write_waveforms: bool = False, waveform_means: Optional[np.ndarray] = None, waveform_sds: Optional[np.ndarray] = None, unit_electrode_indices=None, ): """ Primary method for writing a SortingExtractor object to an NWBFile. Parameters ---------- sorting : spikeinterface.BaseSorting nwbfile : NWBFile unit_ids : list of int or list of str, optional Controls the unit_ids that will be written to the nwb file. If None, all units are written. property_descriptions : dict, optional For each key in this dictionary which matches the name of a unit property in sorting, adds the value as a description to that custom unit column. skip_properties : list of str, optional Each string in this list that matches a unit property will not be written to the NWBFile. skip_features : list of str, optional Each string in this list that matches a spike feature will not be written to the NWBFile. write_in_processing_module : bool, default: False How to save the units table in the nwb file. - True will save it to the processing module to serve as a historical provenance for the official table. - False will save it to the official NWBFile.Units position; recommended only for the final form of the data. units_table_name : str, default: 'units' The name of the units table. If write_as=='units', then units_table_name must also be 'units'. unit_table_description : str, optional Text description of the units table; it is recommended to include information such as the sorting method, curation steps, etc. write_waveforms : bool, default: False if True and either sorting is a spikeextractors SortingExtractor object with "template" property or waveform_means (and optionally waveform_sd) are given, then waveforms are added to the units table after writing. waveform_means : np.ndarray, optional Waveform mean (template) for each unit (num_units, num_samples, num_channels) waveform_sds : np.ndarray, optional Waveform standard deviation for each unit (num_units, num_samples, num_channels) unit_electrode_indices : list of lists or arrays, optional For each unit, the indices of electrodes that each waveform_mean/sd correspond to. """ if not write_in_processing_module and units_table_name != "units": raise ValueError("When writing to the nwbfile.units table, the name of the table must be 'units'!") if not isinstance(nwbfile, pynwb.NWBFile): raise TypeError(f"nwbfile type should be an instance of pynwb.NWBFile but got {type(nwbfile)}") if write_in_processing_module: ecephys_mod = get_module( nwbfile=nwbfile, name="ecephys", description="Intermediate data from extracellular electrophysiology recordings, e.g., LFP.", ) write_table_first_time = units_table_name not in ecephys_mod.data_interfaces if write_table_first_time: units_table = pynwb.misc.Units(name=units_table_name, description=unit_table_description) ecephys_mod.add(units_table) units_table = ecephys_mod[units_table_name] else: write_table_first_time = nwbfile.units is None if write_table_first_time: nwbfile.units = pynwb.misc.Units(name="units", description=unit_table_description) units_table = nwbfile.units default_descriptions = dict( isi_violation="Quality metric that measures the ISI violation ratio as a proxy for the purity of the unit.", firing_rate="Number of spikes per unit of time.", template="The extracellular average waveform.", max_channel="The recording channel id with the largest amplitude.", halfwidth="The full-width half maximum of the negative peak computed on the maximum channel.", peak_to_valley="The duration between the negative and the positive peaks computed on the maximum channel.", snr="The signal-to-noise ratio of the unit.", quality="Quality of the unit as defined by phy (good, mua, noise).", spike_amplitude="Average amplitude of peaks detected on the channel.", spike_rate="Average rate of peaks detected on the channel.", unit_name="Unique reference for each unit.", ) if property_descriptions is None: property_descriptions = dict() if skip_properties is None: skip_properties = list() property_descriptions = dict(default_descriptions, **property_descriptions) data_to_add = defaultdict(dict) sorting_properties = sorting.get_property_keys() excluded_properties = list(skip_properties) + ["contact_vector"] properties_to_extract = [property for property in sorting_properties if property not in excluded_properties] if unit_ids is not None: sorting = sorting.select_units(unit_ids=unit_ids) if unit_electrode_indices is not None: unit_electrode_indices = np.array(unit_electrode_indices)[sorting.ids_to_indices(unit_ids)] unit_ids = sorting.unit_ids # Extract properties for property in properties_to_extract: data = sorting.get_property(property) if isinstance(data[0], (bool, np.bool_)): data = data.astype(str) index = isinstance(data[0], (list, np.ndarray, tuple)) description = property_descriptions.get(property, "No description.") data_to_add[property].update(description=description, data=data, index=index) if property in ["max_channel", "max_electrode"] and nwbfile.electrodes is not None: data_to_add[property].update(table=nwbfile.electrodes) # Unit name logic if "unit_name" in data_to_add: # if 'unit_name' is set as a property, it is used to override default unit_ids (and "id") unit_name_array = data_to_add["unit_name"]["data"] else: unit_name_array = unit_ids.astype("str", copy=False) data_to_add["unit_name"].update(description="Unique reference for each unit.", data=unit_name_array) units_table_previous_properties = set(units_table.colnames) - {"spike_times"} extracted_properties = set(data_to_add) properties_to_add_by_rows = units_table_previous_properties | {"id"} properties_to_add_by_columns = extracted_properties - properties_to_add_by_rows # Find default values for properties / columns already in the table type_to_default_value = {list: [], np.ndarray: np.array(np.nan), str: "", Real: np.nan} property_to_default_values = {"id": None} for property in units_table_previous_properties: # Find a matching data type and get the default value sample_data = units_table[property].data[0] matching_type = next(type for type in type_to_default_value if isinstance(sample_data, type)) default_value = type_to_default_value[matching_type] property_to_default_values.update({property: default_value}) # Add data by rows excluding the rows with previously added unit names unit_names_used_previously = [] if "unit_name" in units_table_previous_properties: unit_names_used_previously = units_table["unit_name"].data has_electrodes_column = "electrodes" in units_table.colnames properties_with_data = {property for property in properties_to_add_by_rows if "data" in data_to_add[property]} rows_in_data = [index for index in range(sorting.get_num_units())] if not has_electrodes_column: rows_to_add = [index for index in rows_in_data if unit_name_array[index] not in unit_names_used_previously] else: rows_to_add = [] for index in rows_in_data: if unit_name_array[index] not in unit_names_used_previously: rows_to_add.append(index) else: unit_name = unit_name_array[index] previous_electrodes = units_table[np.where(units_table["unit_name"][:] == unit_name)[0]].electrodes if list(previous_electrodes.values[0]) != list(unit_electrode_indices[index]): rows_to_add.append(index) for row in rows_to_add: unit_kwargs = dict(property_to_default_values) for property in properties_with_data: unit_kwargs[property] = data_to_add[property]["data"][row] spike_times = [] # Extract and concatenate the spike times from multiple segments for segment_index in range(sorting.get_num_segments()): segment_spike_times = sorting.get_unit_spike_train( unit_id=unit_ids[row], segment_index=segment_index, return_times=True ) spike_times.append(segment_spike_times) spike_times = np.concatenate(spike_times) if waveform_means is not None: unit_kwargs["waveform_mean"] = waveform_means[row] if waveform_sds is not None: unit_kwargs["waveform_sd"] = waveform_sds[row] if unit_electrode_indices is not None: unit_kwargs["electrodes"] = unit_electrode_indices[row] units_table.add_unit(spike_times=spike_times, **unit_kwargs, enforce_unique_id=True) # added_unit_table_ids = units_table.id[-len(rows_to_add) :] # TODO - this line is unused? # Add unit_name as a column and fill previously existing rows with unit_name equal to str(ids) previous_table_size = len(units_table.id[:]) - len(unit_name_array) if "unit_name" in properties_to_add_by_columns: cols_args = data_to_add["unit_name"] data = cols_args["data"] previous_ids = units_table.id[:previous_table_size] default_value = np.array(previous_ids).astype("str") extended_data = np.hstack([default_value, data]) cols_args["data"] = extended_data units_table.add_column("unit_name", **cols_args) # Build a channel name to electrode table index map table_df = units_table.to_dataframe().reset_index() unit_name_to_electrode_index = { unit_name: table_df.query(f"unit_name=='{unit_name}'").index[0] for unit_name in unit_name_array } indexes_for_new_data = [unit_name_to_electrode_index[unit_name] for unit_name in unit_name_array] indexes_for_default_values = table_df.index.difference(indexes_for_new_data).values # Add properties as columns for property in properties_to_add_by_columns - {"unit_name"}: cols_args = data_to_add[property] data = cols_args["data"] if np.issubdtype(data.dtype, np.integer): data = data.astype("float") # Find first matching data-type sample_data = data[0] matching_type = next(type for type in type_to_default_value if isinstance(sample_data, type)) default_value = type_to_default_value[matching_type] extended_data = np.empty(shape=len(units_table.id[:]), dtype=data.dtype) extended_data[indexes_for_new_data] = data extended_data[indexes_for_default_values] = default_value # Always store numpy objects as strings if np.issubdtype(extended_data.dtype, np.object_): extended_data = extended_data.astype("str", copy=False) cols_args["data"] = extended_data units_table.add_column(property, **cols_args)
def add_sorting( sorting: BaseSorting, nwbfile: Optional[pynwb.NWBFile] = None, unit_ids: Optional[List[Union[str, int]]] = None, property_descriptions: Optional[dict] = None, skip_properties: Optional[List[str]] = None, skip_features: Optional[List[str]] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", units_description: str = "Autogenerated by neuroconv.", ): assert write_as in [ "units", "processing", ], f"Argument write_as ({write_as}) should be one of 'units' or 'processing'!" write_in_processing_module = False if write_as == "units" else True add_units_table( sorting=sorting, unit_ids=unit_ids, nwbfile=nwbfile, property_descriptions=property_descriptions, skip_properties=skip_properties, skip_features=skip_features, write_in_processing_module=write_in_processing_module, units_table_name=units_name, unit_table_description=units_description, write_waveforms=False, )
[docs]def write_sorting( sorting: BaseSorting, nwbfile_path: Optional[FilePathType] = None, nwbfile: Optional[pynwb.NWBFile] = None, metadata: Optional[dict] = None, overwrite: bool = False, verbose: bool = True, unit_ids: Optional[List[Union[str, int]]] = None, property_descriptions: Optional[dict] = None, skip_properties: Optional[List[str]] = None, skip_features: Optional[List[str]] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", units_description: str = "Autogenerated by neuroconv.", ): """ Primary method for writing a SortingExtractor object to an NWBFile. Parameters ---------- sorting : spikeinterface.BaseSorting nwbfile_path : FilePathType, optional Path for where to write or load (if overwrite=False) the NWBFile. If specified, the context will always write to this location. nwbfile : NWBFile, optional If passed, this function will fill the relevant fields within the NWBFile object. E.g., calling write_recording(recording=my_recording_extractor, nwbfile=my_nwbfile) will result in the appropriate changes to the my_nwbfile object. If neither 'nwbfile_path' nor 'nwbfile' are specified, an NWBFile object will be automatically generated and returned by the function. metadata : dict, optional Metadata dictionary with information used to create the NWBFile when one does not exist or overwrite=True. overwrite : bool, default: False Whether to overwrite the NWBFile if one exists at the nwbfile_path. The default is False (append mode). verbose : bool, default: True If 'nwbfile_path' is specified, informs user after a successful write operation. unit_ids : list, optional Controls the unit_ids that will be written to the nwb file. If None (default), all units are written. property_descriptions : dict, optional For each key in this dictionary which matches the name of a unit property in sorting, adds the value as a description to that custom unit column. skip_properties : list of str, optional Each string in this list that matches a unit property will not be written to the NWBFile. skip_features : list of str Each string in this list that matches a spike feature will not be written to the NWBFile. write_as : {'units', 'processing'} How to save the units table in the nwb file. Options: - 'units' will save it to the official NWBFile.Units position; recommended only for the final form of the data. - 'processing' will save it to the processing module to serve as a historical provenance for the official table. units_name : str, default: 'units' The name of the units table. If write_as=='units', then units_name must also be 'units'. units_description : str, default: 'Autogenerated by neuroconv.' """ with make_or_load_nwbfile( nwbfile_path=nwbfile_path, nwbfile=nwbfile, metadata=metadata, overwrite=overwrite, verbose=verbose ) as nwbfile_out: add_sorting( sorting=sorting, nwbfile=nwbfile_out, unit_ids=unit_ids, property_descriptions=property_descriptions, skip_properties=skip_properties, skip_features=skip_features, write_as=write_as, units_name=units_name, units_description=units_description, )
[docs]def add_waveforms( waveform_extractor: WaveformExtractor, nwbfile: Optional[pynwb.NWBFile] = None, metadata: Optional[dict] = None, recording: Optional[BaseRecording] = None, unit_ids: Optional[List[Union[str, int]]] = None, skip_properties: Optional[List[str]] = None, property_descriptions: Optional[dict] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", units_description: str = "Autogenerated by neuroconv.", ): """ Primary method for writing a WaveformExtractor object to an NWBFile. Parameters ---------- waveform_extractor : WaveformExtractor nwbfile : NWBFile, optional If passed, this function will fill the relevant fields within the NWBFile object. E.g., calling write_recording(recording=my_recording_extractor, nwbfile=my_nwbfile) will result in the appropriate changes to the my_nwbfile object. If neither 'nwbfile_path' nor 'nwbfile' are specified, an NWBFile object will be automatically generated and returned by the function. metadata : dict, optional Metadata dictionary with information used to create the NWBFile when one does not exist or overwrite=True. The "Ecephys" section of metadata is also used to create electrodes and electrical series fields. recording : BaseRecording, optional If the waveform_extractor is 'recordingless', this argument needs to be passed to save electrode info. Otherwise, electrodes info is not added to the nwb file. unit_ids : list, optional Controls the unit_ids that will be written to the nwb file. If None (default), all units are written. property_descriptions : dict, optional For each key in this dictionary which matches the name of a unit property in sorting, adds the value as a description to that custom unit column. skip_properties : list of str, optional Each string in this list that matches a unit property will not be written to the NWBFile. write_as : {'units', 'processing'} How to save the units table in the nwb file. Options: - 'units' will save it to the official NWBFile.Units position; recommended only for the final form of the data. - 'processing' will save it to the processing module to serve as a historical provenance for the official table. units_name : str, optional, default: 'units' The name of the units table. If write_as=='units', then units_name must also be 'units'. units_description : str, default: 'Autogenerated by neuroconv.' """ # TODO: move into add_units assert write_as in [ "units", "processing", ], f"Argument write_as ({write_as}) should be one of 'units' or 'processing'!" if write_as == "units": assert units_name == "units", "When writing to the nwbfile.units table, the name of the table must be 'units'!" write_in_processing_module = False if write_as == "units" else True # retrieve templates and stds template_means = waveform_extractor.get_all_templates() template_stds = waveform_extractor.get_all_templates(mode="std") sorting = waveform_extractor.sorting if unit_ids is not None: unit_indices = sorting.ids_to_indices(unit_ids) template_means = template_means[unit_indices] template_stds = template_stds[unit_indices] # metrics properties (quality, template) are added as properties to the sorting copy sorting_copy = sorting.select_units(unit_ids=sorting.unit_ids) if waveform_extractor.is_extension("quality_metrics"): qm = waveform_extractor.load_extension("quality_metrics").get_data() for prop in qm.columns: if prop not in sorting_copy.get_property_keys(): sorting_copy.set_property(prop, qm[prop]) if waveform_extractor.is_extension("template_metrics"): tm = waveform_extractor.load_extension("template_metrics").get_data() for prop in tm.columns: if prop not in sorting_copy.get_property_keys(): sorting_copy.set_property(prop, tm[prop]) add_electrodes_info(recording, nwbfile=nwbfile, metadata=metadata) electrode_group_indices = get_electrode_group_indices(recording, nwbfile=nwbfile) unit_electrode_indices = [electrode_group_indices] * len(sorting.unit_ids) add_units_table( sorting=sorting_copy, nwbfile=nwbfile, unit_ids=unit_ids, property_descriptions=property_descriptions, skip_properties=skip_properties, write_in_processing_module=write_in_processing_module, units_table_name=units_name, unit_table_description=units_description, write_waveforms=False, waveform_means=template_means, waveform_sds=template_stds, unit_electrode_indices=unit_electrode_indices, )
[docs]def write_waveforms( waveform_extractor: WaveformExtractor, nwbfile_path: Optional[FilePathType] = None, nwbfile: Optional[pynwb.NWBFile] = None, metadata: Optional[dict] = None, overwrite: bool = False, recording: Optional[BaseRecording] = None, verbose: bool = True, unit_ids: Optional[List[Union[str, int]]] = None, write_electrical_series: bool = False, add_electrical_series_kwargs: Optional[dict] = None, skip_properties: Optional[List[str]] = None, property_descriptions: Optional[dict] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", units_description: str = "Autogenerated by neuroconv.", ): """ Primary method for writing a WaveformExtractor object to an NWBFile. Parameters ---------- waveform_extractor : WaveformExtractor nwbfile_path : FilePathType Path for where to write or load (if overwrite=False) the NWBFile. If specified, the context will always write to this location. nwbfile : NWBFile, optional If passed, this function will fill the relevant fields within the NWBFile object. E.g., calling write_recording(recording=my_recording_extractor, nwbfile=my_nwbfile) will result in the appropriate changes to the my_nwbfile object. If neither 'nwbfile_path' nor 'nwbfile' are specified, an NWBFile object will be automatically generated and returned by the function. metadata : dict, optional Metadata dictionary with information used to create the NWBFile when one does not exist or overwrite=True. The "Ecephys" section of metadata is also used to create electrodes and electrical series fields. overwrite : bool, default: False Whether to overwrite the NWBFile if one exists at the nwbfile_path. recording : BaseRecording, optional If the waveform_extractor is 'recordingless', this argument needs to be passed to save electrode info. Otherwise, electrodes info is not added to the nwb file. verbose : bool, default: True If 'nwbfile_path' is specified, informs user after a successful write operation. unit_ids : list, optional Controls the unit_ids that will be written to the nwb file. If None (default), all units are written. write_electrical_series : bool, default: False If True, the recording object associated to the WaveformExtractor is written as an electrical series. add_electrical_series_kwargs: dict, optional Keyword arguments to control the `add_electrical_series()` function in case write_electrical_series=True property_descriptions: dict, optional For each key in this dictionary which matches the name of a unit property in sorting, adds the value as a description to that custom unit column. skip_properties: list of str, optional Each string in this list that matches a unit property will not be written to the NWBFile. write_as: {'units', 'processing'} How to save the units table in the nwb file. Options: - 'units' will save it to the official NWBFile.Units position; recommended only for the final form of the data. - 'processing' will save it to the processing module to serve as a historical provenance for the official table. units_name : str, default: 'units' The name of the units table. If write_as=='units', then units_name must also be 'units'. units_description : str, default: 'Autogenerated by neuroconv.' """ metadata = metadata if metadata is not None else dict() # try: with make_or_load_nwbfile( nwbfile_path=nwbfile_path, nwbfile=nwbfile, metadata=metadata, overwrite=overwrite, verbose=verbose ) as nwbfile_out: if waveform_extractor_has_recording(waveform_extractor): recording = waveform_extractor.recording assert recording is not None, ( "recording not found. To add the electrode table, the waveform_extractor " "needs to have a recording attached or the 'recording' argument needs to be used." ) if write_electrical_series: add_electrical_series_kwargs = add_electrical_series_kwargs or dict() add_electrical_series( recording=recording, nwbfile=nwbfile_out, metadata=metadata, **add_electrical_series_kwargs ) add_waveforms( waveform_extractor=waveform_extractor, nwbfile=nwbfile, metadata=metadata, recording=recording, unit_ids=unit_ids, skip_properties=skip_properties, property_descriptions=property_descriptions, write_as=write_as, units_name=units_name, units_description=units_description, )
def get_electrode_group_indices(recording, nwbfile): if "group_name" in recording.get_property_keys(): group_names = list(np.unique(recording.get_property("group_name"))) elif "group" in recording.get_property_keys(): group_names = list(np.unique(recording.get_property("group").astype(str))) else: group_names = None if group_names is None: electrode_group_indices = None else: electrode_group_indices = nwbfile.electrodes.to_dataframe().query(f"group_name in {group_names}").index.values return electrode_group_indices
[docs]def waveform_extractor_has_recording(waveform_extractor) -> bool: # TODO - this can probably be replaced now """ Temporary helper function to substitute unreleased built-in waveform_extractor.has_recording(). Parameters ---------- waveform_extractor : si.WaveformExtractor The waveform extractor Returns ------- bool True if the waveform_extractor has an attached recording, False otherwise """ return waveform_extractor._recording is not None