forked from kubapok/kleister-nda-clone
338 lines
9.1 KiB
Python
338 lines
9.1 KiB
Python
![]() |
#!/usr/bin/env python
|
||
|
# coding: utf-8
|
||
|
|
||
|
# # Extract key information from Edgar NDA documents
|
||
|
|
||
|
# In[1]:
|
||
|
|
||
|
|
||
|
import pathlib
|
||
|
from collections import Counter
|
||
|
from sklearn.metrics import *
|
||
|
|
||
|
|
||
|
# In[2]:
|
||
|
|
||
|
|
||
|
KLEISTER_PATH = pathlib.Path('C:/Users/Fijka/Documents/kleister-nda-clone')
|
||
|
file_name = 'train'
|
||
|
|
||
|
|
||
|
# ## Read expected train data
|
||
|
|
||
|
# In[3]:
|
||
|
|
||
|
|
||
|
def get_expected_data(filepath, data_key):
|
||
|
dataset_expected_key = []
|
||
|
with open(filepath,'r') as train_expected_file:
|
||
|
for line in train_expected_file:
|
||
|
key_values = line.rstrip('\n').split(' ')
|
||
|
data_value = None
|
||
|
for key_value in key_values:
|
||
|
key, value = key_value.split('=')
|
||
|
if key == data_key:
|
||
|
data_value = value
|
||
|
if data_value is None:
|
||
|
data_value = 'NONE'
|
||
|
dataset_expected_key.append(data_value)
|
||
|
return dataset_expected_key
|
||
|
|
||
|
|
||
|
# In[4]:
|
||
|
|
||
|
|
||
|
KEYS = ['effective_date', 'jurisdiction', 'party', 'term']
|
||
|
|
||
|
|
||
|
# In[5]:
|
||
|
|
||
|
|
||
|
def read_expected_data(filepath):
|
||
|
data = []
|
||
|
for key in KEYS:
|
||
|
data.append(get_expected_data(filepath, key))
|
||
|
return data
|
||
|
|
||
|
if file_name != 'test-A':
|
||
|
train_expected_data = read_expected_data(KLEISTER_PATH/file_name/'expected.tsv')
|
||
|
|
||
|
|
||
|
# In[6]:
|
||
|
|
||
|
|
||
|
if file_name != 'test-A':
|
||
|
[i[:1] for i in train_expected_data]
|
||
|
|
||
|
|
||
|
# ## Read train dataset
|
||
|
|
||
|
# In[7]:
|
||
|
|
||
|
|
||
|
import lzma
|
||
|
import csv
|
||
|
|
||
|
def read_data(filename):
|
||
|
all_data = lzma.open(filename).read().decode('UTF-8').split('\n')
|
||
|
return [line.split('\t') for line in all_data][:-1]
|
||
|
|
||
|
train_data = read_data(KLEISTER_PATH/file_name/'in.tsv.xz')
|
||
|
|
||
|
|
||
|
# ## JURISDICTION
|
||
|
|
||
|
# In[8]:
|
||
|
|
||
|
|
||
|
STATES = ['Alabama', 'Alaska', 'Arizona', 'Arkansas', 'California', 'Colorado', 'Connecticut', 'Delaware','Florida',
|
||
|
'Georgia', 'Hawaii', 'Idaho', 'Illinois', 'Indiana', 'Iowa', 'Kansas', 'Kentucky', 'Louisiana', 'Maine',
|
||
|
'Maryland', 'Massachusetts', 'Michigan', 'Minnesota', 'Mississippi', 'Missouri', 'Montana', 'Nebraska', 'Nevada',
|
||
|
'New Hampshire', 'New Jersey', 'New Mexico', 'New York', 'North Carolina', 'North Dakota', 'Ohio', 'Oklahoma',
|
||
|
'Oregon', 'Pennsylvania', 'Rhode Island', 'South Carolina', 'South Dakota', 'Tennessee', 'Texas', 'Utah',
|
||
|
'Vermont', 'Virginia', 'Washington', 'West Virginia', 'Wisconsin', 'Wyoming']
|
||
|
|
||
|
|
||
|
# In[9]:
|
||
|
|
||
|
|
||
|
import spacy
|
||
|
nlp = spacy.load("en_core_web_sm")
|
||
|
from operator import itemgetter
|
||
|
|
||
|
jurisdiction = []
|
||
|
|
||
|
def normalize(text):
|
||
|
return text.replace('\\n', ' ').lower()
|
||
|
# nlp(text) -> tokenizacja
|
||
|
|
||
|
def check_jurisdiction(document):
|
||
|
states = {}
|
||
|
for text in document[2:]:
|
||
|
text = normalize(text)
|
||
|
for state in STATES:
|
||
|
if state.lower() in text:
|
||
|
if state in states:
|
||
|
states[state][0] += text.count(state.lower())
|
||
|
else:
|
||
|
states[state] = [text.count(state.lower()), text.index(state.lower())]
|
||
|
if states != {}:
|
||
|
states = sorted(states.items(), key=itemgetter(1), reverse=True)
|
||
|
jurisdiction.append(states[0][0].replace(' ', '_'))
|
||
|
return states[0][0], states
|
||
|
else:
|
||
|
jurisdiction.append(None)
|
||
|
return None
|
||
|
|
||
|
tmp = 0
|
||
|
for i in range(len(train_data)):
|
||
|
tt = check_jurisdiction(train_data[i])
|
||
|
if file_name != 'test-A':
|
||
|
if tt == None:
|
||
|
if train_expected_data[1][i] != None:
|
||
|
# print(i, train_expected_data[1][i], tt)
|
||
|
tmp += 1
|
||
|
else:
|
||
|
if tt[0] != train_expected_data[1][i].replace('_', ' '):
|
||
|
# print(i, train_expected_data[1][i], tt[0])
|
||
|
tmp += 1
|
||
|
print('false jurisdiction:', tmp)
|
||
|
|
||
|
|
||
|
# ## EFFECTIVE DATE
|
||
|
|
||
|
# In[10]:
|
||
|
|
||
|
|
||
|
import re
|
||
|
import datetime
|
||
|
from datetime import date
|
||
|
|
||
|
effective_date = []
|
||
|
|
||
|
def parse_date(date):
|
||
|
month = str(date.month)
|
||
|
if len(month) == 1:
|
||
|
month = '0' + str(date.month)
|
||
|
day = str(date.day)
|
||
|
if len(day) == 1:
|
||
|
day = '0' + str(date.day)
|
||
|
return str(date.year) + '-' + month + '-' + day
|
||
|
|
||
|
def find_dates(text):
|
||
|
|
||
|
MONTHS = {'January' : 1, 'February' : 2, 'March' : 3, 'April' : 4, 'May' : 5, 'June' : 6,
|
||
|
'July' : 7, 'August' : 8, 'September' : 9, 'October' : 10, 'November' : 11, 'December' : 12}
|
||
|
|
||
|
all_dates = []
|
||
|
|
||
|
text = text.replace('\\n', ' ')
|
||
|
|
||
|
dic = {'\d{1,2}\/\d{1,2}\/\d{2}' : '%m/%d/%y',
|
||
|
'[01]*[0-9]\/[01]*[0-9]\/\d{4}' : '%m/%d/%Y',
|
||
|
'\w{3,9}?\s\d{1,2}?,\s\d{4}?' : '%B %d, %Y',
|
||
|
'\w{3,9}?\s\d{1,2}?,\d{4}?' : '%B %d,%Y',
|
||
|
'\d{1,2}?th\sday\sof\s\w{3,9}?\s\d{4}?' : '%dth day of %B %Y',
|
||
|
'\d{1,2}?th\sday\sof\s\w{3,9}?,\s\d{4}?' : '%dth day of %B, %Y',
|
||
|
'\d{1,2}?ND\sday\sof\s\w{3,9}?\s\d{4}?' : '%dND day of %B %Y',
|
||
|
'\w{3,9}?\s\d{1,2}?th\s,\s\d{4}?' : '%B %dth , %Y',
|
||
|
'\w{3,9}?\s\d{1,2}?th,\s\d{4}?' : '%B %dth, %Y',
|
||
|
'\d{1,2}?\sday\sof\s\w{3,9}?,\s\d{4}?' : '%d day of %B, %Y',
|
||
|
'\w{3,9}?\.\s\d{1,2}?,\s\d{4}?' : '%b. %d, %Y',
|
||
|
'\d{1,2}?\s\w{3,9}?,\s\d{4}?' : '%d %B, %Y',
|
||
|
'\d{1,2}?st\sday\sof\s\w{3,9}?\s,\s\d{4}?' : '%dst day of %B , %Y',
|
||
|
'\d{1,2}?st\sday\sof\s\w{3,9}?,\s\d{4}?' : '%dst day of %B, %Y',
|
||
|
'\d{1,2}?nd\sday\sof\s\w{3,9}?,\s\d{4}?' : '%dnd day of %B, %Y',
|
||
|
'\d{1,2}\.\d{1,2}\.\d{2,4}' : '%m.%d.%y'
|
||
|
}
|
||
|
|
||
|
for d in dic:
|
||
|
match = re.search(r'' + d, text)
|
||
|
if match != None:
|
||
|
try:
|
||
|
date = datetime.datetime.strptime(match.group(), dic[d]).date()
|
||
|
all_dates.append(parse_date(date))
|
||
|
except:
|
||
|
pass
|
||
|
|
||
|
return all_dates
|
||
|
|
||
|
def check_effective_date(text):
|
||
|
dates = []
|
||
|
x = find_dates(text)
|
||
|
if x != []:
|
||
|
dates.append(x)
|
||
|
return(dates)
|
||
|
|
||
|
test = 0
|
||
|
for i in range(len(train_data)):
|
||
|
xx = check_effective_date(train_data[i][2])
|
||
|
if file_name != 'test-A':
|
||
|
if train_expected_data[0][i] == 'NONE':
|
||
|
if xx != []:
|
||
|
# print(i, train_expected_data[0][i], xx[-1][0])
|
||
|
test += 1
|
||
|
else:
|
||
|
if xx != []:
|
||
|
if xx[0][-1] != train_expected_data[0][i]:
|
||
|
# print(i, train_expected_data[0][i], xx[-1][0])
|
||
|
test +=1
|
||
|
else:
|
||
|
# print(i, train_expected_data[0][i], xx)
|
||
|
test += 1
|
||
|
if xx != []:
|
||
|
effective_date.append(xx[-1][0])
|
||
|
else:
|
||
|
effective_date.append(None)
|
||
|
print('false effective date', test)
|
||
|
|
||
|
|
||
|
# ## PARTY
|
||
|
|
||
|
# In[11]:
|
||
|
|
||
|
|
||
|
party = []
|
||
|
|
||
|
def check_party(document):
|
||
|
dic = {'And_' : 4,
|
||
|
'From_' : 5,
|
||
|
'For' : 4,
|
||
|
'Between' : 8,
|
||
|
'With' : 5,
|
||
|
'Ceo' : 4,
|
||
|
'To' : 3,
|
||
|
}
|
||
|
|
||
|
for text in document[2:]:
|
||
|
text = text.replace('\\n', ' ')
|
||
|
|
||
|
result = None
|
||
|
match = re.search(r'\w*\s\w*\s\w*,\sInc\.', text)
|
||
|
if match == None:
|
||
|
match = re.search(r'\w*\s\w*\s\w*,\sINC\.', text)
|
||
|
if match != None:
|
||
|
result = match.group().title()
|
||
|
result = result.replace(',', '').replace(' ', '_')
|
||
|
for d in dic:
|
||
|
if d in result:
|
||
|
result = result[result.index(d) + dic[d]:]
|
||
|
if result.startswith('_'):
|
||
|
result = result[1:]
|
||
|
return result
|
||
|
|
||
|
tmp = 0
|
||
|
for i in range(len(train_data)):
|
||
|
tt = check_party(train_data[i])
|
||
|
party.append(tt)
|
||
|
if file_name != 'test-A':
|
||
|
if train_expected_data[2][i] != tt:
|
||
|
tmp += 1
|
||
|
# print(i, train_expected_data[2][i], tt)
|
||
|
print('false party:', tmp)
|
||
|
|
||
|
|
||
|
# ## TERM
|
||
|
|
||
|
# In[12]:
|
||
|
|
||
|
|
||
|
term = []
|
||
|
|
||
|
def check_term(document):
|
||
|
|
||
|
result = None
|
||
|
for text in document[2:]:
|
||
|
text = text.replace('\\n', ' ')
|
||
|
|
||
|
|
||
|
match = re.search(r'\(\d*\)\syears', text)
|
||
|
if match == None:
|
||
|
match = re.search(r'\(\d*\)\smonths', text)
|
||
|
if match != None:
|
||
|
result = match.group().replace('(', '').replace(')', '').replace(' ', '_')
|
||
|
return result
|
||
|
return result
|
||
|
|
||
|
tmp = 0
|
||
|
for i in range(len(train_data)):
|
||
|
tt = check_term(train_data[i])
|
||
|
term.append(tt)
|
||
|
if file_name != 'test-A':
|
||
|
if train_expected_data[3][i] != tt:
|
||
|
if train_expected_data[3][i] == 'NONE' and tt == None:
|
||
|
pass
|
||
|
else:
|
||
|
# print(i, train_expected_data[3][i], tt)
|
||
|
tmp += 1
|
||
|
print('false term:', tmp)
|
||
|
|
||
|
|
||
|
# In[13]:
|
||
|
|
||
|
|
||
|
import os
|
||
|
|
||
|
def write_output(effective_date, jurisdiction, party, term):
|
||
|
if os.path.exists(KLEISTER_PATH/file_name/'out.tsv'):
|
||
|
os.remove(KLEISTER_PATH/file_name/'out.tsv')
|
||
|
file = open(KLEISTER_PATH/file_name/'out.tsv', 'w')
|
||
|
for doc in range(len(effective_date)):
|
||
|
result = ''
|
||
|
if effective_date[doc] != None:
|
||
|
result += 'effective_date=' + effective_date[doc] + '\t'
|
||
|
if jurisdiction[doc] != None:
|
||
|
result += 'jurisdiction=' + jurisdiction[doc] + '\t'
|
||
|
if party[doc] != None:
|
||
|
result += 'party=' + party[doc] + '\t'
|
||
|
if term[doc] != None:
|
||
|
result += 'term=' + term[doc] + '\t'
|
||
|
if len(result) > 1:
|
||
|
result = result[:-1]
|
||
|
result += '\n'
|
||
|
file.write(result)
|
||
|
file.close()
|
||
|
|
||
|
write_output(effective_date, jurisdiction, party, term)
|
||
|
|