diff --git a/extensions/wikidata/src/org/openrefine/wikidata/commands/PerformWikibaseEditsCommand.java b/extensions/wikidata/src/org/openrefine/wikidata/commands/PerformWikibaseEditsCommand.java index fe57c61e3..59f297982 100644 --- a/extensions/wikidata/src/org/openrefine/wikidata/commands/PerformWikibaseEditsCommand.java +++ b/extensions/wikidata/src/org/openrefine/wikidata/commands/PerformWikibaseEditsCommand.java @@ -4,8 +4,6 @@ import javax.servlet.http.HttpServletRequest; import org.json.JSONObject; import org.openrefine.wikidata.operations.PerformWikibaseEditsOperation; -import org.openrefine.wikidata.operations.PerformWikibaseEditsOperation.DuplicateDetectionStrategy; -import org.openrefine.wikidata.operations.PerformWikibaseEditsOperation.OnDuplicateAction; import com.google.refine.commands.EngineDependentCommand; import com.google.refine.model.AbstractOperation; @@ -16,12 +14,8 @@ public class PerformWikibaseEditsCommand extends EngineDependentCommand { @Override protected AbstractOperation createOperation(Project project, HttpServletRequest request, JSONObject engineConfig) throws Exception { - String strategy = request.getParameter("strategy"); - String action = request.getParameter("action"); String summary = request.getParameter("summary"); return new PerformWikibaseEditsOperation(engineConfig, - DuplicateDetectionStrategy.valueOf(strategy), - OnDuplicateAction.valueOf(action), summary); } diff --git a/extensions/wikidata/src/org/openrefine/wikidata/editing/EditBatchProcessor.java b/extensions/wikidata/src/org/openrefine/wikidata/editing/EditBatchProcessor.java new file mode 100644 index 000000000..97f687263 --- /dev/null +++ b/extensions/wikidata/src/org/openrefine/wikidata/editing/EditBatchProcessor.java @@ -0,0 +1,211 @@ +package org.openrefine.wikidata.editing; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.openrefine.wikidata.schema.entityvalues.ReconEntityIdValue; +import org.openrefine.wikidata.updates.ItemUpdate; +import org.openrefine.wikidata.updates.scheduler.WikibaseAPIUpdateScheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wikidata.wdtk.datamodel.helpers.Datamodel; +import org.wikidata.wdtk.datamodel.interfaces.EntityDocument; +import org.wikidata.wdtk.datamodel.interfaces.ItemDocument; +import org.wikidata.wdtk.datamodel.interfaces.ItemIdValue; +import org.wikidata.wdtk.datamodel.interfaces.MonolingualTextValue; +import org.wikidata.wdtk.wikibaseapi.WikibaseDataEditor; +import org.wikidata.wdtk.wikibaseapi.WikibaseDataFetcher; +import org.wikidata.wdtk.wikibaseapi.apierrors.MediaWikiApiErrorException; + +/** + * Schedules and performs a list of updates to items via the API. + * + * @author Antonin Delpeuch + * + */ +public class EditBatchProcessor { + + static final Logger logger = LoggerFactory + .getLogger(EditBatchProcessor.class); + + private WikibaseDataFetcher fetcher; + private WikibaseDataEditor editor; + private NewItemLibrary library; + private List scheduled; + private String summary; + + private List remainingUpdates; + private List currentBatch; + private int batchCursor; + private int globalCursor; + private Map currentDocs; + private int batchSize; + + + /** + * Initiates the process of pushing a batch of updates + * to Wikibase. This schedules the updates and is a + * prerequisite for calling {@link performOneEdit}. + * + * @param fetcher + * the fetcher to use to retrieve the current state of items + * @param editor + * the object to use to perform the edits + * @param updates + * the list of item updates to perform + * @param library + * the library to use to keep track of new item creation + * @param summary + * the summary to append to all edits + * @param batchSize + * the number of items that should be retrieved in one go from the API + */ + public EditBatchProcessor(WikibaseDataFetcher fetcher, + WikibaseDataEditor editor, + List updates, + NewItemLibrary library, + String summary, + int batchSize) { + this.fetcher = fetcher; + this.editor = editor; + editor.setEditAsBot(true); // this will not do anything if the user does not + // have a bot flag, and this is generally wanted if they have one. + this.library = library; + this.summary = summary; + this.batchSize = batchSize; + + // Schedule the edit batch + WikibaseAPIUpdateScheduler scheduler = new WikibaseAPIUpdateScheduler(); + this.scheduled = scheduler.schedule(updates); + this.globalCursor = 0; + + this.batchCursor = 0; + this.remainingUpdates = new ArrayList<>(scheduled); + this.currentBatch = Collections.emptyList(); + this.currentDocs = Collections.emptyMap(); + } + + + /** + * Performs the next edit in the batch. + * + * @throws InterruptedException + */ + public void performEdit() throws InterruptedException { + if (remainingEdits() == 0) { + return; + } + if (batchCursor == currentBatch.size()) { + prepareNewBatch(); + } + ItemUpdate update = currentBatch.get(batchCursor); + + // Rewrite mentions to new items + ReconEntityRewriter rewriter = new ReconEntityRewriter(library, update.getItemId()); + update = rewriter.rewrite(update); + + try { + // New item + if (update.isNew()) { + ReconEntityIdValue newCell = (ReconEntityIdValue)update.getItemId(); + update.normalizeLabelsAndAliases(); + + ItemDocument itemDocument = Datamodel.makeItemDocument(ItemIdValue.NULL, + update.getLabels().stream().collect(Collectors.toList()), + update.getDescriptions().stream().collect(Collectors.toList()), + update.getAliases().stream().collect(Collectors.toList()), + update.getAddedStatementGroups(), + Collections.emptyMap()); + + ItemDocument createdDoc = editor.createItemDocument(itemDocument, summary); + library.setQid(newCell.getReconInternalId(), createdDoc.getItemId().getId()); + } else { + // Existing item + ItemDocument currentDocument = (ItemDocument)currentDocs.get(update.getItemId().getId()); + /* + TermStatementUpdate tsUpdate = new TermStatementUpdate( + currentDocument, + update.getAddedStatements().stream().collect(Collectors.toList()), + update.getDeletedStatements().stream().collect(Collectors.toList()), + update.getLabels().stream().collect(Collectors.toList()), + update.getDescriptions().stream().collect(Collectors.toList()), + update.getAliases().stream().collect(Collectors.toList()), + new ArrayList() + ); + ObjectMapper mapper = new ObjectMapper(); + logger.info(mapper.writeValueAsString(update)); + logger.info(update.toString()); + logger.info(tsUpdate.getJsonUpdateString()); */ + editor.updateTermsStatements(currentDocument, + update.getLabels().stream().collect(Collectors.toList()), + update.getDescriptions().stream().collect(Collectors.toList()), + update.getAliases().stream().collect(Collectors.toList()), + new ArrayList(), + update.getAddedStatements().stream().collect(Collectors.toList()), + update.getDeletedStatements().stream().collect(Collectors.toList()), + summary); + } + } catch (MediaWikiApiErrorException e) { + // TODO find a way to report these errors to the user in a nice way + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + + batchCursor++; + } + + /** + * @return the number of edits that remain to be done in the current batch + */ + public int remainingEdits() { + return scheduled.size() - (globalCursor + batchCursor); + } + + /** + * @return the progress, measured as a percentage + */ + public int progress() { + return (100*(globalCursor + batchCursor)) / scheduled.size(); + } + + protected void prepareNewBatch() throws InterruptedException { + // remove the previous batch from the remainingUpdates + globalCursor += currentBatch.size(); + currentBatch.clear(); + + if(remainingUpdates.size() < batchSize) { + currentBatch = remainingUpdates; + remainingUpdates = Collections.emptyList(); + } else { + currentBatch = remainingUpdates.subList(0, batchSize); + } + List qidsToFetch = currentBatch.stream() + .filter(u -> !u.isNew()) + .map(u -> u.getItemId().getId()) + .collect(Collectors.toList()); + + // Get the current documents for this batch of updates + logger.info("Requesting documents"); + currentDocs = null; + int retries = 3; + while (currentDocs == null && retries > 0) { + try { + currentDocs = fetcher.getEntityDocuments(qidsToFetch); + } catch (MediaWikiApiErrorException e) { + e.printStackTrace(); + Thread.sleep(5000); + } + retries--; + } + if (currentDocs == null) { + throw new InterruptedException("Fetching current documents failed."); + } + batchCursor = 0; + } + +} diff --git a/extensions/wikidata/src/org/openrefine/wikidata/operations/PerformWikibaseEditsOperation.java b/extensions/wikidata/src/org/openrefine/wikidata/operations/PerformWikibaseEditsOperation.java index 48d95bc8b..cd0521fe2 100644 --- a/extensions/wikidata/src/org/openrefine/wikidata/operations/PerformWikibaseEditsOperation.java +++ b/extensions/wikidata/src/org/openrefine/wikidata/operations/PerformWikibaseEditsOperation.java @@ -15,6 +15,7 @@ import org.json.JSONObject; import org.json.JSONWriter; import org.openrefine.wikidata.editing.ConnectionManager; +import org.openrefine.wikidata.editing.EditBatchProcessor; import org.openrefine.wikidata.editing.NewItemLibrary; import org.openrefine.wikidata.editing.ReconEntityRewriter; import org.openrefine.wikidata.updates.ItemUpdate; @@ -57,26 +58,12 @@ public class PerformWikibaseEditsOperation extends EngineDependentOperation { static final Logger logger = LoggerFactory .getLogger(PerformWikibaseEditsOperation.class); - public enum DuplicateDetectionStrategy { - PROPERTY, SNAK, SNAK_QUALIFIERS - } - - public enum OnDuplicateAction { - SKIP, MERGE - } - - private DuplicateDetectionStrategy strategy; - private OnDuplicateAction duplicateAction; private String summary; public PerformWikibaseEditsOperation( JSONObject engineConfig, - DuplicateDetectionStrategy strategy, - OnDuplicateAction duplicateAction, String summary) { super(engineConfig); - this.strategy = strategy; - this.duplicateAction = duplicateAction; this.summary = summary; // getEngine(request, project); @@ -85,16 +72,12 @@ public class PerformWikibaseEditsOperation extends EngineDependentOperation { static public AbstractOperation reconstruct(Project project, JSONObject obj) throws Exception { JSONObject engineConfig = obj.getJSONObject("engineConfig"); - String strategy = obj.getString("duplicate_strategy"); - String action = obj.getString("duplicate_action"); String summary = null; if (obj.has("summary")) { summary = obj.getString("summary"); } return new PerformWikibaseEditsOperation( engineConfig, - DuplicateDetectionStrategy.valueOf(strategy), - OnDuplicateAction.valueOf(action), summary); } @@ -107,10 +90,6 @@ public class PerformWikibaseEditsOperation extends EngineDependentOperation { writer.value(OperationRegistry.s_opClassToName.get(this.getClass())); writer.key("description"); writer.value("Perform Wikibase edits"); - writer.key("duplicate_strategy"); - writer.value(strategy.name()); - writer.key("duplicate_action"); - writer.value(duplicateAction.name()); writer.key("summary"); writer.value(summary); writer.key("engineConfig"); @@ -214,137 +193,29 @@ public class PerformWikibaseEditsOperation extends EngineDependentOperation { WikibaseDataFetcher wbdf = new WikibaseDataFetcher(connection, _schema.getBaseIri()); WikibaseDataEditor wbde = new WikibaseDataEditor(connection, _schema.getBaseIri()); - wbde.setEditAsBot(true); - //wbde.disableEditing(); // Evaluate the schema List itemDocuments = _schema.evaluate(_project, _engine); - // Schedule the edit batch - WikibaseAPIUpdateScheduler scheduler = new WikibaseAPIUpdateScheduler(); - List updates = null; - updates = scheduler.schedule(itemDocuments); - - /** - * TODO: - * - support for new items - * - support for duplicate strategy and action - */ + // Prepare the edits + NewItemLibrary newItemLibrary = new NewItemLibrary(); + EditBatchProcessor processor = new EditBatchProcessor(wbdf, + wbde, itemDocuments, newItemLibrary, _summary, 50); // Perform edits - NewItemLibrary newItemLibrary = new NewItemLibrary(); - DataObjectFactory factory = new DataObjectFactoryImpl(); - List remainingItemUpdates = new ArrayList<>(); - remainingItemUpdates.addAll(updates); - int totalItemUpdates = updates.size(); - int updatesDone = 0; - int batchSize = 50; - while(updatesDone < totalItemUpdates) { - // Split the remaining updates in batches - List batch = null; - if(totalItemUpdates - updatesDone < batchSize) { - batch = remainingItemUpdates; - } else { - batch = remainingItemUpdates.subList(0, batchSize); + logger.info("Performing edits"); + while(processor.remainingEdits() > 0) { + try { + processor.performEdit(); + } catch(InterruptedException e) { + _canceled = true; } - List qids = new ArrayList<>(batch.size()); - for(ItemUpdate update : batch) { - String qid = update.getItemId().getId(); - if (!update.isNew()) { - qids.add(qid); - } - } - - // Get the current documents for this batch of updates - logger.info("Requesting documents"); - Map currentDocs = null; - int retries = 3; - while (currentDocs == null && retries > 0) { - try { - currentDocs = wbdf.getEntityDocuments(qids); - } catch (MediaWikiApiErrorException e) { - e.printStackTrace(); - try { - Thread.sleep(5000); - } catch (InterruptedException e1) { - _canceled = true; - break; - } - } - retries--; - } - + _progress = processor.progress(); if (_canceled) { break; } - logger.info("Performing edits"); - - for(ItemUpdate update : batch) { - // Rewrite the update - ReconEntityRewriter rewriter = new ReconEntityRewriter(newItemLibrary, update.getItemId()); - update = rewriter.rewrite(update); - - try { - // New item - if (update.getItemId().getId().equals("Q0")) { - ReconEntityIdValue newCell = (ReconEntityIdValue)update.getItemId(); - update.normalizeLabelsAndAliases(); - - - ItemDocument itemDocument = factory.getItemDocument( - update.getItemId(), - update.getLabels().stream().collect(Collectors.toList()), - update.getDescriptions().stream().collect(Collectors.toList()), - update.getAliases().stream().collect(Collectors.toList()), - update.getAddedStatementGroups(), - new HashMap(), - 0L); - - ItemDocument createdDoc = wbde.createItemDocument(itemDocument, _summary); - newItemLibrary.setQid(newCell.getReconInternalId(), createdDoc.getItemId().getId()); - } else { - // Existing item - ItemDocument currentDocument = (ItemDocument)currentDocs.get(update.getItemId().getId()); - /* - TermStatementUpdate tsUpdate = new TermStatementUpdate( - currentDocument, - update.getAddedStatements().stream().collect(Collectors.toList()), - update.getDeletedStatements().stream().collect(Collectors.toList()), - update.getLabels().stream().collect(Collectors.toList()), - update.getDescriptions().stream().collect(Collectors.toList()), - update.getAliases().stream().collect(Collectors.toList()), - new ArrayList() - ); - ObjectMapper mapper = new ObjectMapper(); - logger.info(mapper.writeValueAsString(update)); - logger.info(update.toString()); - logger.info(tsUpdate.getJsonUpdateString()); */ - wbde.updateTermsStatements(currentDocument, - update.getLabels().stream().collect(Collectors.toList()), - update.getDescriptions().stream().collect(Collectors.toList()), - update.getAliases().stream().collect(Collectors.toList()), - new ArrayList(), - update.getAddedStatements().stream().collect(Collectors.toList()), - update.getDeletedStatements().stream().collect(Collectors.toList()), - _summary); - } - } catch (MediaWikiApiErrorException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - updatesDone++; - _progress = (100*updatesDone) / totalItemUpdates; - if(_canceled) { - break; - } - } - - batch.clear(); } + _progress = 100; if (!_canceled) { diff --git a/extensions/wikidata/src/org/openrefine/wikidata/qa/scrutinizers/StatementGroupScrutinizer.java b/extensions/wikidata/src/org/openrefine/wikidata/qa/scrutinizers/StatementGroupScrutinizer.java deleted file mode 100644 index 0e553041c..000000000 --- a/extensions/wikidata/src/org/openrefine/wikidata/qa/scrutinizers/StatementGroupScrutinizer.java +++ /dev/null @@ -1,19 +0,0 @@ -package org.openrefine.wikidata.qa.scrutinizers; - -import org.openrefine.wikidata.updates.ItemUpdate; -import org.wikidata.wdtk.datamodel.interfaces.StatementGroup; - -public abstract class StatementGroupScrutinizer extends ItemUpdateScrutinizer { - - @Override - public void scrutinize(ItemUpdate update) { - for(StatementGroup statementGroup : update.getAddedStatementGroups()) { - scrutinizeAdded(statementGroup); - } - - } - - public abstract void scrutinizeAdded(StatementGroup statementGroup); - - public abstract void scrutinizeDeleted(StatementGroup statementGroup); -} diff --git a/extensions/wikidata/tests/src/org/openrefine/wikidata/commands/CommandTest.java b/extensions/wikidata/tests/src/org/openrefine/wikidata/commands/CommandTest.java index cd39dd163..7eefb1d26 100644 --- a/extensions/wikidata/tests/src/org/openrefine/wikidata/commands/CommandTest.java +++ b/extensions/wikidata/tests/src/org/openrefine/wikidata/commands/CommandTest.java @@ -13,7 +13,6 @@ import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; diff --git a/extensions/wikidata/tests/src/org/openrefine/wikidata/editing/EditBatchProcessorTest.java b/extensions/wikidata/tests/src/org/openrefine/wikidata/editing/EditBatchProcessorTest.java new file mode 100644 index 000000000..a94ef5b64 --- /dev/null +++ b/extensions/wikidata/tests/src/org/openrefine/wikidata/editing/EditBatchProcessorTest.java @@ -0,0 +1,152 @@ +package org.openrefine.wikidata.editing; + +import org.openrefine.wikidata.testing.TestingData; +import org.openrefine.wikidata.updates.ItemUpdate; +import org.openrefine.wikidata.updates.ItemUpdateBuilder; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import org.wikidata.wdtk.datamodel.helpers.Datamodel; +import org.wikidata.wdtk.datamodel.helpers.ItemDocumentBuilder; +import org.wikidata.wdtk.datamodel.interfaces.EntityDocument; +import org.wikidata.wdtk.datamodel.interfaces.ItemDocument; +import org.wikidata.wdtk.datamodel.interfaces.ItemIdValue; +import org.wikidata.wdtk.datamodel.interfaces.MonolingualTextValue; +import org.wikidata.wdtk.wikibaseapi.WikibaseDataEditor; +import org.wikidata.wdtk.wikibaseapi.WikibaseDataFetcher; +import org.wikidata.wdtk.wikibaseapi.apierrors.MediaWikiApiErrorException; + +import com.google.refine.tests.RefineTest; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; + + +public class EditBatchProcessorTest extends RefineTest { + + private WikibaseDataFetcher fetcher = null; + private WikibaseDataEditor editor = null; + private NewItemLibrary library = null; + private String summary = "my fantastic edits"; + + @BeforeMethod + public void setUp() { + fetcher = mock(WikibaseDataFetcher.class); + editor = mock(WikibaseDataEditor.class); + editor.disableEditing(); // just in case we got mocking wrong… + library = new NewItemLibrary(); + } + + @Test + public void testNewItem() throws InterruptedException, MediaWikiApiErrorException, IOException { + List batch = new ArrayList<>(); + batch.add(new ItemUpdateBuilder(TestingData.existingId) + .addAlias(Datamodel.makeMonolingualTextValue("my new alias", "en")) + .addStatement(TestingData.generateStatement(TestingData.existingId, TestingData.newIdA)) + .build()); + MonolingualTextValue label = Datamodel.makeMonolingualTextValue("better label", "en"); + batch.add(new ItemUpdateBuilder(TestingData.newIdA) + .addLabel(label) + .build()); + + // Plan expected edits + ItemDocument existingItem = ItemDocumentBuilder.forItemId(TestingData.existingId) + .withLabel(Datamodel.makeMonolingualTextValue("pomme", "fr")) + .withDescription(Datamodel.makeMonolingualTextValue("fruit délicieux", "fr")) + .build(); + when(fetcher.getEntityDocuments(Collections.singletonList(TestingData.existingId.getId()))) + .thenReturn(Collections.singletonMap(TestingData.existingId.getId(), existingItem)); + + ItemDocument expectedNewItem = ItemDocumentBuilder.forItemId(ItemIdValue.NULL) + .withLabel(label).build(); + ItemDocument createdNewItem = ItemDocumentBuilder.forItemId(Datamodel.makeWikidataItemIdValue("Q1234")) + .withLabel(label).withRevisionId(37828L).build(); + when(editor.createItemDocument(expectedNewItem, summary)).thenReturn(createdNewItem); + + EditBatchProcessor processor = new EditBatchProcessor(fetcher, editor, batch, library, summary, 50); + assertEquals(2, processor.remainingEdits()); + assertEquals(0, processor.progress()); + processor.performEdit(); + assertEquals(1, processor.remainingEdits()); + assertEquals(50, processor.progress()); + processor.performEdit(); + assertEquals(0, processor.remainingEdits()); + assertEquals(100, processor.progress()); + processor.performEdit(); // does not do anything + assertEquals(0, processor.remainingEdits()); + assertEquals(100, processor.progress()); + + NewItemLibrary expectedLibrary = new NewItemLibrary(); + expectedLibrary.setQid(1234L, "Q1234"); + assertEquals(expectedLibrary, library); + } + + @Test + public void testMultipleBatches() throws MediaWikiApiErrorException, InterruptedException, IOException { + // Prepare test data + MonolingualTextValue description = Datamodel.makeMonolingualTextValue("village in Nepal", "en"); + List ids = new ArrayList<>(); + for(int i = 124; i < 190; i++) { + ids.add("Q"+String.valueOf(i)); + } + List qids = ids.stream() + .map(e -> Datamodel.makeWikidataItemIdValue(e)) + .collect(Collectors.toList()); + List batch = qids.stream() + .map(qid -> new ItemUpdateBuilder(qid) + .addDescription(description) + .build()) + .collect(Collectors.toList()); + + int batchSize = 50; + List fullBatch = qids.stream() + .map(qid -> ItemDocumentBuilder.forItemId(qid) + .withStatement(TestingData.generateStatement(qid, TestingData.existingId)) + .build()) + .collect(Collectors.toList()); + List firstBatch = fullBatch.subList(0, batchSize); + List secondBatch = fullBatch.subList(batchSize, fullBatch.size()); + + when(fetcher.getEntityDocuments(toQids(firstBatch))).thenReturn(toMap(firstBatch)); + when(fetcher.getEntityDocuments(toQids(secondBatch))).thenReturn(toMap(secondBatch)); + + // Run edits + EditBatchProcessor processor = new EditBatchProcessor(fetcher, editor, batch, library, summary, batchSize); + assertEquals(0, processor.progress()); + for(int i = 124; i < 190; i++) { + assertEquals(processor.remainingEdits(), 190-i); + processor.performEdit(); + } + assertEquals(0, processor.remainingEdits()); + assertEquals(100, processor.progress()); + + // Check result + assertEquals(new NewItemLibrary(), library); + verify(fetcher, times(1)).getEntityDocuments(toQids(firstBatch)); + verify(fetcher, times(1)).getEntityDocuments(toQids(secondBatch)); + for(ItemDocument doc : fullBatch) { + verify(editor, times(1)).updateTermsStatements(doc, Collections.emptyList(), + Collections.singletonList(description), Collections.emptyList(), Collections.emptyList(), + Collections.emptyList(), Collections.emptyList(), summary); + } + } + + private Map toMap(List docs) { + return docs.stream() + .collect(Collectors.toMap(doc -> doc.getItemId().getId(), doc -> doc)); + } + + private List toQids(List docs) { + return docs.stream().map(doc -> doc.getItemId().getId()).collect(Collectors.toList()); + } +}