Request items by batches instead of one-by-one

This commit is contained in:
Antonin Delpeuch 2018-01-14 14:54:14 +00:00
parent 54acac491d
commit 0a18d61372

View File

@ -18,13 +18,17 @@ import org.openrefine.wikidata.editing.NewItemLibrary;
import org.openrefine.wikidata.schema.ItemUpdate;
import org.openrefine.wikidata.schema.WikibaseSchema;
import org.openrefine.wikidata.schema.entityvalues.ReconEntityIdValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wikidata.wdtk.datamodel.implementation.DataObjectFactoryImpl;
import org.wikidata.wdtk.datamodel.interfaces.DataObjectFactory;
import org.wikidata.wdtk.datamodel.interfaces.EntityDocument;
import org.wikidata.wdtk.datamodel.interfaces.EntityIdValue;
import org.wikidata.wdtk.datamodel.interfaces.ItemDocument;
import org.wikidata.wdtk.datamodel.interfaces.MonolingualTextValue;
import org.wikidata.wdtk.util.WebResourceFetcherImpl;
import org.wikidata.wdtk.wikibaseapi.ApiConnection;
import org.wikidata.wdtk.wikibaseapi.TermStatementUpdate;
import org.wikidata.wdtk.wikibaseapi.WikibaseDataEditor;
import org.wikidata.wdtk.wikibaseapi.WikibaseDataFetcher;
import org.wikidata.wdtk.wikibaseapi.apierrors.MediaWikiApiErrorException;
@ -45,6 +49,9 @@ import com.google.refine.util.Pool;
public class PerformWikibaseEditsOperation extends EngineDependentOperation {
static final Logger logger = LoggerFactory
.getLogger(PerformWikibaseEditsOperation.class);
public enum DuplicateDetectionStrategy {
PROPERTY, SNAK, SNAK_QUALIFIERS
}
@ -221,52 +228,104 @@ public class PerformWikibaseEditsOperation extends EngineDependentOperation {
// Perform edits
NewItemLibrary newItemLibrary = new NewItemLibrary();
DataObjectFactory factory = new DataObjectFactoryImpl();
List<ItemUpdate> remainingItemUpdates = new ArrayList<>();
remainingItemUpdates.addAll(updates.values());
int totalItemUpdates = updates.size();
int updatesDone = 0;
for(ItemUpdate update : updates.values()) {
try {
// New item
if (update.getItemId().getId() == "Q0") {
ReconEntityIdValue newCell = (ReconEntityIdValue)update.getItemId();
update.normalizeLabelsAndAliases();
ItemDocument itemDocument = factory.getItemDocument(
update.getItemId(),
update.getLabels(),
update.getDescriptions(),
update.getAliases(),
update.getAddedStatementGroups(),
new HashMap<String,SiteLink>(),
0L);
ItemDocument createdDoc = wbde.createItemDocument(itemDocument, _summary);
newItemLibrary.setQid(newCell.getReconInternalId(), createdDoc.getItemId().getId());
} else {
// Existing item
ItemDocument currentDocument = (ItemDocument) wbdf.getEntityDocument(update.getItemId().getId());
wbde.updateTermsStatements(currentDocument,
update.getLabels(),
update.getDescriptions(),
update.getAliases(),
new ArrayList<MonolingualTextValue>(),
update.getAddedStatements(),
update.getDeletedStatements(), _summary);
int batchSize = 50;
while(updatesDone < totalItemUpdates) {
// Split the remaining updates in batches
List<ItemUpdate> batch = null;
if(totalItemUpdates - updatesDone < batchSize) {
batch = remainingItemUpdates;
} else {
batch = remainingItemUpdates.subList(0, batchSize);
}
List<String> qids = new ArrayList<>(batch.size());
for(ItemUpdate update : batch) {
String qid = update.getItemId().getId();
if (!update.isNew()) {
qids.add(qid);
}
} catch (MediaWikiApiErrorException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
updatesDone++;
_progress = (100*updatesDone) / totalItemUpdates;
// Get the current documents for this batch of updates
logger.info("Requesting documents");
Map<String, EntityDocument> currentDocs = null;
int retries = 3;
while (currentDocs == null && retries > 0) {
try {
currentDocs = wbdf.getEntityDocuments(qids);
} catch (MediaWikiApiErrorException e) {
e.printStackTrace();
try {
Thread.sleep(5000);
} catch (InterruptedException e1) {
_canceled = true;
break;
}
}
retries--;
}
if(_canceled) {
if (_canceled) {
break;
}
logger.info("Performing edits");
for(ItemUpdate update : batch) {
try {
// New item
if (update.getItemId().getId() == "Q0") {
ReconEntityIdValue newCell = (ReconEntityIdValue)update.getItemId();
update.normalizeLabelsAndAliases();
ItemDocument itemDocument = factory.getItemDocument(
update.getItemId(),
update.getLabels(),
update.getDescriptions(),
update.getAliases(),
update.getAddedStatementGroups(),
new HashMap<String,SiteLink>(),
0L);
ItemDocument createdDoc = wbde.createItemDocument(itemDocument, _summary);
newItemLibrary.setQid(newCell.getReconInternalId(), createdDoc.getItemId().getId());
} else {
// Existing item
ItemDocument currentDocument = (ItemDocument)currentDocs.get(update.getItemId().getId());
TermStatementUpdate itemUpdate = new TermStatementUpdate( currentDocument, update.getAddedStatements(),
update.getDeletedStatements(), update.getLabels(),
update.getDescriptions(),
update.getAliases(),
new ArrayList<MonolingualTextValue>()
);
System.out.println(itemUpdate.getJsonUpdateString());
wbde.updateTermsStatements(currentDocument,
update.getLabels(),
update.getDescriptions(),
update.getAliases(),
new ArrayList<MonolingualTextValue>(),
update.getAddedStatements(),
update.getDeletedStatements(), _summary);
}
} catch (MediaWikiApiErrorException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
updatesDone++;
if(_canceled) {
break;
}
}
batch.clear();
}
_progress = 100;