cretea text classification
This commit is contained in:
parent
1b74870dd0
commit
fb24ad5bca
|
@ -1 +1,3 @@
|
|||
.vscode
|
||||
__pycache__
|
||||
data/
|
|
@ -4,3 +4,4 @@
|
|||
### Filip Patyk
|
||||
### 424714
|
||||
|
||||
[https://git.wmi.amu.edu.pl/AITech/aitech-iumkv](https://git.wmi.amu.edu.pl/AITech/aitech-ium)
|
|
@ -0,0 +1,4 @@
|
|||
__all__ = ["Dataset", "NewsDataset"]
|
||||
|
||||
from .news_dataset import NewsDataset
|
||||
from .dataset import Dataset
|
|
@ -0,0 +1,36 @@
|
|||
import torch
|
||||
import pandas as pd
|
||||
from transformers import BertTokenizer
|
||||
|
||||
tokenizer = BertTokenizer.from_pretrained("bert-base-cased")
|
||||
|
||||
|
||||
class Dataset(torch.utils.data.Dataset):
|
||||
def __init__(self, data: pd.DataFrame) -> None:
|
||||
self.labels = data["label"].to_list()
|
||||
# self.texts = [
|
||||
# tokenizer(
|
||||
# text,
|
||||
# padding="max_length",
|
||||
# max_length=512,
|
||||
# truncation=True,
|
||||
# return_tensors="pt",
|
||||
# )
|
||||
# for text in data["text"]
|
||||
# ]
|
||||
self.texts = data["text"].to_list()
|
||||
|
||||
def __getitem__(self, idx):
|
||||
label = self.labels[idx]
|
||||
text = tokenizer(
|
||||
self.texts[idx],
|
||||
padding="max_length",
|
||||
max_length=512,
|
||||
truncation=True,
|
||||
return_tensors="pt",
|
||||
)
|
||||
|
||||
return text, label
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self.labels)
|
|
@ -0,0 +1,34 @@
|
|||
import pandas as pd
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class NewsDataset:
|
||||
def __init__(self, data_dir_path: str = "data", data_lenght: int = None) -> None:
|
||||
self.data_dir_path = Path(data_dir_path)
|
||||
self.true_news_path = self.data_dir_path / "True.csv"
|
||||
self.fake_news_path = self.data_dir_path / "Fake.csv"
|
||||
|
||||
self.true_news = self.load_news(self.true_news_path, data_lenght)
|
||||
self.fake_news = self.load_news(self.fake_news_path, data_lenght)
|
||||
|
||||
self.true_news["label"] = 1
|
||||
self.fake_news["label"] = 0
|
||||
|
||||
def load_news(self, file_path: Path, trim: int = None) -> pd.DataFrame:
|
||||
news = pd.read_csv(file_path)
|
||||
news = news.drop(columns=["title", "subject", "date"])
|
||||
|
||||
return news if not trim else news.head(trim)
|
||||
|
||||
@property
|
||||
def data(self) -> pd.DataFrame:
|
||||
dataset = pd.concat([self.true_news, self.fake_news], axis=0)
|
||||
dataset["text"] = dataset["text"].str.strip()
|
||||
dataset.dropna(axis=0, how="any", inplace=False, subset=["text"])
|
||||
return dataset
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
dataset = NewsDataset()
|
||||
print(dataset.data.head(5))
|
|
@ -0,0 +1,47 @@
|
|||
import os
|
||||
|
||||
import pandas as pd
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
|
||||
from datasets import Dataset
|
||||
|
||||
NUM_WORKERS = os.cpu_count()
|
||||
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
|
||||
PIN_MEMORY = True if DEVICE == "cuda" else False
|
||||
|
||||
|
||||
def evaluate(
|
||||
model: nn.Module,
|
||||
test_data: pd.DataFrame,
|
||||
batch_size: int,
|
||||
) -> None:
|
||||
test_dataset = Dataset(test_data)
|
||||
|
||||
test_dataloader = torch.utils.data.DataLoader(
|
||||
test_dataset,
|
||||
batch_size=batch_size,
|
||||
num_workers=NUM_WORKERS,
|
||||
pin_memory=PIN_MEMORY,
|
||||
shuffle=True,
|
||||
)
|
||||
|
||||
model.to(DEVICE)
|
||||
total_acc_test = 0
|
||||
|
||||
with torch.no_grad():
|
||||
for test_input, test_label in test_dataloader:
|
||||
test_label = test_label.to(DEVICE)
|
||||
mask = test_input["attention_mask"].to(DEVICE)
|
||||
input_id = test_input["input_ids"].squeeze(1).to(DEVICE)
|
||||
|
||||
output = model(input_id, mask)
|
||||
|
||||
acc = (output.argmax(dim=1) == test_label).sum().item()
|
||||
total_acc_test += acc
|
||||
|
||||
print(f"Test Accuracy: {total_acc_test / len(test_data): .3f}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pass
|
|
@ -0,0 +1,48 @@
|
|||
import random
|
||||
from sklearn.model_selection import train_test_split
|
||||
|
||||
from models import BertClassifier
|
||||
from datasets import NewsDataset
|
||||
from train import train
|
||||
from evaluate import evaluate
|
||||
|
||||
SEED = 2137
|
||||
|
||||
# Hyperparameters
|
||||
|
||||
INITIAL_LR = 1e-6
|
||||
NUM_EPOCHS = 5
|
||||
BATCH_SIZE = 2
|
||||
|
||||
if __name__ == "__main__":
|
||||
# loading & spliting data
|
||||
news_dataset = NewsDataset(data_dir_path="data", data_lenght=2000)
|
||||
|
||||
train_val_data, test_data = train_test_split(
|
||||
news_dataset.data,
|
||||
test_size=0.8,
|
||||
shuffle=True,
|
||||
random_state=random.seed(SEED),
|
||||
)
|
||||
train_data, val_data = train_test_split(
|
||||
train_val_data,
|
||||
test_size=0.2,
|
||||
shuffle=True,
|
||||
random_state=random.seed(SEED),
|
||||
)
|
||||
# trainig model
|
||||
trained_model = train(
|
||||
model=BertClassifier(),
|
||||
train_data=train_data,
|
||||
val_data=val_data,
|
||||
learning_rate=INITIAL_LR,
|
||||
epochs=NUM_EPOCHS,
|
||||
batch_size=BATCH_SIZE,
|
||||
)
|
||||
|
||||
# evaluating model
|
||||
evaluate(
|
||||
model=trained_model,
|
||||
test_data=test_data,
|
||||
batch_size=BATCH_SIZE,
|
||||
)
|
|
@ -0,0 +1,3 @@
|
|||
__all__ = ["BertClassifier"]
|
||||
|
||||
from .bert_model import BertClassifier
|
|
@ -0,0 +1,22 @@
|
|||
from torch import nn
|
||||
from transformers import BertModel
|
||||
|
||||
|
||||
class BertClassifier(nn.Module):
|
||||
def __init__(self, dropout: float = 0.5, num_classes: int = 2):
|
||||
super(BertClassifier, self).__init__()
|
||||
|
||||
self.bert = BertModel.from_pretrained("bert-base-cased")
|
||||
self.dropout = nn.Dropout(dropout)
|
||||
self.linear = nn.Linear(768, num_classes)
|
||||
self.relu = nn.ReLU()
|
||||
|
||||
def forward(self, input_id, mask):
|
||||
_, pooled_output = self.bert(
|
||||
input_ids=input_id, attention_mask=mask, return_dict=False
|
||||
)
|
||||
dropout_output = self.dropout(pooled_output)
|
||||
linear_output = self.linear(dropout_output)
|
||||
final_layer = self.relu(linear_output)
|
||||
|
||||
return final_layer
|
|
@ -0,0 +1,88 @@
|
|||
import os
|
||||
|
||||
import pandas as pd
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
from tqdm import tqdm
|
||||
|
||||
from datasets import Dataset
|
||||
|
||||
NUM_WORKERS = os.cpu_count()
|
||||
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
|
||||
PIN_MEMORY = True if DEVICE == "cuda" else False
|
||||
|
||||
|
||||
def train(
|
||||
model: nn.Module,
|
||||
train_data: pd.DataFrame,
|
||||
val_data: pd.DataFrame,
|
||||
learning_rate: float,
|
||||
epochs: int,
|
||||
batch_size: int,
|
||||
) -> nn.Module:
|
||||
train_dataset, val_dataset = Dataset(train_data), Dataset(val_data)
|
||||
|
||||
train_dataloader = torch.utils.data.DataLoader(
|
||||
train_dataset,
|
||||
batch_size=batch_size,
|
||||
num_workers=NUM_WORKERS,
|
||||
pin_memory=PIN_MEMORY,
|
||||
shuffle=True,
|
||||
)
|
||||
val_dataloader = torch.utils.data.DataLoader(
|
||||
val_dataset,
|
||||
batch_size=batch_size,
|
||||
num_workers=NUM_WORKERS,
|
||||
pin_memory=PIN_MEMORY,
|
||||
shuffle=True,
|
||||
)
|
||||
|
||||
criterion = nn.CrossEntropyLoss()
|
||||
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
|
||||
|
||||
model.to(DEVICE)
|
||||
for epoch in range(epochs):
|
||||
total_acc_train = 0
|
||||
total_loss_train = 0
|
||||
for train_input, train_label in tqdm(train_dataloader):
|
||||
train_label = train_label.to(DEVICE)
|
||||
mask = train_input["attention_mask"].to(DEVICE)
|
||||
input_id = train_input["input_ids"].squeeze(1).to(DEVICE)
|
||||
|
||||
output = model(input_id, mask)
|
||||
|
||||
batch_loss = criterion(output, train_label.long())
|
||||
total_loss_train += batch_loss.item()
|
||||
|
||||
acc = (output.argmax(dim=1) == train_label).sum().item()
|
||||
total_acc_train += acc
|
||||
|
||||
model.zero_grad()
|
||||
batch_loss.backward()
|
||||
optimizer.step()
|
||||
|
||||
total_acc_val = 0
|
||||
total_loss_val = 0
|
||||
|
||||
with torch.no_grad():
|
||||
for val_input, val_label in val_dataloader:
|
||||
val_label = val_label.to(DEVICE)
|
||||
mask = val_input["attention_mask"].to(DEVICE)
|
||||
input_id = val_input["input_ids"].squeeze(1).to(DEVICE)
|
||||
|
||||
output = model(input_id, mask)
|
||||
|
||||
batch_loss = criterion(output, val_label.long())
|
||||
total_loss_val += batch_loss.item()
|
||||
|
||||
acc = (output.argmax(dim=1) == val_label).sum().item()
|
||||
total_acc_val += acc
|
||||
|
||||
print(
|
||||
f"Epochs: {epoch + 1} | Train Loss: {total_loss_train / len(train_data): .3f} \
|
||||
| Train Accuracy: {total_acc_train / len(train_data): .3f} \
|
||||
| Val Loss: {total_loss_val / len(val_data): .3f} \
|
||||
| Val Accuracy: {total_acc_val / len(val_data): .3f}"
|
||||
)
|
||||
|
||||
return model
|
Loading…
Reference in New Issue