Source code for octopus_sensing.preprocessing.shimmer3

# This file is part of Octopus Sensing <https://octopus-sensing.nastaran-saffar.me/>
# Copyright © Nastaran Saffaryazdi 2020
#
# Octopus Sensing is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
#  either version 3 of the License, or (at your option) any later version.
#
# Octopus Sensing is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with Octopus Sensing.
# If not, see <https://www.gnu.org/licenses/>.

import os
import heartpy as hp
from scipy import signal
import numpy as np
import pathlib

from octopus_sensing.preprocessing.utils import load_all_trials, resample, load_all_samples
from octopus_sensing.devices.common import SavingModeEnum


[docs]def shimmer3_preprocess(input_path: str, file_name: str, output_path: str, saving_mode: int = SavingModeEnum.CONTINIOUS_SAVING_MODE, sampling_rate: int = 128, signal_preprocess: bool = True): ''' Preprocess shimmer recorded files to prepare them for visualizing and analysis It applys data cleaning (according to signal_preprocess), resampling (according to sampling_rate), and splits data if data has been recorded continuously. It will save PPG and GSR data in separated files Parameters ---------- input_path: str The path to recorded shimmer data file_name: str The file name of recorded shimmer data output_path: str preprocessed file path saving_mode: int, default: SavingModeEnum.CONTINIOUS_SAVING_MODE The saving mode of recorded data. If it is CONTINIOUS_SAVING_MODE, data will be splitted according to markers and will be recorded in the separated files sampling_rate: int, default: 128 The desired sampling_rate. Data will be resampled according to this sampling rate signal_preprocess: bool, default: True If True will apply preliminary preprocessing steps to clean line noises Note ----- Sometimes recorded data in one second with Shimmer3 are less or more than the specified sampling rate. So, we resample data by replicating the last samples or removing some samples to achieve the desired sampling_rate ''' if saving_mode == SavingModeEnum.SEPARATED_SAVING_MODE: data, times = \ load_all_samples(os.path.join(input_path, file_name), (5, 7), 7, '%Y-%m-%d %H:%M:%S.%f') resampled_data = \ resample(data, times, sampling_rate) if signal_preprocess is True: cleaned_gsr = \ clean_gsr(resampled_data[:, 0], sampling_rate) cleaned_ppg = \ clean_ppg(resampled_data[:, 1], sampling_rate) else: cleaned_gsr = resampled_data[:, 0] cleaned_ppg = resampled_data[:, 1] gsr_output_path = os.path.join(output_path, "gsr") if not os.path.exists(gsr_output_path): pathlib.Path(gsr_output_path).mkdir(parents=True, exist_ok=True) gsr_file_path = \ "{0}/gsr{1}".format(gsr_output_path, file_name[7:]) ppg_output_path = os.path.join(output_path, "ppg") if not os.path.exists(ppg_output_path): pathlib.Path(ppg_output_path).mkdir(parents=True, exist_ok=True) ppg_file_path = \ "{0}/ppg{1}".format(ppg_output_path, file_name[7:]) np.savetxt(gsr_file_path, cleaned_gsr) np.savetxt(ppg_file_path, cleaned_ppg) elif saving_mode == SavingModeEnum.CONTINIOUS_SAVING_MODE: print("shimmer input_path", input_path) # First data needs to be splitted based on markers trials_data, trials_times, triger_list = \ load_all_trials(os.path.join(input_path, file_name), # File path (5, 7), # channel columns 7, # timestamp column 8, # triger column '%Y-%m-%d %H:%M:%S.%f') # timestamp format i = 0 for trial in trials_data: gsr_output_path = os.path.join(output_path, "gsr") if not os.path.exists(gsr_output_path): pathlib.Path(gsr_output_path).mkdir(parents=True, exist_ok=True) gsr_file_path = \ "{0}/gsr{1}-{2}.csv".format(gsr_output_path, # Removing .csv and shimmer from file_name file_name[7:-4], str(triger_list[i]).zfill(2)) ppg_output_path = os.path.join(output_path, "ppg") if not os.path.exists(ppg_output_path): pathlib.Path(ppg_output_path).mkdir(parents=True, exist_ok=True) ppg_file_path = \ "{0}/ppg{1}-{2}.csv".format(ppg_output_path, # Removing .csv and shimmer from file_name file_name[7:-4], str(triger_list[i]).zfill(2)) resampled_data = \ resample(trial, trials_times[i], sampling_rate) if signal_preprocess is True: print("shape", resampled_data.shape) cleaned_gsr = \ clean_gsr(resampled_data[:, 0], sampling_rate) cleaned_ppg = \ clean_ppg(resampled_data[:, 1], sampling_rate) else: cleaned_gsr = resampled_data[:, 0] cleaned_ppg = resampled_data[:, 1] np.savetxt(gsr_file_path, cleaned_gsr) np.savetxt(ppg_file_path, cleaned_ppg) i += 1 else: raise Exception("Saving mode is incorrect")
[docs]def clean_gsr(data, sampling_rate: int, low_pass: float=0.1, high_pass: float=15): ''' Removes high frequency and rapid transient noises Parameters ----------- data: numpy.array An 1D array of GSR data smpling_rate: int, default: 128 sampling rate low_pass: float, default: 0.7 The low cut frequency for filtering high_pass: float, default: 2.5 The high cut frequency for filtering Returns ------- cleaned_data: numpy.array An 1D array of cleaned GSR data ''' nyqs = sampling_rate * 0.5 # Removing high frequency noises b, a = signal.butter(5, [low_pass / nyqs, high_pass / nyqs], 'bands') output = signal.filtfilt(b, a, np.array(data, dtype=np.float64)) # Removing rapid transient artifacts final_output = signal.medfilt(output, kernel_size=5) return final_output
[docs]def clean_ppg(data: np.ndarray, sampling_rate: int, low_pass: float=0.7, high_pass: float=2.5): ''' Removes high frequency noises It uses `heartpy <https://github.com/paulvangentcom/heartrate_analysis_python>` library Parameters ----------- data: numpy.ndarray An 1D array of PPG data smpling_rate: int, default: 128 sampling rate low_pass: float, default: 0.7 The low cut frequency for filtering high_pass: float, default: 2.5 The high cut frequency for filtering Returns ------- cleaned_data: numpy.array An 1D array of cleaned PPG data ''' filtered = hp.filter_signal(data, [low_pass, high_pass], sample_rate=sampling_rate, order=3, filtertype='bandpass') return filtered