# ***************************************************
# _____ _ _
# | ____|_ __ ___| |__ __ _ _ __ | |_ ___ _ __
# | _| | '_ \ / __| '_ \ / _` | '_ \| __/ _ \ '__|
# | |___| | | | (__| | | | (_| | | | | || __/ |
# |_____|_| |_|\___|_| |_|\__,_|_| |_|\__\___|_|
#
# ***************************************************
from collections import Counter
from typing import Union, List, Type, Optional, Callable
from numpy import array, stack, ndarray, max as np_max, zeros, nan, float32, dstack
from pandas import DataFrame
from tqdm.auto import tqdm
__all__ = ["FixedSlidingWindow", "adjust_sequences"]
_Numerical = Union[int, float]
[docs]class FixedSlidingWindow:
"""Fixed sliding window.
Examples::
>>> import numpy as np
>>> from enchanter.preprocessing import signal
>>> x = np.random.randn(1024, 23)
>>> y = np.random.randint(0, 9, 1024)
>>> sw = signal.FixedSlidingWindow(256, overlap_rate=0.5)
>>> x, y = sw(x, y)
>>> x.shape # [6, 256, 23]
>>> y.shape # [6, ]
Args:
window_size (int): Window size
overlap_rate (float): overrap rate
step_size (Optional[int]): step size
Raises:
AssertionError: an error occur when
argument overlap_rate under 0.0 or over 1.0.n error occurred.
"""
def __init__(self, window_size: int, overlap_rate: Union[float, None], step_size: Optional[int] = None) -> None:
self.window_size: int = window_size
if overlap_rate is None and step_size is not None:
if step_size > 0:
self.overlap = int(step_size)
elif isinstance(overlap_rate, float):
if not 0.0 < overlap_rate <= 1.0:
raise AssertionError("overlap_rate ranges from 0.0 to 1.0")
self.overlap = int(window_size * overlap_rate)
else:
raise ValueError
[docs] @staticmethod
def clean(labels: ndarray) -> ndarray:
"""
Clean up
Args:
labels:
Returns:
"""
tmp = []
for lbl in labels:
window_size = len(lbl)
counter: Counter = Counter(lbl)
common = counter.most_common()
values = list(counter.values())
if common[0][0] == 0 and values[0] == window_size // 2:
label = common[1][0]
else:
label = common[0][0]
tmp.append(label)
return array(tmp)
def __call__(self, data, target):
data = self.transform(data)
label = self.transform(target)
label = self.clean(label)
return data, label
[docs]def adjust_sequences(
sequences: List[ndarray],
max_len: Optional[Union[int, Callable[[List[int]], int]]] = None,
fill: Union[str, _Numerical] = "ffill",
dtype: Type = float32,
) -> ndarray:
"""
The function to adjust the length of the series data to a certain value.
For each sample, if the series is longer than ``max_len``, the length is up to ``max_len``, and the rest is ignored.
If it is shorter than ``max_len``, the missing part is filled in with the last value.
Args:
sequences: A Python list whose elements have ``np.ndarray`` objects that are not constant in length.
Each element is a 2D array, the 0th dimension is the length of the series, the 1st dimension is
the number of features in the time series, and all samples must have the same number of features.
max_len: Processes all input elements to the specified length.
If not specified, the largest sequence length in a given sample will be max_len.
Also, given functions such as np.max, np.min, and np.mean,
you can use them to generate new length series.
fill: If it is shorter than ``max_len``, specify how to fill in the missing parts.
If ``fill ='ffill'``, it will be filled with the last value.
If a number (Python int or Python float) is given, use that value to fill the value.
``fill=["ffill" or int or float]``
dtype: Specify the data type of NumPy. The data type of the output series is determined based on this value.
``dtype`` must be a float.
Examples:
>>> import numpy as np
>>> x = [
>>> np.array([[i] for i in [1, 2, 3, 4, 5]]),
>>> np.array([[i] for i in [1, 2, 3, 4, 5, 6, 7, 8]]),
>>> np.array([[i] for i in [1, 2, 3]]),
>>> ]
>>> out = adjust_sequences(x)
>>> out[-1]
>>> # array([[1.],
>>> # [2.],
>>> # [3.],
>>> # [3.],
>>> # [3.],
>>> # [3.],
>>> # [3.],
>>> # [3.]])
>>> out = adjust_sequences(x, np.min)
>>> out
>>> # array([[[1],
>>> # [2],
>>> # [3]],
>>> #
>>> # [[1],
>>> # [2],
>>> # [3]],
>>> #
>>> # [[1],
>>> # [2],
>>> # [3]]])
Returns:
Returns a 3D array of ``[Sample, Seq_len, Features]``.
"""
features = sequences[0].shape[1]
lengths = []
for item in sequences:
if isinstance(item, ndarray):
lengths.append(item.shape[0])
if max_len is None:
maximum_len = np_max(lengths)
elif callable(max_len):
maximum_len = int(max_len(lengths))
else:
maximum_len = max_len
new_seqs = []
for seq in sequences:
new_seq = zeros((maximum_len, features), dtype=dtype)
new_seq[:, :] = nan
new_seq = DataFrame(new_seq)
if seq.dtype != dtype:
seq = seq.astype(dtype)
if maximum_len > seq.shape[0]:
new_seq[: seq.shape[0]] = seq
if fill == "ffill":
new_seq = new_seq.ffill()
elif isinstance(fill, int) or isinstance(fill, float):
new_seq = new_seq.fillna(fill)
else:
raise TypeError
else:
new_seq[:maximum_len] = seq[:maximum_len]
new_seqs.append(new_seq.values)
return dstack(new_seqs).transpose((2, 0, 1))