UGP/t5.py

126 lines
4.0 KiB
Python

import torch
import copy
from torch import nn
from transformers import T5PreTrainedModel, T5Config
from transformers.models.t5.modeling_t5 import T5Stack
from transformers.modeling_outputs import SequenceClassifierOutput
class T5ClassificationHead(nn.Module):
def __init__(self, config: T5Config):
super().__init__()
self.dense_in = nn.Linear(config.d_model, 768)
self.dense = nn.Linear(768, 768)
self.dense_out = nn.Linear(768, config.num_labels)
self.dropout = nn.Dropout(0.1)
def forward(self, features, **kwargs):
x = features[:, 0, :]
x = self.dropout(x)
x = self.dense_in(x)
x = torch.relu(x)
x = self.dropout(x)
x = self.dense(x)
x = torch.relu(x)
x = self.dropout(x)
x = self.dense_out(x)
return x
class T5ForClassification(T5PreTrainedModel):
def __init__(self, config: T5Config):
super().__init__(config)
self.model_dim = config.d_model
self.shared = nn.Embedding(config.vocab_size, config.d_model)
encoder_config = copy.deepcopy(config)
encoder_config.is_decoder = False
encoder_config.use_cache = False
encoder_config.is_encoder_decoder = False
self.encoder = T5Stack(encoder_config, self.shared)
decoder_config = copy.deepcopy(config)
decoder_config.is_decoder = True
decoder_config.is_encoder_decoder = False
decoder_config.num_layers = config.num_decoder_layers
self.decoder = T5Stack(decoder_config, self.shared)
modules_to_freeze = [self.encoder.block[i].layer[0] for i in range(len(self.encoder.block))]
modules_to_freeze.extend([self.decoder.block[i].layer[0] for i in range(len(self.decoder.block))])
modules_to_freeze.extend([self.decoder.block[i].layer[1] for i in range(len(self.decoder.block))])
for module in modules_to_freeze:
for param in module.parameters():
param.requires_grad = False
self.lm_head = T5ClassificationHead(config)
# Initialize weights and apply final processing
self.post_init()
# Model parallel
self.model_parallel = False
self.device_map = None
def forward(
self,
input_ids=None,
attention_mask=None,
head_mask=None,
cross_attn_head_mask=None,
past_key_values=None,
inputs_embeds=None,
decoder_inputs_embeds=None,
use_cache=None,
output_attentions=None,
output_hidden_states=None,
return_dict=None,
labels=None
):
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
outputs = self.encoder(
input_ids,
attention_mask=attention_mask,
head_mask=head_mask,
cross_attn_head_mask=cross_attn_head_mask,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
use_cache=use_cache,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
outputs = self.decoder(
input_ids,
attention_mask=attention_mask,
head_mask=head_mask,
cross_attn_head_mask=cross_attn_head_mask,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
use_cache=use_cache,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
logits = self.lm_head(outputs[0])
loss = None
if labels is not None:
loss_fct = nn.CrossEntropyLoss()
loss = loss_fct(logits.view(-1, self.config.num_labels), labels.view(-1))
return SequenceClassifierOutput(
loss=loss,
logits=logits,
)