From 4ad7aa62e9f3ec5dc83b1b261f93fcb267aeb54f Mon Sep 17 00:00:00 2001 From: Krystian Wasilewski Date: Mon, 9 Jan 2023 19:50:09 +0100 Subject: [PATCH] multiprocessing --- engine.py | 2 +- main.py | 43 ++++++++++++++++++++++++++++++------------- 2 files changed, 31 insertions(+), 14 deletions(-) diff --git a/engine.py b/engine.py index 43289c1..9fe7495 100644 --- a/engine.py +++ b/engine.py @@ -1,6 +1,6 @@ from simpful import * -FS = FuzzySystem() +FS = FuzzySystem(show_banner=False) # Define fuzzy sets for the variable # RELEASE_YEAR diff --git a/main.py b/main.py index 5fa6e27..89e83c0 100644 --- a/main.py +++ b/main.py @@ -5,9 +5,13 @@ !pip install "uvicorn[standard]" !uvicorn main:app --reload """ +import multiprocessing +import time +from multiprocessing import Pool import numpy as np import pandas as pd +import pandas.core.series from fastapi import FastAPI from scipy.spatial.distance import cosine from sklearn.preprocessing import MultiLabelBinarizer @@ -18,9 +22,11 @@ app = FastAPI() data = pd.DataFrame() -def inference(first_id: str, second_id: str): - first = data.loc[first_id] - second = data.loc[second_id] +def inference(first: pandas.core.series.Series, second_id: str, df=None): + if df is not None: + second = df.loc[second_id] + else: + second = data.loc[second_id] year_diff = int(first['release_year'] - second['release_year']) FS.set_variable('RELEASE_YEAR', year_diff) @@ -40,9 +46,15 @@ def inference(first_id: str, second_id: str): emotion_diff = 1 - cosine(first['emotions'], second['emotions']) FS.set_variable('EMOTIONS', emotion_diff) - return FS.inference(['RECOMMENDATION']) + return second_id, FS.inference(['RECOMMENDATION'])['RECOMMENDATION'] +def process_dataframe(df, production): + scores = [] + for index, row in df.iterrows(): + scores.append(inference(production, str(index), df)) + return scores + @app.on_event('startup') async def startup_event(): @@ -66,7 +78,7 @@ def rec_score(first_id: str, second_id: str): except KeyError: return {'error': f'{second_id} is not a valid id'} - return inference(first_id, second_id) + return inference(first, second_id) @app.get('/recs/{production_id}') @@ -77,12 +89,17 @@ async def recs(production_id: str, count: int | None): return {'error': f'{production_id} is not a valid id'} scores = [] + time_start = time.time() + cpus = multiprocessing.cpu_count() + df_list = np.array_split(data, cpus) + pool = Pool(cpus) + results = [pool.apply_async(process_dataframe, [df, first]) for df in df_list] - for index, row in data.iterrows(): - if str(index) == production_id: - continue - scores.append((index, inference(production_id, str(index))['RECOMMENDATION'])) - - scores = [idx[0] for idx in sorted(scores, key=lambda x: x[1], reverse=True)[:count]] - - return list(scores) + for r in results: + r.wait() + for r in results: + scores += r.get() + print(f'time elapsed = {time.time() - time_start}') + scores = [idx[0] for idx in sorted(scores, key=lambda x: x[1], reverse=True)[:count+1]] + scores.remove(production_id) + return scores