Source code for neuroconv.datainterfaces.icephys.abf.abfdatainterface

import json
from datetime import datetime, timedelta
from typing import List
from warnings import warn

from ..baseicephysinterface import BaseIcephysInterface


[docs]def get_start_datetime(neo_reader): """Get start datetime for Abf file.""" if all(k in neo_reader._axon_info for k in ["uFileStartDate", "uFileStartTimeMS"]): startDate = str(neo_reader._axon_info["uFileStartDate"]) startTime = round(neo_reader._axon_info["uFileStartTimeMS"] / 1000) startDate = datetime.strptime(startDate, "%Y%m%d") startTime = timedelta(seconds=startTime) return startDate + startTime else: warn( f"uFileStartDate or uFileStartTimeMS not found in {neo_reader.filename.split('/')[-1]}, datetime for " "recordings might be wrongly stored." ) return neo_reader._axon_info["rec_datetime"]
[docs]class AbfInterface(BaseIcephysInterface): """Interface for ABF intracellular electrophysiology data.""" help = "Interface for ABF intracellular electrophysiology data." display_name = "ABF Icephys" ExtractorName = "AxonIO"
[docs] @classmethod def get_source_schema(cls) -> dict: source_schema = super().get_source_schema() source_schema["properties"]["file_paths"] = dict( type="array", minItems=1, items={"type": "string", "format": "file"}, description="Array of paths to ABF files.", ) source_schema["properties"]["icephys_metadata"] = dict( type="object", description="Metadata for this experiment." ) source_schema["properties"]["icephys_metadata_file_path"] = dict( type="string", format="file", description="Path to JSON file containing metadata for this experiment." ) return source_schema
def __init__(self, file_paths: list, icephys_metadata: dict = None, icephys_metadata_file_path: str = None): """ ABF IcephysInterface based on Neo AxonIO. Parameters ---------- file_paths : list List of files to be converted to the same NWB file. icephys_metadata : dict, optional Dictionary containing the Icephys-specific metadata. icephys_metadata_file_path : str, optional JSON file containing the Icephys-specific metadata. """ super().__init__(file_paths=file_paths) self.source_data.update( icephys_metadata=icephys_metadata, icephys_metadata_file_path=icephys_metadata_file_path, )
[docs] def get_metadata(self) -> dict: from ....tools.neo import get_number_of_electrodes, get_number_of_segments metadata = super().get_metadata() if self.source_data["icephys_metadata"]: icephys_metadata = self.source_data["icephys_metadata"] elif self.source_data["icephys_metadata_file_path"]: with open(self.source_data["icephys_metadata_file_path"]) as json_file: icephys_metadata = json.load(json_file) else: icephys_metadata = dict() # Recordings sessions metadata (one Session is one abf file / neo reader) icephys_sessions = icephys_metadata.get("recording_sessions", dict()) # LabMetadata if any(x in icephys_metadata for x in ("cell_id", "slice_id", "targeted_layer", "inferred_layer")): metadata["ndx-dandi-icephys"] = dict( # Required fields for DANDI cell_id=icephys_metadata.get("cell_id"), slice_id=icephys_metadata.get("slice_id"), # Lab specific metadata targeted_layer=icephys_metadata.get("targeted_layer", ""), inferred_layer=icephys_metadata.get("inferred_layer", ""), ) # Extract start_time info first_reader = self.readers_list[0] first_session_time = get_start_datetime(neo_reader=first_reader) session_start_time = first_session_time.strftime("%Y-%m-%dT%H:%M:%S%z") metadata["NWBFile"].update(session_start_time=session_start_time) metadata["Icephys"]["Sessions"] = list() # Extract useful metadata from each reader in the sequence i = 0 ii = 0 iii = 0 for ir, reader in enumerate(self.readers_list): # Get extra info from metafile, if present abf_file_name = reader.filename.split("/")[-1] item = [s for s in icephys_sessions if s.get("abf_file_name", "") == abf_file_name] extra_info = item[0] if len(item) > 0 else dict() abfDateTime = get_start_datetime(neo_reader=reader) # Calculate session start time relative to first abf file (first session), in seconds relative_session_start_time = abfDateTime - first_session_time relative_session_start_time = float(relative_session_start_time.seconds) metadata["Icephys"]["Sessions"].append( dict( name=abf_file_name, relative_session_start_time=relative_session_start_time, icephys_experiment_type=extra_info.get("icephys_experiment_type", None), stimulus_type=extra_info.get("stimulus_type", "not described"), recordings=list(), ) ) n_segments = get_number_of_segments(reader, block=0) n_electrodes = get_number_of_electrodes(reader) # Loop through segments (sequential recordings table) for sg in range(n_segments): # Loop through channels (simultaneous recordings table) for el in range(n_electrodes): metadata["Icephys"]["Sessions"][ir]["recordings"].append( dict( intracellular_recordings_table_ind=i, simultaneous_recordings_table_ind=ii, sequential_recordings_table_ind=iii, # repetitions_table_id=0, # experimental_conditions_table_id=0 ) ) i += 1 ii += 1 iii += 1 return metadata
[docs] def set_aligned_starting_time(self, aligned_starting_time: float): for reader in self.readers_list: number_of_segments = reader.header["nb_segment"][0] for segment_index in range(number_of_segments): reader._t_starts[segment_index] += aligned_starting_time
[docs] def set_aligned_segment_starting_times( self, aligned_segment_starting_times: List[List[float]], stub_test: bool = False ): """ Align the individual starting time for each video in this interface relative to the common session start time. Must be in units seconds relative to the common 'session_start_time'. Parameters ---------- aligned_segment_starting_times : list of list of floats The relative starting times of each video. Outer list is over file paths (readers). Inner list is over segments of each recording. """ number_of_files_from_starting_times = len(aligned_segment_starting_times) assert number_of_files_from_starting_times == len(self.readers_list), ( f"The length of the outer list of 'starting_times' ({number_of_files_from_starting_times}) " "does not match the number of files ({len(self.readers_list)})!" ) for file_index, (reader, aligned_segment_starting_times_by_file) in enumerate( zip(self.readers_list, aligned_segment_starting_times) ): number_of_segments = reader.header["nb_segment"][0] assert number_of_segments == len( aligned_segment_starting_times_by_file ), f"The length of starting times index {file_index} does not match the number of segments of that reader!" reader._t_starts = aligned_segment_starting_times_by_file