Time-RCD / modeling_time_rcd.py
oliverlevn's picture
update the transformer lib cannot detect
a4b2002 verified
"""
Time-RCD Model for HuggingFace Integration
This file contains a simplified Time_RCD model that:
1. Inherits directly from PreTrainedModel (no extra layers)
2. Matches your original Time_RCD implementation exactly
3. Can load from your local checkpoint
4. Provides HuggingFace compatibility
The structure is:
Time_RCD -> PreTrainedModel (single inheritance, clean & simple)
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import os
import math
from typing import Optional, Tuple, Union, Dict, Any
from dataclasses import dataclass
# Try to import einops, fall back to manual implementation if not available
try:
from einops import rearrange
HAS_EINOPS = True
except ImportError:
HAS_EINOPS = False
def rearrange(tensor, pattern):
# Simple fallback for the specific pattern we use
if pattern == "two num_heads -> two num_heads 1 1":
return tensor.unsqueeze(-1).unsqueeze(-1)
else:
raise NotImplementedError(f"Pattern {pattern} not implemented in fallback")
from transformers import PreTrainedModel
from transformers.modeling_outputs import ModelOutput
from transformers.utils import logging
from .configuration_time_rcd import TimeRCDConfig
logger = logging.get_logger(__name__)
@dataclass
class TimeRCDOutput(ModelOutput):
"""
Output for Time_RCD model.
Args:
anomaly_scores (`torch.FloatTensor` of shape `(batch_size, sequence_length)`):
Anomaly scores for each time step.
anomaly_logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, 2)`):
Raw logits for anomaly classification.
reconstruction (`torch.FloatTensor` of shape `(batch_size, sequence_length, num_features)`):
Reconstructed time series.
embeddings (`torch.FloatTensor` of shape `(batch_size, sequence_length, num_features, d_proj)`):
Time series embeddings from the encoder.
"""
anomaly_scores: Optional[torch.FloatTensor] = None
anomaly_logits: Optional[torch.FloatTensor] = None
reconstruction: Optional[torch.FloatTensor] = None
embeddings: Optional[torch.FloatTensor] = None
class Time_RCD(PreTrainedModel):
"""
Time-RCD Model for Time Series Anomaly Detection
This is the main model class that directly inherits from PreTrainedModel.
It matches your original Time_RCD implementation structure exactly:
- TimeSeriesEncoder for encoding
- reconstruction_head for reconstruction
- anomaly_head for anomaly detection
No extra inheritance layers - clean and simple!
"""
config_class = TimeRCDConfig
base_model_prefix = "time_rcd"
supports_gradient_checkpointing = True
def __init__(self, config: TimeRCDConfig):
super().__init__(config)
self.config = config
# Time series encoder (matches your original implementation)
self.ts_encoder = TimeSeriesEncoder(
d_model=config.d_model,
d_proj=config.d_proj,
patch_size=config.patch_size,
num_layers=config.num_layers,
num_heads=config.num_heads,
d_ff_dropout=config.d_ff_dropout,
use_rope=config.use_rope,
num_features=config.num_features,
activation=config.activation
)
# Reconstruction head (exactly like your original)
self.reconstruction_head = nn.Sequential(
nn.Linear(config.d_proj, config.d_proj * 4),
nn.GELU(),
nn.Dropout(config.dropout),
nn.Linear(config.d_proj * 4, config.d_proj * 4),
nn.GELU(),
nn.Dropout(config.dropout),
nn.Linear(config.d_proj * 4, 1) # Output: (B, seq_len, num_features, 1)
)
# Anomaly detection head (exactly like your original)
self.anomaly_head = nn.Sequential(
nn.Linear(config.d_proj, config.d_proj // 2),
nn.GELU(),
nn.Dropout(config.dropout),
nn.Linear(config.d_proj // 2, 2) # Binary classification: (B, seq_len, num_features, 2)
)
# Initialize weights
self.post_init()
def _init_weights(self, module):
"""Initialize the weights (standard HuggingFace pattern)"""
if isinstance(module, nn.Linear):
module.weight.data.normal_(mean=0.0, std=self.config.initializer_range if hasattr(self.config, 'initializer_range') else 0.02)
if module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.LayerNorm):
module.bias.data.zero_()
module.weight.data.fill_(1.0)
def forward(
self,
time_series: torch.Tensor,
attention_mask: Optional[torch.Tensor] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple, TimeRCDOutput]:
"""
Forward pass through Time_RCD model
Args:
time_series (`torch.FloatTensor` of shape `(batch_size, sequence_length, num_features)`):
Input time series data.
attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*):
Mask to avoid performing attention on padding token indices.
return_dict (`bool`, *optional*):
Whether to return a ModelOutput instead of a plain tuple.
"""
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
batch_size, seq_len, num_features = time_series.shape
# Normalize time series (exactly like your original)
time_series = (time_series - time_series.mean(dim=1, keepdim=True)) / (time_series.std(dim=1, keepdim=True) + 1e-8)
# Get embeddings from encoder
embeddings = self.ts_encoder(time_series, attention_mask) # (B, seq_len, num_features, d_proj)
# Get reconstruction
reconstruction = self.reconstruction_head(embeddings) # (B, seq_len, num_features, 1)
reconstruction = reconstruction.squeeze(-1) # (B, seq_len, num_features)
# Get anomaly predictions
anomaly_logits = self.anomaly_head(embeddings) # (B, seq_len, num_features, 2)
anomaly_logits = torch.mean(anomaly_logits, dim=-2) # Average over features: (B, seq_len, 2)
anomaly_scores = F.softmax(anomaly_logits, dim=-1)[..., 1] # Probability of anomaly: (B, seq_len)
if not return_dict:
return (anomaly_scores, anomaly_logits, reconstruction, embeddings)
return TimeRCDOutput(
anomaly_scores=anomaly_scores,
anomaly_logits=anomaly_logits,
reconstruction=reconstruction,
embeddings=embeddings
)
def zero_shot(self, data: np.ndarray, batch_size: int = 64, win_size: int = 5000) -> tuple:
"""
Zero-shot inference method matching AnomalyCLIP structure.
The model handles normalization internally, so no external processor needed!
This method only handles windowing for long sequences.
Args:
data: Input time series data of shape (n_samples, n_features) or (n_samples,)
batch_size: Batch size for processing
win_size: Window size for processing (only used if data > win_size)
Returns:
tuple: (scores, logits) where:
- scores: list of anomaly score arrays per batch
- logits: list of anomaly logit arrays per batch
"""
import tqdm
from torch.utils.data import DataLoader, TensorDataset
self.eval()
device = next(self.parameters()).device
# Ensure numpy and 2D shape
data = np.asarray(data)
if data.ndim == 1:
data = data.reshape(-1, 1)
# Adjust window size if data is too short
if len(data) <= win_size:
win_size = len(data)
# Create windows if data is longer than win_size
windows = []
masks = []
if len(data) > win_size:
# Create non-overlapping windows
for i in range(0, len(data), win_size):
window = data[i:i + win_size]
if len(window) < win_size:
# Pad last window if needed
padded = np.zeros((win_size, data.shape[1]))
padded[:len(window)] = window
window = padded
mask = np.zeros(win_size, dtype=bool)
mask[:len(window)] = True
else:
mask = np.ones(win_size, dtype=bool)
windows.append(window)
masks.append(mask)
else:
# Single window
windows.append(data)
masks.append(np.ones(len(data), dtype=bool))
# Convert to tensors
time_series_windows = torch.tensor(np.array(windows), dtype=torch.float32)
attention_masks = torch.tensor(np.array(masks), dtype=torch.bool)
# Create dataloader
dataset = TensorDataset(time_series_windows, attention_masks)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)
loop = tqdm.tqdm(enumerate(dataloader), total=len(dataloader), leave=True)
scores = []
logits = []
with torch.no_grad():
for i, (batch_ts, batch_mask) in loop:
batch_ts = batch_ts.to(device)
batch_mask = batch_mask.to(device)
# Forward pass (model normalizes internally!)
outputs = self(
time_series=batch_ts,
attention_mask=batch_mask,
return_dict=True
)
# Extract scores and logits
anomaly_probs = outputs.anomaly_scores.cpu().numpy() # (B, seq_len)
anomaly_logits = outputs.anomaly_logits # (B, seq_len, 2)
logit_diff = anomaly_logits[..., 1] - anomaly_logits[..., 0] # (B, seq_len)
scores.append(anomaly_probs)
logits.append(logit_diff.cpu().numpy())
return scores, logits
@classmethod
def from_original_checkpoint(cls, checkpoint_path: str, config: Optional[TimeRCDConfig] = None):
"""
Load model from your original checkpoint format
Args:
checkpoint_path: Path to your .pth checkpoint file
config: Model configuration (optional - will auto-detect from checkpoint if not provided)
Returns:
Loaded Time_RCD model
"""
print(f"Loading Time_RCD from checkpoint: {checkpoint_path}")
# Load checkpoint
if not os.path.exists(checkpoint_path):
raise FileNotFoundError(f"Checkpoint not found: {checkpoint_path}")
checkpoint = torch.load(checkpoint_path, map_location='cpu')
print(f"Checkpoint keys: {list(checkpoint.keys())}")
# Auto-detect config from checkpoint if not provided
if config is None:
print("📋 Auto-detecting config from checkpoint...")
if 'config' in checkpoint:
ckpt_config = checkpoint['config']
ts_config = ckpt_config.get('ts_config', {})
config = TimeRCDConfig(
d_model=ts_config.get('d_model', 512),
d_proj=ts_config.get('d_proj', 256),
patch_size=ts_config.get('patch_size', 4), # Important!
num_layers=ts_config.get('num_layers', 8),
num_heads=ts_config.get('num_heads', 8),
d_ff_dropout=ts_config.get('d_ff_dropout', 0.1),
use_rope=ts_config.get('use_rope', True),
activation=ts_config.get('activation', 'gelu'),
num_features=ts_config.get('num_features', 1),
max_seq_len=ckpt_config.get('max_seq_len', 512),
win_size=ckpt_config.get('win_size', 5000),
batch_size=ckpt_config.get('batch_size', 64),
dropout=0.1
)
print(f"✅ Auto-detected config: patch_size={config.patch_size}, d_model={config.d_model}, d_proj={config.d_proj}")
else:
print("⚠️ No config found in checkpoint, using defaults")
config = TimeRCDConfig()
# Create model
model = cls(config)
checkpoint = torch.load(checkpoint_path, map_location='cpu')
print(f"Checkpoint keys: {list(checkpoint.keys())}")
# Handle different checkpoint formats
if 'model_state_dict' in checkpoint:
state_dict = checkpoint['model_state_dict']
elif 'state_dict' in checkpoint:
state_dict = checkpoint['state_dict']
else:
state_dict = checkpoint
# Remove 'module.' prefix if present (from DDP training)
new_state_dict = {}
for key, value in state_dict.items():
if key.startswith('module.'):
new_key = key[7:] # Remove 'module.' prefix
else:
new_key = key
new_state_dict[new_key] = value
# Load state dict with flexible matching
try:
model.load_state_dict(new_state_dict, strict=False)
print("✅ Successfully loaded checkpoint with flexible matching")
except Exception as e:
print(f"⚠️ Error loading state dict: {e}")
print("Available checkpoint keys:", list(new_state_dict.keys())[:10])
print("Model keys:", list(model.state_dict().keys())[:10])
return model
def save_pretrained(self, save_directory: str, **kwargs):
"""
Save the model in HuggingFace format
This allows you to use .from_pretrained() later
"""
super().save_pretrained(save_directory, **kwargs)
print(f"✅ Model saved to {save_directory}")
print("You can now load it with:")
print(f"model = Time_RCD.from_pretrained('{save_directory}')")
class TimeSeriesEncoder(nn.Module):
"""
Time Series Encoder with PatchTST-like patching, RoPE.
Args:
d_model (int): Model dimension
d_proj (int): Projection dimension
patch_size (int): Size of each patch
num_layers (int): Number of encoder layers
num_heads (int): Number of attention heads
d_ff_dropout (float): Dropout rate
max_total_tokens (int): Maximum sequence length
use_rope (bool): Use RoPE if True
num_features (int): Number of features in the time series
activation (str): "relu" or "gelu"
Inputs:
time_series (Tensor): Shape (batch_size, seq_len, num_features)
mask (Tensor): Shape (batch_size, seq_len)
Outputs:
local_embeddings (Tensor): Shape (batch_size, seq_len, num_features, d_proj)
"""
def __init__(self, d_model=2048, d_proj=512, patch_size=32, num_layers=6, num_heads=8,
d_ff_dropout=0.1, max_total_tokens=8192, use_rope=True, num_features=1,
activation="relu"):
super().__init__()
self.patch_size = patch_size
self.d_model = d_model
self.d_proj = d_proj
self.num_layers = num_layers
self.num_heads = num_heads
self.d_ff_dropout = d_ff_dropout
self.max_total_tokens = max_total_tokens
self.use_rope = use_rope
self.num_features = num_features
self.activation = activation
# Patch embedding layer
self.embedding_layer = nn.Linear(patch_size, d_model)
if use_rope:
# Initialize RoPE and custom encoder
self.rope_embedder = RotaryEmbedding(d_model)
self.transformer_encoder = CustomTransformerEncoder(
d_model=d_model,
nhead=num_heads,
dim_feedforward=d_model * 4,
dropout=d_ff_dropout,
activation=activation,
num_layers=num_layers,
num_features=num_features
)
else:
# Standard encoder without RoPE
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model,
nhead=num_heads,
dim_feedforward=d_model * 4,
dropout=d_ff_dropout,
batch_first=True,
activation=activation
)
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)
# Output projection layers
self.projection_layer = nn.Linear(d_model, patch_size * d_proj)
self._init_parameters()
def _init_parameters(self):
for name, param in self.named_parameters():
if 'weight' in name and 'linear' in name:
if self.activation == "relu":
nn.init.kaiming_uniform_(param, nonlinearity='relu')
elif self.activation == "gelu":
nn.init.kaiming_uniform_(param, nonlinearity='gelu')
elif 'bias' in name:
nn.init.constant_(param, 0.0)
def forward(self, time_series, mask=None):
"""Forward pass to generate local embeddings."""
if time_series.dim() == 2:
time_series = time_series.unsqueeze(-1)
device = time_series.device
B, seq_len, num_features = time_series.size()
assert num_features == self.num_features, f"Number of features mismatch with data: {num_features} vs param: {self.num_features}"
# Create mask if not provided
if mask is None:
mask = torch.ones(B, seq_len, dtype=torch.bool, device=device)
assert mask.size() == (B, seq_len), f"Mask shape mismatch: expected ({B}, {seq_len}), got {mask.size()}"
# Pad sequence to be divisible by patch_size
padded_length = math.ceil(seq_len / self.patch_size) * self.patch_size
if padded_length > seq_len:
pad_amount = padded_length - seq_len
time_series = F.pad(time_series, (0, 0, 0, pad_amount), value=0)
mask = F.pad(mask, (0, pad_amount), value=0)
# Convert to patches
num_patches = padded_length // self.patch_size
total_length = num_patches * num_features
patches = time_series.view(B, num_patches, self.patch_size, num_features)
patches = patches.permute(0, 3, 1, 2).contiguous() # (B, num_features, num_patches, patch_size)
patches = patches.view(B, num_features * num_patches, self.patch_size) # (B, L, patch_size)
# Create feature IDs for patches
feature_id = torch.arange(num_features, device=device).repeat_interleave(
num_patches) # (num_features * num_patches = L,)
feature_id = feature_id.unsqueeze(0).expand(B, -1) # (B, L)
# Embed patches
embedded_patches = self.embedding_layer(patches) # (B, L, d_model)
# Create patch-level mask
mask = mask.view(B, num_patches, self.patch_size)
patch_mask = mask.sum(dim=-1) > 0 # (B, num_patches)
full_mask = patch_mask.unsqueeze(1).expand(-1, num_features, -1) # (B, num_features, num_patches)
full_mask = full_mask.reshape(B, num_features * num_patches) # (B, L)
# Generate RoPE frequencies if applicable
if self.use_rope:
freqs = self.rope_embedder(total_length).to(device)
else:
freqs = None
# Encode sequence
if num_features > 1:
output = self.transformer_encoder(
embedded_patches,
freqs=freqs,
src_id=feature_id,
attn_mask=full_mask
)
else:
output = self.transformer_encoder(
embedded_patches,
freqs=freqs,
attn_mask=full_mask
)
# Extract and project local embeddings
patch_embeddings = output # (B, L, d_model)
patch_proj = self.projection_layer(patch_embeddings) # (B, L, patch_size * d_proj)
local_embeddings = patch_proj.view(B, num_features, num_patches, self.patch_size, self.d_proj)
local_embeddings = local_embeddings.permute(0, 2, 3, 1, 4) # (B, num_patches, patch_size, num_features, d_proj)
local_embeddings = local_embeddings.view(B, -1, num_features, self.d_proj)[:, :seq_len, :,
:] # (B, seq_len, num_features, d_proj)
return local_embeddings
class CustomTransformerEncoder(nn.Module):
"""Stack of Transformer Encoder Layers."""
def __init__(self, d_model, nhead, dim_feedforward, dropout, activation, num_layers, num_features):
super().__init__()
self.layers = nn.ModuleList([
TransformerEncoderLayerWithRoPE(
d_model=d_model,
nhead=nhead,
dim_feedforward=dim_feedforward,
dropout=dropout,
activation=activation,
num_features=num_features
) for _ in range(num_layers)
])
def forward(self, src, freqs, src_id=None, attn_mask=None):
output = src
for layer in self.layers:
output = layer(output, freqs, src_id, attn_mask=attn_mask)
return output
class TransformerEncoderLayerWithRoPE(nn.Module):
"""Transformer Encoder Layer with RoPE and RMSNorm."""
def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1, activation="relu", num_features=1):
super().__init__()
self.self_attn = MultiheadAttentionWithRoPE(d_model, nhead, num_features)
self.dropout = nn.Dropout(dropout)
self.input_norm = RMSNorm(d_model)
self.output_norm = RMSNorm(d_model)
self.mlp = LlamaMLP(d_model, dim_feedforward)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
self.activation = F.relu if activation == "relu" else F.gelu
def forward(self, src, freqs, src_id=None, attn_mask=None):
residual = src
src = self.input_norm(src)
src = self.self_attn(src, src, src, freqs, src_id, src_id, attn_mask=attn_mask)
src = src + residual
residual = src
src = self.output_norm(src)
src = self.mlp(src)
src = residual + self.dropout2(src)
return src
class RMSNorm(nn.Module):
"""Root Mean Square Normalization layer."""
def __init__(self, size: int, dim: int = -1, eps: float = 1e-5) -> None:
super().__init__()
self.scale = nn.Parameter(torch.ones(size))
self.eps = eps
self.dim = dim
def forward(self, x: torch.Tensor) -> torch.Tensor:
norm_x = x.to(torch.float32).pow(2).mean(dim=self.dim, keepdim=True)
x_normed = x * torch.rsqrt(norm_x + self.eps)
return (self.scale * x_normed).type_as(x)
class RotaryEmbedding(nn.Module):
"""Rotary Positional Embedding for injecting positional information."""
def __init__(self, dim):
super().__init__()
inv_freq = 1.0 / (10000 ** (torch.arange(0, dim, 2).float() / dim))
self.register_buffer("inv_freq", inv_freq)
def forward(self, seq_len):
t = torch.arange(seq_len, device=self.inv_freq.device).type_as(self.inv_freq)
freqs = torch.einsum("i,j->ij", t, self.inv_freq)
return freqs # Shape: (seq_len, dim // 2)
class BinaryAttentionBias(nn.Module):
"""Binary Variate Attention for time series data."""
def __init__(self,
num_heads: int):
super().__init__()
self.num_heads = num_heads
self.emd = nn.Embedding(2, num_heads)
def forward(self,
query_id: torch.Tensor,
kv_id: torch.Tensor,
) -> torch.Tensor:
ind = torch.eq(query_id.unsqueeze(-1), kv_id.unsqueeze(-2))
ind = ind.unsqueeze(1) # (batch_size, 1, q_len, kv_len)
weight = rearrange(self.emd.weight, "two num_heads -> two num_heads 1 1") # (2, num_heads, 1, 1)
bias = ~ind * weight[:1] + ind * weight[1:] # (batch_size, num_heads, q_len, kv_len)
return bias
class MultiheadAttentionWithRoPE(nn.Module):
"""Multi-head Attention with Rotary Positional Encoding (RoPE), non-causal by default."""
"========== NOtice that this applies BinaryAttentionBias ==========="
def __init__(self, embed_dim, num_heads, num_features):
super().__init__()
self.embed_dim = embed_dim
self.num_heads = num_heads
self.head_dim = embed_dim // num_heads
self.num_features = num_features
assert self.head_dim * num_heads == embed_dim, "embed_dim must be divisible by num_heads"
# Linear projections for Q, K, V, and output
self.q_proj = nn.Linear(embed_dim, embed_dim, bias=False)
self.k_proj = nn.Linear(embed_dim, embed_dim, bias=False)
self.v_proj = nn.Linear(embed_dim, embed_dim, bias=False)
self.out_proj = nn.Linear(embed_dim, embed_dim, bias=False)
# Binary attention bias for time series
if num_features > 1:
self.binary_attention_bias = BinaryAttentionBias(num_heads)
def apply_rope(self, x, freqs):
"""Apply Rotary Positional Encoding to the input tensor."""
B, seq_len, embed_dim = x.shape
assert embed_dim == self.embed_dim, "Embedding dimension mismatch"
assert freqs.shape == (seq_len, embed_dim // 2), "freqs shape mismatch"
# Reshape for rotation: split embed_dim into pairs
x_ = x.view(B, seq_len, embed_dim // 2, 2)
cos = freqs.cos().unsqueeze(0) # (1, seq_len, embed_dim // 2, 1)
sin = freqs.sin().unsqueeze(0) # (1, seq_len, embed_dim // 2, 1)
# Apply rotation to each pair
x_rot = torch.stack(
[
x_[..., 0] * cos - x_[..., 1] * sin,
x_[..., 0] * sin + x_[..., 1] * cos,
],
dim=-1
)
return x_rot.view(B, seq_len, embed_dim)
def forward(self, query, key, value, freqs, query_id=None, kv_id=None, attn_mask=None):
"""
Forward pass for multi-head attention with RoPE.
Args:
query (Tensor): Shape (B, T, C)
key (Tensor): Shape (B, T, C)
value (Tensor): Shape (B, T, C)
freqs (Tensor): RoPE frequencies, shape (T, embed_dim // 2)
query_id (Tensor, optional): Shape (B, q_len), feature IDs for query
kv_id (Tensor, optional): Shape (B, kv_len), feature IDs for key/value
attn_mask (Tensor, optional): Shape (B, T), True for valid positions, False for padding.
Returns:
Tensor: Attention output, shape (B, T, C)
"""
B, T, C = query.shape
assert key.shape == (B, T, C) and value.shape == (B, T, C), "query, key, value shapes must match"
# Project inputs to Q, K, V
Q = self.q_proj(query)
K = self.k_proj(key)
V = self.v_proj(value)
# Apply RoPE to Q and K
Q_rot = self.apply_rope(Q, freqs)
K_rot = self.apply_rope(K, freqs)
# Reshape for multi-head attention
Q_rot = Q_rot.view(B, T, self.num_heads, self.head_dim).transpose(1, 2) # (B, nh, T, hs)
K_rot = K_rot.view(B, T, self.num_heads, self.head_dim).transpose(1, 2) # (B, nh, T, hs)
V = V.view(B, T, self.num_heads, self.head_dim).transpose(1, 2) # (B, nh, T, hs)
# Prepare attention mask for padding
if attn_mask is not None:
attn_mask = attn_mask.unsqueeze(1).unsqueeze(2) # (B, 1, 1, T)
else:
attn_mask = None
if query_id is not None and kv_id is not None:
# Add binary attention bias
attn_bias = self.binary_attention_bias(query_id, kv_id) # (B, num_heads, q_len, kv_len)
scores = torch.matmul(Q_rot, K_rot.transpose(-2, -1)) / math.sqrt(
self.head_dim) # (B, num_heads, q_len, kv_len)
scores += attn_bias
if attn_mask is not None:
scores = scores.masked_fill(~attn_mask, float('-inf'))
attn_weights = F.softmax(scores, dim=-1) # (B, num_heads, q_len, kv_len)
y = torch.matmul(attn_weights, V) # (B, num_heads, q_len, hs)
else:
# Compute scaled dot-product attention (non-causal) without binary bias
# for param in self.binary_attention_bias.parameters():
# param.requires_grad = False
y = F.scaled_dot_product_attention(
Q_rot, K_rot, V,
attn_mask=attn_mask,
is_causal=False # Non-causal attention for encoder
) # (B, nh, T, hs)
# Reshape and project output
y = y.transpose(1, 2).contiguous().view(B, T, C)
y = self.out_proj(y)
return y
class LlamaMLP(nn.Module):
def __init__(self, d_model, dim_feedforward=2048):
super().__init__()
self.hidden_size = d_model
self.intermediate_size = dim_feedforward
self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=True)
self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=True)
self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=True)
self.act_fn = F.gelu
def forward(self, x):
down_proj = self.down_proj(self.act_fn(self.gate_proj(x)) * self.up_proj(x))
return down_proj
# For backward compatibility, create aliases
TimeRCDModel = Time_RCD # Alias for consistency
AnomalyCLIPModel = Time_RCD # For existing code that uses this name
# Register model with AutoModel when using trust_remote_code
try:
from transformers import AutoModel
AutoModel.register(TimeRCDConfig, Time_RCD)
except Exception:
pass # Silently fail if already registered or in restricted environment