126 lines
4.0 KiB
Python
126 lines
4.0 KiB
Python
import torch
|
|
import copy
|
|
from torch import nn
|
|
from transformers import T5PreTrainedModel, T5Config
|
|
from transformers.models.t5.modeling_t5 import T5Stack
|
|
from transformers.modeling_outputs import SequenceClassifierOutput
|
|
|
|
|
|
class T5ClassificationHead(nn.Module):
|
|
def __init__(self, config: T5Config):
|
|
super().__init__()
|
|
|
|
self.dense_in = nn.Linear(config.d_model, 768)
|
|
self.dense = nn.Linear(768, 768)
|
|
self.dense_out = nn.Linear(768, config.num_labels)
|
|
self.dropout = nn.Dropout(0.1)
|
|
|
|
def forward(self, features, **kwargs):
|
|
x = features[:, 0, :]
|
|
x = self.dropout(x)
|
|
x = self.dense_in(x)
|
|
x = torch.relu(x)
|
|
x = self.dropout(x)
|
|
x = self.dense(x)
|
|
x = torch.relu(x)
|
|
x = self.dropout(x)
|
|
x = self.dense_out(x)
|
|
|
|
return x
|
|
|
|
|
|
class T5ForClassification(T5PreTrainedModel):
|
|
def __init__(self, config: T5Config):
|
|
super().__init__(config)
|
|
self.model_dim = config.d_model
|
|
|
|
self.shared = nn.Embedding(config.vocab_size, config.d_model)
|
|
|
|
encoder_config = copy.deepcopy(config)
|
|
encoder_config.is_decoder = False
|
|
encoder_config.use_cache = False
|
|
encoder_config.is_encoder_decoder = False
|
|
self.encoder = T5Stack(encoder_config, self.shared)
|
|
|
|
decoder_config = copy.deepcopy(config)
|
|
decoder_config.is_decoder = True
|
|
decoder_config.is_encoder_decoder = False
|
|
decoder_config.num_layers = config.num_decoder_layers
|
|
self.decoder = T5Stack(decoder_config, self.shared)
|
|
|
|
modules_to_freeze = [self.encoder.block[i].layer[0] for i in range(len(self.encoder.block))]
|
|
modules_to_freeze.extend([self.decoder.block[i].layer[0] for i in range(len(self.decoder.block))])
|
|
modules_to_freeze.extend([self.decoder.block[i].layer[1] for i in range(len(self.decoder.block))])
|
|
|
|
for module in modules_to_freeze:
|
|
for param in module.parameters():
|
|
param.requires_grad = False
|
|
|
|
self.lm_head = T5ClassificationHead(config)
|
|
|
|
# Initialize weights and apply final processing
|
|
self.post_init()
|
|
|
|
# Model parallel
|
|
self.model_parallel = False
|
|
self.device_map = None
|
|
|
|
|
|
def forward(
|
|
self,
|
|
input_ids=None,
|
|
attention_mask=None,
|
|
head_mask=None,
|
|
cross_attn_head_mask=None,
|
|
past_key_values=None,
|
|
inputs_embeds=None,
|
|
decoder_inputs_embeds=None,
|
|
use_cache=None,
|
|
output_attentions=None,
|
|
output_hidden_states=None,
|
|
return_dict=None,
|
|
labels=None
|
|
):
|
|
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
|
|
|
|
outputs = self.encoder(
|
|
input_ids,
|
|
attention_mask=attention_mask,
|
|
head_mask=head_mask,
|
|
cross_attn_head_mask=cross_attn_head_mask,
|
|
past_key_values=past_key_values,
|
|
inputs_embeds=inputs_embeds,
|
|
use_cache=use_cache,
|
|
output_attentions=output_attentions,
|
|
output_hidden_states=output_hidden_states,
|
|
return_dict=return_dict,
|
|
)
|
|
|
|
outputs = self.decoder(
|
|
input_ids,
|
|
attention_mask=attention_mask,
|
|
head_mask=head_mask,
|
|
cross_attn_head_mask=cross_attn_head_mask,
|
|
past_key_values=past_key_values,
|
|
inputs_embeds=inputs_embeds,
|
|
use_cache=use_cache,
|
|
output_attentions=output_attentions,
|
|
output_hidden_states=output_hidden_states,
|
|
return_dict=return_dict,
|
|
)
|
|
|
|
|
|
logits = self.lm_head(outputs[0])
|
|
|
|
|
|
loss = None
|
|
if labels is not None:
|
|
loss_fct = nn.CrossEntropyLoss()
|
|
loss = loss_fct(logits.view(-1, self.config.num_labels), labels.view(-1))
|
|
|
|
|
|
return SequenceClassifierOutput(
|
|
loss=loss,
|
|
logits=logits,
|
|
)
|