Source code for octopus_sensing.preprocessing.openbci

# This file is part of Octopus Sensing <https://octopus-sensing.nastaran-saffar.me/>
# Copyright © Nastaran Saffaryazdi 2020
#
# Octopus Sensing is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
#  either version 3 of the License, or (at your option) any later version.
#
# Octopus Sensing is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with Octopus Sensing.
# If not, see <https://www.gnu.org/licenses/>.
import os
from typing import List, Optional
import pandas as pd
import numpy as np
import mne

from octopus_sensing.preprocessing.utils import load_all_trials, resample, load_all_samples
from octopus_sensing.devices.common import SavingModeEnum


[docs]def openbci_preprocess(input_path: str, file_name: str, output_path: str, channels: List[str], saving_mode: int = SavingModeEnum.CONTINIOUS_SAVING_MODE, sampling_rate: int = 128, signal_preprocess: bool = True): ''' Preprocess openbci recorded files to prepare them for visualizing and analysis It applys data cleaning (according to signal_preprocess), resampling (according to sampling_rate), and splits data if data has been recorded continuously. This method uses `mne library <https://mne.tools/stable/index.html>`_ for EEG data processing Parameters ---------- input_path: str The path to recorded openbci data file_name: str The file name of recorded openbci data output_path: str preprocessed file path channels: List(str) a list of recorded channels name saving_mode: int, default: SavingModeEnum.CONTINIOUS_SAVING_MODE The saving mode of recorded data. If it is CONTINIOUS_SAVING_MODE, data will be splitted according to markers and will be recorded in the separated files sampling_rate: int, default: 128 The desired sampling_rate. Data will be resampled according to this sampling rate signal_preprocess: bool, default: True If True will apply preliminary preprocessing steps to clean line noises Note ----- Sometimes recorded data in one second with Openbci are less or more than the specified sampling rate. So, we resample data by replicating the last samples or removing some samples to achieve the desired sampling_rate ''' if saving_mode == SavingModeEnum.SEPARATED_SAVING_MODE: if len(channels) == 8: data, times = \ load_all_samples(os.path.join(input_path, file_name), (0, 8), # channel columns 12, '%H:%M:%S.%f') # timestamp column elif len(channels) == 16: data, times = \ load_all_samples(os.path.join(input_path, file_name), (0, 16), # channel columns 20, '%H:%M:%S.%f') # timestamp column output_file_path = \ "{0}/{1}".format(output_path, file_name) resampled_data = \ resample(data, times, sampling_rate) if signal_preprocess is True: preprocessed_data = \ clean_eeg(resampled_data/1e6, channel_names=channels, sampling_rate=sampling_rate) else: preprocessed_data = resampled_data # convert array into dataframe data_frame = pd.DataFrame(preprocessed_data, columns=channels) data_frame.to_csv(output_file_path, index=False) elif saving_mode == SavingModeEnum.CONTINIOUS_SAVING_MODE: if len(channels) == 8: trials_data, trials_times, triger_list = \ load_all_trials(os.path.join(input_path, file_name), # File path (0, 8), # channel columns 12, # timestamp column 13, # triger column '%H:%M:%S.%f') # timestamp format elif len(channels) == 16: trials_data, trials_times, triger_list = \ load_all_trials(os.path.join(input_path, file_name), # File path (0, 16), # channel columns 20, # timestamp column 21, # triger column '%H:%M:%S.%f') # timestamp format print("len trials ***************", len(trials_data)) i = 0 for trial in trials_data: output_file_path = \ "{0}/{1}-{2}.csv".format(output_path, file_name[:-4], # Removing .csv from file_name str(triger_list[i]).zfill(2)) print("output_file_path", output_file_path) resampled_data = \ resample(trial, trials_times[i], sampling_rate) if signal_preprocess is True: preprocessed_data = \ clean_eeg(resampled_data, channel_names=channels, sampling_rate=sampling_rate) else: preprocessed_data = resampled_data # convert array into dataframe data_frame = pd.DataFrame(preprocessed_data, columns=channels) # save the dataframe as a csv file data_frame.to_csv(output_file_path, index=False) i += 1 else: raise Exception("Saving mode is incorrect")
[docs]class EegPreprocessing(): ''' Converts EEG data to mne raw data format for furthur analysis Parameters ---------- data: numpy.ndarray The channels’ time series (n_samples*n_channels) channel_names: List[str], default: None A list of channels' names sampling_rate: int, default: 128 Sampling rate of data ''' def __init__(self, data: np.ndarray, channel_names: List[str]=None, sampling_rate: int=128): if channel_names is None: self._channel_names = \ ["Fp1", "Fp2", "F7", "F3", "F4", "F8", "T3", "C3", "C4", "T4", "T5", "P3", "P4", "T6", "O1", "O2"] else: self._channel_names = channel_names channel_data = np.transpose(data) channel_types = ["eeg"]*len(self._channel_names) self.__info = mne.create_info(self._channel_names, sampling_rate, channel_types) montage = mne.channels.make_standard_montage('standard_1020') self._mne_raw = mne.io.RawArray(channel_data, self.__info) self._mne_raw.set_montage(montage, match_case=False) self._mne_raw.load_data()
[docs] def get_data(self): ''' Gets EEG data as a raw mne data Returns -------- mne_raw_data: mne.io.RawArray EEG data as a mne.io.RawArray ''' return self._mne_raw.get_data()
[docs] def filter_data(self, low_frequency: float=1, high_frequency:float =45, notch_frequencies: List[int]=[60]): ''' Apply notch filter, low pass and high pass (bandpass) filter on mne data Parameters ----------- low_frequency: float, default: 1 The low cut frequency for filtering high_frequency: float, default: 45 The high cut frequency for filtering notch_frequencies: List[int] default: [60] the frequencies to be used in the notch filter ''' # Band pass filter self._mne_raw.filter(l_freq=low_frequency, h_freq=high_frequency) # Notch filter if notch_frequencies not in (None, []): self._mne_raw.notch_filter(notch_frequencies)
[docs]def clean_eeg(data, channel_names: List[str] = None, low_frequency: float = 1, high_frequency: float = 45, sampling_rate: int = 128): ''' Cleans EEG data Parameters ----------- low_frequency: float, default: 1 The low cut frequency for filtering high_frequency: float, default: 45 The high cut frequency for filtering smpling_rate: int, default: 128 sampling rate ''' if channel_names is None: channel_names = ["Fp1", "Fp2", "F7", "F3", "F4", "F8", "T3", "C3", "C4", "T4", "T5", "P3", "P4", "T6", "O1", "O2"] # The length of data is 5 minutes preprocessing = \ EegPreprocessing(data, channel_names=channel_names, sampling_rate=sampling_rate) preprocessing.filter_data(low_frequency=low_frequency, high_frequency=high_frequency) preprocessed_data = preprocessing.get_data() return np.transpose(preprocessed_data)