multiprocessing

This commit is contained in:
Krystian Wasilewski 2023-01-09 19:50:09 +01:00
parent 38578e007f
commit 4ad7aa62e9
2 changed files with 31 additions and 14 deletions

View File

@ -1,6 +1,6 @@
from simpful import *
FS = FuzzySystem()
FS = FuzzySystem(show_banner=False)
# Define fuzzy sets for the variable
# RELEASE_YEAR

43
main.py
View File

@ -5,9 +5,13 @@
!pip install "uvicorn[standard]"
!uvicorn main:app --reload
"""
import multiprocessing
import time
from multiprocessing import Pool
import numpy as np
import pandas as pd
import pandas.core.series
from fastapi import FastAPI
from scipy.spatial.distance import cosine
from sklearn.preprocessing import MultiLabelBinarizer
@ -18,9 +22,11 @@ app = FastAPI()
data = pd.DataFrame()
def inference(first_id: str, second_id: str):
first = data.loc[first_id]
second = data.loc[second_id]
def inference(first: pandas.core.series.Series, second_id: str, df=None):
if df is not None:
second = df.loc[second_id]
else:
second = data.loc[second_id]
year_diff = int(first['release_year'] - second['release_year'])
FS.set_variable('RELEASE_YEAR', year_diff)
@ -40,9 +46,15 @@ def inference(first_id: str, second_id: str):
emotion_diff = 1 - cosine(first['emotions'], second['emotions'])
FS.set_variable('EMOTIONS', emotion_diff)
return FS.inference(['RECOMMENDATION'])
return second_id, FS.inference(['RECOMMENDATION'])['RECOMMENDATION']
def process_dataframe(df, production):
scores = []
for index, row in df.iterrows():
scores.append(inference(production, str(index), df))
return scores
@app.on_event('startup')
async def startup_event():
@ -66,7 +78,7 @@ def rec_score(first_id: str, second_id: str):
except KeyError:
return {'error': f'{second_id} is not a valid id'}
return inference(first_id, second_id)
return inference(first, second_id)
@app.get('/recs/{production_id}')
@ -77,12 +89,17 @@ async def recs(production_id: str, count: int | None):
return {'error': f'{production_id} is not a valid id'}
scores = []
time_start = time.time()
cpus = multiprocessing.cpu_count()
df_list = np.array_split(data, cpus)
pool = Pool(cpus)
results = [pool.apply_async(process_dataframe, [df, first]) for df in df_list]
for index, row in data.iterrows():
if str(index) == production_id:
continue
scores.append((index, inference(production_id, str(index))['RECOMMENDATION']))
scores = [idx[0] for idx in sorted(scores, key=lambda x: x[1], reverse=True)[:count]]
return list(scores)
for r in results:
r.wait()
for r in results:
scores += r.get()
print(f'time elapsed = {time.time() - time_start}')
scores = [idx[0] for idx in sorted(scores, key=lambda x: x[1], reverse=True)[:count+1]]
scores.remove(production_id)
return scores