import torch import torch.utils.data import numpy as np epsilon = 1e-8 class TimeRCDDataset(torch.utils.data.Dataset): def __init__(self, data, window_size, stride=1, normalize=False, pad_to_multiple=True): super().__init__() self.window_size = window_size self.stride = stride # Ensure numpy array and a consistent 2D shape (N, C) data = np.asarray(data) if data.ndim == 1: data = data.reshape(-1, 1) self.original_length = data.shape[0] self.pad_to_multiple = pad_to_multiple # Normalize data if other than UCR self.data = self._normalize_data(data) if normalize else data # self.data = data # self.univariate = self.data.shape[0] == 1 # Handle padding if requested if self.pad_to_multiple: self.data, self.padding_mask = self._pad_data_to_multiple() else: self.padding_mask = np.ones(self.data.shape[0], dtype=bool) # All data is real def _normalize_data(self, data, epsilon=1e-8): """ Normalize data using mean and standard deviation. """ mean, std = np.mean(data, axis=0), np.std(data, axis=0) std = np.where(std == 0, epsilon, std) return ((data - mean) / std) def _pad_data_to_multiple(self): """ Pad data to make its length a multiple of window_size and return padding mask. """ data_length = self.data.shape[0] remainder = data_length % self.window_size if remainder == 0: # No padding needed - all data is real padding_mask = np.ones(data_length, dtype=bool) return self.data, padding_mask # Calculate padding needed padding_length = self.window_size - remainder print(f"Padding AnomalyClipDataset: original length {data_length}, window_size {self.window_size}, adding {padding_length} samples") # Pad by repeating the last row, keeping 2D shape (1, C) last_row = self.data[-1:, :] padding_data = np.repeat(last_row, padding_length, axis=0) padded_data = np.vstack([self.data, padding_data]) # Create padding mask: True for real data, False for padded data padding_mask = np.ones(data_length + padding_length, dtype=bool) padding_mask[data_length:] = False # Mark padded samples as False return padded_data, padding_mask def __getitem__(self, index): start = index * self.stride end = start + self.window_size if end > self.data.shape[0]: raise IndexError("Index out of bounds for the dataset.") # Always return (window_size, num_features) sample = torch.tensor(self.data[start:end, :], dtype=torch.float32) mask = torch.tensor(self.padding_mask[start:end], dtype=torch.bool) # if self.univariate: # sample = sample.unsqueeze(-1) # Add channel dimension for univariate data return sample, mask def __len__(self): return max(0, (self.data.shape[0] - self.window_size) // self.stride + 1) class ReconstructDataset(torch.utils.data.Dataset): def __init__(self, data, window_size, stride=1, normalize=True): super().__init__() self.window_size = window_size self.stride = stride self.data = self._normalize_data(data) if normalize else data data = np.asarray(data) if data.ndim == 1: data = data.reshape(-1, 1) self.univariate = data.shape[1] == 1 self.sample_num = max(0, (self.data.shape[0] - window_size) // stride + 1) self.samples, self.targets = self._generate_samples() def _normalize_data(self, data, epsilon=1e-8): mean, std = np.mean(data, axis=0), np.std(data, axis=0) std = np.where(std == 0, epsilon, std) # Avoid division by zero return (data - mean) / std def _generate_samples(self): data = torch.tensor(self.data, dtype=torch.float32) if self.univariate: data = data.squeeze() X = torch.stack([data[i * self.stride : i * self.stride + self.window_size] for i in range(self.sample_num)]) X = X.unsqueeze(-1) else: X = torch.stack([data[i * self.stride : i * self.stride + self.window_size, :] for i in range(self.sample_num)]) return X, X def __len__(self): return self.sample_num def __getitem__(self, index): return self.samples[index], self.targets[index] class ForecastDataset(torch.utils.data.Dataset): def __init__(self, data, window_size, pred_len, stride=1, normalize=True): super().__init__() self.window_size = window_size self.pred_len = pred_len self.stride = stride self.data = self._normalize_data(data) if normalize else data data = np.asarray(data) if data.ndim == 1: data = data.reshape(-1, 1) self.sample_num = max((self.data.shape[0] - window_size - pred_len) // stride + 1, 0) # Generate samples efficiently self.samples, self.targets = self._generate_samples() def _normalize_data(self, data, epsilon=1e-8): """ Normalize data using mean and standard deviation. """ mean, std = np.mean(data, axis=0), np.std(data, axis=0) std = np.where(std == 0, epsilon, std) # Avoid division by zero return (data - mean) / std def _generate_samples(self): """ Generate windowed samples efficiently using vectorized slicing. """ data = torch.tensor(self.data, dtype=torch.float32) indices = np.arange(0, self.sample_num * self.stride, self.stride) X = torch.stack([data[i : i + self.window_size] for i in indices]) Y = torch.stack([data[i + self.window_size : i + self.window_size + self.pred_len] for i in indices]) return X, Y # Inputs & targets def __len__(self): return self.sample_num def __getitem__(self, index): return self.samples[index], self.targets[index] # class ForecastDataset(torch.utils.data.Dataset): # def __init__(self, data, window_size, pred_len, normalize=True): # super().__init__() # self.normalize = normalize # if self.normalize: # data_mean = np.mean(data, axis=0) # data_std = np.std(data, axis=0) # data_std = np.where(data_std == 0, epsilon, data_std) # self.data = (data - data_mean) / data_std # else: # self.data = data # self.window_size = window_size # if data.shape[1] == 1: # data = data.squeeze() # self.len, = data.shape # self.sample_num = max(self.len - self.window_size - pred_len + 1, 0) # X = torch.zeros((self.sample_num, self.window_size)) # Y = torch.zeros((self.sample_num, pred_len)) # for i in range(self.sample_num): # X[i, :] = torch.from_numpy(data[i : i + self.window_size]) # Y[i, :] = torch.from_numpy(np.array( # data[i + self.window_size: i + self.window_size + pred_len] # )) # self.samples, self.targets = torch.unsqueeze(X, -1), torch.unsqueeze(Y, -1) # else: # self.len = self.data.shape[0] # self.sample_num = max(self.len - self.window_size - pred_len + 1, 0) # X = torch.zeros((self.sample_num, self.window_size, self.data.shape[1])) # Y = torch.zeros((self.sample_num, pred_len, self.data.shape[1])) # for i in range(self.sample_num): # X[i, :] = torch.from_numpy(data[i : i + self.window_size, :]) # Y[i, :] = torch.from_numpy(data[i + self.window_size: i + self.window_size + pred_len, :]) # self.samples, self.targets = X, Y # def __len__(self): # return self.sample_num # def __getitem__(self, index): # return self.samples[index, :, :], self.targets[index, :, :] class TSDataset(torch.utils.data.Dataset): def __init__(self, X, y=None, mean=None, std=None): super(TSDataset, self).__init__() self.X = X self.mean = mean self.std = std def __len__(self): return self.X.shape[0] def __getitem__(self, idx): if torch.is_tensor(idx): idx = idx.tolist() sample = self.X[idx, :] if self.mean is not None and self.std is not None: sample = (sample - self.mean) / self.std # assert_almost_equal (0, sample.mean(), decimal=1) return torch.from_numpy(sample), idx class ReconstructDataset_Moment(torch.utils.data.Dataset): def __init__(self, data, window_size, stride=1, normalize=True): super().__init__() self.window_size = window_size self.stride = stride self.data = self._normalize_data(data) if normalize else data self.univariate = self.data.shape[1] == 1 self.sample_num = max((self.data.shape[0] - window_size) // stride + 1, 0) self.samples = self._generate_samples() self.input_mask = np.ones(self.window_size, dtype=np.float32) # Fixed input mask def _normalize_data(self, data, epsilon=1e-8): mean, std = np.mean(data, axis=0), np.std(data, axis=0) std = np.where(std == 0, epsilon, std) # Avoid division by zero return (data - mean) / std def _generate_samples(self): data = torch.tensor(self.data, dtype=torch.float32) indices = np.arange(0, self.sample_num * self.stride, self.stride) if self.univariate: X = torch.stack([data[i : i + self.window_size] for i in indices]) else: X = torch.stack([data[i : i + self.window_size, :] for i in indices]) return X def __len__(self): return self.sample_num def __getitem__(self, index): return self.samples[index], self.input_mask class TACLipDataset(torch.utils.data.Dataset): def __init__(self, data, win_size, step=1, flag="test"): self.flag = flag self.step = step self.win_size = win_size self.test = data print("Before normalization", self.test[:20]) self.test = self._normalize_data(self.test) print("After normalization", self.test[:20]) self.test_labels = np.zeros(self.test.shape[0]) def _normalize_data(self, data, epsilon=1e-8): mean, std = np.mean(data, axis=0), np.std(data, axis=0) std = np.where(std == 0, epsilon, std) # Avoid division by zero return (data - mean) / std def __len__(self): """ Number of images in the object dataset. """ if self.flag == "train": return (self.train.shape[0] - self.win_size) // self.step + 1 elif (self.flag == 'val'): return (self.val.shape[0] - self.win_size) // self.step + 1 elif (self.flag == 'test'): return (self.test.shape[0] - self.win_size) // self.step + 1 else: return (self.test.shape[0] - self.win_size) // self.win_size + 1 def __getitem__(self, index): index = index * self.step if self.flag == "train": return np.float32(self.train[index:index + self.win_size]), np.float32(self.test_labels[0:self.win_size]) elif (self.flag == 'val'): return np.float32(self.val[index:index + self.win_size]), np.float32(self.test_labels[0:self.win_size]) elif (self.flag == 'test'): return np.float32(self.test[index:index + self.win_size]), np.float32( self.test_labels[index:index + self.win_size]) else: return np.float32(self.test[ index // self.step * self.win_size:index // self.step * self.win_size + self.win_size]), np.float32( self.test_labels[index // self.step * self.win_size:index // self.step * self.win_size + self.win_size])