| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from transformers import ( |
| AutoModel, AutoConfig, AutoTokenizer, |
| T5ForConditionalGeneration, T5Config, |
| AutoModelForSequenceClassification, |
| PreTrainedModel, PretrainedConfig |
| ) |
| from transformers.modeling_utils import ( |
| load_state_dict, |
| WEIGHTS_NAME, |
| SAFE_WEIGHTS_NAME, |
| SAFE_WEIGHTS_INDEX_NAME, |
| WEIGHTS_INDEX_NAME |
| ) |
| from transformers.utils import ( |
| is_safetensors_available, |
| is_torch_available, |
| logging, |
| EntryNotFoundError, |
| PushToHubMixin |
| ) |
| import os |
| import json |
| import numpy as np |
|
|
| logger = logging.get_logger(__name__) |
|
|
| class BaseHateSpeechModel(nn.Module): |
| """Base class cho tất cả các mô hình hate speech detection""" |
| def __init__(self, model_name: str, num_labels: int = 3): |
| super().__init__() |
| self.num_labels = num_labels |
| self.model_name = model_name |
| |
| def forward(self, input_ids, attention_mask, labels=None): |
| raise NotImplementedError |
| |
| def load_state_dict(self, state_dict, strict=True): |
| """ |
| Override load_state_dict để bypass transformers' key renaming. |
| Load trực tiếp state_dict vào model mà không qua key mapping. |
| """ |
| |
| missing_keys, unexpected_keys = super().load_state_dict(state_dict, strict=False) |
| if missing_keys and strict: |
| logger.warning(f"Missing keys when loading state_dict: {missing_keys}") |
| if unexpected_keys: |
| logger.warning(f"Unexpected keys when loading state_dict: {unexpected_keys}") |
| return missing_keys, unexpected_keys |
| |
| @classmethod |
| def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs): |
| """ |
| Load model từ pretrained checkpoint. |
| Transformers sẽ tự động load state_dict sau khi khởi tạo model. |
| """ |
| |
| config = kwargs.pop("config", None) |
| |
| |
| if config is None: |
| try: |
| config = AutoConfig.from_pretrained(pretrained_model_name_or_path) |
| except Exception: |
| config = {} |
| |
| |
| num_labels = kwargs.pop("num_labels", None) |
| if num_labels is None: |
| if hasattr(config, "num_labels"): |
| num_labels = config.num_labels |
| elif isinstance(config, dict) and "num_labels" in config: |
| num_labels = config["num_labels"] |
| else: |
| num_labels = 3 |
| |
| |
| base_model_name = None |
| if hasattr(config, "_name_or_path"): |
| base_model_name = config._name_or_path |
| elif isinstance(config, dict) and "_name_or_path" in config: |
| base_model_name = config["_name_or_path"] |
| |
| |
| if base_model_name: |
| model = cls(model_name=base_model_name, num_labels=num_labels, **kwargs) |
| else: |
| |
| model = cls(num_labels=num_labels, **kwargs) |
| |
| return model |
|
|
| class PhoBERTV2Model(BaseHateSpeechModel): |
| """PhoBERT-V2 cho hate speech detection""" |
| def __init__(self, model_name: str = "vinai/phobert-base-v2", num_labels: int = 3): |
| super().__init__(model_name, num_labels) |
| self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) |
| self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) |
| self.dropout = nn.Dropout(0.1) |
| self.classifier = nn.Linear(self.config.hidden_size, num_labels) |
| |
| def forward(self, input_ids, attention_mask, labels=None): |
| outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) |
| pooled_output = outputs.pooler_output |
| pooled_output = self.dropout(pooled_output) |
| logits = self.classifier(pooled_output) |
| |
| loss = None |
| if labels is not None: |
| loss_fn = nn.CrossEntropyLoss() |
| loss = loss_fn(logits, labels) |
| return {"loss": loss, "logits": logits} |
|
|
| class BartPhoModel(BaseHateSpeechModel): |
| """BART Pho cho hate speech detection""" |
| def __init__(self, model_name: str = "vinai/bartpho-syllable-base", num_labels: int = 3): |
| super().__init__(model_name, num_labels) |
| self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) |
| self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) |
| self.dropout = nn.Dropout(0.1) |
| self.classifier = nn.Linear(self.config.d_model, num_labels) |
| |
| def forward(self, input_ids, attention_mask, labels=None): |
| outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) |
| |
| last_hidden_states = outputs.last_hidden_state |
| pooled_output = last_hidden_states.mean(dim=1) |
| pooled_output = self.dropout(pooled_output) |
| logits = self.classifier(pooled_output) |
| |
| loss = None |
| if labels is not None: |
| loss_fn = nn.CrossEntropyLoss() |
| loss = loss_fn(logits, labels) |
| return {"loss": loss, "logits": logits} |
|
|
| class ViSoBERTModel(BaseHateSpeechModel): |
| """ViSoBERT cho hate speech detection""" |
| def __init__(self, model_name: str = "uitnlp/visobert", num_labels: int = 3): |
| super().__init__(model_name, num_labels) |
| self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) |
| self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) |
| self.dropout = nn.Dropout(0.1) |
| self.classifier = nn.Linear(self.config.hidden_size, num_labels) |
| |
| def forward(self, input_ids, attention_mask, labels=None): |
| outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) |
| |
| |
| if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None: |
| pooled_output = outputs.pooler_output |
| else: |
| |
| pooled_output = outputs.last_hidden_state.mean(dim=1) |
| |
| pooled_output = self.dropout(pooled_output) |
| logits = self.classifier(pooled_output) |
| |
| loss = None |
| if labels is not None: |
| loss_fn = nn.CrossEntropyLoss() |
| loss = loss_fn(logits, labels) |
| return {"loss": loss, "logits": logits} |
|
|
| class PhoBERTV1Model(BaseHateSpeechModel): |
| """PhoBERT-V1 cho hate speech detection""" |
| def __init__(self, model_name: str = "vinai/phobert-base", num_labels: int = 3): |
| super().__init__(model_name, num_labels) |
| self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) |
| self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) |
| self.dropout = nn.Dropout(0.1) |
| self.classifier = nn.Linear(self.config.hidden_size, num_labels) |
| |
| def forward(self, input_ids, attention_mask, labels=None): |
| outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) |
| |
| if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None: |
| pooled_output = outputs.pooler_output |
| else: |
| pooled_output = outputs.last_hidden_state.mean(dim=1) |
| pooled_output = self.dropout(pooled_output) |
| logits = self.classifier(pooled_output) |
| |
| loss = None |
| if labels is not None: |
| loss_fn = nn.CrossEntropyLoss() |
| loss = loss_fn(logits, labels) |
| return {"loss": loss, "logits": logits} |
|
|
| class MBERTModel(BaseHateSpeechModel): |
| """mBERT (bert-base-multilingual-cased) cho hate speech detection""" |
| def __init__(self, model_name: str = "bert-base-multilingual-cased", num_labels: int = 3): |
| super().__init__(model_name, num_labels) |
| self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) |
| self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) |
| self.dropout = nn.Dropout(0.1) |
| self.classifier = nn.Linear(self.config.hidden_size, num_labels) |
| |
| def forward(self, input_ids, attention_mask, labels=None): |
| outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) |
| if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None: |
| pooled_output = outputs.pooler_output |
| else: |
| pooled_output = outputs.last_hidden_state.mean(dim=1) |
| pooled_output = self.dropout(pooled_output) |
| logits = self.classifier(pooled_output) |
| |
| loss = None |
| if labels is not None: |
| loss_fn = nn.CrossEntropyLoss() |
| loss = loss_fn(logits, labels) |
| return {"loss": loss, "logits": logits} |
|
|
| class SPhoBERTModel(BaseHateSpeechModel): |
| """SPhoBERT (biến thể PhoBERT syllable-level) cho hate speech detection""" |
| def __init__(self, model_name: str = "vinai/phobert-base", num_labels: int = 3): |
| super().__init__(model_name, num_labels) |
| self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) |
| self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) |
| self.dropout = nn.Dropout(0.1) |
| self.classifier = nn.Linear(self.config.hidden_size, num_labels) |
| |
| def forward(self, input_ids, attention_mask, labels=None): |
| outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) |
| if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None: |
| pooled_output = outputs.pooler_output |
| else: |
| pooled_output = outputs.last_hidden_state.mean(dim=1) |
| pooled_output = self.dropout(pooled_output) |
| logits = self.classifier(pooled_output) |
| |
| loss = None |
| if labels is not None: |
| loss_fn = nn.CrossEntropyLoss() |
| loss = loss_fn(logits, labels) |
| return {"loss": loss, "logits": logits} |
|
|
| class ViHateT5Model(BaseHateSpeechModel): |
| """ViHateT5 cho hate speech detection""" |
| def __init__(self, model_name: str = "VietAI/vit5-base", num_labels: int = 3): |
| super().__init__(model_name, num_labels) |
| self.config = T5Config.from_pretrained(model_name) |
| self.encoder = T5ForConditionalGeneration.from_pretrained(model_name, config=self.config) |
| self.dropout = nn.Dropout(0.1) |
| self.classifier = nn.Linear(self.config.d_model, num_labels) |
| |
| def forward(self, input_ids, attention_mask, labels=None): |
| outputs = self.encoder.encoder(input_ids=input_ids, attention_mask=attention_mask) |
| |
| last_hidden_states = outputs.last_hidden_state |
| pooled_output = last_hidden_states.mean(dim=1) |
| pooled_output = self.dropout(pooled_output) |
| logits = self.classifier(pooled_output) |
| |
| loss = None |
| if labels is not None: |
| loss_fn = nn.CrossEntropyLoss() |
| loss = loss_fn(logits, labels) |
| return {"loss": loss, "logits": logits} |
|
|
| class XLMRModel(BaseHateSpeechModel): |
| """XLM-R Large cho hate speech detection""" |
| def __init__(self, model_name: str = "xlm-roberta-large", num_labels: int = 3): |
| super().__init__(model_name, num_labels) |
| self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) |
| self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) |
| self.dropout = nn.Dropout(0.1) |
| self.classifier = nn.Linear(self.config.hidden_size, num_labels) |
| |
| def forward(self, input_ids, attention_mask, labels=None): |
| outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) |
| pooled_output = outputs.pooler_output |
| pooled_output = self.dropout(pooled_output) |
| logits = self.classifier(pooled_output) |
| |
| loss = None |
| if labels is not None: |
| loss_fn = nn.CrossEntropyLoss() |
| loss = loss_fn(logits, labels) |
| return {"loss": loss, "logits": logits} |
|
|
| class RoBERTaGRUModel(BaseHateSpeechModel): |
| """RoBERTa + GRU Hybrid model""" |
| def __init__(self, model_name: str = "vinai/phobert-base-v2", num_labels: int = 3, hidden_size: int = 256): |
| super().__init__(model_name, num_labels) |
| self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) |
| self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) |
| self.gru = nn.GRU( |
| input_size=self.config.hidden_size, |
| hidden_size=hidden_size, |
| num_layers=2, |
| batch_first=True, |
| dropout=0.1, |
| bidirectional=True |
| ) |
| self.dropout = nn.Dropout(0.1) |
| self.classifier = nn.Linear(hidden_size * 2, num_labels) |
| |
| def forward(self, input_ids, attention_mask, labels=None): |
| outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) |
| hidden_states = outputs.last_hidden_state |
| |
| |
| gru_output, _ = self.gru(hidden_states) |
| |
| |
| pooled_output = gru_output.mean(dim=1) |
| pooled_output = self.dropout(pooled_output) |
| logits = self.classifier(pooled_output) |
| |
| loss = None |
| if labels is not None: |
| loss_fn = nn.CrossEntropyLoss() |
| loss = loss_fn(logits, labels) |
| return {"loss": loss, "logits": logits} |
|
|
| class TextCNNModel(BaseHateSpeechModel): |
| """TextCNN cho hate speech detection""" |
| def __init__(self, vocab_size: int, embedding_dim: int = 128, num_labels: int = 3, |
| num_filters: int = 100, filter_sizes: list = [3, 4, 5], dropout: float = 0.5): |
| super().__init__("textcnn", num_labels) |
| self.embedding = nn.Embedding(vocab_size, embedding_dim) |
| self.convs = nn.ModuleList([ |
| nn.Conv2d(1, num_filters, (filter_size, embedding_dim)) |
| for filter_size in filter_sizes |
| ]) |
| self.dropout = nn.Dropout(dropout) |
| self.classifier = nn.Linear(num_filters * len(filter_sizes), num_labels) |
| |
| @classmethod |
| def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs): |
| """Override để detect vocab_size từ state_dict hoặc checkpoint file""" |
| |
| vocab_size = kwargs.pop("vocab_size", None) |
| config = kwargs.pop("config", None) |
| |
| |
| if vocab_size is None: |
| import os |
| state_dict = None |
| |
| if os.path.isdir(pretrained_model_name_or_path): |
| if os.path.isfile(os.path.join(pretrained_model_name_or_path, SAFE_WEIGHTS_NAME)): |
| try: |
| from safetensors.torch import load_file |
| state_dict = load_file(os.path.join(pretrained_model_name_or_path, SAFE_WEIGHTS_NAME)) |
| except Exception: |
| pass |
| elif os.path.isfile(os.path.join(pretrained_model_name_or_path, WEIGHTS_NAME)): |
| try: |
| state_dict = torch.load(os.path.join(pretrained_model_name_or_path, WEIGHTS_NAME), map_location="cpu") |
| except Exception: |
| pass |
| |
| |
| if state_dict is not None and "embedding.weight" in state_dict: |
| vocab_size = state_dict["embedding.weight"].shape[0] |
| else: |
| vocab_size = 30000 |
| |
| |
| num_labels = kwargs.pop("num_labels", None) |
| if num_labels is None: |
| if config and hasattr(config, "num_labels"): |
| num_labels = config.num_labels |
| elif config and isinstance(config, dict) and "num_labels" in config: |
| num_labels = config["num_labels"] |
| else: |
| num_labels = 3 |
| |
| |
| model = cls(vocab_size=vocab_size, num_labels=num_labels, **kwargs) |
| |
| return model |
| |
| def forward(self, input_ids, attention_mask, labels=None): |
| |
| embedded = self.embedding(input_ids) |
| |
| |
| embedded = embedded.unsqueeze(1) |
| |
| |
| conv_outputs = [] |
| for conv in self.convs: |
| conv_out = F.relu(conv(embedded)) |
| conv_out = conv_out.squeeze(3) |
| pooled = F.max_pool1d(conv_out, conv_out.size(2)) |
| pooled = pooled.squeeze(2) |
| conv_outputs.append(pooled) |
| |
| |
| concatenated = torch.cat(conv_outputs, dim=1) |
| |
| |
| concatenated = self.dropout(concatenated) |
| logits = self.classifier(concatenated) |
| |
| loss = None |
| if labels is not None: |
| loss_fn = nn.CrossEntropyLoss() |
| loss = loss_fn(logits, labels) |
| return {"loss": loss, "logits": logits} |
|
|
| class BiLSTMModel(BaseHateSpeechModel): |
| """BiLSTM cho hate speech detection""" |
| def __init__(self, vocab_size: int, embedding_dim: int = 128, hidden_size: int = 256, |
| num_labels: int = 3, num_layers: int = 2, dropout: float = 0.5): |
| super().__init__("bilstm", num_labels) |
| self.embedding = nn.Embedding(vocab_size, embedding_dim) |
| self.lstm = nn.LSTM( |
| input_size=embedding_dim, |
| hidden_size=hidden_size, |
| num_layers=num_layers, |
| batch_first=True, |
| dropout=dropout if num_layers > 1 else 0, |
| bidirectional=True |
| ) |
| self.dropout = nn.Dropout(dropout) |
| self.classifier = nn.Linear(hidden_size * 2, num_labels) |
| |
| @classmethod |
| def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs): |
| """Override để detect vocab_size từ state_dict hoặc checkpoint file""" |
| |
| vocab_size = kwargs.pop("vocab_size", None) |
| config = kwargs.pop("config", None) |
| |
| |
| if vocab_size is None: |
| import os |
| state_dict = None |
| |
| if os.path.isdir(pretrained_model_name_or_path): |
| if os.path.isfile(os.path.join(pretrained_model_name_or_path, SAFE_WEIGHTS_NAME)): |
| try: |
| from safetensors.torch import load_file |
| state_dict = load_file(os.path.join(pretrained_model_name_or_path, SAFE_WEIGHTS_NAME)) |
| except Exception: |
| pass |
| elif os.path.isfile(os.path.join(pretrained_model_name_or_path, WEIGHTS_NAME)): |
| try: |
| state_dict = torch.load(os.path.join(pretrained_model_name_or_path, WEIGHTS_NAME), map_location="cpu") |
| except Exception: |
| pass |
| |
| |
| if state_dict is not None and "embedding.weight" in state_dict: |
| vocab_size = state_dict["embedding.weight"].shape[0] |
| else: |
| vocab_size = 30000 |
| |
| |
| num_labels = kwargs.pop("num_labels", None) |
| if num_labels is None: |
| if config and hasattr(config, "num_labels"): |
| num_labels = config.num_labels |
| elif config and isinstance(config, dict) and "num_labels" in config: |
| num_labels = config["num_labels"] |
| else: |
| num_labels = 3 |
| |
| |
| model = cls(vocab_size=vocab_size, num_labels=num_labels, **kwargs) |
| |
| return model |
| |
| def forward(self, input_ids, attention_mask, labels=None): |
| |
| embedded = self.embedding(input_ids) |
| |
| |
| lstm_output, (hidden, cell) = self.lstm(embedded) |
| |
| |
| |
| pooled_output = lstm_output.mean(dim=1) |
| |
| |
| |
| |
| |
| |
| |
| pooled_output = self.dropout(pooled_output) |
| logits = self.classifier(pooled_output) |
| |
| loss = None |
| if labels is not None: |
| loss_fn = nn.CrossEntropyLoss() |
| loss = loss_fn(logits, labels) |
| return {"loss": loss, "logits": logits} |
|
|
| class EnsembleModel(BaseHateSpeechModel): |
| """Ensemble model kết hợp các mô hình deep learning""" |
| def __init__(self, models: list, num_labels: int = 3, weights: list = None): |
| super().__init__("ensemble", num_labels) |
| self.models = nn.ModuleList(models) |
| self.num_models = len(models) |
| self.weights = weights if weights else [1.0] * self.num_models |
| self.weights = torch.tensor(self.weights, dtype=torch.float32) |
| self.weights = self.weights / self.weights.sum() |
| |
| def forward(self, input_ids, attention_mask, labels=None): |
| all_logits = [] |
| total_loss = 0 |
| |
| for i, model in enumerate(self.models): |
| model_output = model(input_ids, attention_mask, labels) |
| all_logits.append(model_output["logits"]) |
| |
| if model_output["loss"] is not None: |
| total_loss += self.weights[i] * model_output["loss"] |
| |
| |
| ensemble_logits = torch.zeros_like(all_logits[0]) |
| for i, logits in enumerate(all_logits): |
| ensemble_logits += self.weights[i] * logits |
| |
| return { |
| "loss": total_loss if total_loss > 0 else None, |
| "logits": ensemble_logits |
| } |
|
|
| def get_model(model_name: str, num_labels: int = 3, **kwargs): |
| """ |
| Factory function để tạo model dựa trên tên |
| |
| Args: |
| model_name: Tên model ("phobert-v2", "bartpho", "visobert", "vihate-t5", |
| "xlm-r", "roberta-gru", "textcnn", "bilstm", "bilstm-crf", "ensemble") |
| num_labels: Số lượng nhãn (3 cho hate speech: CLEAN, OFFENSIVE, HATE) |
| **kwargs: Các tham số bổ sung cho model |
| |
| Returns: |
| Model instance |
| """ |
| model_mapping = { |
| "phobert-v1": PhoBERTV1Model, |
| "phobert-v2": PhoBERTV2Model, |
| "bartpho": BartPhoModel, |
| "visobert": ViSoBERTModel, |
| "vihate-t5": ViHateT5Model, |
| "xlm-r": XLMRModel, |
| "mbert": MBERTModel, |
| "sphobert": SPhoBERTModel, |
| "roberta-gru": RoBERTaGRUModel, |
| "textcnn": TextCNNModel, |
| "bilstm": BiLSTMModel, |
| "ensemble": EnsembleModel |
| } |
| |
| if model_name not in model_mapping: |
| raise ValueError(f"Unknown model: {model_name}. Available models: {list(model_mapping.keys())}") |
| |
| model_class = model_mapping[model_name] |
| return model_class(num_labels=num_labels, **kwargs) |