import uuid
import warnings
from collections import defaultdict
from numbers import Real
from pathlib import Path
from typing import List, Literal, Optional, Union
import numpy as np
import psutil
import pynwb
from hdmf.backends.hdf5.h5_utils import H5DataIO
from hdmf.data_utils import AbstractDataChunkIterator, DataChunkIterator
from packaging.version import Version
from spikeinterface import BaseRecording, BaseSorting, WaveformExtractor
from .spikeinterfacerecordingdatachunkiterator import (
SpikeInterfaceRecordingDataChunkIterator,
)
from ..importing import get_package_version
from ..nwb_helpers import get_module, make_or_load_nwbfile
from ...utils import (
DeepDict,
FilePathType,
calculate_regular_series_rate,
dict_deep_update,
)
[docs]def add_devices(nwbfile: pynwb.NWBFile, metadata: Optional[DeepDict] = None):
"""
Add device information to nwbfile object.
Will always ensure nwbfile has at least one device, but multiple
devices within the metadata list will also be created.
Parameters
----------
nwbfile: NWBFile
nwb file to which the recording information is to be added
metadata: DeepDict
metadata info for constructing the nwb file (optional).
Should be of the format
metadata['Ecephys']['Device'] = [
{
'name': my_name,
'description': my_description
},
...
]
Missing keys in an element of metadata['Ecephys']['Device'] will be auto-populated with defaults.
"""
if nwbfile is not None:
assert isinstance(nwbfile, pynwb.NWBFile), "'nwbfile' should be of type pynwb.NWBFile"
# Default Device metadata
defaults = dict(name="Device", description="Ecephys probe. Automatically generated.")
if metadata is None:
metadata = dict()
if "Ecephys" not in metadata:
metadata["Ecephys"] = dict()
if "Device" not in metadata["Ecephys"]:
metadata["Ecephys"]["Device"] = [defaults]
for dev in metadata["Ecephys"]["Device"]:
if dev.get("name", defaults["name"]) not in nwbfile.devices:
nwbfile.create_device(**dict(defaults, **dev))
[docs]def add_electrode_groups(recording: BaseRecording, nwbfile: pynwb.NWBFile, metadata: dict = None):
"""
Add electrode group information to nwbfile object.
Will always ensure nwbfile has at least one electrode group.
Will auto-generate a linked device if the specified name does not exist in the nwbfile.
Parameters
----------
recording: spikeinterface.BaseRecording
nwbfile: pynwb.NWBFile
nwb file to which the recording information is to be added
metadata: dict
metadata info for constructing the nwb file (optional).
Should be of the format
metadata['Ecephys']['ElectrodeGroup'] = [
{
'name': my_name,
'description': my_description,
'location': electrode_location,
'device': my_device_name
},
...
]
Missing keys in an element of metadata['Ecephys']['ElectrodeGroup'] will be auto-populated with defaults.
Group names set by RecordingExtractor channel properties will also be included with passed metadata,
but will only use default description and location.
"""
assert isinstance(nwbfile, pynwb.NWBFile), "'nwbfile' should be of type pynwb.NWBFile"
if len(nwbfile.devices) == 0:
warnings.warn("When adding ElectrodeGroup, no Devices were found on nwbfile. Creating a Device now...")
add_devices(nwbfile=nwbfile, metadata=metadata)
if metadata is None:
metadata = dict()
if "Ecephys" not in metadata:
metadata["Ecephys"] = dict()
if "group_name" in recording.get_property_keys():
group_names = np.unique(recording.get_property("group_name"))
else:
group_names = np.unique(recording.get_channel_groups()).astype("str", copy=False)
defaults = [
dict(
name=group_name,
description="no description",
location="unknown",
device=[i.name for i in nwbfile.devices.values()][0],
)
for group_name in group_names
]
if "ElectrodeGroup" not in metadata["Ecephys"]:
metadata["Ecephys"]["ElectrodeGroup"] = defaults
assert all(
[isinstance(x, dict) for x in metadata["Ecephys"]["ElectrodeGroup"]]
), "Expected metadata['Ecephys']['ElectrodeGroup'] to be a list of dictionaries!"
for grp in metadata["Ecephys"]["ElectrodeGroup"]:
if grp.get("name", defaults[0]["name"]) not in nwbfile.electrode_groups:
device_name = grp.get("device", defaults[0]["device"])
if device_name not in nwbfile.devices:
new_device_metadata = dict(Ecephys=dict(Device=[dict(name=device_name)]))
add_devices(nwbfile=nwbfile, metadata=new_device_metadata)
warnings.warn(
f"Device '{device_name}' not detected in "
"attempted link to electrode group! Automatically generating."
)
electrode_group_kwargs = dict(defaults[0], **grp)
electrode_group_kwargs.update(device=nwbfile.devices[device_name])
nwbfile.create_electrode_group(**electrode_group_kwargs)
if not nwbfile.electrode_groups:
device_name = list(nwbfile.devices.keys())[0]
device = nwbfile.devices[device_name]
if len(nwbfile.devices) > 1:
warnings.warn(
"More than one device found when adding electrode group "
f"via channel properties: using device '{device_name}'. To use a "
"different device, indicate it the metadata argument."
)
electrode_group_kwargs = dict(defaults[0])
electrode_group_kwargs.update(device=device)
for group_name in np.unique(recording.get_channel_groups()).tolist():
electrode_group_kwargs.update(name=str(group_name))
nwbfile.create_electrode_group(**electrode_group_kwargs)
[docs]def add_electrodes(recording: BaseRecording, nwbfile: pynwb.NWBFile, metadata: dict = None, exclude: tuple = ()):
"""
Add channels from recording object as electrodes to nwbfile object.
Parameters
----------
recording: spikeinterface.BaseRecording
nwbfile: NWBFile
nwb file to which the recording information is to be added
metadata: dict
metadata info for constructing the nwb file (optional).
Should be of the format
metadata['Ecephys']['Electrodes'] = [
{
'name': my_name,
'description': my_description
},
...
]
Note that data intended to be added to the electrodes table of the NWBFile should be set as channel
properties in the RecordingExtractor object.
Missing keys in an element of metadata['Ecephys']['ElectrodeGroup'] will be auto-populated with defaults
whenever possible.
If 'my_name' is set to one of the required fields for nwbfile
electrodes (id, x, y, z, imp, location, filtering, group_name),
then the metadata will override their default values.
Setting 'my_name' to metadata field 'group' is not supported as the linking to
nwbfile.electrode_groups is handled automatically; please specify the string 'group_name' in this case.
If no group information is passed via metadata, automatic linking to existing electrode groups,
possibly including the default, will occur.
exclude: tuple
An iterable containing the string names of channel properties in the RecordingExtractor
object to ignore when writing to the NWBFile.
"""
assert isinstance(nwbfile, pynwb.NWBFile), "'nwbfile' should be of type pynwb.NWBFile"
# Test that metadata has the expected structure
electrodes_metadata = list()
if metadata is not None:
electrodes_metadata = metadata.get("Ecephys", dict()).get("Electrodes", list())
required_keys = {"name", "description"}
assert all(
[isinstance(property, dict) and set(property.keys()) == required_keys for property in electrodes_metadata]
), (
"Expected metadata['Ecephys']['Electrodes'] to be a list of dictionaries, "
"containing the keys 'name' and 'description'"
)
assert all(
[property["name"] != "group" for property in electrodes_metadata]
), "Passing metadata field 'group' is deprecated; pass group_name instead!"
# Transform to a dict that maps property name to its description
property_descriptions = dict()
for property in electrodes_metadata:
property_descriptions[property["name"]] = property["description"]
# 1. Build columns details from extractor properties: dict(name: dict(description='',data=data, index=False))
data_to_add = defaultdict(dict)
recorder_properties = recording.get_property_keys()
excluded_properties = list(exclude) + ["contact_vector"]
properties_to_extract = [property for property in recorder_properties if property not in excluded_properties]
for property in properties_to_extract:
data = recording.get_property(property)
index = isinstance(data[0], (list, np.ndarray, tuple))
# booleans are parsed as strings
if isinstance(data[0], (bool, np.bool_)):
data = data.astype(str)
# Fill with provided custom descriptions
description = property_descriptions.get(property, "no description")
data_to_add[property].update(description=description, data=data, index=index)
# Channel name logic
channel_ids = recording.get_channel_ids()
if "channel_name" in data_to_add:
# if 'channel_name' is set as a property, it is used to override default channel_ids (and "id")
channel_name_array = data_to_add["channel_name"]["data"]
else:
channel_name_array = channel_ids.astype("str", copy=False)
data_to_add["channel_name"].update(description="unique channel reference", data=channel_name_array, index=False)
# Location in spikeinterface is equivalent to rel_x, rel_y, rel_z in the nwb standard
if "location" in data_to_add:
data = data_to_add["location"]["data"]
column_number_to_property = {0: "rel_x", 1: "rel_y", 2: "rel_z"}
for column_number in range(data.shape[1]):
property = column_number_to_property[column_number]
data_to_add[property].update(description=property, data=data[:, column_number], index=False)
data_to_add.pop("location")
# Brain area is the meaning of location in the nwb standard
if "brain_area" in data_to_add:
data_to_add["location"] = data_to_add["brain_area"]
data_to_add["location"].update(description="location")
data_to_add.pop("brain_area")
# If no group_names are provided, use information from groups or default values
if "group_name" in data_to_add:
group_name_array = data_to_add["group_name"]["data"].astype("str", copy=False)
else:
if "group" in data_to_add:
group_name_array = data_to_add["group"]["data"].astype("str", copy=False)
else:
default_group_name = "ElectrodeGroup"
group_name_array = np.full(channel_name_array.size, fill_value=default_group_name)
group_name_array[group_name_array == ""] = "ElectrodeGroup"
data_to_add["group_name"].update(description="group_name", data=group_name_array, index=False)
# Add missing groups to the nwb file
groupless_names = [group_name for group_name in group_name_array if group_name not in nwbfile.electrode_groups]
if len(groupless_names) > 0:
electrode_group_list = [dict(name=group_name) for group_name in groupless_names]
missing_group_metadata = dict(Ecephys=dict(ElectrodeGroup=electrode_group_list))
add_electrode_groups(recording=recording, nwbfile=nwbfile, metadata=missing_group_metadata)
group_list = [nwbfile.electrode_groups[group_name] for group_name in group_name_array]
data_to_add["group"].update(description="the ElectrodeGroup object", data=group_list, index=False)
# 2 Divide properties to those that will be added as rows (default plus previous) and columns (new properties)
# This mapping contains all the defaults that might be required by pre-defined columns on the NWB schema
# https://nwb-schema.readthedocs.io/en/latest/format.html#groups-general-extracellular-ephys-electrodes
required_schema_property_to_default_value = dict(
id=None,
group=None,
group_name="default",
location="unknown",
)
optional_schema_property_to_default_value = dict(
x=np.nan,
y=np.nan,
z=np.nan,
# There doesn't seem to be a canonical default for impedance, if missing.
# The NwbRecordingExtractor follows the -1.0 convention, other scripts sometimes use np.nan
imp=-1.0,
filtering="none",
)
required_schema_properties = set(required_schema_property_to_default_value)
optional_schema_properties = set(optional_schema_property_to_default_value)
schema_properties = required_schema_properties | optional_schema_properties
electrode_table_previous_properties = set(nwbfile.electrodes.colnames) if nwbfile.electrodes else set()
extracted_properties = set(data_to_add)
properties_to_add_by_rows = required_schema_properties | electrode_table_previous_properties
properties_to_add_by_columns = extracted_properties - properties_to_add_by_rows
# Find default values for a subset of optional schema defined properties already in the electrode table
all_properties_to_default_value = dict(required_schema_property_to_default_value)
for optional_property in optional_schema_properties.intersection(electrode_table_previous_properties):
all_properties_to_default_value.update(
{optional_property: optional_schema_property_to_default_value[optional_property]}
)
# Find default values for custom (not schema defined) properties / columns already in the electrode table
type_to_default_value = {list: [], np.ndarray: np.array(np.nan), str: "", Real: np.nan}
for property in electrode_table_previous_properties - schema_properties:
# Find a matching data type and get the default value
sample_data = nwbfile.electrodes[property].data[0]
matching_type = next(type for type in type_to_default_value if isinstance(sample_data, type))
default_value = type_to_default_value[matching_type]
all_properties_to_default_value.update({property: default_value})
# Add data by rows excluding the rows containing channel_names and group_names that were previously added
# The same channel_name can be added provided that it belongs to a different group
channel_group_names_used_previously = []
if "channel_name" in electrode_table_previous_properties and "group_name" in electrode_table_previous_properties:
channel_group_names_used_previously = [
(ch_name, gr_name)
for ch_name, gr_name in zip(nwbfile.electrodes["channel_name"].data, nwbfile.electrodes["group_name"].data)
]
properties_with_data = [property for property in properties_to_add_by_rows if "data" in data_to_add[property]]
rows_in_data = [index for index in range(recording.get_num_channels())]
rows_to_add = [
index
for index in rows_in_data
if (channel_name_array[index], group_name_array[index]) not in channel_group_names_used_previously
]
for row in rows_to_add:
electrode_kwargs = dict(all_properties_to_default_value)
for property in properties_with_data:
electrode_kwargs[property] = data_to_add[property]["data"][row]
nwbfile.add_electrode(**electrode_kwargs, enforce_unique_id=True)
# Add channel_name as a column and fill previously existing rows with channel_name equal to str(ids)
previous_table_size = len(nwbfile.electrodes.id[:]) - len(channel_name_array)
if "channel_name" in properties_to_add_by_columns:
cols_args = data_to_add["channel_name"]
data = cols_args["data"]
previous_ids = nwbfile.electrodes.id[:previous_table_size]
default_value = np.array(previous_ids).astype("str")
extended_data = np.hstack([default_value, data])
cols_args["data"] = extended_data
nwbfile.add_electrode_column("channel_name", **cols_args)
# Build a channel name to electrode table index map
electrodes_df = nwbfile.electrodes.to_dataframe().reset_index()
channel_name_to_electrode_index = {
channel_name: electrodes_df.query(f"channel_name=='{channel_name}'").index[0]
for channel_name in channel_name_array
}
indexes_for_new_data = [channel_name_to_electrode_index[channel_name] for channel_name in channel_name_array]
indexes_for_default_values = electrodes_df.index.difference(indexes_for_new_data).values
# Add properties as columns
for property in properties_to_add_by_columns - {"channel_name"}:
cols_args = data_to_add[property]
data = cols_args["data"]
if np.issubdtype(data.dtype, np.integer):
data = data.astype("float")
if property in optional_schema_property_to_default_value:
default_value = optional_schema_property_to_default_value[property]
else: # Find first matching data-type for custom column
sample_data = data[0]
matching_type = next(type for type in type_to_default_value if isinstance(sample_data, type))
default_value = type_to_default_value[matching_type]
extended_data = np.empty(shape=len(nwbfile.electrodes.id[:]), dtype=data.dtype)
extended_data[indexes_for_new_data] = data
extended_data[indexes_for_default_values] = default_value
cols_args["data"] = extended_data
nwbfile.add_electrode_column(property, **cols_args)
[docs]def check_if_recording_traces_fit_into_memory(recording: BaseRecording, segment_index: int = 0) -> None:
"""
Raises an error if the full traces of a recording extractor are larger than psutil.virtual_memory().available.
Parameters
----------
recording : spikeinterface.BaseRecording
A recording extractor object from spikeinterface.
segment_index : int, optional
The segment index of the recording extractor object, by default 0
Raises
------
MemoryError
"""
element_size_in_bytes = recording.get_dtype().itemsize
num_channels = recording.get_num_channels()
num_frames = recording.get_num_frames(segment_index=segment_index)
traces_size_in_bytes = element_size_in_bytes * num_channels * num_frames
available_memory_in_bytes = psutil.virtual_memory().available
if traces_size_in_bytes > available_memory_in_bytes:
message = (
f"Memory error, full electrical series is {round(traces_size_in_bytes/1e9, 2)} GB) but only"
f"({round(available_memory_in_bytes/1e9, 2)} GB are available. Use iterator_type='V2'"
)
raise MemoryError(message)
[docs]def _recording_traces_to_hdmf_iterator(
recording: BaseRecording,
segment_index: int = None,
return_scaled: bool = False,
iterator_type: str = "v2",
iterator_opts: dict = None,
) -> AbstractDataChunkIterator:
"""Function to wrap traces of spikeinterface recording into an AbstractDataChunkIterator.
Parameters
----------
recording : spikeinterface.BaseRecording
A recording extractor from spikeinterface
segment_index : int, optional
The recording segment to add to the NWBFile.
return_scaled : bool, defaults to False
When True recording extractor objects from spikeinterface return their traces in microvolts.
iterator_type: {"v2", "v1", None}, default: 'v2'
The type of DataChunkIterator to use.
'v1' is the original DataChunkIterator of the hdmf data_utils.
'v2' is the locally developed SpikeInterfaceRecordingDataChunkIterator, which offers full control over chunking.
None: write the TimeSeries with no memory chunking.
iterator_opts: dict, optional
Dictionary of options for the iterator.
See https://hdmf.readthedocs.io/en/stable/hdmf.data_utils.html#hdmf.data_utils.GenericDataChunkIterator
for the full list of options.
Returns
-------
traces_as_iterator: AbstractDataChunkIterator
The traces of the recording extractor wrapped in an iterator object.
Raises
------
ValueError
If the iterator_type is not 'v1', 'v2' or None.
"""
supported_iterator_types = ["v1", "v2", None]
if iterator_type not in supported_iterator_types:
message = f"iterator_type {iterator_type} should be either 'v1', 'v2' (recommended) or None"
raise ValueError(message)
iterator_opts = dict() if iterator_opts is None else iterator_opts
if iterator_type is None:
check_if_recording_traces_fit_into_memory(recording=recording, segment_index=segment_index)
traces_as_iterator = recording.get_traces(return_scaled=return_scaled, segment_index=segment_index)
elif iterator_type == "v2":
traces_as_iterator = SpikeInterfaceRecordingDataChunkIterator(
recording=recording,
segment_index=segment_index,
return_scaled=return_scaled,
**iterator_opts,
)
elif iterator_type == "v1":
traces_as_iterator = DataChunkIterator(
data=recording.get_traces(return_scaled=return_scaled, segment_index=segment_index), **iterator_opts
)
else:
raise ValueError("iterator_type must be None, 'v1', or 'v2'.")
return traces_as_iterator
[docs]def add_electrical_series(
recording: BaseRecording,
nwbfile: pynwb.NWBFile,
metadata: dict = None,
segment_index: int = 0,
starting_time: Optional[float] = None,
write_as: Literal["raw", "processed", "lfp"] = "raw",
es_key: str = None,
write_scaled: bool = False,
compression: Optional[str] = "gzip",
compression_opts: Optional[int] = None,
iterator_type: Optional[str] = "v2",
iterator_opts: Optional[dict] = None,
):
"""
Adds traces from recording object as ElectricalSeries to an NWBFile object.
Parameters
----------
recording : SpikeInterfaceRecording
A recording extractor from spikeinterface
nwbfile : NWBFile
nwb file to which the recording information is to be added
metadata : dict, optional
metadata info for constructing the nwb file.
Should be of the format
metadata['Ecephys']['ElectricalSeries'] = dict(
name=my_name,
description=my_description
)
segment_index : int, default: 0
The recording segment to add to the NWBFile.
starting_time : float, optional
Sets the starting time of the ElectricalSeries to a manually set value.
write_as : {'raw', 'processed', 'lfp'}
How to save the traces data in the nwb file. Options:
- 'raw': save it in acquisition
- 'processed': save it as FilteredEphys, in a processing module
- 'lfp': save it as LFP, in a processing module
es_key : str, optional
Key in metadata dictionary containing metadata info for the specific electrical series
write_scaled : bool, default: False
If True, writes the traces in uV with the right conversion.
If False , the data is stored as it is and the right conversions factors are added to the nwbfile.
compression: {None, 'gzip', 'lzf'}, default: 'gzip'
Type of compression to use. Set to None to disable all compression.
To use the `configure_backend` function, you should set this to None.
compression_opts: int, default: 4
Only applies to compression="gzip". Controls the level of the GZIP.
iterator_type: {"v2", "v1", None}, default: 'v2'
The type of DataChunkIterator to use.
'v1' is the original DataChunkIterator of the hdmf data_utils.
'v2' is the locally developed SpikeInterfaceRecordingDataChunkIterator, which offers full control over chunking.
None: write the TimeSeries with no memory chunking.
iterator_opts: dict, optional
Dictionary of options for the iterator.
See https://hdmf.readthedocs.io/en/stable/hdmf.data_utils.html#hdmf.data_utils.GenericDataChunkIterator
for the full list of options.
Missing keys in an element of metadata['Ecephys']['ElectrodeGroup'] will be auto-populated with defaults
whenever possible.
"""
assert write_as in [
"raw",
"processed",
"lfp",
], f"'write_as' should be 'raw', 'processed' or 'lfp', but instead received value {write_as}"
modality_signature = write_as.upper() if write_as == "lfp" else write_as.capitalize()
default_name = f"ElectricalSeries{modality_signature}"
default_description = dict(raw="Raw acquired data", lfp="Processed data - LFP", processed="Processed data")
eseries_kwargs = dict(name=default_name, description=default_description[write_as])
# Select and/or create module if lfp or processed data is to be stored.
if write_as in ["lfp", "processed"]:
ecephys_mod = get_module(
nwbfile=nwbfile,
name="ecephys",
description="Intermediate data from extracellular electrophysiology recordings, e.g., LFP.",
)
if write_as == "lfp" and "LFP" not in ecephys_mod.data_interfaces:
ecephys_mod.add(pynwb.ecephys.LFP(name="LFP"))
if write_as == "processed" and "Processed" not in ecephys_mod.data_interfaces:
ecephys_mod.add(pynwb.ecephys.FilteredEphys(name="Processed"))
if metadata is not None and "Ecephys" in metadata and es_key is not None:
assert es_key in metadata["Ecephys"], f"metadata['Ecephys'] dictionary does not contain key '{es_key}'"
eseries_kwargs.update(metadata["Ecephys"][es_key])
# If the recording extractor has more than 1 segment, append numbers to the names so that the names are unique.
# 0-pad these names based on the number of segments.
# If there are 10 segments use 2 digits, if there are 100 segments use 3 digits, etc.
if recording.get_num_segments() > 1:
width = int(np.ceil(np.log10((recording.get_num_segments()))))
eseries_kwargs["name"] += f"{segment_index:0{width}}"
# The add_electrodes adds a column with channel name to the electrode table.
add_electrodes(recording=recording, nwbfile=nwbfile, metadata=metadata)
# That uses either the `channel_name` property or the channel ids as string otherwise.
channel_names = recording.get_property("channel_name")
if channel_names is None:
channel_names = recording.get_channel_ids().astype("str")
# We use those channels to select the electrodes to be added to the ElectricalSeries
channel_name_column = nwbfile.electrodes["channel_name"][:]
mask = np.isin(channel_name_column, channel_names)
table_ids = np.nonzero(mask)[0]
electrode_table_region = nwbfile.create_electrode_table_region(
region=table_ids.tolist(), description="electrode_table_region"
)
eseries_kwargs.update(electrodes=electrode_table_region)
# Spikeinterface guarantees data in micro volts when return_scaled=True. This multiplies by gain and adds offsets
# In nwb to get traces in Volts we take data*channel_conversion*conversion + offset
channel_conversion = recording.get_channel_gains()
channel_offset = recording.get_channel_offsets()
unique_channel_conversion = np.unique(channel_conversion)
unique_channel_conversion = unique_channel_conversion[0] if len(unique_channel_conversion) == 1 else None
unique_offset = np.unique(channel_offset)
if unique_offset.size > 1:
raise ValueError("Recording extractors with heterogeneous offsets are not supported")
unique_offset = unique_offset[0] if unique_offset[0] is not None else 0
micro_to_volts_conversion_factor = 1e-6
if not write_scaled and unique_channel_conversion is None:
eseries_kwargs.update(conversion=micro_to_volts_conversion_factor)
eseries_kwargs.update(channel_conversion=channel_conversion)
elif not write_scaled and unique_channel_conversion is not None:
eseries_kwargs.update(conversion=unique_channel_conversion * micro_to_volts_conversion_factor)
if not write_scaled:
eseries_kwargs.update(offset=unique_offset * micro_to_volts_conversion_factor)
# Iterator
ephys_data_iterator = _recording_traces_to_hdmf_iterator(
recording=recording,
segment_index=segment_index,
iterator_type=iterator_type,
iterator_opts=iterator_opts,
)
if compression is not None:
# in this case we assume HDF5 backend and compression
eseries_kwargs.update(
data=H5DataIO(data=ephys_data_iterator, compression=compression, compression_opts=compression_opts)
)
else:
eseries_kwargs.update(data=ephys_data_iterator)
# Now we decide whether to store the timestamps as a regular series or as an irregular series.
if recording.has_time_vector(segment_index=segment_index):
# First we check if the recording has a time vector to avoid creating artificial timestamps
timestamps = recording.get_times(segment_index=segment_index)
rate = calculate_regular_series_rate(series=timestamps) # Returns None if it is not regular
recording_t_start = timestamps[0]
else:
rate = recording.get_sampling_frequency()
recording_t_start = recording._recording_segments[segment_index].t_start or 0
starting_time = starting_time if starting_time is not None else 0
if rate:
starting_time = float(starting_time + recording_t_start)
# Note that we call the sampling frequency again because the estimated rate might be different from the
# sampling frequency of the recording extractor by some epsilon.
eseries_kwargs.update(starting_time=starting_time, rate=recording.get_sampling_frequency())
else:
shifted_timestamps = starting_time + timestamps
if compression is not None:
# in this case we assume HDF5 backend and compression
timestamps_iterator = H5DataIO(
data=shifted_timestamps, compression=compression, compression_opts=compression_opts
)
else:
timestamps_iterator = shifted_timestamps
eseries_kwargs.update(timestamps=timestamps_iterator)
# Create ElectricalSeries object and add it to nwbfile
es = pynwb.ecephys.ElectricalSeries(**eseries_kwargs)
if write_as == "raw":
nwbfile.add_acquisition(es)
elif write_as == "processed":
ecephys_mod.data_interfaces["Processed"].add_electrical_series(es)
elif write_as == "lfp":
ecephys_mod.data_interfaces["LFP"].add_electrical_series(es)
[docs]def add_electrodes_info(recording: BaseRecording, nwbfile: pynwb.NWBFile, metadata: dict = None):
"""
Add device, electrode_groups, and electrodes info to the nwbfile.
Parameters
----------
recording : SpikeInterfaceRecording
nwbfile : NWBFile
NWB file to which the recording information is to be added
metadata : dict, optional
metadata info for constructing the nwb file.
Should be of the format
metadata['Ecephys']['Electrodes'] = [
{
'name': my_name,
'description': my_description
},
...
]
Note that data intended to be added to the electrodes table of the NWBFile should be set as channel
properties in the RecordingExtractor object.
Missing keys in an element of metadata['Ecephys']['ElectrodeGroup'] will be auto-populated with defaults
whenever possible.
If 'my_name' is set to one of the required fields for nwbfile
electrodes (id, x, y, z, imp, location, filtering, group_name),
then the metadata will override their default values.
Setting 'my_name' to metadata field 'group' is not supported as the linking to
nwbfile.electrode_groups is handled automatically; please specify the string 'group_name' in this case.
If no group information is passed via metadata, automatic linking to existing electrode groups,
possibly including the default, will occur.
"""
add_devices(nwbfile=nwbfile, metadata=metadata)
add_electrode_groups(recording=recording, nwbfile=nwbfile, metadata=metadata)
add_electrodes(recording=recording, nwbfile=nwbfile, metadata=metadata)
def add_recording(
recording: BaseRecording,
nwbfile: pynwb.NWBFile,
metadata: Optional[dict] = None,
starting_time: Optional[float] = None,
write_as: Literal["raw", "processed", "lfp"] = "raw",
es_key: Optional[str] = None,
write_electrical_series: bool = True,
write_scaled: bool = False,
compression: Optional[str] = "gzip",
compression_opts: Optional[int] = None,
iterator_type: str = "v2",
iterator_opts: Optional[dict] = None,
):
if hasattr(recording, "nwb_metadata"):
metadata = dict_deep_update(recording.nwb_metadata, metadata)
elif metadata is None:
metadata = get_nwb_metadata(recording=recording)
# Convenience function to add device, electrode groups and electrodes info
add_electrodes_info(recording=recording, nwbfile=nwbfile, metadata=metadata)
if write_electrical_series:
number_of_segments = recording.get_num_segments()
for segment_index in range(number_of_segments):
add_electrical_series(
recording=recording,
nwbfile=nwbfile,
segment_index=segment_index,
starting_time=starting_time,
metadata=metadata,
write_as=write_as,
es_key=es_key,
write_scaled=write_scaled,
compression=compression,
compression_opts=compression_opts,
iterator_type=iterator_type,
iterator_opts=iterator_opts,
)
[docs]def write_recording(
recording: BaseRecording,
nwbfile_path: Optional[FilePathType] = None,
nwbfile: Optional[pynwb.NWBFile] = None,
metadata: Optional[dict] = None,
overwrite: bool = False,
verbose: bool = True,
starting_time: Optional[float] = None,
write_as: Optional[str] = "raw",
es_key: Optional[str] = None,
write_electrical_series: bool = True,
write_scaled: bool = False,
compression: Optional[str] = "gzip",
compression_opts: Optional[int] = None,
iterator_type: Optional[str] = "v2",
iterator_opts: Optional[dict] = None,
) -> pynwb.NWBFile:
"""
Primary method for writing a RecordingExtractor object to an NWBFile.
Parameters
----------
recording : spikeinterface.BaseRecording
nwbfile_path : FilePathType, optional
Path for where to write or load (if overwrite=False) the NWBFile.
If specified, the context will always write to this location.
nwbfile : NWBFile, optional
If passed, this function will fill the relevant fields within the NWBFile object.
E.g., calling
write_recording(recording=my_recording_extractor, nwbfile=my_nwbfile)
will result in the appropriate changes to the my_nwbfile object.
If neither 'nwbfile_path' nor 'nwbfile' are specified, an NWBFile object will be automatically generated
and returned by the function.
metadata : dict, optional
metadata info for constructing the nwb file (optional). Should be
of the format
metadata['Ecephys'] = {
'Device': [
{
'name': my_name,
'description': my_description
},
...
]
'ElectrodeGroup': [
{
'name': my_name,
'description': my_description,
'location': electrode_location,
'device': my_device_name
},
...
]
'Electrodes': [
{
'name': my_name,
'description': my_description
},
...
]
'ElectricalSeries' = {
'name': my_name,
'description': my_description
}
Note that data intended to be added to the electrodes table of the NWBFile should be set as channel
properties in the RecordingExtractor object.
overwrite : bool, default: False
Whether to overwrite the NWBFile if one exists at the nwbfile_path.
verbose : bool, default: True
If 'nwbfile_path' is specified, informs user after a successful write operation.
starting_time : float, optional
Sets the starting time of the ElectricalSeries to a manually set value.
write_as: {'raw', 'processed', 'lfp'}, optional
How to save the traces data in the nwb file.
- 'raw' will save it in acquisition
- 'processed' will save it as FilteredEphys, in a processing module
- 'lfp' will save it as LFP, in a processing module
es_key: str, optional
Key in metadata dictionary containing metadata info for the specific electrical series
write_electrical_series: bool, default: True
If True, electrical series are written in acquisition. If False, only device, electrode_groups,
and electrodes are written to NWB.
write_scaled: bool, default: True
If True, writes the scaled traces (return_scaled=True)
compression: {None, 'gzip', 'lzp'}, default: 'gzip'
Type of compression to use. Set to None to disable all compression.
To use the `configure_backend` function, you should set this to None.
compression_opts: int, optional, default: 4
Only applies to compression="gzip". Controls the level of the GZIP.
iterator_type: {"v2", "v1", None}
The type of DataChunkIterator to use.
'v1' is the original DataChunkIterator of the hdmf data_utils.
'v2' is the locally developed SpikeInterfaceRecordingDataChunkIterator, which offers full control over chunking.
None: write the TimeSeries with no memory chunking.
iterator_opts: dict, optional
Dictionary of options for the RecordingExtractorDataChunkIterator (iterator_type='v2').
Valid options are:
buffer_gb : float, default: 1.0
In units of GB. Recommended to be as much free RAM as available. Automatically calculates suitable
buffer shape.
buffer_shape : tuple, optional
Manual specification of buffer shape to return on each iteration.
Must be a multiple of chunk_shape along each axis.
Cannot be set if `buffer_gb` is specified.
chunk_mb : float. default: 1.0
Should be below 1 MB. Automatically calculates suitable chunk shape.
chunk_shape : tuple, optional
Manual specification of the internal chunk shape for the HDF5 dataset.
Cannot be set if `chunk_mb` is also specified.
display_progress : bool, default: False
Display a progress bar with iteration rate and estimated completion time.
progress_bar_options : dict, optional
Dictionary of keyword arguments to be passed directly to tqdm.
See https://github.com/tqdm/tqdm#parameters for options.
"""
with make_or_load_nwbfile(
nwbfile_path=nwbfile_path, nwbfile=nwbfile, metadata=metadata, overwrite=overwrite, verbose=verbose
) as nwbfile_out:
add_recording(
recording=recording,
nwbfile=nwbfile,
starting_time=starting_time,
metadata=metadata,
write_as=write_as,
es_key=es_key,
write_electrical_series=write_electrical_series,
write_scaled=write_scaled,
compression=compression,
compression_opts=compression_opts,
iterator_type=iterator_type,
iterator_opts=iterator_opts,
)
return nwbfile_out
[docs]def get_nspikes(units_table: pynwb.misc.Units, unit_id: int) -> int:
"""Return the number of spikes for chosen unit."""
ids = np.array(units_table.id[:])
indexes = np.where(ids == unit_id)[0]
if not len(indexes):
raise ValueError(f"{unit_id} is an invalid unit_id. Valid ids: {ids}.")
index = indexes[0]
if index == 0:
return units_table["spike_times_index"].data[index]
else:
return units_table["spike_times_index"].data[index] - units_table["spike_times_index"].data[index - 1]
[docs]def add_units_table(
sorting: BaseSorting,
nwbfile: pynwb.NWBFile,
unit_ids: Optional[List[Union[str, int]]] = None,
property_descriptions: Optional[dict] = None,
skip_properties: Optional[List[str]] = None,
skip_features: Optional[List[str]] = None,
units_table_name: str = "units",
unit_table_description: str = "Autogenerated by neuroconv.",
write_in_processing_module: bool = False,
write_waveforms: bool = False,
waveform_means: Optional[np.ndarray] = None,
waveform_sds: Optional[np.ndarray] = None,
unit_electrode_indices=None,
):
"""
Primary method for writing a SortingExtractor object to an NWBFile.
Parameters
----------
sorting : spikeinterface.BaseSorting
nwbfile : NWBFile
unit_ids : list of int or list of str, optional
Controls the unit_ids that will be written to the nwb file. If None, all
units are written.
property_descriptions : dict, optional
For each key in this dictionary which matches the name of a unit
property in sorting, adds the value as a description to that
custom unit column.
skip_properties : list of str, optional
Each string in this list that matches a unit property will not be written to the NWBFile.
skip_features : list of str, optional
Each string in this list that matches a spike feature will not be written to the NWBFile.
write_in_processing_module : bool, default: False
How to save the units table in the nwb file.
- True will save it to the processing module to serve as a historical provenance for the official table.
- False will save it to the official NWBFile.Units position; recommended only for the final form of the data.
units_table_name : str, default: 'units'
The name of the units table. If write_as=='units', then units_table_name must also be 'units'.
unit_table_description : str, optional
Text description of the units table; it is recommended to include information such as the sorting method,
curation steps, etc.
write_waveforms : bool, default: False
if True and either sorting is a spikeextractors SortingExtractor object with "template" property or
waveform_means (and optionally waveform_sd) are given, then waveforms are added to the units table
after writing.
waveform_means : np.ndarray, optional
Waveform mean (template) for each unit (num_units, num_samples, num_channels)
waveform_sds : np.ndarray, optional
Waveform standard deviation for each unit (num_units, num_samples, num_channels)
unit_electrode_indices : list of lists or arrays, optional
For each unit, the indices of electrodes that each waveform_mean/sd correspond to.
"""
if not write_in_processing_module and units_table_name != "units":
raise ValueError("When writing to the nwbfile.units table, the name of the table must be 'units'!")
if not isinstance(nwbfile, pynwb.NWBFile):
raise TypeError(f"nwbfile type should be an instance of pynwb.NWBFile but got {type(nwbfile)}")
if write_in_processing_module:
ecephys_mod = get_module(
nwbfile=nwbfile,
name="ecephys",
description="Intermediate data from extracellular electrophysiology recordings, e.g., LFP.",
)
write_table_first_time = units_table_name not in ecephys_mod.data_interfaces
if write_table_first_time:
units_table = pynwb.misc.Units(name=units_table_name, description=unit_table_description)
ecephys_mod.add(units_table)
units_table = ecephys_mod[units_table_name]
else:
write_table_first_time = nwbfile.units is None
if write_table_first_time:
nwbfile.units = pynwb.misc.Units(name="units", description=unit_table_description)
units_table = nwbfile.units
default_descriptions = dict(
isi_violation="Quality metric that measures the ISI violation ratio as a proxy for the purity of the unit.",
firing_rate="Number of spikes per unit of time.",
template="The extracellular average waveform.",
max_channel="The recording channel id with the largest amplitude.",
halfwidth="The full-width half maximum of the negative peak computed on the maximum channel.",
peak_to_valley="The duration between the negative and the positive peaks computed on the maximum channel.",
snr="The signal-to-noise ratio of the unit.",
quality="Quality of the unit as defined by phy (good, mua, noise).",
spike_amplitude="Average amplitude of peaks detected on the channel.",
spike_rate="Average rate of peaks detected on the channel.",
unit_name="Unique reference for each unit.",
)
if property_descriptions is None:
property_descriptions = dict()
if skip_properties is None:
skip_properties = list()
property_descriptions = dict(default_descriptions, **property_descriptions)
data_to_add = defaultdict(dict)
sorting_properties = sorting.get_property_keys()
excluded_properties = list(skip_properties) + ["contact_vector"]
properties_to_extract = [property for property in sorting_properties if property not in excluded_properties]
if unit_ids is not None:
sorting = sorting.select_units(unit_ids=unit_ids)
if unit_electrode_indices is not None:
unit_electrode_indices = np.array(unit_electrode_indices)[sorting.ids_to_indices(unit_ids)]
unit_ids = sorting.unit_ids
# Extract properties
for property in properties_to_extract:
data = sorting.get_property(property)
if isinstance(data[0], (bool, np.bool_)):
data = data.astype(str)
index = isinstance(data[0], (list, np.ndarray, tuple))
description = property_descriptions.get(property, "No description.")
data_to_add[property].update(description=description, data=data, index=index)
if property in ["max_channel", "max_electrode"] and nwbfile.electrodes is not None:
data_to_add[property].update(table=nwbfile.electrodes)
# Unit name logic
if "unit_name" in data_to_add:
# if 'unit_name' is set as a property, it is used to override default unit_ids (and "id")
unit_name_array = data_to_add["unit_name"]["data"]
else:
unit_name_array = unit_ids.astype("str", copy=False)
data_to_add["unit_name"].update(description="Unique reference for each unit.", data=unit_name_array)
units_table_previous_properties = set(units_table.colnames) - {"spike_times"}
extracted_properties = set(data_to_add)
properties_to_add_by_rows = units_table_previous_properties | {"id"}
properties_to_add_by_columns = extracted_properties - properties_to_add_by_rows
# Find default values for properties / columns already in the table
type_to_default_value = {list: [], np.ndarray: np.array(np.nan), str: "", Real: np.nan}
property_to_default_values = {"id": None}
for property in units_table_previous_properties:
# Find a matching data type and get the default value
sample_data = units_table[property].data[0]
matching_type = next(type for type in type_to_default_value if isinstance(sample_data, type))
default_value = type_to_default_value[matching_type]
property_to_default_values.update({property: default_value})
# Add data by rows excluding the rows with previously added unit names
unit_names_used_previously = []
if "unit_name" in units_table_previous_properties:
unit_names_used_previously = units_table["unit_name"].data
has_electrodes_column = "electrodes" in units_table.colnames
properties_with_data = {property for property in properties_to_add_by_rows if "data" in data_to_add[property]}
rows_in_data = [index for index in range(sorting.get_num_units())]
if not has_electrodes_column:
rows_to_add = [index for index in rows_in_data if unit_name_array[index] not in unit_names_used_previously]
else:
rows_to_add = []
for index in rows_in_data:
if unit_name_array[index] not in unit_names_used_previously:
rows_to_add.append(index)
else:
unit_name = unit_name_array[index]
previous_electrodes = units_table[np.where(units_table["unit_name"][:] == unit_name)[0]].electrodes
if list(previous_electrodes.values[0]) != list(unit_electrode_indices[index]):
rows_to_add.append(index)
for row in rows_to_add:
unit_kwargs = dict(property_to_default_values)
for property in properties_with_data:
unit_kwargs[property] = data_to_add[property]["data"][row]
spike_times = []
# Extract and concatenate the spike times from multiple segments
for segment_index in range(sorting.get_num_segments()):
segment_spike_times = sorting.get_unit_spike_train(
unit_id=unit_ids[row], segment_index=segment_index, return_times=True
)
spike_times.append(segment_spike_times)
spike_times = np.concatenate(spike_times)
if waveform_means is not None:
unit_kwargs["waveform_mean"] = waveform_means[row]
if waveform_sds is not None:
unit_kwargs["waveform_sd"] = waveform_sds[row]
if unit_electrode_indices is not None:
unit_kwargs["electrodes"] = unit_electrode_indices[row]
units_table.add_unit(spike_times=spike_times, **unit_kwargs, enforce_unique_id=True)
# added_unit_table_ids = units_table.id[-len(rows_to_add) :] # TODO - this line is unused?
# Add unit_name as a column and fill previously existing rows with unit_name equal to str(ids)
previous_table_size = len(units_table.id[:]) - len(unit_name_array)
if "unit_name" in properties_to_add_by_columns:
cols_args = data_to_add["unit_name"]
data = cols_args["data"]
previous_ids = units_table.id[:previous_table_size]
default_value = np.array(previous_ids).astype("str")
extended_data = np.hstack([default_value, data])
cols_args["data"] = extended_data
units_table.add_column("unit_name", **cols_args)
# Build a channel name to electrode table index map
table_df = units_table.to_dataframe().reset_index()
unit_name_to_electrode_index = {
unit_name: table_df.query(f"unit_name=='{unit_name}'").index[0] for unit_name in unit_name_array
}
indexes_for_new_data = [unit_name_to_electrode_index[unit_name] for unit_name in unit_name_array]
indexes_for_default_values = table_df.index.difference(indexes_for_new_data).values
# Add properties as columns
for property in properties_to_add_by_columns - {"unit_name"}:
cols_args = data_to_add[property]
data = cols_args["data"]
if np.issubdtype(data.dtype, np.integer):
data = data.astype("float")
# Find first matching data-type
sample_data = data[0]
matching_type = next(type for type in type_to_default_value if isinstance(sample_data, type))
default_value = type_to_default_value[matching_type]
extended_data = np.empty(shape=len(units_table.id[:]), dtype=data.dtype)
extended_data[indexes_for_new_data] = data
extended_data[indexes_for_default_values] = default_value
# Always store numpy objects as strings
if np.issubdtype(extended_data.dtype, np.object_):
extended_data = extended_data.astype("str", copy=False)
cols_args["data"] = extended_data
units_table.add_column(property, **cols_args)
def add_sorting(
sorting: BaseSorting,
nwbfile: Optional[pynwb.NWBFile] = None,
unit_ids: Optional[List[Union[str, int]]] = None,
property_descriptions: Optional[dict] = None,
skip_properties: Optional[List[str]] = None,
skip_features: Optional[List[str]] = None,
write_as: Literal["units", "processing"] = "units",
units_name: str = "units",
units_description: str = "Autogenerated by neuroconv.",
):
assert write_as in [
"units",
"processing",
], f"Argument write_as ({write_as}) should be one of 'units' or 'processing'!"
write_in_processing_module = False if write_as == "units" else True
add_units_table(
sorting=sorting,
unit_ids=unit_ids,
nwbfile=nwbfile,
property_descriptions=property_descriptions,
skip_properties=skip_properties,
skip_features=skip_features,
write_in_processing_module=write_in_processing_module,
units_table_name=units_name,
unit_table_description=units_description,
write_waveforms=False,
)
[docs]def write_sorting(
sorting: BaseSorting,
nwbfile_path: Optional[FilePathType] = None,
nwbfile: Optional[pynwb.NWBFile] = None,
metadata: Optional[dict] = None,
overwrite: bool = False,
verbose: bool = True,
unit_ids: Optional[List[Union[str, int]]] = None,
property_descriptions: Optional[dict] = None,
skip_properties: Optional[List[str]] = None,
skip_features: Optional[List[str]] = None,
write_as: Literal["units", "processing"] = "units",
units_name: str = "units",
units_description: str = "Autogenerated by neuroconv.",
):
"""
Primary method for writing a SortingExtractor object to an NWBFile.
Parameters
----------
sorting : spikeinterface.BaseSorting
nwbfile_path : FilePathType, optional
Path for where to write or load (if overwrite=False) the NWBFile.
If specified, the context will always write to this location.
nwbfile : NWBFile, optional
If passed, this function will fill the relevant fields within the NWBFile object.
E.g., calling
write_recording(recording=my_recording_extractor, nwbfile=my_nwbfile)
will result in the appropriate changes to the my_nwbfile object.
If neither 'nwbfile_path' nor 'nwbfile' are specified, an NWBFile object will be automatically generated
and returned by the function.
metadata : dict, optional
Metadata dictionary with information used to create the NWBFile when one does not exist or overwrite=True.
overwrite : bool, default: False
Whether to overwrite the NWBFile if one exists at the nwbfile_path.
The default is False (append mode).
verbose : bool, default: True
If 'nwbfile_path' is specified, informs user after a successful write operation.
unit_ids : list, optional
Controls the unit_ids that will be written to the nwb file. If None (default), all
units are written.
property_descriptions : dict, optional
For each key in this dictionary which matches the name of a unit
property in sorting, adds the value as a description to that
custom unit column.
skip_properties : list of str, optional
Each string in this list that matches a unit property will not be written to the NWBFile.
skip_features : list of str
Each string in this list that matches a spike feature will not be written to the NWBFile.
write_as : {'units', 'processing'}
How to save the units table in the nwb file. Options:
- 'units' will save it to the official NWBFile.Units position; recommended only for the final form of the data.
- 'processing' will save it to the processing module to serve as a historical provenance for the official table.
units_name : str, default: 'units'
The name of the units table. If write_as=='units', then units_name must also be 'units'.
units_description : str, default: 'Autogenerated by neuroconv.'
"""
with make_or_load_nwbfile(
nwbfile_path=nwbfile_path, nwbfile=nwbfile, metadata=metadata, overwrite=overwrite, verbose=verbose
) as nwbfile_out:
add_sorting(
sorting=sorting,
nwbfile=nwbfile_out,
unit_ids=unit_ids,
property_descriptions=property_descriptions,
skip_properties=skip_properties,
skip_features=skip_features,
write_as=write_as,
units_name=units_name,
units_description=units_description,
)
def get_electrode_group_indices(recording, nwbfile):
if "group_name" in recording.get_property_keys():
group_names = list(np.unique(recording.get_property("group_name")))
elif "group" in recording.get_property_keys():
group_names = list(np.unique(recording.get_property("group").astype(str)))
else:
group_names = None
if group_names is None:
electrode_group_indices = None
else:
electrode_group_indices = nwbfile.electrodes.to_dataframe().query(f"group_name in {group_names}").index.values
return electrode_group_indices