import os from kaggle.api.kaggle_api_extended import KaggleApi import pandas as pd import numpy as np from sklearn.preprocessing import StandardScaler from sklearn.model_selection import train_test_split def download_kaggle_dataset(): print(f'Kaggle API key: {os.environ["KAGGLE_KEY"]}') kaggle = KaggleApi() kaggle.authenticate() kaggle.dataset_download_files("mlg-ulb/creditcardfraud", path="./", unzip=True) def load_data(name): df = pd.read_csv(name) return df def normalize_data(df): scaler = StandardScaler() df["Amount"] = scaler.fit_transform(df["Amount"].values.reshape(-1, 1)) return df def create_undersample_data(df): # Determine the number of instances in the minority class fraud_count = len(df[df.Class == 1]) fraud_indices = np.array(df[df.Class == 1].index) # Select indices corresponding to majority class instances normal_indices = df[df.Class == 0].index # Randomly sample the same number of instances from the majority class random_normal_indices = np.random.choice(normal_indices, fraud_count, replace=False) random_normal_indices = np.array(random_normal_indices) # Combine indices of both classes undersample_indice = np.concatenate([fraud_indices, random_normal_indices]) # Undersample dataset undersample_data = df.iloc[undersample_indice, :] X_undersample = undersample_data.iloc[:, undersample_data.columns != "Class"] y_undersample = undersample_data.iloc[:, undersample_data.columns == "Class"] return undersample_data, X_undersample, y_undersample def split_undersample_data(X_undersample, y_undersample): X_train_undersample, X_test_undersample, y_train_undersample, y_test_undersample = ( train_test_split(X_undersample, y_undersample, test_size=0.3, random_state=0) ) return ( X_train_undersample, X_test_undersample, y_train_undersample, y_test_undersample, ) def save_undersample_data( undersample_data, X_train_undersample, X_test_undersample, y_train_undersample, y_test_undersample, ): undersample_data.to_csv("/data/undersample_data.csv", index=False) X_train_undersample.to_csv("/data/X_train_undersample.csv", index=False) X_test_undersample.to_csv("/data/X_test_undersample.csv", index=False) y_train_undersample.to_csv("/data/y_train_undersample.csv", index=False) y_test_undersample.to_csv("/data/y_test_undersample.csv", index=False) def split_whole_data(df): X = df.iloc[:, df.columns != "Class"] y = df.iloc[:, df.columns == "Class"] X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.3, random_state=0 ) return X_train, X_test, y_train, y_test def save_whole_data(df, X_train, X_test, y_train, y_test): df.to_csv("/data/creditcard.csv", index=False) X_train.to_csv("/data/X_train.csv", index=False) X_test.to_csv("/data/X_test.csv", index=False) y_train.to_csv("/data/y_train.csv", index=False) y_test.to_csv("/data/y_test.csv", index=False) def main(): download_kaggle_dataset() os.makedirs("/data", exist_ok=True) df = load_data("creditcard.csv") df = normalize_data(df) undersample_data, X_undersample, y_undersample = create_undersample_data(df) X_train_undersample, X_test_undersample, y_train_undersample, y_test_undersample = ( split_undersample_data(X_undersample, y_undersample) ) save_undersample_data( undersample_data, X_train_undersample, X_test_undersample, y_train_undersample, y_test_undersample, ) X_train, X_test, y_train, y_test = split_whole_data(df) save_whole_data(X_train, X_test, y_train, y_test) if __name__ == "__main__": main()