aitech-sd-lab/dialogue_system.py
2022-06-15 11:45:20 +02:00

305 lines
16 KiB
Python

from modules.NLU import NLU
from modules.DST import DST
# from modules.DP import DP
from modules.NLG import NLG
import json
import re
import time
value_dict = json.load(open('modules/value_dict.json'))
def format_prediction(prediction, intent):
out_list = []
for idx, tup in enumerate(prediction):
if tup[1][0] == 'B':
slot_list = [intent, 'Cinema', tup[1][2:], tup[0]]
for tup in prediction[idx + 1:]:
if tup[1][0] != 'I':
break
else:
slot_list[3] += ' ' + tup[0]
out_list.append(slot_list)
for slot in out_list:
slot[3] = re.sub("^[!\"#$%&\'()*+,.;:<=>?\[\]^_`{|}~]+", '', slot[3])
slot[3] = re.sub("[!\"#$%&\'()*+,.;:<=>?\[\]^_`{|}~]+$", '', slot[3])
return out_list
def main():
nlu = NLU()
dst = DST()
# dp = DP()
nlg = NLG(dst)
#nlu.train_slot_model('data/train+test-pl.conllu', 'data/train+test-pl.conllu')
#nlu.train_intent_model('data/NLU_data_intent')
nlu.load_slot_model('slot-model-pl')
nlu.load_intent_model('intent-model-pl')
start = time.time()
turns = 0
print('===========================================')
print('### By otrzymać pomoc, wpisz /pomoc ###')
print('### By zakończyć rozmowę, wpisz /koniec ###')
print('### By rozpocząć rozmowę od początku, wpisz /reset ###')
print('### By podejrzeć stan dialogu, wpisz /dst ###')
print('Witaj, jestem Usher - system do rezerwacji biletów kinowych. W czym mogę Ci pomóc?')
# WIP
while True:
turns += 1
user_input = input('> ')
dst.state['history'].append(f"user\t{user_input}")
user_input_lr = user_input.lower()
flag = False
slots = nlu.predict_slots(user_input)
intent = nlu.predict_intent(user_input)
formatted_prediction = format_prediction(slots, intent)
for slot in formatted_prediction:
if len(formatted_prediction)<2:
if dst.state['system_action'][-1] != slot:
formatted_prediction.remove(slot)
formatted_prediction.append([slot[0],slot[1],dst.state['system_action'][-1][0][2],slot[3]])
if slot[2]=='seat':
if ',' in slot[3]:
seat,row = slot[3].split(',')
formatted_prediction.remove(['inform', 'Cinema', 'seat', slot[3]])
formatted_prediction.append(['inform', 'Cinema', 'seat', seat])
formatted_prediction.append(['inform', 'Cinema', 'row', f' {row}'])
if 'rzęd' in user_input_lr:
formatted_prediction_tmp = formatted_prediction.copy()
user_input_splited = user_input_lr.split()
for slots in formatted_prediction:
if slots[2] =='row' or slots[2]=='seat' or slots[2] =='quantity':
formatted_prediction_tmp.remove(slots)
formatted_prediction = formatted_prediction_tmp.copy()
if 'miejsc' not in user_input_lr and 'bilet' not in user_input_lr:
for i in range(len(user_input_splited)-1):
if 'rzęd' in user_input_splited[i]:
try:
if isinstance(int(user_input_splited[i+1]),int)and i==0:
formatted_prediction.append(['inform', 'Cinema', 'row', user_input_splited[i+1]])
elif isinstance(int(user_input_splited[i-1]),int):
formatted_prediction.append(['inform', 'Cinema', 'row', user_input_splited[i-1]])
except:
if isinstance(int(user_input_splited[i+1]),int):
formatted_prediction.append(['inform', 'Cinema', 'row', user_input_splited[i+1]])
elif 'miejsc' in user_input_lr and 'bilet' not in user_input_lr:
for i in range(len(user_input_splited)-1):
print(user_input_splited[i],user_input_splited[i+1])
if 'rzęd' in user_input_splited[i]:
try:
if isinstance(int(user_input_splited[i+1]),int)and i==0:
formatted_prediction.append(['inform', 'Cinema', 'row', user_input_splited[i+1]])
elif isinstance(int(user_input_splited[i-1]),int):
formatted_prediction.append(['inform', 'Cinema', 'row', user_input_splited[i-1]])
except:
if isinstance(int(user_input_splited[i+1]),int):
formatted_prediction.append(['inform', 'Cinema', 'row', user_input_splited[i+1]])
if 'miejsc' in user_input_splited[i]:
try:
if i==0 and isinstance(int(user_input_splited[i+1]),int):
formatted_prediction.append(['inform', 'Cinema', 'seat', user_input_splited[i+1]])
elif isinstance(int(user_input_splited[i-1]),int):
formatted_prediction.append(['inform', 'Cinema', 'seat', user_input_splited[i-1]])
except:
if isinstance(int(user_input_splited[i+1]),int):
formatted_prediction.append(['inform', 'Cinema', 'seat', user_input_splited[i+1]])
elif 'miejsc' not in user_input_lr and 'bilet' in user_input_lr:
for i in range(len(user_input_splited)-1):
print(user_input_splited[i],user_input_splited[i+1])
if 'rzęd' in user_input_splited[i]:
try:
if isinstance(int(user_input_splited[i+1]),int)and i==0:
formatted_prediction.append(['inform', 'Cinema', 'row', user_input_splited[i+1]])
elif isinstance(int(user_input_splited[i-1]),int):
formatted_prediction.append(['inform', 'Cinema', 'row', user_input_splited[i-1]])
except:
if isinstance(int(user_input_splited[i+1]),int):
formatted_prediction.append(['inform', 'Cinema', 'row', user_input_splited[i+1]])
if 'miejsc' in user_input_splited[i]:
try:
if i==0 and isinstance(int(user_input_splited[i+1]),int):
formatted_prediction.append(['inform', 'Cinema', 'quantity', user_input_splited[i+1]])
elif isinstance(int(user_input_splited[i-1]),int):
formatted_prediction.append(['inform', 'Cinema', 'quantity', user_input_splited[i-1]])
except:
if isinstance(int(user_input_splited[i+1]),int):
formatted_prediction.append(['inform', 'Cinema', 'quantity', user_input_splited[i+1]])
elif 'miejsc' in user_input_lr and 'bilet' in user_input_lr:
for i in range(len(user_input_splited)-1):
if 'rzęd' in user_input_splited[i]:
try:
if isinstance(int(user_input_splited[i+1]),int)and i==0:
formatted_prediction.append(['inform', 'Cinema', 'row', user_input_splited[i+1]])
elif isinstance(int(user_input_splited[i-1]),int):
formatted_prediction.append(['inform', 'Cinema', 'row', user_input_splited[i-1]])
except:
if isinstance(int(user_input_splited[i+1]),int):
formatted_prediction.append(['inform', 'Cinema', 'row', user_input_splited[i+1]])
if 'miejsc' in user_input_splited[i]:
try:
if isinstance(int(user_input_splited[i+1]),int) and i==0:
formatted_prediction.append(['inform', 'Cinema', 'seat', user_input_splited[i+1]])
elif isinstance(int(user_input_splited[i-1]),int):
formatted_prediction.append(['inform', 'Cinema', 'seat', user_input_splited[i-1]])
except:
if isinstance(int(user_input_splited[i+1]),int):
formatted_prediction.append(['inform', 'Cinema', 'seat', user_input_splited[i+1]])
if 'bilet' in user_input_splited[i]:
try:
if isinstance(int(user_input_splited[i+1]),int)and i==0:
formatted_prediction.append(['inform', 'Cinema', 'quantity', user_input_splited[i+1]])
elif isinstance(int(user_input_splited[i-1]),int):
formatted_prediction.append(['inform', 'Cinema', 'quantity', user_input_splited[i-1]])
except:
if isinstance(int(user_input_splited[i+1]),int):
formatted_prediction.append(['inform', 'Cinema', 'quantity', user_input_splited[i+1]])
if user_input_lr == '/pomoc':
print('System Usher pozwala na:\n1. Rezerwację biletu\n2. Anulowanie rezerwacji biletu\n3. Zakup biletu\n4. Anulowanie zakupu biletu\n5. Sprawdzenie repertuaru\n6. Sprawdzenie dostępności miejsc')
elif user_input_lr == '/koniec':
print('Dziękuję za skorzystanie z moich usług. Miłego dnia!')
break
elif user_input_lr == '/dst':
print(dst.state)
elif user_input_lr == '/reset':
with open('history.txt', 'a+', encoding='utf-8') as f:
f.write(str(dst.state))
f.write('\n')
dst = DST()
# dp = DP() ?
print('Witaj, jestem Usher - system do rezerwacji biletów kinowych. W czym mogę Ci pomóc?')
elif 'rezerw' in user_input_lr:
if 'anulo' in user_input_lr:
flag=False
for slot in slots:
if slot == 'B-e-mail' or '@' in slot[0]:
print(nlg.update([['cinema','inform','cancel_book',slot[0]]]))
dst.update([['inform', 'Cinema', 'task', 'cancel_book'],['inform', 'Cinema', 'cancel_book_status',True]])
flag = True
if not flag:
dst.update([['inform', 'Cinema', 'task', 'cancel_book'],['inform', 'Cinema', 'cancel_book_status',False]])
print(nlg.update([['cinema','inform','cancel_book','']]))
else:
dst.update([['inform', 'Cinema', 'task', 'book']])
dst.update(formatted_prediction)
for slot,value in dst.state['belief_state']['cinema']['book'].items():
if value == '':
print(nlg.update([['cinema','request',slot,'']]))
break
elif slot=="row" and value != '':
for slot,value in dst.state['belief_state']['cinema']['semi'].items():
if value == '':
print(nlg.update([['cinema','request',slot,'']]))
break
elif slot in ["cancel_book_status","cancel_buy_status"] and value =='':
continue
elif 'kup' in user_input_lr or ('zwr' and 'bilet') in user_input_lr:
if 'anulo' in user_input_lr or 'zwr' in user_input_lr:
flag=False
for slot in slots:
if slot == 'B-e-mail' or '@' in slot[0]:
print(nlg.update([['cinema','inform','cancel_buy',slot[0]]]))
dst.update([['inform', 'Cinema', 'task', 'cancel_buy'],['inform', 'Cinema', 'cancel_buy_status',True]])
flag = True
if not flag:
dst.update([['inform', 'Cinema', 'task', 'cancel_buy'],['inform', 'Cinema', 'cancel_buy_status',False]])
print(nlg.update([['cinema','inform','cancel_buy','']]))
else:
dst.update([['inform', 'Cinema', 'task', 'buy']])
dst.update(formatted_prediction)
for slot,value in dst.state['belief_state']['cinema']['book'].items():
if value == '':
print(nlg.update([['cinema','request',slot,'']]))
break
elif slot=="row" and value != '':
for slot,value in dst.state['belief_state']['cinema']['semi'].items():
if value == '':
print(nlg.update([['cinema','request',slot,'']]))
break
elif slot in ["cancel_book_status","cancel_buy_status"] and value =='':
continue
elif (('jak' or 'któr') and 'film') in user_input_lr or 'repertuar' in user_input_lr:
dst.update([['inform', 'Cinema', 'task', 'show_movies']])
flag=False
for slot in slots:
if slot[0] in value_dict['train']['date'] or slot[0] in value_dict['train']['day'] or slot[0] in ['dziś','jutro','pojutrze']:
print(nlg.update([['cinema','offer','closestscreening',slot[0]]]))
flag = True
if not flag:
print(nlg.update([['cinema','offer','closestscreening','']]))
elif ('czy' or 'jakie' or 'które') and ('dostęp' or 'woln' or 'zajęt') in user_input_lr:
if 'miejsce' in user_input_lr:
dst.update([['offer', 'Cinema', 'task', 'show_seats']])
flag=False
for slot in slots:
if slot[0] in value_dict['train']['seat']:
print(nlg.update([['cinema','offer','seat',slot[0]]]))
flag = True
if not flag:
print(nlg.update([['cinema','offer','seat','']]))
elif len(formatted_prediction)<1:
print('Nie do końca zrozumiałem mógłbyś/mogłabyś powtórzyć? ')
else:
# print(formatted_prediction) # NLU output
dst.update(formatted_prediction)
#DP
fl = False
for action in dst.state['system_action']:
if 'cancel_book' == action[0][2]:
for slot in formatted_prediction:
if slot[2] == 'e-mail' or slot[2] == 'cancel_book' or slot[2] == 'cancel_buy':
fl = True
if fl:
print("Anulowano. Czy w czymś jeszcze mogę Ci pomóc?")
else:
for slot,value in dst.state['belief_state']['cinema']['book'].items():
if value == '':
print(nlg.update([['cinema','request',slot,'']]))
break
elif slot=="row" and value != '':
for slot,value in dst.state['belief_state']['cinema']['semi'].items():
if slot == 'payments' and value == '':
print('Czy chciałbyś dokonać płatności online?')
input_user = input('> ')
if input_user.lower() == 'tak':
print(nlg.update([['cinema','request',slot,'online']]))
dst.update([['inform', 'Cinema', 'payments', 'online']])
else:
print(nlg.update([['cinema','request',slot,'']]))
dst.update([['inform', 'Cinema', 'payments', 'in_cinema']])
elif value == '':
print(nlg.update([['cinema','request',slot,'']]))
break
elif slot in ["cancel_book_status","cancel_buy_status"] and value =='':
continue
end = time.time()
print(f'### Czas konwersacji: {round(end - start, 2)}s ###')
print(f'### Tury konwersacji: {turns} ###')
with open('history.txt', 'a+', encoding='utf-8') as f:
f.write(str(dst.state))
f.write('\n')
if __name__ == '__main__':
main()