Better testing of the editing process

This commit is contained in:
Antonin Delpeuch 2018-03-03 12:44:20 +00:00
parent 773be2e161
commit a002468e7d
6 changed files with 376 additions and 168 deletions

View File

@ -4,8 +4,6 @@ import javax.servlet.http.HttpServletRequest;
import org.json.JSONObject;
import org.openrefine.wikidata.operations.PerformWikibaseEditsOperation;
import org.openrefine.wikidata.operations.PerformWikibaseEditsOperation.DuplicateDetectionStrategy;
import org.openrefine.wikidata.operations.PerformWikibaseEditsOperation.OnDuplicateAction;
import com.google.refine.commands.EngineDependentCommand;
import com.google.refine.model.AbstractOperation;
@ -16,12 +14,8 @@ public class PerformWikibaseEditsCommand extends EngineDependentCommand {
@Override
protected AbstractOperation createOperation(Project project, HttpServletRequest request, JSONObject engineConfig)
throws Exception {
String strategy = request.getParameter("strategy");
String action = request.getParameter("action");
String summary = request.getParameter("summary");
return new PerformWikibaseEditsOperation(engineConfig,
DuplicateDetectionStrategy.valueOf(strategy),
OnDuplicateAction.valueOf(action),
summary);
}

View File

@ -0,0 +1,211 @@
package org.openrefine.wikidata.editing;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.openrefine.wikidata.schema.entityvalues.ReconEntityIdValue;
import org.openrefine.wikidata.updates.ItemUpdate;
import org.openrefine.wikidata.updates.scheduler.WikibaseAPIUpdateScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wikidata.wdtk.datamodel.helpers.Datamodel;
import org.wikidata.wdtk.datamodel.interfaces.EntityDocument;
import org.wikidata.wdtk.datamodel.interfaces.ItemDocument;
import org.wikidata.wdtk.datamodel.interfaces.ItemIdValue;
import org.wikidata.wdtk.datamodel.interfaces.MonolingualTextValue;
import org.wikidata.wdtk.wikibaseapi.WikibaseDataEditor;
import org.wikidata.wdtk.wikibaseapi.WikibaseDataFetcher;
import org.wikidata.wdtk.wikibaseapi.apierrors.MediaWikiApiErrorException;
/**
* Schedules and performs a list of updates to items via the API.
*
* @author Antonin Delpeuch
*
*/
public class EditBatchProcessor {
static final Logger logger = LoggerFactory
.getLogger(EditBatchProcessor.class);
private WikibaseDataFetcher fetcher;
private WikibaseDataEditor editor;
private NewItemLibrary library;
private List<ItemUpdate> scheduled;
private String summary;
private List<ItemUpdate> remainingUpdates;
private List<ItemUpdate> currentBatch;
private int batchCursor;
private int globalCursor;
private Map<String, EntityDocument> currentDocs;
private int batchSize;
/**
* Initiates the process of pushing a batch of updates
* to Wikibase. This schedules the updates and is a
* prerequisite for calling {@link performOneEdit}.
*
* @param fetcher
* the fetcher to use to retrieve the current state of items
* @param editor
* the object to use to perform the edits
* @param updates
* the list of item updates to perform
* @param library
* the library to use to keep track of new item creation
* @param summary
* the summary to append to all edits
* @param batchSize
* the number of items that should be retrieved in one go from the API
*/
public EditBatchProcessor(WikibaseDataFetcher fetcher,
WikibaseDataEditor editor,
List<ItemUpdate> updates,
NewItemLibrary library,
String summary,
int batchSize) {
this.fetcher = fetcher;
this.editor = editor;
editor.setEditAsBot(true); // this will not do anything if the user does not
// have a bot flag, and this is generally wanted if they have one.
this.library = library;
this.summary = summary;
this.batchSize = batchSize;
// Schedule the edit batch
WikibaseAPIUpdateScheduler scheduler = new WikibaseAPIUpdateScheduler();
this.scheduled = scheduler.schedule(updates);
this.globalCursor = 0;
this.batchCursor = 0;
this.remainingUpdates = new ArrayList<>(scheduled);
this.currentBatch = Collections.emptyList();
this.currentDocs = Collections.emptyMap();
}
/**
* Performs the next edit in the batch.
*
* @throws InterruptedException
*/
public void performEdit() throws InterruptedException {
if (remainingEdits() == 0) {
return;
}
if (batchCursor == currentBatch.size()) {
prepareNewBatch();
}
ItemUpdate update = currentBatch.get(batchCursor);
// Rewrite mentions to new items
ReconEntityRewriter rewriter = new ReconEntityRewriter(library, update.getItemId());
update = rewriter.rewrite(update);
try {
// New item
if (update.isNew()) {
ReconEntityIdValue newCell = (ReconEntityIdValue)update.getItemId();
update.normalizeLabelsAndAliases();
ItemDocument itemDocument = Datamodel.makeItemDocument(ItemIdValue.NULL,
update.getLabels().stream().collect(Collectors.toList()),
update.getDescriptions().stream().collect(Collectors.toList()),
update.getAliases().stream().collect(Collectors.toList()),
update.getAddedStatementGroups(),
Collections.emptyMap());
ItemDocument createdDoc = editor.createItemDocument(itemDocument, summary);
library.setQid(newCell.getReconInternalId(), createdDoc.getItemId().getId());
} else {
// Existing item
ItemDocument currentDocument = (ItemDocument)currentDocs.get(update.getItemId().getId());
/*
TermStatementUpdate tsUpdate = new TermStatementUpdate(
currentDocument,
update.getAddedStatements().stream().collect(Collectors.toList()),
update.getDeletedStatements().stream().collect(Collectors.toList()),
update.getLabels().stream().collect(Collectors.toList()),
update.getDescriptions().stream().collect(Collectors.toList()),
update.getAliases().stream().collect(Collectors.toList()),
new ArrayList<MonolingualTextValue>()
);
ObjectMapper mapper = new ObjectMapper();
logger.info(mapper.writeValueAsString(update));
logger.info(update.toString());
logger.info(tsUpdate.getJsonUpdateString()); */
editor.updateTermsStatements(currentDocument,
update.getLabels().stream().collect(Collectors.toList()),
update.getDescriptions().stream().collect(Collectors.toList()),
update.getAliases().stream().collect(Collectors.toList()),
new ArrayList<MonolingualTextValue>(),
update.getAddedStatements().stream().collect(Collectors.toList()),
update.getDeletedStatements().stream().collect(Collectors.toList()),
summary);
}
} catch (MediaWikiApiErrorException e) {
// TODO find a way to report these errors to the user in a nice way
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
batchCursor++;
}
/**
* @return the number of edits that remain to be done in the current batch
*/
public int remainingEdits() {
return scheduled.size() - (globalCursor + batchCursor);
}
/**
* @return the progress, measured as a percentage
*/
public int progress() {
return (100*(globalCursor + batchCursor)) / scheduled.size();
}
protected void prepareNewBatch() throws InterruptedException {
// remove the previous batch from the remainingUpdates
globalCursor += currentBatch.size();
currentBatch.clear();
if(remainingUpdates.size() < batchSize) {
currentBatch = remainingUpdates;
remainingUpdates = Collections.emptyList();
} else {
currentBatch = remainingUpdates.subList(0, batchSize);
}
List<String> qidsToFetch = currentBatch.stream()
.filter(u -> !u.isNew())
.map(u -> u.getItemId().getId())
.collect(Collectors.toList());
// Get the current documents for this batch of updates
logger.info("Requesting documents");
currentDocs = null;
int retries = 3;
while (currentDocs == null && retries > 0) {
try {
currentDocs = fetcher.getEntityDocuments(qidsToFetch);
} catch (MediaWikiApiErrorException e) {
e.printStackTrace();
Thread.sleep(5000);
}
retries--;
}
if (currentDocs == null) {
throw new InterruptedException("Fetching current documents failed.");
}
batchCursor = 0;
}
}

View File

@ -15,6 +15,7 @@ import org.json.JSONObject;
import org.json.JSONWriter;
import org.openrefine.wikidata.editing.ConnectionManager;
import org.openrefine.wikidata.editing.EditBatchProcessor;
import org.openrefine.wikidata.editing.NewItemLibrary;
import org.openrefine.wikidata.editing.ReconEntityRewriter;
import org.openrefine.wikidata.updates.ItemUpdate;
@ -57,26 +58,12 @@ public class PerformWikibaseEditsOperation extends EngineDependentOperation {
static final Logger logger = LoggerFactory
.getLogger(PerformWikibaseEditsOperation.class);
public enum DuplicateDetectionStrategy {
PROPERTY, SNAK, SNAK_QUALIFIERS
}
public enum OnDuplicateAction {
SKIP, MERGE
}
private DuplicateDetectionStrategy strategy;
private OnDuplicateAction duplicateAction;
private String summary;
public PerformWikibaseEditsOperation(
JSONObject engineConfig,
DuplicateDetectionStrategy strategy,
OnDuplicateAction duplicateAction,
String summary) {
super(engineConfig);
this.strategy = strategy;
this.duplicateAction = duplicateAction;
this.summary = summary;
// getEngine(request, project);
@ -85,16 +72,12 @@ public class PerformWikibaseEditsOperation extends EngineDependentOperation {
static public AbstractOperation reconstruct(Project project, JSONObject obj)
throws Exception {
JSONObject engineConfig = obj.getJSONObject("engineConfig");
String strategy = obj.getString("duplicate_strategy");
String action = obj.getString("duplicate_action");
String summary = null;
if (obj.has("summary")) {
summary = obj.getString("summary");
}
return new PerformWikibaseEditsOperation(
engineConfig,
DuplicateDetectionStrategy.valueOf(strategy),
OnDuplicateAction.valueOf(action),
summary);
}
@ -107,10 +90,6 @@ public class PerformWikibaseEditsOperation extends EngineDependentOperation {
writer.value(OperationRegistry.s_opClassToName.get(this.getClass()));
writer.key("description");
writer.value("Perform Wikibase edits");
writer.key("duplicate_strategy");
writer.value(strategy.name());
writer.key("duplicate_action");
writer.value(duplicateAction.name());
writer.key("summary");
writer.value(summary);
writer.key("engineConfig");
@ -214,137 +193,29 @@ public class PerformWikibaseEditsOperation extends EngineDependentOperation {
WikibaseDataFetcher wbdf = new WikibaseDataFetcher(connection, _schema.getBaseIri());
WikibaseDataEditor wbde = new WikibaseDataEditor(connection, _schema.getBaseIri());
wbde.setEditAsBot(true);
//wbde.disableEditing();
// Evaluate the schema
List<ItemUpdate> itemDocuments = _schema.evaluate(_project, _engine);
// Schedule the edit batch
WikibaseAPIUpdateScheduler scheduler = new WikibaseAPIUpdateScheduler();
List<ItemUpdate> updates = null;
updates = scheduler.schedule(itemDocuments);
/**
* TODO:
* - support for new items
* - support for duplicate strategy and action
*/
// Prepare the edits
NewItemLibrary newItemLibrary = new NewItemLibrary();
EditBatchProcessor processor = new EditBatchProcessor(wbdf,
wbde, itemDocuments, newItemLibrary, _summary, 50);
// Perform edits
NewItemLibrary newItemLibrary = new NewItemLibrary();
DataObjectFactory factory = new DataObjectFactoryImpl();
List<ItemUpdate> remainingItemUpdates = new ArrayList<>();
remainingItemUpdates.addAll(updates);
int totalItemUpdates = updates.size();
int updatesDone = 0;
int batchSize = 50;
while(updatesDone < totalItemUpdates) {
// Split the remaining updates in batches
List<ItemUpdate> batch = null;
if(totalItemUpdates - updatesDone < batchSize) {
batch = remainingItemUpdates;
} else {
batch = remainingItemUpdates.subList(0, batchSize);
logger.info("Performing edits");
while(processor.remainingEdits() > 0) {
try {
processor.performEdit();
} catch(InterruptedException e) {
_canceled = true;
}
List<String> qids = new ArrayList<>(batch.size());
for(ItemUpdate update : batch) {
String qid = update.getItemId().getId();
if (!update.isNew()) {
qids.add(qid);
}
}
// Get the current documents for this batch of updates
logger.info("Requesting documents");
Map<String, EntityDocument> currentDocs = null;
int retries = 3;
while (currentDocs == null && retries > 0) {
try {
currentDocs = wbdf.getEntityDocuments(qids);
} catch (MediaWikiApiErrorException e) {
e.printStackTrace();
try {
Thread.sleep(5000);
} catch (InterruptedException e1) {
_canceled = true;
break;
}
}
retries--;
}
_progress = processor.progress();
if (_canceled) {
break;
}
logger.info("Performing edits");
for(ItemUpdate update : batch) {
// Rewrite the update
ReconEntityRewriter rewriter = new ReconEntityRewriter(newItemLibrary, update.getItemId());
update = rewriter.rewrite(update);
try {
// New item
if (update.getItemId().getId().equals("Q0")) {
ReconEntityIdValue newCell = (ReconEntityIdValue)update.getItemId();
update.normalizeLabelsAndAliases();
ItemDocument itemDocument = factory.getItemDocument(
update.getItemId(),
update.getLabels().stream().collect(Collectors.toList()),
update.getDescriptions().stream().collect(Collectors.toList()),
update.getAliases().stream().collect(Collectors.toList()),
update.getAddedStatementGroups(),
new HashMap<String,SiteLink>(),
0L);
ItemDocument createdDoc = wbde.createItemDocument(itemDocument, _summary);
newItemLibrary.setQid(newCell.getReconInternalId(), createdDoc.getItemId().getId());
} else {
// Existing item
ItemDocument currentDocument = (ItemDocument)currentDocs.get(update.getItemId().getId());
/*
TermStatementUpdate tsUpdate = new TermStatementUpdate(
currentDocument,
update.getAddedStatements().stream().collect(Collectors.toList()),
update.getDeletedStatements().stream().collect(Collectors.toList()),
update.getLabels().stream().collect(Collectors.toList()),
update.getDescriptions().stream().collect(Collectors.toList()),
update.getAliases().stream().collect(Collectors.toList()),
new ArrayList<MonolingualTextValue>()
);
ObjectMapper mapper = new ObjectMapper();
logger.info(mapper.writeValueAsString(update));
logger.info(update.toString());
logger.info(tsUpdate.getJsonUpdateString()); */
wbde.updateTermsStatements(currentDocument,
update.getLabels().stream().collect(Collectors.toList()),
update.getDescriptions().stream().collect(Collectors.toList()),
update.getAliases().stream().collect(Collectors.toList()),
new ArrayList<MonolingualTextValue>(),
update.getAddedStatements().stream().collect(Collectors.toList()),
update.getDeletedStatements().stream().collect(Collectors.toList()),
_summary);
}
} catch (MediaWikiApiErrorException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
updatesDone++;
_progress = (100*updatesDone) / totalItemUpdates;
if(_canceled) {
break;
}
}
batch.clear();
}
_progress = 100;
if (!_canceled) {

View File

@ -1,19 +0,0 @@
package org.openrefine.wikidata.qa.scrutinizers;
import org.openrefine.wikidata.updates.ItemUpdate;
import org.wikidata.wdtk.datamodel.interfaces.StatementGroup;
public abstract class StatementGroupScrutinizer extends ItemUpdateScrutinizer {
@Override
public void scrutinize(ItemUpdate update) {
for(StatementGroup statementGroup : update.getAddedStatementGroups()) {
scrutinizeAdded(statementGroup);
}
}
public abstract void scrutinizeAdded(StatementGroup statementGroup);
public abstract void scrutinizeDeleted(StatementGroup statementGroup);
}

View File

@ -13,7 +13,6 @@ import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;

View File

@ -0,0 +1,152 @@
package org.openrefine.wikidata.editing;
import org.openrefine.wikidata.testing.TestingData;
import org.openrefine.wikidata.updates.ItemUpdate;
import org.openrefine.wikidata.updates.ItemUpdateBuilder;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.wikidata.wdtk.datamodel.helpers.Datamodel;
import org.wikidata.wdtk.datamodel.helpers.ItemDocumentBuilder;
import org.wikidata.wdtk.datamodel.interfaces.EntityDocument;
import org.wikidata.wdtk.datamodel.interfaces.ItemDocument;
import org.wikidata.wdtk.datamodel.interfaces.ItemIdValue;
import org.wikidata.wdtk.datamodel.interfaces.MonolingualTextValue;
import org.wikidata.wdtk.wikibaseapi.WikibaseDataEditor;
import org.wikidata.wdtk.wikibaseapi.WikibaseDataFetcher;
import org.wikidata.wdtk.wikibaseapi.apierrors.MediaWikiApiErrorException;
import com.google.refine.tests.RefineTest;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
public class EditBatchProcessorTest extends RefineTest {
private WikibaseDataFetcher fetcher = null;
private WikibaseDataEditor editor = null;
private NewItemLibrary library = null;
private String summary = "my fantastic edits";
@BeforeMethod
public void setUp() {
fetcher = mock(WikibaseDataFetcher.class);
editor = mock(WikibaseDataEditor.class);
editor.disableEditing(); // just in case we got mocking wrong
library = new NewItemLibrary();
}
@Test
public void testNewItem() throws InterruptedException, MediaWikiApiErrorException, IOException {
List<ItemUpdate> batch = new ArrayList<>();
batch.add(new ItemUpdateBuilder(TestingData.existingId)
.addAlias(Datamodel.makeMonolingualTextValue("my new alias", "en"))
.addStatement(TestingData.generateStatement(TestingData.existingId, TestingData.newIdA))
.build());
MonolingualTextValue label = Datamodel.makeMonolingualTextValue("better label", "en");
batch.add(new ItemUpdateBuilder(TestingData.newIdA)
.addLabel(label)
.build());
// Plan expected edits
ItemDocument existingItem = ItemDocumentBuilder.forItemId(TestingData.existingId)
.withLabel(Datamodel.makeMonolingualTextValue("pomme", "fr"))
.withDescription(Datamodel.makeMonolingualTextValue("fruit délicieux", "fr"))
.build();
when(fetcher.getEntityDocuments(Collections.singletonList(TestingData.existingId.getId())))
.thenReturn(Collections.singletonMap(TestingData.existingId.getId(), existingItem));
ItemDocument expectedNewItem = ItemDocumentBuilder.forItemId(ItemIdValue.NULL)
.withLabel(label).build();
ItemDocument createdNewItem = ItemDocumentBuilder.forItemId(Datamodel.makeWikidataItemIdValue("Q1234"))
.withLabel(label).withRevisionId(37828L).build();
when(editor.createItemDocument(expectedNewItem, summary)).thenReturn(createdNewItem);
EditBatchProcessor processor = new EditBatchProcessor(fetcher, editor, batch, library, summary, 50);
assertEquals(2, processor.remainingEdits());
assertEquals(0, processor.progress());
processor.performEdit();
assertEquals(1, processor.remainingEdits());
assertEquals(50, processor.progress());
processor.performEdit();
assertEquals(0, processor.remainingEdits());
assertEquals(100, processor.progress());
processor.performEdit(); // does not do anything
assertEquals(0, processor.remainingEdits());
assertEquals(100, processor.progress());
NewItemLibrary expectedLibrary = new NewItemLibrary();
expectedLibrary.setQid(1234L, "Q1234");
assertEquals(expectedLibrary, library);
}
@Test
public void testMultipleBatches() throws MediaWikiApiErrorException, InterruptedException, IOException {
// Prepare test data
MonolingualTextValue description = Datamodel.makeMonolingualTextValue("village in Nepal", "en");
List<String> ids = new ArrayList<>();
for(int i = 124; i < 190; i++) {
ids.add("Q"+String.valueOf(i));
}
List<ItemIdValue> qids = ids.stream()
.map(e -> Datamodel.makeWikidataItemIdValue(e))
.collect(Collectors.toList());
List<ItemUpdate> batch = qids.stream()
.map(qid -> new ItemUpdateBuilder(qid)
.addDescription(description)
.build())
.collect(Collectors.toList());
int batchSize = 50;
List<ItemDocument> fullBatch = qids.stream()
.map(qid -> ItemDocumentBuilder.forItemId(qid)
.withStatement(TestingData.generateStatement(qid, TestingData.existingId))
.build())
.collect(Collectors.toList());
List<ItemDocument> firstBatch = fullBatch.subList(0, batchSize);
List<ItemDocument> secondBatch = fullBatch.subList(batchSize, fullBatch.size());
when(fetcher.getEntityDocuments(toQids(firstBatch))).thenReturn(toMap(firstBatch));
when(fetcher.getEntityDocuments(toQids(secondBatch))).thenReturn(toMap(secondBatch));
// Run edits
EditBatchProcessor processor = new EditBatchProcessor(fetcher, editor, batch, library, summary, batchSize);
assertEquals(0, processor.progress());
for(int i = 124; i < 190; i++) {
assertEquals(processor.remainingEdits(), 190-i);
processor.performEdit();
}
assertEquals(0, processor.remainingEdits());
assertEquals(100, processor.progress());
// Check result
assertEquals(new NewItemLibrary(), library);
verify(fetcher, times(1)).getEntityDocuments(toQids(firstBatch));
verify(fetcher, times(1)).getEntityDocuments(toQids(secondBatch));
for(ItemDocument doc : fullBatch) {
verify(editor, times(1)).updateTermsStatements(doc, Collections.emptyList(),
Collections.singletonList(description), Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), Collections.emptyList(), summary);
}
}
private Map<String, EntityDocument> toMap(List<ItemDocument> docs) {
return docs.stream()
.collect(Collectors.toMap(doc -> doc.getItemId().getId(), doc -> doc));
}
private List<String> toQids(List<ItemDocument> docs) {
return docs.stream().map(doc -> doc.getItemId().getId()).collect(Collectors.toList());
}
}