import pandas as pd import string import re import nltk from nltk.tokenize import word_tokenize from nltk.corpus import stopwords from nltk.stem import WordNetLemmatizer from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.linear_model import LogisticRegression from sklearn.model_selection import train_test_split from prefect import task, Flow, context from pandas import DataFrame nltk.download('stopwords') nltk.download('wordnet') nltk.download('punkt') # remove urls, handles, and the hashtag from hashtags (taken from https://stackoverflow.com/questions/8376691/how-to-remove-hashtag-user-link-of-a-tweet-using-regular-expression) def remove_urls(text): new_text = ' '.join(re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)"," ",text).split()) return new_text # make all text lowercase def text_lowercase(text): return text.lower() # remove numbers def remove_numbers(text): result = re.sub(r'\d+', '', text) return result # remove punctuation def remove_punctuation(text): translator = str.maketrans('', '', string.punctuation) return text.translate(translator) # tokenize def tokenize(text): text = word_tokenize(text) return text # remove stopwords stop_words = set(stopwords.words('english')) def remove_stopwords(text): text = [i for i in text if not i in stop_words] return text # lemmatize lemmatizer = WordNetLemmatizer() def lemmatize(text): text = [lemmatizer.lemmatize(token) for token in text] return text def preprocessing(text): text = text_lowercase(text) text = remove_urls(text) text = remove_numbers(text) text = remove_punctuation(text) text = tokenize(text) text = remove_stopwords(text) text = lemmatize(text) text = ' '.join(text) return text @task def get_train_set() -> DataFrame: logger = context.get("logger") train = pd.read_csv('train.csv') train = train.drop(['keyword', 'location'], axis=1) logger.info(f"Train set: {len(train)} elements") return train @task def get_test_set() -> DataFrame: logger = context.get("logger") test = pd.read_csv('test.csv') logger.info(f"Test set: {len(test)} elements") return test @task def preprocess_train(train: DataFrame) -> DataFrame: pp_text_train = [] for text_data in train['text']: pp_text_data = preprocessing(text_data) pp_text_train.append(pp_text_data) train['pp_text'] = pp_text_train return train @task def preprocess_test(test: DataFrame) -> DataFrame: pp_text_test = [] for text_data in test['text']: pp_text_data = preprocessing(text_data) pp_text_test.append(pp_text_data) test['pp_text'] = pp_text_test return test @task def prepare_vectorizer(train_data: DataFrame, test_data: DataFrame) -> TfidfVectorizer: train_text_data = list(train_data['pp_text']) test_text_data = list(test_data['pp_text']) corpus = train_text_data + test_text_data tf = TfidfVectorizer() fitted_vectorizer = tf.fit(corpus) return fitted_vectorizer @task def transform_train(vectorizer: TfidfVectorizer, train_set: DataFrame) -> DataFrame: return vectorizer.transform(train_set) @task def transform_test(vectorizer: TfidfVectorizer, test_set: DataFrame) -> DataFrame: return vectorizer.transform(test_set) @task def split_test_set(X: DataFrame, Y: DataFrame) -> dict: X_train, X_test, y_train, y_test = train_test_split(X, Y) return {'X_train': X_train, 'X_test': X_test, 'y_train': y_train, 'y_test': y_test} @task def train_model(X: DataFrame, Y: DataFrame) -> LogisticRegression: scikit_log_reg = LogisticRegression() model = scikit_log_reg.fit(X, Y) return model @task def evaluate(model: LogisticRegression, X: DataFrame, Y: DataFrame) -> None: logger = context.get("logger") predictions = model.predict(X) count = 0 for guess, answer in zip(predictions, Y): if guess == answer: count += 1 score = count/len(Y) logger.info(f"model score: {count/len(Y)}") if __name__ == "__main__": with Flow("My First Prefect Flow!") as flow: train_data = get_train_set() test_data = get_test_set() train_data = preprocess_train(train_data) test_data = preprocess_test(test_data) vectorizer = prepare_vectorizer(train_data, test_data) vectorized_train_data = transform_train(vectorizer, train_data['pp_text']) vectorized_test_data = transform_test(vectorizer, train_data['pp_text']) splitted_data = split_test_set(vectorized_train_data, train_data['target']) model = train_model(splitted_data['X_train'], splitted_data['y_train']) evaluate(model, splitted_data['X_test'], splitted_data['y_test']) flow.validate() # flow.visualize() flow.run()