Source code for yasa.hypno

"""
Hypnogram-related functions and class.
"""
import mne
import logging

# import warnings
import numpy as np
import pandas as pd
from yasa.io import set_log_level
from yasa.plotting import plot_hypnogram
from yasa.sleepstats import transition_matrix
from pandas.api.types import CategoricalDtype

__all__ = [
    "Hypnogram",
    "hypno_str_to_int",
    "hypno_int_to_str",
    "hypno_upsample_to_sf",
    "hypno_upsample_to_data",
    "hypno_find_periods",
    "load_profusion_hypno",
    "simulate_hypnogram",
]


logger = logging.getLogger("yasa")


[docs]class Hypnogram: """ Experimental class for manipulating hypnogram in YASA (dev). Starting with v0.7, YASA will take a more object-oriented approach to hypnograms. That is, hypnograms are now stored as a class (aka object), which comes with its own attributes and functions. Furthermore, YASA does not allow integer values to define the stages anymore. Instead, users must pass an array of strings with the actual stage names (e.g. ["WAKE", "WAKE", "N1", ..., "REM", "REM"]). .. versionadded:: 0.7.0 Parameters ---------- values : array_like A vector of stage values, represented as strings. See some examples below: * 2-stages hypnogram (Wake/Sleep): ``["W", "S", "S", "W", "S"]`` * 3-stages (Wake/NREM/REM): ``pd.Series(["WAKE", "NREM", "NREM", "REM", "REM"])`` * 4-stages (Wake/Light/Deep/REM): ``np.array(["Wake", "Light", "Deep", "Deep"])`` * 5-stages (default): ``["N1", "N1", "N2", "N3", "N2", "REM", "W"]`` Artefacts ("Art") and unscored ("Uns") epochs are always allowed regardless of the number of stages in the hypnogram. .. note:: Abbreviated or full spellings for the stages are allowed, as well as lower/upper/mixed case. Internally, YASA will convert the stages to to full spelling and uppercase (e.g. "w" -> "WAKE"). n_stages : int Whether ``values`` comes from a 2, 3, 4 or 5-stages hypnogram. Default is 5 stages, meaning that the following sleep stages are allowed: N1, N2, N3, REM, WAKE. freq : str A pandas frequency string indicating the frequency resolution of the hypnogram. Default is "30s" meaning that each value in the hypnogram represents a 30-seconds epoch. Examples: "1min", "10s", "15min". A full list of accepted values can be found at https://pandas.pydata.org/docs/user_guide/timeseries.html#timeseries-offset-aliases ``freq`` will be passed to the :py:func:`pandas.date_range` function to create the time index of the hypnogram. start : str or datetime An optional string indicating the starting datetime of the hypnogram (e.g. "2022-12-15 22:30:00"). If ``start`` is specified and valid, the index of the hypnogram will be a :py:class:`pandas.DatetimeIndex`. Otherwise it will be a :py:class:`pandas.RangeIndex`, indicating the epoch number. scorer : str An optional string indicating the scorer name. If specified, this will be set as the name of the :py:class:`pandas.Series`, otherwise the name will be set to "Stage". Examples -------- Create a 2-stages hypnogram >>> from yasa import Hypnogram >>> values = ["W", "W", "W", "S", "S", "S", "S", "S", "W", "S", "S", "S"] >>> hyp = Hypnogram(values, n_stages=2) >>> hyp <Hypnogram | 12 epochs x 30s (6.00 minutes), 2 stages> - Use `.hypno` to get the string values as a pandas.Series - Use `.as_int()` to get the integer values as a pandas.Series - Use `.plot_hypnogram()` to plot the hypnogram See the online documentation for more details. We can access the actual values, which are stored as a :py:class:`pandas.Series`, with: >>> hyp.hypno Epoch 0 WAKE 1 WAKE 2 WAKE 3 SLEEP 4 SLEEP 5 SLEEP 6 SLEEP 7 SLEEP 8 WAKE 9 SLEEP 10 SLEEP 11 SLEEP Name: Stage, dtype: category Categories (4, object): ['WAKE', 'SLEEP', 'ART', 'UNS'] >>> # Number of epochs in the hypnogram >>> hyp.n_epochs 12 >>> # Total duration of the hypnogram, in minutes (12 epochs * 30 seconds = 6 minutes) >>> hyp.duration 6.0 >>> # Default mapping from strings to integers. Can be changed with `hyp.mapping = {}` >>> hyp.mapping {'WAKE': 0, 'SLEEP': 1, 'ART': -1, 'UNS': -2} >>> # Get the hypnogram Series integer values >>> hyp.as_int() Epoch 0 0 1 0 2 0 3 1 4 1 5 1 6 1 7 1 8 0 9 1 10 1 11 1 Name: Stage, dtype: int16 >>> # Calculate the summary sleep statistics >>> hyp.sleep_statistics() {'TIB': 6.0, 'SPT': 4.5, 'WASO': 0.5, 'TST': 4.0, 'SE': 66.6667, 'SME': 88.8889, 'SFI': 7.5, 'SOL': 1.5, 'SOL_5min': nan, 'WAKE': 2.0} >>> # Get the state-transition matrix >>> counts, probs = hyp.transition_matrix() >>> counts To Stage WAKE SLEEP From Stage WAKE 2 2 SLEEP 1 6 All these methods and properties are also valid with a 5-stages hypnogram. In the example below, we use the :py:func:`yasa.simulate_hypnogram` to generate a plausible 5-stages hypnogram with a 30-seconds resolution. A random seed is specified to ensure that we get reproducible results. Lastly, we set an actual start time to the hypnogram. As a result, the index of the resulting hypnogram is a :py:class:`pandas.DatetimeIndex`. >>> from yasa import simulate_hypnogram >>> hyp = simulate_hypnogram( ... tib=500, n_stages=5, start="2022-12-15 22:30:00", scorer="S1", seed=42) >>> hyp <Hypnogram | 1000 epochs x 30s (500.00 minutes), 5 stages, scored by S1> - Use `.hypno` to get the string values as a pandas.Series - Use `.as_int()` to get the integer values as a pandas.Series - Use `.plot_hypnogram()` to plot the hypnogram See the online documentation for more details. >>> hyp.hypno Time 2022-12-15 22:30:00 WAKE 2022-12-15 22:30:30 WAKE 2022-12-15 22:31:00 WAKE 2022-12-15 22:31:30 WAKE 2022-12-15 22:32:00 WAKE ... 2022-12-16 06:47:30 N2 2022-12-16 06:48:00 N2 2022-12-16 06:48:30 N2 2022-12-16 06:49:00 N2 2022-12-16 06:49:30 N2 Freq: 30S, Name: S1, Length: 1000, dtype: category Categories (7, object): ['WAKE', 'N1', 'N2', 'N3', 'REM', 'ART', 'UNS'] The summary sleep statistics will include more items with a 5-stages hypnogram than a 2-stages hypnogram, i.e. the amount and percentage of each sleep stage, the REM latency, etc. >>> hyp.sleep_statistics() {'TIB': 500.0, 'SPT': 497.5, 'WASO': 79.5, 'TST': 418.0, 'SE': 83.6, 'SME': 84.0201, 'SFI': 0.7177, 'SOL': 2.5, 'SOL_5min': 2.5, 'Lat_REM': 67.0, 'WAKE': 82.0, 'N1': 69.0, 'N2': 247.0, 'N3': 64.5, 'REM': 37.5, '%N1': 16.5072, '%N2': 59.0909, '%N3': 15.4306, '%REM': 8.9713} """
[docs] def __init__(self, values, n_stages=5, *, freq="30s", start=None, scorer=None): assert isinstance( values, (list, np.ndarray, pd.Series) ), "`values` must be a list, numpy.array or pandas.Series" assert isinstance(n_stages, int), "`n_stages` must be an integer between 2 and 5." assert n_stages in [2, 3, 4, 5], "`n_stages` must be an integer between 2 and 5." assert isinstance(freq, str), "`freq` must be a pandas frequency string." assert isinstance( start, (type(None), str, pd.Timestamp) ), "`start` must be either None, a string or a pandas.Timestamp." assert isinstance( scorer, (type(None), str, int) ), "`scorer` must be either None, or a string or an integer." if n_stages == 2: accepted = ["W", "WAKE", "S", "SLEEP", "ART", "UNS"] mapping = {"WAKE": 0, "SLEEP": 1, "ART": -1, "UNS": -2} elif n_stages == 3: accepted = ["WAKE", "W", "NREM", "REM", "R", "ART", "UNS"] mapping = {"WAKE": 0, "NREM": 2, "REM": 4, "ART": -1, "UNS": -2} elif n_stages == 4: accepted = ["WAKE", "W", "LIGHT", "DEEP", "REM", "R", "ART", "UNS"] mapping = {"WAKE": 0, "LIGHT": 2, "DEEP": 3, "REM": 4, "ART": -1, "UNS": -2} else: accepted = ["WAKE", "W", "N1", "N2", "N3", "REM", "R", "ART", "UNS"] mapping = {"WAKE": 0, "N1": 1, "N2": 2, "N3": 3, "REM": 4, "ART": -1, "UNS": -2} assert all([val.upper() in accepted for val in values]), ( f"{np.unique(values)} do not match the accepted values for a {n_stages} stages " f"hypnogram: {accepted}" ) if isinstance(values, pd.Series): # Make sure to remove index if the input is a pandas.Series values = values.to_numpy(copy=True) hypno = pd.Series(values).str.upper() if scorer is None: hypno.name = "Stage" else: hypno.name = scorer # Combine accepted values map_accepted = {"S": "SLEEP", "W": "WAKE", "R": "REM"} hypno = hypno.replace(map_accepted) labels = pd.Series(accepted).replace(map_accepted).unique().tolist() # Change dtype of series to "categorical" (reduces memory) cat_dtype = CategoricalDtype(labels, ordered=False) hypno = hypno.astype(cat_dtype) # Create Index if start is not None: hypno.index = pd.date_range(start=start, freq=freq, periods=hypno.shape[0]) hypno.index.name = "Time" timedelta = hypno.index - hypno.index[0] else: fake_dt = pd.date_range(start="2022-12-03 00:00:00", freq=freq, periods=hypno.shape[0]) hypno.index.name = "Epoch" timedelta = fake_dt - fake_dt[0] # Set attributes self._hypno = hypno self._n_epochs = hypno.shape[0] self._freq = freq self._sampling_frequency = 1 / pd.Timedelta(freq).total_seconds() self._start = start self._timedelta = timedelta self._duration = self._n_epochs / (60 * self._sampling_frequency) self._n_stages = n_stages self._labels = labels self._mapping = mapping self._scorer = scorer
def __repr__(self): # TODO v0.8: Keep only the text between < and > text_scorer = f", scored by {self.scorer}" if self.scorer is not None else "" return ( f"<Hypnogram | {self.n_epochs} epochs x {self.freq} ({self.duration:.2f} minutes), " f"{self.n_stages} stages{text_scorer}>\n" " - Use `.hypno` to get the string values as a pandas.Series\n" " - Use `.as_int()` to get the integer values as a pandas.Series\n" " - Use `.plot_hypnogram()` to plot the hypnogram\n" "See the online documentation for more details." ) def __str__(self): text_scorer = f", scored by {self.scorer}" if self.scorer is not None else "" return ( f"<Hypnogram | {self.n_epochs} epochs x {self.freq} ({self.duration:.2f} minutes), " f"{self.n_stages} stages{text_scorer}>\n" " - Use `.hypno` to get the string values as a pandas.Series\n" " - Use `.as_int()` to get the integer values as a pandas.Series\n" " - Use `.plot_hypnogram()` to plot the hypnogram\n" "See the online documentation for more details." ) @property def hypno(self): """ The hypnogram values, stored in a :py:class:`pandas.Series`. To reduce memory usage, the stages are stored as categories (:py:class:`pandas.Categorical`). ``hypno`` inherits all the methods of a standard :py:class:`pandas.Series`, e.g. ``.describe()``, ``.unique()``, ``.to_csv()``, and more. .. note:: ``print(Hypnogram)`` is a shortcut to ``print(Hypnogram.hypno)``. """ return self._hypno @property def n_epochs(self): """The number of epochs in the hypnogram.""" return self._n_epochs @property def freq(self): """The frequency resolution of the hypnogram. Default is '30s'""" return self._freq @property def sampling_frequency(self): """The sampling frequency (Hz) of the hypnogram.""" return self._sampling_frequency @property def start(self): """The start date/time of the hypnogram. Default is None.""" return self._start @property def timedelta(self): """ A :py:class:`pandas.TimedeltaIndex` vector with the accumulated time difference of each epoch compared to the first epoch. """ return self._timedelta @property def duration(self): """Total duration of the hypnogram, expressed in minutes. AKA Time in Bed.""" return self._duration @property def n_stages(self): """ The number of allowed stages in the hypnogram. This is not the number of unique stages in the current hypnogram. This does not include Artefact and Unscored which are always allowed. """ return self._n_stages @property def labels(self): """The allowed stage labels.""" return self._labels @property def mapping(self): """A dictionary with the mapping from string to integer values.""" return self._mapping @mapping.setter def mapping(self, map_dict): assert isinstance(map_dict, dict), "`mapping` must be a dictionary, e.g. {'WAKE': 0, ...}" assert all([val in map_dict.keys() for val in self.hypno.unique()]), ( f"Some values in `hypno` ({self.hypno.unique()}) are not in `map_dict` " f"({map_dict.keys()})" ) if "ART" not in map_dict.keys(): map_dict["ART"] = -1 if "UNS" not in map_dict.keys(): map_dict["UNS"] = -2 self._mapping = map_dict @property def mapping_int(self): """A dictionary with the mapping from integer to string values.""" return {v: k for k, v in self.mapping.items()} @property def scorer(self): """The scorer name.""" return self._scorer # CLASS METHODS BELOW
[docs] def as_annotations(self): """ Return a pandas DataFrame summarizing epoch-level information. Column order and names are compliant with BIDS `events files <https://bids-specification.readthedocs.io/en/stable/04-modality-specific-files/05-task-events.html>`_ and MNE `events/annotations dataframes <https://mne.tools/stable/glossary.html#term-annotations>`_. Returns ------- annotations : :py:class:`pandas.DataFrame` A dataframe containing epoch onset, duration, stage, etc. Examples -------- >>> from yasa import Hypnogram >>> hyp = Hypnogram(["W", "W", "LIGHT", "LIGHT", "DEEP", "REM", "WAKE"], n_stages=4) >>> hyp.as_annotations() onset duration value description epoch 0 0.0 30.0 0 WAKE 1 30.0 30.0 0 WAKE 2 60.0 30.0 2 LIGHT 3 90.0 30.0 2 LIGHT 4 120.0 30.0 3 DEEP 5 150.0 30.0 4 REM 6 180.0 30.0 0 WAKE """ data = { "onset": self.timedelta.total_seconds(), "duration": 1 / self.sampling_frequency, "value": self.as_int().to_numpy(), "description": self.hypno.to_numpy(), "epoch": np.arange(self.n_epochs), } if self.scorer is not None: data["scorer"] = self.scorer return pd.DataFrame(data).set_index("epoch")
[docs] def as_int(self): """Return hypnogram values as integers. The default mapping from string to integer is: * 2 stages: {"WAKE": 0, "SLEEP": 1, "ART": -1, "UNS": -2} * 3 stages: {"WAKE": 0, "NREM": 2, "REM": 4, "ART": -1, "UNS": -2} * 4 stages: {"WAKE": 0, "LIGHT": 2, "DEEP": 3, "REM": 4, "ART": -1, "UNS": -2} * 5 stages: {"WAKE": 0, "N1": 1, "N2": 2, "N3": 3, "REM": 4, "ART": -1, "UNS": -2} Users can define a custom mapping: >>> hyp.mapping = {"WAKE": 0, "NREM": 1, "REM": 2} Examples -------- Convert a 2-stages hypnogram to a pandas.Series of integers >>> from yasa import Hypnogram >>> hyp = Hypnogram(["W", "W", "S", "S", "W", "S"], n_stages=2) >>> hyp.as_int() Epoch 0 0 1 0 2 1 3 1 4 0 5 1 Name: Stage, dtype: int16 Same with a 4-stages hypnogram >>> from yasa import Hypnogram >>> hyp = Hypnogram(["W", "W", "LIGHT", "LIGHT", "DEEP", "REM", "WAKE"], n_stages=4) >>> hyp.as_int() Epoch 0 0 1 0 2 2 3 2 4 3 5 4 6 0 Name: Stage, dtype: int16 """ # Return as int16 (-32768 to 32767) to reduce memory usage return self.hypno.replace(self.mapping).astype(np.int16)
[docs] def consolidate_stages(self, new_n_stages): """Reduce the number of stages in a hypnogram to match actigraphy or wearables. For example, a standard 5-stage hypnogram (W, N1, N2, N3, REM) could be consolidated to a hypnogram more common with actigraphy (e.g. 2-stages: [Wake, Sleep] or 4-stages: [W, Light, Deep, REM]). Parameters ---------- self : :py:class:`yasa.Hypnogram` Hypnogram, assumed to be already cropped to time in bed (TIB, also referred to as Total Recording Time, i.e. "lights out" to "lights on"). new_n_stages : int Desired number of sleep stages. Must be lower than the current number of stages. - 5 stages - Wake, N1, N2, N3, REM - 4 stages - Wake, Light, Deep, REM - 3 stages - Wake, NREM, REM - 2 stages - Wake, Sleep .. note:: Unscored and Artefact are always allowed. Returns ------- hyp : :py:class:`yasa.Hypnogram` The consolidated Hypnogram object. This function returns a copy, i.e. the original hypnogram is not modified in place. Examples -------- >>> from yasa import Hypnogram >>> hyp = Hypnogram(["W", "W", "N1", "N2", "N2", "N2", "N2", "W"], n_stages=5) >>> hyp_2s = hyp.consolidate_stages(2) >>> print(hyp_2s) Epoch 0 WAKE 1 WAKE 2 SLEEP 3 SLEEP 4 SLEEP 5 SLEEP 6 SLEEP 7 WAKE Name: Stage, dtype: category Categories (4, object): ['WAKE', 'SLEEP', 'ART', 'UNS'] """ assert self.n_stages in [3, 4, 5], "`self.n_stages` must be 3, 4, or 5" assert new_n_stages in [2, 3, 4], "`new_n_stages` must be 2, 3, or 4" assert new_n_stages < self.n_stages, "`new_n_stages` must be lower than `self.n_stages`" # Change sleep codes where applicable. if new_n_stages == 2: # Consolidate all Sleep mapping = { "N1": "S", "N2": "S", "N3": "S", "REM": "S", "LIGHT": "S", "DEEP": "S", "NREM": "S", } elif new_n_stages == 3: # Consolidate N1/N2/N3 or Light/Deep into NREM mapping = {"N1": "NREM", "N2": "NREM", "N3": "NREM", "LIGHT": "NREM", "DEEP": "NREM"} elif new_n_stages == 4: # Consolidate N1/N2 into Light mapping = {"N1": "LIGHT", "N2": "LIGHT", "N3": "DEEP"} new_hyp = self.hypno.replace(mapping).to_numpy() return Hypnogram( values=new_hyp, n_stages=new_n_stages, freq=self.freq, start=self.start, scorer=self.scorer, )
[docs] def copy(self): """Return a new copy of the current Hypnogram.""" return type(self)( values=self.hypno.to_numpy(), n_stages=self.n_stages, freq=self.freq, start=self.start, scorer=self.scorer, )
[docs] def find_periods(self, threshold="5min", equal_length=False): """Find sequences of consecutive values exceeding a certain duration in hypnogram. Parameters ---------- self : :py:class:`yasa.Hypnogram` Hypnogram, assumed to be already cropped to time in bed (TIB, also referred to as Total Recording Time, i.e. "lights out" to "lights on"). threshold : str This function will only keep periods that exceed a certain duration (default '5min'), e.g. '5min', '15min', '30sec', '1hour'. To disable thresholding, use '0sec'. equal_length : bool If True, the periods will all have the exact duration defined in threshold. That is, periods that are longer than the duration threshold will be divided into sub-periods of exactly the length of ``threshold``. Returns ------- periods : :py:class:`pandas.DataFrame` Output dataframe * ``values`` : The value in hypno of the current period * ``start`` : The index of the start of the period in hypno * ``length`` : The duration of the period, in number of epochs Examples -------- Let's assume that we have an hypnogram where sleep = 1 and wake = 0, with one value per minute. >>> from yasa import Hypnogram >>> val = 11 * ["W"] + 3 * ["S"] + 2 * ["W"] + 9 * ["S"] + ["W", "W"] >>> hyp = Hypnogram(val, n_stages=2, freq="1min") >>> hyp.find_periods(threshold="0min") values start length 0 WAKE 0 11 1 SLEEP 11 3 2 WAKE 14 2 3 SLEEP 16 9 4 WAKE 25 2 This gives us the start and duration of each sequence of consecutive values in the hypnogram. For example, the first row tells us that there is a sequence of 11 consecutive WAKE starting at the first index of hypno. Now, we may want to keep only periods that are longer than a specific threshold, for example 5 minutes: >>> hyp.find_periods(threshold="5min") values start length 0 WAKE 0 11 1 SLEEP 16 9 Only the two sequences that are longer than 5 minutes (11 minutes and 9 minutes respectively) are kept. Feel free to play around with different values of threshold! This function is not limited to binary arrays, e.g. a 5-stages hypnogram at 30-sec resolution: >>> from yasa import simulate_hypnogram >>> hyp = simulate_hypnogram(tib=30, seed=42) >>> hyp.find_periods(threshold="2min") values start length 0 WAKE 0 5 1 N1 5 6 2 N2 11 49 Lastly, using ``equal_length=True`` will further divide the periods into segments of the same duration, i.e. the duration defined in ``threshold``: >>> hyp.find_periods(threshold="5min", equal_length=True) values start length 0 N2 11 10 1 N2 21 10 2 N2 31 10 3 N2 41 10 Here, the 24.5 minutes of consecutive N2 sleep (= 49 epochs) are divided into 4 periods of exactly 5 minute each. The remaining 4.5 minutes at the end of the hypnogram are removed because it is less than 5 minutes. In other words, the remainder of the division of a given segment by the desired duration is discarded. """ return hypno_find_periods( self.hypno, self.sampling_frequency, threshold=threshold, equal_length=equal_length )
[docs] def plot_hypnogram(self, **kwargs): """Plot the hypnogram. .. seealso:: :py:func:`yasa.plot_hypnogram` Parameters ---------- **kwargs : dict Optional keyword arguments passed to :py:func:`yasa.plot_hypnogram`. Returns ------- ax : :py:class:`matplotlib.axes.Axes` Matplotlib Axes Examples -------- .. plot:: >>> from yasa import simulate_hypnogram >>> ax = simulate_hypnogram(tib=480, seed=88).plot_hypnogram(highlight="REM") """ return plot_hypnogram(self, **kwargs)
[docs] def simulate_similar(self, **kwargs): """Simulate a new hypnogram based on properties of the current hypnogram. .. seealso:: :py:func:`yasa.simulate_hypnogram` Parameters ---------- self : :py:class:`yasa.Hypnogram` Hypnogram, assumed to be already cropped to time in bed (TIB). **kwargs : dict Optional keyword arguments passed to :py:func:`yasa.simulate_hypnogram`. Returns ------- hyp : :py:class:`yasa.Hypnogram` A simulated hypnogram. Examples -------- >>> import pandas as pd >>> from yasa import Hypnogram >>> hyp = Hypnogram( ... ["W", "S", "W"], n_stages=2, freq="2min", scorer="Human").upsample("30s") >>> shyp = hyp.simulate_similar(scorer="Simulated", seed=6) >>> df = pd.concat([hyp.hypno, shyp.hypno], axis=1) >>> print(df) Human Simulated Epoch 0 WAKE WAKE 1 WAKE WAKE 2 WAKE WAKE 3 WAKE WAKE 4 SLEEP SLEEP 5 SLEEP SLEEP 6 SLEEP SLEEP 7 SLEEP SLEEP 8 WAKE SLEEP 9 WAKE SLEEP 10 WAKE SLEEP 11 WAKE WAKE """ assert "n_stages" not in kwargs and "freq" not in kwargs, ( "`n_stages` and `freq` cannot be included as additional `**kwargs` " "because they must match properties of the current Hypnogram." ) simulate_hypnogram_kwargs = { "tib": self.duration, "n_stages": self.n_stages, "freq": self.freq, "trans_probas": self.transition_matrix()[1], "start": self.start, "scorer": self.scorer, } simulate_hypnogram_kwargs.update(kwargs) return simulate_hypnogram(**simulate_hypnogram_kwargs)
[docs] def sleep_statistics(self): """ Compute standard sleep statistics from an hypnogram. This function supports a 2, 3, 4 or 5-stages hypnogram. Parameters ---------- self : :py:class:`yasa.Hypnogram` Hypnogram, assumed to be already cropped to time in bed (TIB, also referred to as Total Recording Time, i.e. "lights out" to "lights on"). Returns ------- stats : dict Summary sleep statistics. Notes ----- All values except SE, SME, SFI and the percentage of each stage are expressed in minutes. YASA follows the AASM guidelines to calculate these parameters: * Time in Bed (TIB): total duration of the hypnogram. * Sleep Period Time (SPT): duration from first to last period of sleep. * Wake After Sleep Onset (WASO): duration of wake periods within SPT. * Total Sleep Time (TST): total sleep duration in SPT. * Sleep Onset Latency (SOL): Latency to first epoch of any sleep. * SOL 5min: Latency to 5 minutes of persistent sleep (any stage). * REM latency: latency to first REM sleep. * Sleep Efficiency (SE): TST / TIB * 100 (%). * Sleep Maintenance Efficiency (SME): TST / SPT * 100 (%). * Sleep Fragmentation Index: number of transitions from sleep to wake / hours of TST * Sleep stages amount and proportion of TST .. warning:: Artefact and Unscored epochs are excluded from the calculation of the total sleep time (TST). TST is calculated as the sum of all REM and NREM sleep in SPT. .. warning:: The definition of REM latency in the AASM scoring manual differs from the REM latency reported here. The former uses the time from first epoch of sleep, while YASA uses the time from the beginning of the recording. The AASM definition of the REM latency can be found with `SOL - Lat_REM`. References ---------- * Iber (2007). The AASM manual for the scoring of sleep and associated events: rules, terminology and technical specifications. American Academy of Sleep Medicine. * Silber et al. (2007). `The visual scoring of sleep in adults <https://www.ncbi.nlm.nih.gov/pubmed/17557422>`_. Journal of Clinical Sleep Medicine, 3(2), 121-131. Examples -------- Sleep statistics for a 2-stage hypnogram with a resolution of 15-seconds >>> from yasa import Hypnogram >>> # Generate a fake hypnogram, where "S" = Sleep, "W" = Wake >>> values = 10 * ["W"] + 40 * ["S"] + 5 * ["W"] + 40 * ["S"] + 9 * ["W"] >>> hyp = Hypnogram(values, freq="15s", n_stages=2) >>> hyp.sleep_statistics() {'TIB': 26.0, 'SPT': 21.25, 'WASO': 1.25, 'TST': 20.0, 'SE': 76.9231, 'SME': 94.1176, 'SFI': 1.5, 'SOL': 2.5, 'SOL_5min': 2.5, 'WAKE': 6.0} Sleep statistics for a 5-stages hypnogram >>> from yasa import simulate_hypnogram >>> # Generate a 8 hr (= 480 minutes) 5-stages hypnogram with a 30-seconds resolution >>> hyp = simulate_hypnogram(tib=480, seed=42) >>> hyp.sleep_statistics() {'TIB': 480.0, 'SPT': 477.5, 'WASO': 79.5, 'TST': 398.0, 'SE': 82.9167, 'SME': 83.3508, 'SFI': 0.7538, 'SOL': 2.5, 'SOL_5min': 2.5, 'Lat_REM': 67.0, 'WAKE': 82.0, 'N1': 67.0, 'N2': 240.5, 'N3': 53.0, 'REM': 37.5, '%N1': 16.8342, '%N2': 60.4271, '%N3': 13.3166, '%REM': 9.4221} """ hypno = self.hypno.to_numpy() assert self.n_epochs > 0, "Hypnogram is empty!" all_sleep = ["SLEEP", "N1", "N2", "N3", "NREM", "REM", "LIGHT", "DEEP"] all_non_sleep = ["WAKE", "ART", "UNS"] stats = {} # TIB, first and last sleep stats["TIB"] = self.n_epochs idx_sleep = np.where(~np.isin(hypno, all_non_sleep))[0] if not len(idx_sleep): first_sleep, last_sleep = 0, self.n_epochs else: first_sleep = idx_sleep[0] last_sleep = idx_sleep[-1] # Crop to SPT hypno_s = hypno[first_sleep : (last_sleep + 1)] stats["SPT"] = hypno_s.size if len(idx_sleep) else 0 stats["WASO"] = hypno_s[hypno_s == "WAKE"].size if len(idx_sleep) else np.nan # Before YASA v0.5.0, TST was calculated as SPT - WASO, meaning that Art # and Unscored epochs were included. TST is now restrained to sleep stages. stats["TST"] = hypno_s[np.isin(hypno_s, all_sleep)].shape[0] # Sleep efficiency and fragmentation stats["SE"] = 100 * stats["TST"] / stats["TIB"] if stats["SPT"] == 0: stats["SME"] = np.nan stats["SFI"] = np.nan else: # Sleep maintenance efficiency stats["SME"] = 100 * stats["TST"] / stats["SPT"] # SFI is the ratio of the number of transitions from sleep into Wake to TST (hours) # The original definition included transitions into Wake or N1. counts, _ = self.transition_matrix() n_trans_to_wake = np.sum( counts.loc[ np.intersect1d(counts.index, all_sleep), np.intersect1d(counts.index, ["WAKE"]) ].to_numpy() ) stats["SFI"] = n_trans_to_wake / (stats["TST"] / (3600 * self.sampling_frequency)) # Sleep stage latencies -- only relevant if hypno is cropped to TIB stats["SOL"] = first_sleep if stats["TST"] > 0 else np.nan sleep_periods = hypno_find_periods( np.isin(hypno, all_sleep), self.sampling_frequency, threshold="5min" ).query("values == True") if sleep_periods.shape[0]: stats["SOL_5min"] = sleep_periods["start"].iloc[0] else: stats["SOL_5min"] = np.nan if "REM" in self.labels: # Question: should we add latencies for other stage too? stats["Lat_REM"] = np.where(hypno == "REM")[0].min() if "REM" in hypno else np.nan # Duration of each stage for st in self.labels: if st == "SLEEP": # SLEEP == TST continue stats[st] = hypno[hypno == st].size # Remove ART and UNS if they are empty if stats["ART"] == 0: stats.pop("ART") if stats["UNS"] == 0: stats.pop("UNS") # Convert to minutes for key, value in stats.items(): if key in ["SE", "SME"]: continue stats[key] = value / (60 * self.sampling_frequency) # Proportion of each sleep stages for st in all_sleep: if st in stats.keys(): if stats["TST"] == 0: stats[f"%{st}"] = np.nan else: stats[f"%{st}"] = 100 * stats[st] / stats["TST"] # Round to 4 decimals stats = {key: np.round(val, 4) for key, val in stats.items()} return stats
[docs] def transition_matrix(self): """Create a state-transition matrix from an hypnogram. Parameters ---------- self : :py:class:`yasa.Hypnogram` Hypnogram, assumed to be already cropped to time in bed (TIB, also referred to as Total Recording Time, i.e. "lights out" to "lights on"). For best results, the hypnogram should not contain any artefact or unscored epochs. Returns ------- counts : :py:class:`pandas.DataFrame` Counts transition matrix (number of transitions from stage A to stage B). The pre-transition states are the rows and the post-transition states are the columns. probs : :py:class:`pandas.DataFrame` Conditional probability transition matrix, i.e. given that current state is A, what is the probability that the next state is B. ``probs`` is a `right stochastic matrix <https://en.wikipedia.org/wiki/Stochastic_matrix>`_, i.e. each row sums to 1. Examples -------- >>> from yasa import Hypnogram, simulate_hypnogram >>> # Generate a 8 hr (= 480 minutes) 5-stages hypnogram with a 30-seconds resolution >>> hyp = simulate_hypnogram(tib=480, seed=42) >>> counts, probs = hyp.transition_matrix() >>> counts To Stage WAKE N1 N2 N3 REM From Stage WAKE 153 11 0 0 0 N1 6 99 29 0 0 N2 3 16 447 10 5 N3 1 3 4 96 1 REM 0 5 1 0 69 >>> probs.round(3) To Stage WAKE N1 N2 N3 REM From Stage WAKE 0.933 0.067 0.000 0.000 0.00 N1 0.045 0.739 0.216 0.000 0.00 N2 0.006 0.033 0.929 0.021 0.01 N3 0.010 0.029 0.038 0.914 0.01 REM 0.000 0.067 0.013 0.000 0.92 """ counts, probs = transition_matrix(self.as_int()) counts.index = counts.index.map(self.mapping_int) counts.columns = counts.columns.map(self.mapping_int) probs.index = probs.index.map(self.mapping_int) probs.columns = probs.columns.map(self.mapping_int) return counts, probs
[docs] def upsample(self, new_freq, **kwargs): """Upsample hypnogram to a higher frequency. Parameters ---------- self : :py:class:`yasa.Hypnogram` Hypnogram, assumed to be already cropped to time in bed (TIB, also referred to as Total Recording Time, i.e. "lights out" to "lights on"). For best results, the hypnogram should not contain any artefact or unscored epochs. new_freq : str Frequency is defined with a pandas frequency string, e.g. "10s" or "1min". Returns ------- hyp : :py:class:`yasa.Hypnogram` The upsampled Hypnogram object. This function returns a copy, i.e. the original hypnogram is not modified in place. Examples -------- Create a 30-sec hypnogram >>> from yasa import Hypnogram >>> hyp = Hypnogram(["W", "W", "S", "S", "W"], n_stages=2, start="2022-12-23 23:00") >>> hyp.hypno Time 2022-12-23 23:00:00 WAKE 2022-12-23 23:00:30 WAKE 2022-12-23 23:01:00 SLEEP 2022-12-23 23:01:30 SLEEP 2022-12-23 23:02:00 WAKE Freq: 30S, Name: Stage, dtype: category Categories (4, object): ['WAKE', 'SLEEP', 'ART', 'UNS'] Upsample to a 15-seconds resolution >>> hyp_up = hyp.upsample("15s") >>> hyp_up.hypno Time 2022-12-23 23:00:00 WAKE 2022-12-23 23:00:15 WAKE 2022-12-23 23:00:30 WAKE 2022-12-23 23:00:45 WAKE 2022-12-23 23:01:00 SLEEP 2022-12-23 23:01:15 SLEEP 2022-12-23 23:01:30 SLEEP 2022-12-23 23:01:45 SLEEP 2022-12-23 23:02:00 WAKE 2022-12-23 23:02:15 WAKE Freq: 15S, Name: Stage, dtype: category Categories (4, object): ['WAKE', 'SLEEP', 'ART', 'UNS'] """ assert pd.Timedelta(new_freq) < pd.Timedelta(self.freq), ( f"The upsampling `new_freq` ({new_freq}) must be higher than the current frequency of " f"hypnogram {self.freq}" ) if isinstance(self.hypno.index, pd.DatetimeIndex): # Upsampling should extend the last epoch, e.g. # - 30-sec: last epoch at 07:20:30 # - 10-sec: last epoch should be 07:20:50 and not 07:20:30 otherwise we're losing 20 sec hyp_extend = self.hypno.copy() hyp_extend = hyp_extend.reindex( hyp_extend.index.union([hyp_extend.index[-1] + pd.Timedelta(self.freq)]) ).ffill() new_hyp = hyp_extend.resample(new_freq, origin="start").ffill().iloc[:-1] else: hyp_extend = self.hypno.copy() hyp_extend.index = self.timedelta hyp_extend = hyp_extend.reindex( hyp_extend.index.union([hyp_extend.index[-1] + pd.Timedelta(self.freq)]) ).ffill() new_hyp = ( hyp_extend.resample(new_freq, origin="start") .ffill() .reset_index(drop=True) .iloc[:-1] ) new_hyp.index.name = "Epoch" return Hypnogram( values=new_hyp, n_stages=self.n_stages, freq=new_freq, start=self.start, scorer=self.scorer, )
[docs] def upsample_to_data(self, data, sf=None, verbose=True): """ Upsample an hypnogram to a given sampling frequency and fit the resulting hypnogram to corresponding EEG data, such that the hypnogram and EEG data have the exact same number of samples. Parameters ---------- self : :py:class:`yasa.Hypnogram` Hypnogram, assumed to be already cropped to time in bed (TIB, also referred to as Total Recording Time, i.e. "lights out" to "lights on"). For best results, the hypnogram should not contain any artefact or unscored epochs. data : array_like or :py:class:`mne.io.BaseRaw` 1D or 2D EEG data. Can also be a :py:class:`mne.io.BaseRaw`, in which case ``data`` and ``sf_data`` will be automatically extracted. sf_data : float The sampling frequency of ``data``, in Hz (e.g. 100 Hz, 256 Hz, ...). Can be omitted if ``data`` is a :py:class:`mne.io.BaseRaw`. verbose : bool or str Verbose level. Default (False) will only print warning and error messages. The logging levels are 'debug', 'info', 'warning', 'error', and 'critical'. For most users the choice is between 'info' (or ``verbose=True``) and warning (``verbose=False``). Returns ------- hypno : array_like The hypnogram values, upsampled to ``sf_data`` and cropped/padded to ``max(data.shape)``. For compatibility with most YASA functions, the returned hypnogram is an array with integer values, and not a :py:class:`yasa.Hypnogram` object. Warns ----- UserWarning If the upsampled ``hypno`` is shorter / longer than ``max(data.shape)`` and therefore needs to be padded/cropped respectively. This output can be disabled by passing ``verbose='ERROR'``. """ hypno_up = hypno_upsample_to_data( self.as_int(), self.sampling_frequency, data=data, sf_data=sf, verbose=verbose ) return hypno_up
############################################################################# # STR <--> INT CONVERSION #############################################################################
[docs]def hypno_str_to_int( hypno, mapping_dict={ "w": 0, "wake": 0, "n1": 1, "s1": 1, "n2": 2, "s2": 2, "n3": 3, "s3": 3, "s4": 3, "r": 4, "rem": 4, "art": -1, "mt": -1, "uns": -2, "nd": -2, }, ): """Convert a string hypnogram array to integer. ['W', 'N2', 'N2', 'N3', 'R'] ==> [0, 2, 2, 3, 4] .. versionadded:: 0.1.5 Parameters ---------- hypno : array_like The sleep staging (hypnogram) 1D array. mapping_dict : dict The mapping dictionnary, in lowercase. Note that this function is essentially a wrapper around :py:meth:`pandas.Series.map`. Returns ------- hypno : array_like The corresponding integer hypnogram. """ # warnings.warn( # "The `yasa.hypno_str_to_int` function is deprecated and will be removed in v0.8. " # "Please use the `yasa.Hypnogram.as_int` method instead.", # FutureWarning, # ) assert isinstance(hypno, (list, np.ndarray, pd.Series)), "Not an array." hypno = pd.Series(np.asarray(hypno, dtype=str)) assert not hypno.str.isnumeric().any(), "Hypno contains numeric values." return hypno.str.lower().map(mapping_dict).values
[docs]def hypno_int_to_str( hypno, mapping_dict={0: "W", 1: "N1", 2: "N2", 3: "N3", 4: "R", -1: "Art", -2: "Uns"} ): """Convert an integer hypnogram array to a string array. [0, 2, 2, 3, 4] ==> ['W', 'N2', 'N2', 'N3', 'R'] .. versionadded:: 0.1.5 Parameters ---------- hypno : array_like The sleep staging (hypnogram) 1D array. mapping_dict : dict The mapping dictionnary. Note that this function is essentially a wrapper around :py:meth:`pandas.Series.map`. Returns ------- hypno : array_like The corresponding integer hypnogram. """ # warnings.warn( # "The `yasa.hypno_int_to_str` function is deprecated and will be removed in v0.8. " # "Please use the `yasa.Hypnogram` class to create an hypnogram instead.", # FutureWarning, # ) assert isinstance(hypno, (list, np.ndarray, pd.Series)), "Not an array." hypno = pd.Series(np.asarray(hypno, dtype=int)) return hypno.map(mapping_dict).values
############################################################################# # UPSAMPLING #############################################################################
[docs]def hypno_upsample_to_sf(hypno, sf_hypno, sf_data): """Upsample the hypnogram to a given sampling frequency. .. versionadded:: 0.1.5 Parameters ---------- hypno : array_like The sleep staging (hypnogram) 1D array. sf_hypno : float The current sampling frequency of the hypnogram, in Hz, e.g. * 1/30 = 1 value per each 30 seconds of EEG data, * 1 = 1 value per second of EEG data sf_data : float The desired sampling frequency of the hypnogram, in Hz (e.g. 100 Hz, 256 Hz, ...) Returns ------- hypno : array_like The hypnogram, upsampled to ``sf_data``. """ # warnings.warn( # "The `yasa.hypno_upsample_to_sf` function is deprecated and will be removed in v0.8. " # "Please use the `yasa.Hypnogram.upsample` method instead.", # FutureWarning, # ) repeats = sf_data / sf_hypno assert sf_hypno <= sf_data, "sf_hypno must be less than sf_data." assert repeats.is_integer(), "sf_hypno / sf_data must be a whole number." assert isinstance(hypno, (list, np.ndarray, pd.Series)) return np.repeat(np.asarray(hypno), repeats)
def hypno_fit_to_data(hypno, data, sf=None): """Crop or pad the hypnogram to fit the length of data. Hypnogram and data MUST have the SAME sampling frequency. This is an internal function. Parameters ---------- hypno : array_like The sleep staging (hypnogram) 1D array. data : np.array_like or mne.io.Raw 1D or 2D EEG data. Can also be a MNE Raw object, in which case data and sf will be automatically extracted. sf : float, optional The sampling frequency of data AND the hypnogram. Returns ------- hypno : array_like Hypnogram, with the same number of samples as data. """ # Check if data is an MNE raw object if isinstance(data, mne.io.BaseRaw): sf = data.info["sfreq"] data = data.times # 1D array and does not require to preload data data = np.asarray(data) hypno = np.asarray(hypno) assert hypno.ndim == 1, "Hypno must be 1D." npts_hyp = hypno.size npts_data = max(data.shape) # Support for 2D data if npts_hyp < npts_data: # Hypnogram is shorter than data npts_diff = npts_data - npts_hyp if sf is not None: dur_diff = npts_diff / sf logger.warning( "Hypnogram is SHORTER than data by %.2f seconds. " "Padding hypnogram with last value to match data.size." % dur_diff ) else: logger.warning( "Hypnogram is SHORTER than data by %i samples. " "Padding hypnogram with last value to match data.size." % npts_diff ) hypno = np.pad(hypno, (0, npts_diff), mode="edge") elif npts_hyp > npts_data: # Hypnogram is longer than data npts_diff = npts_hyp - npts_data if sf is not None: dur_diff = npts_diff / sf logger.warning( "Hypnogram is LONGER than data by %.2f seconds. " "Cropping hypnogram to match data.size." % dur_diff ) else: logger.warning( "Hypnogram is LONGER than data by %i samples. " "Cropping hypnogram to match data.size." % npts_diff ) hypno = hypno[0:npts_data] return hypno
[docs]def hypno_upsample_to_data(hypno, sf_hypno, data, sf_data=None, verbose=True): """Upsample an hypnogram to a given sampling frequency and fit the resulting hypnogram to corresponding EEG data, such that the hypnogram and EEG data have the exact same number of samples. .. versionadded:: 0.1.5 Parameters ---------- hypno : array_like The sleep staging (hypnogram) 1D array. sf_hypno : float The current sampling frequency of the hypnogram, in Hz, e.g. * 1/30 = 1 value per each 30 seconds of EEG data, * 1 = 1 value per second of EEG data data : array_like or :py:class:`mne.io.BaseRaw` 1D or 2D EEG data. Can also be a :py:class:`mne.io.BaseRaw`, in which case ``data`` and ``sf_data`` will be automatically extracted. sf_data : float The sampling frequency of ``data``, in Hz (e.g. 100 Hz, 256 Hz, ...). Can be omitted if ``data`` is a :py:class:`mne.io.BaseRaw`. verbose : bool or str Verbose level. Default (False) will only print warning and error messages. The logging levels are 'debug', 'info', 'warning', 'error', and 'critical'. For most users the choice is between 'info' (or ``verbose=True``) and warning (``verbose=False``). Returns ------- hypno : array_like The hypnogram, upsampled to ``sf_data`` and cropped/padded to ``max(data.shape)``. Warns ----- UserWarning If the upsampled ``hypno`` is shorter / longer than ``max(data.shape)`` and therefore needs to be padded/cropped respectively. This output can be disabled by passing ``verbose='ERROR'``. """ # NOTE: FutureWarning not added here otherwise it would also be shown when calling # yasa.Hypnogram.upsample_to_data. set_log_level(verbose) if isinstance(data, mne.io.BaseRaw): sf_data = data.info["sfreq"] data = data.times hypno_up = hypno_upsample_to_sf(hypno=hypno, sf_hypno=sf_hypno, sf_data=sf_data) return hypno_fit_to_data(hypno=hypno_up, data=data, sf=sf_data)
############################################################################# # HYPNO LOADING #############################################################################
[docs]def load_profusion_hypno(fname, replace=True): # pragma: no cover """ Load a Compumedics Profusion hypnogram (.xml). The Compumedics Profusion hypnogram format is one of the two hypnogram formats found in the `National Sleep Research Resource (NSRR) <https://sleepdata.org/>`_ website. For more details on the format, please refer to https://github.com/nsrr/edf-editor-translator/wiki/Compumedics-Annotation-Format Parameters ---------- fname : str Filename with full path. replace : bool If True, the integer values will be mapped to YASA default, i.e. 0 for Wake, 1 for N1, 2 for N2, 3 for N3 / S4 and 4 for REM. Note that the native profusion format is identical except for REM sleep which is marked as 5. Returns ------- hypno : 1D array (n_epochs, ) Hypnogram, with one value per 30 second epochs. sf_hyp : float Sampling frequency of the hypnogram. Default is 1 / 30 Hz. """ # Note that an alternative is to use the `xmltodict` library: # >>> with open(fname) as in_file: # >>> xml = in_file.read() # >>> epoch_length = xml['EpochLength'] # >>> hypno = np.array(xml['SleepStages']['SleepStage'], dtype='int') # >>> xml = xmltodict.parse(xml, process_namespaces=True)['CMPStudyConfig'] # >>> annotations = pd.DataFrame(xml['ScoredEvents']['ScoredEvent']) # >>> annotations["Start"] = annotations["Start"].astype(float) # >>> annotations["Duration"] = annotations["Duration"].astype(float) import xml.etree.ElementTree as ET tree = ET.parse(fname) root = tree.getroot() epoch_length = float(root[0].text) sf_hyp = 1 / epoch_length hypno = [] for s in root[4]: hypno.append(s.text) # TODO: This should return a yasa.Hypnogram object hypno = np.array(hypno).astype(int) if replace: # Stage 4 --> 3 and REM --> 4 hypno = pd.Series(hypno).replace({4: 3, 5: 4}).to_numpy() return hypno, sf_hyp
############################################################################# # PERIODS & CYCLES #############################################################################
[docs]def hypno_find_periods(hypno, sf_hypno, threshold="5min", equal_length=False): """Find sequences of consecutive values exceeding a certain duration in hypnogram. .. versionadded:: 0.6.2 Parameters ---------- hypno : array_like A 1D array with the sleep stages (= hypnogram). The dtype can be anything (int, bool, str). More generally, this can be any vector for which you wish to find runs of consecutive items. sf_hypno : float The current sampling frequency of ``hypno``, in Hz, e.g. 1/30 = 1 value per each 30 seconds of EEG data, 1 = 1 value per second of EEG data. threshold : str This function will only keep periods that exceed a certain duration (default '5min'), e.g. '5min', '15min', '30sec', '1hour'. To disable thresholding, use '0sec'. equal_length : bool If True, the periods will all have the exact duration defined in threshold. That is, periods that are longer than the duration threshold will be divided into sub-periods of exactly the length of ``threshold``. Returns ------- periods : :py:class:`pandas.DataFrame` Output dataframe * ``values`` : The value in hypno of the current period * ``start`` : The index of the start of the period in hypno * ``length`` : The duration of the period, in number of samples Examples -------- Let's assume that we have an hypnogram where sleep = 1 and wake = 0. There is one value per minute, and therefore the sampling frequency of the hypnogram is 1 / 60 sec (~0.016 Hz). >>> import yasa >>> hypno = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0] >>> yasa.hypno_find_periods(hypno, sf_hypno=1/60, threshold="0min") values start length 0 0 0 11 1 1 11 3 2 0 14 2 3 1 16 9 4 0 25 2 This gives us the start and duration of each sequence of consecutive values in the hypnogram. For example, the first row tells us that there is a sequence of 11 consecutive 0 starting at the first index of hypno. Now, we may want to keep only periods that are longer than a specific threshold, for example 5 minutes: >>> yasa.hypno_find_periods(hypno, sf_hypno=1/60, threshold="5min") values start length 0 0 0 11 1 1 16 9 Only the two sequences that are longer than 5 minutes (11 minutes and 9 minutes respectively) are kept. Feel free to play around with different values of threshold! This function is not limited to binary arrays, e.g. >>> hypno = [0, 0, 0, 0, 1, 2, 2, 2, 2, 2, 2, 0, 0, 0, 1, 0, 1] >>> yasa.hypno_find_periods(hypno, sf_hypno=1/60, threshold="2min") values start length 0 0 0 4 1 2 5 6 2 0 11 3 Lastly, using ``equal_length=True`` will further divide the periods into segments of the same duration, i.e. the duration defined in ``threshold``: >>> hypno = [0, 0, 0, 0, 1, 2, 2, 2, 2, 2, 2, 0, 0, 0, 1, 0, 1] >>> yasa.hypno_find_periods(hypno, sf_hypno=1/60, threshold="2min", equal_length=True) values start length 0 0 0 2 1 0 2 2 2 2 5 2 3 2 7 2 4 2 9 2 5 0 11 2 Here, the first period of 4 minutes of consecutive 0 is further divided into 2 periods of exactly 2 minutes. Next, the sequence of 6 consecutive 2 is further divided into 3 periods of 2 minutes. Lastly, the last value in the sequence of 3 consecutive 0 at the end of the array is removed to keep only a segment of 2 exactly minutes. In other words, the remainder of the division of a given segment by the desired duration is discarded. """ # NOTE: FutureWarning not added here otherwise it would also be shown when calling # yasa.Hypnogram.find_periods # Convert the threshold to number of samples assert isinstance(threshold, str), "Threshold must be a string, e.g. '5min', '30sec', '15min'" thr_sec = pd.Timedelta(threshold).total_seconds() thr_samp = sf_hypno * thr_sec if float(thr_samp).is_integer(): thr_samp = int(thr_samp) else: raise ValueError( f"The selected threshold does not result in an whole number of samples (" f"{thr_sec:.3f} seconds * {sf_hypno:.3f} Hz = {thr_samp:.3f} samples)" ) # Find run starts # https://gist.github.com/alimanfoo/c5977e87111abe8127453b21204c1065 assert isinstance(hypno, (list, np.ndarray, pd.Series)), "hypno must be an array." x = np.asarray(hypno) n = x.shape[0] loc_run_start = np.empty(n, dtype=bool) loc_run_start[0] = True loc_run_start[1:] = x[:-1] != x[1:] run_starts = np.nonzero(loc_run_start)[0] # Find run values run_values = x[loc_run_start] # Find run lengths run_lengths = np.diff(np.append(run_starts, n)) seq = pd.DataFrame({"values": run_values, "start": run_starts, "length": run_lengths}) # Remove runs that are shorter than threshold seq = seq[seq["length"] >= thr_samp].reset_index(drop=True) if not equal_length: return seq # Divide into epochs of equal length assert thr_samp > 0, "Threshold must be non-zero if using equal_length=True." new_seq = {"values": [], "start": [], "length": []} for i, row in seq.iterrows(): quotient, remainder = np.divmod(row["length"], thr_samp) new_start = row["start"] if quotient > 0: while quotient != 0: new_seq["values"].append(row["values"]) new_seq["start"].append(new_start) new_seq["length"].append(thr_samp) new_start += thr_samp quotient -= 1 else: new_seq["values"].append(row["values"]) new_seq["start"].append(row["start"]) new_seq["length"].append(row["length"]) new_seq = pd.DataFrame(new_seq) return new_seq
############################################################################# # SIMULATION #############################################################################
[docs]def simulate_hypnogram( tib=480, trans_probas=None, init_probas=None, seed=None, **kwargs, ): """Simulate a hypnogram based on transition probabilities. Current implentation is a naive Markov model. The initial stage of a hypnogram is generated using probabilites from ``init_probas`` and then subsequent stages are generated from a Markov sequence based on ``trans_probas``. .. important:: The Markov simulation model is not meant to accurately portray sleep macroarchitecture and should only be used for testing or other unique purposes. .. seealso:: :py:meth:`yasa.Hypnogram.simulate_similar` .. versionadded:: 0.6.3 Parameters ---------- tib : int, float Total duration of the hypnogram (i.e., time in bed), expressed in minutes. Returned hypnogram will be slightly shorter if ``tib`` is not evenly divisible by ``freq``. Default is 480 minutes (= 8 hours). .. seealso:: :py:func:`yasa.sleep_statistics` trans_probas : :py:class:`pandas.DataFrame` or None Transition probability matrix where each cell is a transition probability between sleep stages of consecutive *epochs*. ``trans_probas`` is a `right stochastic matrix <https://en.wikipedia.org/wiki/Stochastic_matrix>`_, i.e. each row sums to 1. If None (default), transition probabilities come from Metzner et al., 2021 [Metzner2021]_. If :py:class:`pandas.DataFrame`, must have "from"-stages as indices and "to"-stages as columns. Indices and columns must follow YASA string hypnogram convention (e.g., WAKE, N1). Unscored/Artefact stages are not allowed. .. note:: Transition probability matrices should indicate the transition probability between *epochs* (i.e., probability of the next epoch) and not simply stage (i.e., probability of non-similar stage). .. seealso:: Return value from :py:func:`yasa.transition_matrix` init_probas : :py:class:`pandas.Series` or None Probabilites of each stage to initialize random walk. If None (default), initialize with "from"-WAKE row of ``trans_probas``. If :py:class:`pandas.Series`, indices must be stages following YASA string hypnogram convention and identical to those of ``trans_probas``. seed : int or None Random seed for generating Markov sequence. If an integer number is provided, the random hypnogram will be predictable. This argument is required if reproducible results are desired. **kwargs : dict Other arguments that are passed to :py:class:`yasa.Hypnogram`. .. note:: `n_stages` and `freq` must be consistent with a user-specificied `trans_probas`. Returns ------- hyp : :py:class:`yasa.Hypnogram` Hypnogram containing simulated sleep stages. Notes ----- Default transition probabilities are based on 30-second epochs and can be found in the ``traMat_Epoch.npy`` file of Supplementary Information for Metzner et al., 2021 [Metzner2021]_ (rounded values are viewable in Figure 5b). Please cite this work if these probabilites are used for publication. References ---------- .. [Metzner2021] Metzner, C., Schilling, A., Traxdorf, M., Schulze, H., & Krausse, P. (2021). Sleep as a random walk: a super-statistical analysis of EEG data across sleep stages. Communications Biology, 4. https://doi.org/10.1038/s42003-021-02912-6 Examples -------- >>> from yasa import simulate_hypnogram >>> hyp = simulate_hypnogram(tib=5, seed=1) >>> hyp <Hypnogram | 10 epochs x 30s (5.00 minutes), 5 stages> - Use `.hypno` to get the string values as a pandas.Series - Use `.as_int()` to get the integer values as a pandas.Series - Use `.plot_hypnogram()` to plot the hypnogram See the online documentation for more details. >>> hyp.hypno Epoch 0 WAKE 1 N1 2 N1 3 N2 4 N2 5 N2 6 N2 7 N2 8 N2 9 N2 Name: Stage, dtype: object >>> hyp = simulate_hypnogram(tib=5, n_stages=2, seed=1) >>> hyp.hypno Epoch 0 WAKE 1 SLEEP 2 SLEEP 3 SLEEP 4 SLEEP 5 SLEEP 6 SLEEP 7 SLEEP 8 SLEEP 9 SLEEP Name: Stage, dtype: object Add some Unscored epochs. >>> hyp = simulate_hypnogram(tib=5, n_stages=2, seed=1) >>> hyp.hypno.iloc[-2:] = "UNS" >>> hyp.hypno Epoch 0 WAKE 1 SLEEP 2 SLEEP 3 SLEEP 4 SLEEP 5 SLEEP 6 SLEEP 7 SLEEP 8 UNS 9 UNS Name: Stage, dtype: object Base the data off a real subject's transition matrix. .. plot:: >>> import numpy as np >>> import matplotlib.pyplot as plt >>> from yasa import Hypnogram, hypno_int_to_str >>> url = ( >>> "https://github.com/raphaelvallat/yasa/raw/master/" >>> "notebooks/data_full_6hrs_100Hz_hypno_30s.txt" >>> ) >>> values_str = hypno_int_to_str(np.loadtxt(url)) >>> real_hyp = Hypnogram(values_str) >>> fake_hyp = real_hyp.simulate_similar(seed=2) >>> fig, (ax1, ax2) = plt.subplots(nrows=2, figsize=(7, 5)) >>> real_hyp.plot_hypnogram(ax=ax1).set_title("Real hypnogram") >>> fake_hyp.plot_hypnogram(ax=ax2).set_title("Fake hypnogram") >>> plt.tight_layout() """ # Extract yasa.Hypnogram defaults, which will be assumed later but need throughout if "n_stages" not in kwargs: kwargs["n_stages"] = 5 if "freq" not in kwargs: kwargs["freq"] = "30s" # Validate input assert isinstance(tib, (int, float)) and tib > 0, "`tib` must be a number > 0" if trans_probas is not None: assert isinstance(trans_probas, pd.DataFrame), "`trans_probas` must be a pandas DataFrame" assert np.all( np.less_equal(trans_probas.shape, kwargs["n_stages"]) ), "user-specified `trans_probas` must not include more stages than `n_stages`" if init_probas is not None: assert isinstance(init_probas, pd.Series), "`init_probas` must be a pandas Series" if seed is not None: assert isinstance(seed, int) and seed >= 0, "`seed` must be an integer >= 0" if trans_probas is None: # Check this here, rather than letting hyp.upsample catch it, to be clear about reason assert pd.Timedelta(kwargs["freq"]) <= pd.Timedelta( "30s" ), "`freq` must be <= 30s when using default `trans_probas`" # Initialize random number generator rng = np.random.default_rng(seed) def _markov_sequence(p_init, p_transition, sequence_length): """Generate a Markov sequence based on p_init and p_transition. https://ericmjl.github.io/essays-on-data-science/machine-learning/markov-models """ initial_state = list(rng.multinomial(1, p_init)).index(1) states = [initial_state] while len(states) < sequence_length: p_tr = p_transition[states[-1]] new_state = list(rng.multinomial(1, p_tr)).index(1) states.append(new_state) return np.asarray(states) if trans_probas is None: # Generate transition probability DataFrame trans_freqs = np.array( [ [11737, 571, 84, 2, 2], [281, 6697, 1661, 11, 59], [253, 1070, 26259, 505, 272], [49, 176, 279, 9630, 12], [57, 189, 84, 2, 10071], ] ) trans_probas = trans_freqs / trans_freqs.sum(axis=1, keepdims=True) trans_probas = pd.DataFrame( trans_probas, index=["WAKE", "N1", "N2", "N3", "REM"], columns=["WAKE", "N1", "N2", "N3", "REM"], ) trans_probas.attrs = {"is_default": True} else: trans_probas.attrs = {"is_default": False} if init_probas is None: # Extract Wake row of initial probabilities as a Series for w in ["w", "W", "wake", "WAKE"]: if w in trans_probas.index: init_probas = trans_probas.loc[w, :].copy() assert init_probas is not None, "`trans_probas` must include 'WAKE' in the index" stage_order = init_probas.index.tolist() assert ( stage_order == trans_probas.index.tolist() == trans_probas.columns.tolist() ), "`init_probas` and `trans_probas` must have all matching indices" # Extract probabilities as arrays trans_arr = trans_probas.to_numpy() init_arr = init_probas.to_numpy() # Make sure all rows sum to 1 assert np.allclose(trans_arr.sum(axis=1), 1), "All rows of `trans_probas` must sum to 1" assert np.isclose(init_arr.sum(), 1), "`init_probas` must sum to 1" # Find number of *complete* epochs within TIB duration to simulate if trans_probas.attrs.get("is_default"): freq_sec = 30 else: freq_sec = pd.Timedelta(kwargs["freq"]).total_seconds() n_epochs = np.floor(tib * 60 / freq_sec).astype(int) # Generate hypnogram integer values values_int = _markov_sequence(init_arr, trans_arr, n_epochs) # Convert to hypnogram string values (based on indices) values_str = [stage_order[x] for x in values_int] # Create YASA hypnogram instance if trans_probas.attrs.get("is_default"): # If using default trans_probas, hyp *must* be initialized with 5 stages and 30s epochs n_stages = kwargs.pop("n_stages") freq = kwargs.pop("freq") hyp = Hypnogram(values_str, n_stages=5, freq="30s", **kwargs) if pd.Timedelta(freq) < pd.Timedelta("30s"): hyp = hyp.upsample(freq) if n_stages != 5: hyp = hyp.consolidate_stages(n_stages) else: hyp = Hypnogram(values_str, **kwargs) return hyp