UGP/custom_model.py

480 lines
18 KiB
Python

import logging
from typing import Optional, Tuple, Union
import torch
from torch import nn
from torch.nn import MSELoss, CrossEntropyLoss, BCEWithLogitsLoss
from transformers import GPT2Model, GPT2ForSequenceClassification, RobertaForSequenceClassification, RobertaModel
from transformers.modeling_outputs import SequenceClassifierOutputWithPast, SequenceClassifierOutput
LOGGER = logging.getLogger(__name__)
# RoBERTa - simple example
class RobertaClassificationHeadCustomSimple(nn.Module):
def __init__(self, config):
super().__init__()
hidden_size = config.hidden_size
self.dense_1 = nn.Linear(hidden_size, 2 * hidden_size)
self.dense_2 = nn.Linear(2 * hidden_size, hidden_size)
classifier_dropout = (
config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob
)
self.dropout = nn.Dropout(classifier_dropout)
self.out_proj = nn.Linear(hidden_size, config.num_labels)
def forward(self, features, **kwargs):
x = features[:, 0, :] # take <s> token (equiv. to [CLS])
x = self.dense_1(x)
x = torch.relu(x)
x = self.dropout(x)
x = self.dense_2(x)
x = torch.relu(x)
x = self.dropout(x)
x = self.out_proj(x)
return x
class RobertaForSequenceClassificationCustomSimple(RobertaForSequenceClassification):
def __init__(self, config):
super().__init__(config)
self.num_labels = config.num_labels
self.config = config
self.roberta = RobertaModel(config, add_pooling_layer=False)
self.classifier = RobertaClassificationHeadCustomSimple(config)
# Initialize weights and apply final processing
self.post_init()
# RoBERTa - Example 1
class RobertaClassificationHeadCustom(nn.Module):
def __init__(self, config):
super().__init__()
hidden_size = config.hidden_size * 2
self.dense_1 = nn.Linear(hidden_size, 2 * hidden_size)
self.dense_2 = nn.Linear(2 * hidden_size, hidden_size)
classifier_dropout = (
config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob
)
self.dropout = nn.Dropout(classifier_dropout)
self.out_proj = nn.Linear(hidden_size, config.num_labels)
def forward(self, features, **kwargs):
if "hidden_states" in kwargs and kwargs["hidden_states"] is not None:
x = torch.cat(
(
features[:, 0, :],
# take <s> token (equiv. to [CLS]) from hidden states from last layer
kwargs["hidden_states"][-2][:, 0, :],
),
dim=1,
)
else:
raise RuntimeError("Missing hidden state to process forward")
x = self.dense_1(x)
x = torch.relu(x)
x = self.dropout(x)
x = self.dense_2(x)
x = torch.relu(x)
x = self.dropout(x)
x = self.out_proj(x)
return x
class RobertaForSequenceClassificationCustom(RobertaForSequenceClassification):
def __init__(self, config):
super().__init__(config)
self.num_labels = config.num_labels
self.config = config
self.roberta = RobertaModel(config, add_pooling_layer=False)
self.classifier = RobertaClassificationHeadCustom(config)
# Initialize weights and apply final processing
self.post_init()
def forward(
self,
input_ids: Optional[torch.LongTensor] = None,
attention_mask: Optional[torch.FloatTensor] = None,
token_type_ids: Optional[torch.LongTensor] = None,
position_ids: Optional[torch.LongTensor] = None,
head_mask: Optional[torch.FloatTensor] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
labels: Optional[torch.LongTensor] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple[torch.Tensor], SequenceClassifierOutput]:
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
outputs = self.roberta(
input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids,
position_ids=position_ids,
head_mask=head_mask,
inputs_embeds=inputs_embeds,
output_attentions=output_attentions,
output_hidden_states=self.config.use_hidden_states,
return_dict=return_dict,
)
sequence_output = outputs[0]
logits = self.classifier(sequence_output, hidden_states=outputs.hidden_states)
del outputs.hidden_states
loss = None
if labels is not None:
# move labels to correct device to enable model parallelism
labels = labels.to(logits.device)
if self.config.problem_type is None:
if self.num_labels == 1:
self.config.problem_type = "regression"
elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
self.config.problem_type = "single_label_classification"
else:
self.config.problem_type = "multi_label_classification"
if self.config.problem_type == "regression":
loss_fct = MSELoss()
if self.num_labels == 1:
loss = loss_fct(logits.squeeze(), labels.squeeze())
else:
loss = loss_fct(logits, labels)
elif self.config.problem_type == "single_label_classification":
loss_fct = CrossEntropyLoss()
loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
elif self.config.problem_type == "multi_label_classification":
loss_fct = BCEWithLogitsLoss()
loss = loss_fct(logits, labels)
if not return_dict:
output = (logits,) + outputs[2:]
return ((loss,) + output) if loss is not None else output
return SequenceClassifierOutput(
loss=loss,
logits=logits,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
)
# RoBERTa - Example 2
class RobertaClassificationHeadCustomAlternative(nn.Module):
def __init__(self, config):
super().__init__()
hidden_size = config.hidden_size
self.dense_1_input = nn.Linear(hidden_size, 2 * hidden_size)
self.dense_1_hidden = nn.Linear(hidden_size, 2 * hidden_size)
self.dense_2 = nn.Linear(4 * hidden_size, hidden_size)
classifier_dropout = (
config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob
)
self.dropout = nn.Dropout(classifier_dropout)
self.out_proj = nn.Linear(hidden_size, config.num_labels)
def forward(self, features, **kwargs):
x = features[:, 0, :] # take <s> token (equiv. to [CLS])
if "hidden_states" in kwargs and kwargs["hidden_states"] is not None:
# take <s> token (equiv. to [CLS]) from hidden states from last layer
hidden = kwargs["hidden_states"][-1][:, 0, :]
else:
raise RuntimeError("Missing hidden state to process forward")
x = self.dense_1_input(x)
x = torch.relu(x)
x = self.dropout(x)
hidden = self.dense_1_hidden(hidden)
hidden = torch.relu(hidden)
hidden = self.dropout(hidden)
x = torch.cat((x, hidden), dim=1)
x = self.dense_2(x)
x = torch.relu(x)
x = self.dropout(x)
x = self.out_proj(x)
return x
class RobertaForSequenceClassificationCustomAlternative(RobertaForSequenceClassification):
def __init__(self, config):
super().__init__(config)
self.num_labels = config.num_labels
self.config = config
self.roberta = RobertaModel(config, add_pooling_layer=False)
self.classifier = RobertaClassificationHeadCustomAlternative(config)
# Initialize weights and apply final processing
self.post_init()
def forward(
self,
input_ids: Optional[torch.LongTensor] = None,
attention_mask: Optional[torch.FloatTensor] = None,
token_type_ids: Optional[torch.LongTensor] = None,
position_ids: Optional[torch.LongTensor] = None,
head_mask: Optional[torch.FloatTensor] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
labels: Optional[torch.LongTensor] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple[torch.Tensor], SequenceClassifierOutput]:
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
outputs = self.roberta(
input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids,
position_ids=position_ids,
head_mask=head_mask,
inputs_embeds=inputs_embeds,
output_attentions=output_attentions,
output_hidden_states=self.config.use_hidden_states,
return_dict=return_dict,
)
sequence_output = outputs[0]
logits = self.classifier(sequence_output, hidden_states=outputs.hidden_states)
del outputs.hidden_states
loss = None
if labels is not None:
# move labels to correct device to enable model parallelism
labels = labels.to(logits.device)
if self.config.problem_type is None:
if self.num_labels == 1:
self.config.problem_type = "regression"
elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
self.config.problem_type = "single_label_classification"
else:
self.config.problem_type = "multi_label_classification"
if self.config.problem_type == "regression":
loss_fct = MSELoss()
if self.num_labels == 1:
loss = loss_fct(logits.squeeze(), labels.squeeze())
else:
loss = loss_fct(logits, labels)
elif self.config.problem_type == "single_label_classification":
loss_fct = CrossEntropyLoss()
loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
elif self.config.problem_type == "multi_label_classification":
loss_fct = BCEWithLogitsLoss()
loss = loss_fct(logits, labels)
if not return_dict:
output = (logits,) + outputs[2:]
return ((loss,) + output) if loss is not None else output
return SequenceClassifierOutput(
loss=loss,
logits=logits,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
)
# GPT-2 - simple example #
class GPT2ClassificationHeadCustomSimple(nn.Module):
def __init__(self, config):
super().__init__()
hidden_size = config.n_embd
self.dense_1 = nn.Linear(hidden_size, 2 * hidden_size)
self.dense_2 = nn.Linear(2 * hidden_size, hidden_size)
self.dropout = nn.Dropout(config.resid_pdrop)
self.out_proj = nn.Linear(hidden_size, config.num_labels, bias=False)
def forward(self, x):
x = self.dense_1(x)
x = torch.relu(x)
x = self.dropout(x)
x = self.dense_2(x)
x = torch.relu(x)
x = self.dropout(x)
x = self.out_proj(x)
return x
class GPT2ForSequenceClassificationCustomSimple(GPT2ForSequenceClassification):
def __init__(self, config):
super().__init__(config)
self.num_labels = config.num_labels
self.transformer = GPT2Model(config)
self.score = GPT2ClassificationHeadCustomSimple(config)
# Model parallel
self.model_parallel = False
self.device_map = None
# Initialize weights and apply final processing
self.post_init()
# GPT-2 - Example 1 #
class GPT2ClassificationHeadCustom(nn.Module):
def __init__(self, config):
super().__init__()
hidden_size = config.n_embd
self.dense_1_input = nn.Linear(hidden_size, 2 * hidden_size)
self.dense_1_hidden = nn.Linear(hidden_size, 2 * hidden_size)
self.dense_2 = nn.Linear(4 * hidden_size, hidden_size)
self.dropout = nn.Dropout(config.resid_pdrop)
self.out_proj = nn.Linear(hidden_size, config.num_labels, bias=False)
def forward(self, x, **kwargs):
if "hidden_states" in kwargs and kwargs["hidden_states"] is not None:
# Get hidden states from last layer
hidden = kwargs["hidden_states"][-1]
else:
raise RuntimeError("Missing hidden state to process forward")
x = self.dense_1_input(x)
x = torch.relu(x)
x = self.dropout(x)
hidden = self.dense_1_hidden(hidden)
hidden = torch.relu(hidden)
hidden = self.dropout(hidden)
x = torch.cat((x, hidden), dim=2)
x = self.dense_2(x)
x = torch.relu(x)
x = self.dropout(x)
x = self.out_proj(x)
return x
class GPT2ForSequenceClassificationCustom(GPT2ForSequenceClassification):
def __init__(self, config):
super().__init__(config)
self.num_labels = config.num_labels
self.transformer = GPT2Model(config)
self.score = GPT2ClassificationHeadCustom(config)
# Model parallel
self.model_parallel = False
self.device_map = None
# Initialize weights and apply final processing
self.post_init()
def forward(
self,
input_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[Tuple[Tuple[torch.Tensor]]] = None,
attention_mask: Optional[torch.FloatTensor] = None,
token_type_ids: Optional[torch.LongTensor] = None,
position_ids: Optional[torch.LongTensor] = None,
head_mask: Optional[torch.FloatTensor] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
labels: Optional[torch.LongTensor] = None,
use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple, SequenceClassifierOutputWithPast]:
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
transformer_outputs = self.transformer(
input_ids,
past_key_values=past_key_values,
attention_mask=attention_mask,
token_type_ids=token_type_ids,
position_ids=position_ids,
head_mask=head_mask,
inputs_embeds=inputs_embeds,
use_cache=use_cache,
output_attentions=output_attentions,
output_hidden_states=self.config.use_hidden_states,
return_dict=return_dict,
)
hidden_states = transformer_outputs[0]
logits = self.score(hidden_states, hidden_states=transformer_outputs.hidden_states)
del transformer_outputs.hidden_states
if input_ids is not None:
batch_size, sequence_length = input_ids.shape[:2]
else:
batch_size, sequence_length = inputs_embeds.shape[:2]
assert (
self.config.pad_token_id is not None or batch_size == 1
), "Cannot handle batch sizes > 1 if no padding token is defined."
if self.config.pad_token_id is None:
sequence_lengths = -1
else:
if input_ids is not None:
sequence_lengths = (torch.eq(input_ids, self.config.pad_token_id).long().argmax(-1) - 1).to(
logits.device
)
else:
sequence_lengths = -1
LOGGER.warning(
f"{self.__class__.__name__} will not detect padding tokens in `inputs_embeds`. Results may be "
"unexpected if using padding tokens in conjunction with `inputs_embeds.`"
)
pooled_logits = logits[torch.arange(batch_size, device=logits.device), sequence_lengths]
loss = None
if labels is not None:
if self.config.problem_type is None:
if self.num_labels == 1:
self.config.problem_type = "regression"
elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
self.config.problem_type = "single_label_classification"
else:
self.config.problem_type = "multi_label_classification"
if self.config.problem_type == "regression":
loss_fct = MSELoss()
if self.num_labels == 1:
loss = loss_fct(pooled_logits.squeeze(), labels.squeeze())
else:
loss = loss_fct(pooled_logits, labels)
elif self.config.problem_type == "single_label_classification":
loss_fct = CrossEntropyLoss()
loss = loss_fct(pooled_logits.view(-1, self.num_labels), labels.view(-1))
elif self.config.problem_type == "multi_label_classification":
loss_fct = BCEWithLogitsLoss()
loss = loss_fct(pooled_logits, labels)
if not return_dict:
output = (pooled_logits,) + transformer_outputs[1:]
return ((loss,) + output) if loss is not None else output
return SequenceClassifierOutputWithPast(
loss=loss,
logits=pooled_logits,
past_key_values=transformer_outputs.past_key_values,
hidden_states=transformer_outputs.hidden_states,
attentions=transformer_outputs.attentions,
)