Source code for enchanter.tasks.ts_unsupervised

from typing import List, Dict, Optional, Union, Tuple

import numpy as np
import torch
from torch.cuda import amp
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split

from enchanter.engine import modules
from enchanter.engine import BaseRunner
from enchanter.engine.typehint import ScikitModel
from enchanter.addons.criterions.ts_triplet_loss import (
    generate_anchor_positive_input,
    generate_negative_input,
    generate_sample_indices,
    positive_criterion_for_triplet_loss,
    negative_criterion_for_triplet_loss,
    calculate_triplet_loss,
)
from enchanter.callbacks import Callback
from enchanter.utils.datasets import TimeSeriesUnlabeledDataset


[docs]class TimeSeriesUnsupervisedRunner(BaseRunner): """ Runner for unsupervised time series representation learning. Unsupervised representation learning for time series uses the the Unsupervised Triplet Loss proposed in NeurIPS 2019. Paper: `Unsupervised Scalable Representation Learning for Multivariate Time Series \ <https://papers.nips.cc/paper/8713-unsupervised-scalable-representation-learning-for-multivariate-time-series>`_ Examples: >>> experiment = ... >>> model: torch.nn.Module = ... >>> optimizer: torch.optim.Optimizer = ... >>> runner = TimeSeriesUnsupervisedRunner(model, optimizer, experiment) >>> runner.add_loader("train", ...) >>> runner.train_config(...) >>> runner.run() """ def __init__( self, model: torch.nn.Module, optimizer: torch.optim.Optimizer, experiment, n_negative_samples: int = 1, negative_penalty: int = 1, compared_len: Optional[int] = None, evaluator: ScikitModel = SVC(), save_memory: bool = False, scheduler: List = None, callbacks: Optional[List[Callback]] = None, ): """ Initializer Args: model: PyTorch model which outputting a fixed-length vector regardless of the length of the input series. optimizer: PyTorch Optimizer experiment: ``comet_ml.BaseExperiment`` or ``enchanter.callbacks.BaseLogger`` n_negative_samples: Parameter K in the paper. The number of negative samples to be sampled during training. negative_penalty: Coefficients that control how much negative values are valued. compared_len: Maximum length of randomly chosen time series. (default None). save_memory: If True, enables to save GPU memory. scheduler: lr scheduler. use ``torch.optim.lr_scheduler`` callbacks: List of callback. """ super(TimeSeriesUnsupervisedRunner, self).__init__() self.model = model self.optimizer = optimizer self.experiment = experiment if compared_len is None: self.compared_len = np.inf else: self.compared_len = compared_len self.n_negative_samples = n_negative_samples self.negative_penalty = negative_penalty self.save_memory = save_memory self.train_ds = None if scheduler is None: self.scheduler = list() else: self.scheduler = scheduler self.callbacks = callbacks self.evaluator = evaluator self.evaluator_params = None
[docs] def initialize(self) -> None: super(TimeSeriesUnsupervisedRunner, self).initialize() if isinstance(self.loaders["train"].dataset, TimeSeriesUnlabeledDataset): self.train_ds = self.loaders["train"].dataset # type: ignore else: raise ValueError( "You should use the `enchanter.utils.datasets.TimeSeriesUnlabeledDataset` or" " `TimeSeriesLabeledDataset` for the dataset class." )
[docs] def calculate_negative_loss_per_negative_sample( self, begin_neg: torch.Tensor, len_pos_neg: int, batch_size: int, negative_sample_step: int, anchor_representation: torch.Tensor, data: torch.Tensor, ) -> torch.Tensor: """ calculate negative loss per negative sample Args: begin_neg: len_pos_neg: batch_size: negative_sample_step: anchor_representation: data: Returns: negative loss (torch.Tensor) """ representation_size: int = anchor_representation.shape[2] negative_data: torch.Tensor = generate_negative_input( begin_neg, len_pos_neg, batch_size, negative_sample_step, self.train_ds.data, data # type: ignore ).to( self.device ) # [batch_size, features, seq_len] negative_representation: torch.Tensor = self.model(negative_data).view(batch_size, representation_size, 1) negative_loss: torch.Tensor = negative_criterion_for_triplet_loss( anchor_representation, negative_representation ) return negative_loss
[docs] def calculate_negative_loss(self, positive_loss, anchor_representation): """ calculate negative loss using all negative samples. Args: positive_loss: anchor_representation: Returns: loss (torch.Tensor) """ train_size: int = len(self.train_ds) multiplicative_ration: float = self.negative_penalty / self.n_negative_samples batch_size: int = anchor_representation.shape[0] length: int = min(self.compared_len, self.train_ds.data.shape[2]) samples: torch.Tensor = torch.tensor( np.random.choice(train_size, size=(self.n_negative_samples, batch_size)), dtype=torch.long ) _, _, _, len_pos_neg, begin_neg_samples = generate_sample_indices(self.n_negative_samples, batch_size, length) for i in range(self.n_negative_samples): negative_loss = self.calculate_negative_loss_per_negative_sample( begin_neg_samples, len_pos_neg, batch_size, i, anchor_representation, samples ) positive_loss = calculate_triplet_loss(positive_loss, negative_loss, multiplicative_ration) if self.save_memory and i != self.n_negative_samples - 1: positive_loss.backward(retain_graph=True) positive_loss = torch.tensor(0.0, device=self.device) torch.cuda.empty_cache() return positive_loss
[docs] def train_step(self, batch) -> Dict[str, torch.Tensor]: if len(batch) == 2: x_train, _ = batch else: x_train = batch[0] batch_size: int = x_train.shape[0] length: int = min(self.compared_len, self.train_ds.data.shape[2]) # type: ignore anchor_data, positive_data = generate_anchor_positive_input( self.n_negative_samples, batch_size, length, x_train ) with amp.autocast(enabled=isinstance(self.scaler, amp.GradScaler)): anchor_representation: torch.Tensor = self.model(anchor_data) positive_representation: torch.Tensor = self.model(positive_data) representation_size: int = anchor_representation.shape[1] anchor_representation = anchor_representation.view(batch_size, 1, representation_size) positive_representation = positive_representation.view(batch_size, representation_size, 1) loss = positive_criterion_for_triplet_loss(anchor_representation, positive_representation) if self.save_memory: loss.backward(retain_graph=True) loss = torch.tensor(0.0, device=self.device) del positive_representation torch.cuda.empty_cache() loss = self.calculate_negative_loss(loss, anchor_representation) return {"loss": loss}
[docs] def train_end(self, outputs: List) -> Dict[str, torch.Tensor]: avg_loss = torch.stack([x["loss"] for x in outputs]).mean() return {"avg_loss": avg_loss}
[docs] def val_step(self, batch: Tuple) -> Dict[str, torch.Tensor]: return self.train_step(batch)
[docs] def val_end(self, outputs: List) -> Dict[str, torch.Tensor]: return self.train_end(outputs)
[docs] def test_step(self, batch: Tuple) -> Dict[str, torch.Tensor]: x, y = batch with amp.autocast(enabled=isinstance(self.scaler, amp.GradScaler)): encoded = self.model(x) return {"encoded": encoded, "targets": y}
def _generate_train_features(self) -> Tuple[torch.Tensor, torch.Tensor]: features = [] targets = [] loader = self.loaders["train"] loader = modules.tfds_to_numpy(loader) if modules.is_tfds(loader) else loader self.model.eval() with torch.no_grad(), amp.autocast(enabled=isinstance(self.scaler, amp.GradScaler)): for batch in loader: x, y = batch features.append(self.model(x.to(self.device))) targets.append(y) return torch.cat(features).cpu().numpy(), torch.cat(targets).cpu().numpy()
[docs] def test_end(self, outputs: List) -> Dict[str, torch.Tensor]: x_train, y_train = self._generate_train_features() if "grid_search" in self.manager.params.keys(): self.evaluator.set_params(**self.manager.params["grid_search"]) if len(y_train) <= 10000: self.evaluator.fit(x_train, y_train) # type: ignore else: split = train_test_split(x_train, y_train, train_size=10000, random_state=0, stratify=y_train) self.evaluator.fit(split[0], split[2]) # type: ignore # self.experiment.log_parameters(search.best_params_, prefix="grid_search_best_params") else: self.evaluator.fit(x_train, y_train) # type: ignore x_test = torch.cat([output["encoded"] for output in outputs]).cpu().numpy() y_test = torch.cat([output["targets"] for output in outputs]).cpu().numpy() return {"evaluator_score": torch.tensor(self.evaluator.score(x_test, y_test))}
[docs] def encode(self, data: Union[np.ndarray, torch.Tensor]) -> Union[np.ndarray, torch.Tensor]: """ Output encoded data. The output data has the same data type as the input. Args: data: data Returns: encoded data """ if isinstance(data, np.ndarray): data = torch.tensor(data, device=self.device) ndarray = True elif isinstance(data, torch.Tensor): data = data.to(self.device) ndarray = False else: raise ValueError("Unexpected data type.") self.model.eval() with torch.no_grad(), amp.autocast(enabled=isinstance(self.scaler, amp.GradScaler)): out: torch.Tensor = self.model(data) if ndarray: out = out.cpu().numpy() # type: ignore return out
[docs] def predict(self, x: Union[torch.Tensor, np.ndarray]) -> np.ndarray: """ See Also ``self.encode`` """ out = self.encode(x) if isinstance(out, torch.Tensor): out = out.numpy() return out