116 lines
4.6 KiB
Python
116 lines
4.6 KiB
Python
from bs4 import BeautifulSoup
|
|
import requests
|
|
import time
|
|
import os
|
|
from tqdm import tqdm
|
|
import csv
|
|
from collections import deque
|
|
import argparse
|
|
import re
|
|
import pandas as pd
|
|
|
|
MAIN_URL = "https://pl.wikisource.org/"
|
|
|
|
|
|
class WikiCrawler:
|
|
def __init__(self, wiki_type: str, output_file_name: str):
|
|
self.wiki_type = wiki_type
|
|
self.output_file_name = output_file_name
|
|
self.page_number = 1
|
|
self.index = 1
|
|
self.load_last_checkpoint()
|
|
|
|
def load_last_checkpoint(self):
|
|
self.start_file_name = None
|
|
if os.path.exists(self.output_file_name):
|
|
df = pd.read_csv(self.output_file_name, encoding='utf-8', sep='\t')
|
|
last_line = df.tail(1).iloc()[0]
|
|
self.start_file_name = last_line[0]
|
|
self.page_number = int(last_line[-1])
|
|
self.index = int(last_line[-2])
|
|
del df
|
|
print(f"Starting from index: {self.index}, page: {self.page_number}")
|
|
|
|
def _init_crawl(self):
|
|
category_dict = {"green": "Uwierzytelniona", "yellow": "Skorygowana", "red": "Przepisana"}
|
|
CATEGORY_URL = f"{MAIN_URL}/wiki/Kategoria:{category_dict[self.wiki_type]}"
|
|
# if should start from other step
|
|
if self.start_file_name:
|
|
CATEGORY_URL = f"{MAIN_URL}/w/index.php?title=Kategoria:{category_dict[self.wiki_type]}&pagefrom={self.start_file_name}"
|
|
request = requests.get(CATEGORY_URL)
|
|
assert request.status_code == 200, f"Status diffrent on main request, status: {request.status_code}"
|
|
|
|
soup = BeautifulSoup(request.text, 'lxml')
|
|
self.max_len = int("".join(re.findall("\d", re.sub("\xa0",'', soup.find("div", {"id": "mw-pages"}).find("p").text))[3:]))
|
|
self.pbar = tqdm(total=self.max_len)
|
|
|
|
if self.start_file_name:
|
|
self.pbar.update(self.index)
|
|
self.pbar.refresh()
|
|
|
|
return soup, request
|
|
|
|
def save_page_data(self, page_element):
|
|
time.sleep(0.3)
|
|
doc_request = requests.get(MAIN_URL + page_element['href'])
|
|
assert doc_request.status_code == 200, f"Wrong status on requesting doc link: {MAIN_URL + page_element['href']}"
|
|
doc_soup = BeautifulSoup(doc_request.text, 'lxml')
|
|
text = doc_soup.find("div", {"class": "pagetext"}).next_element
|
|
image_url = doc_soup.find("div", {"class": "prp-page-image"}).find("img")['src']
|
|
|
|
with open(self.output_file_name, 'a', newline='') as output_csv:
|
|
row_dict = {
|
|
"title": page_element['title'],
|
|
"href": MAIN_URL + page_element['href'],
|
|
"image_url": image_url,
|
|
"text": text.text,
|
|
"index": self.index,
|
|
"page_number": self.page_number
|
|
}
|
|
fields = list(row_dict.keys())
|
|
writer = csv.DictWriter(output_csv, fieldnames=list(row_dict.keys()), delimiter='\t')
|
|
writer.writerow(row_dict)
|
|
|
|
def crawl(self):
|
|
soup, r = self._init_crawl()
|
|
first_search = True
|
|
while self.index < self.max_len:
|
|
time.sleep(0.3)
|
|
self.pbar.set_description(f"Page number: {self.page_number}")
|
|
next_page = soup.find("a", {"href": re.compile(r"\/w\/index.php.*")}, string="następna strona")
|
|
if next_page:
|
|
next_page = next_page.get('href', None)
|
|
if next_page and not first_search:
|
|
r = requests.get(MAIN_URL + next_page)
|
|
elif not next_page:
|
|
print(soup)
|
|
print("\n\n\n", soup.text)
|
|
print("End of pages, or next page not found")
|
|
break
|
|
|
|
# handle wrong request
|
|
if r.status_code != 200:
|
|
print('Retry of request, request data: ', r.__dict__)
|
|
time.sleep(60)
|
|
r = requests.get(MAIN_URL + next_page)
|
|
assert r.status_code == 200, f"Retry failed, request status: {r.status_code}, full_info: {r.__dict__}"
|
|
|
|
soup = BeautifulSoup(r.text, 'lxml')
|
|
links = soup.find_all("a", {"href": re.compile(r"\/wiki\/Strona:.*")})
|
|
for link in links:
|
|
self.save_page_data(link)
|
|
self.index += 1
|
|
self.pbar.update(1)
|
|
self.page_number += 1
|
|
first_search = False
|
|
print("Finished")
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--wiki_type", type=str, choices=["green", "yellow", "red"], required=True)
|
|
parser.add_argument("--output_file_name", type=str, required=True)
|
|
args, left_argv = parser.parse_known_args()
|
|
|
|
crawler = WikiCrawler(**vars(args))
|
|
crawler.crawl()
|