Change the behaviour of failed reconciliations to not reconcile the cell at all (#4232)
* Change the behaviour of failed reconciliations to not reconcile the cell at all. This makes it easier to tell apart cells which could not be reconciled due to a network error and those which just did not have any reconciliation candidate. This makes it possible to retry reconciling cells which have been left unreconciled after a recon operation. Closes #3369. * Update StandardReconConfigTests with new behaviour for failed recons
This commit is contained in:
parent
f94116148b
commit
235b5957e2
@ -501,14 +501,8 @@ public class StandardReconConfig extends ReconConfig {
|
||||
logger.error("Failed to batch recon with load:\n" + queriesString, e);
|
||||
}
|
||||
|
||||
// TODO: This code prevents the retry mechanism in ReconOperation from working
|
||||
while (recons.size() < jobs.size()) {
|
||||
Recon recon = new Recon(historyEntryID, identifierSpace, schemaSpace);
|
||||
recon.service = service;
|
||||
recon.identifierSpace = identifierSpace;
|
||||
recon.schemaSpace = schemaSpace;
|
||||
|
||||
recons.add(recon);
|
||||
recons.add(null);
|
||||
}
|
||||
|
||||
return recons;
|
||||
|
@ -286,29 +286,21 @@ public class ReconOperation extends EngineDependentOperation {
|
||||
JobGroup group = jobToGroup.get(job);
|
||||
List<ReconEntry> entries = group.entries;
|
||||
|
||||
/*
|
||||
* TODO: Not sure what this retry is meant to handle, but it's currently
|
||||
* non-functional due the code at the end of StandardReconConfig#batchRecon()
|
||||
* which tops up any missing entries.
|
||||
*/
|
||||
if (recon == null) {
|
||||
group.trials++;
|
||||
if (group.trials < 3) {
|
||||
logger.warn("Re-trying job including cell containing: " + entries.get(0).cell.value);
|
||||
continue; // try again next time
|
||||
}
|
||||
String msg = "Failed after 3 trials for job including cell containing: " + entries.get(0).cell.value;
|
||||
logger.warn(msg);
|
||||
recon = _reconConfig.createNewRecon(_historyEntryID);
|
||||
}
|
||||
|
||||
jobToGroup.remove(job);
|
||||
jobs.remove(j);
|
||||
done++;
|
||||
|
||||
recon.judgmentBatchSize = entries.size();
|
||||
if (recon != null) {
|
||||
recon.judgmentBatchSize = entries.size();
|
||||
}
|
||||
|
||||
for (ReconEntry entry : entries) {
|
||||
if (recon == null) {
|
||||
// TODO add EvalError instead? That is not so convenient
|
||||
// for users because they would lose the cell contents.
|
||||
// Better leave the cell unreconciled so they can be reconciled again later.
|
||||
continue;
|
||||
}
|
||||
Cell oldCell = entry.cell;
|
||||
Cell newCell = new Cell(oldCell.value, recon);
|
||||
|
||||
|
@ -37,11 +37,15 @@ import org.apache.hc.core5.http.io.entity.EntityUtils;
|
||||
import org.apache.hc.core5.http.message.BasicNameValuePair;
|
||||
import org.apache.hc.core5.http.protocol.HttpContext;
|
||||
import org.apache.hc.core5.util.TimeValue;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.refine.RefineServlet;
|
||||
|
||||
|
||||
public class HttpClient {
|
||||
final static Logger logger = LoggerFactory.getLogger("http-client");
|
||||
|
||||
final private RequestConfig defaultRequestConfig;
|
||||
private HttpClientBuilder httpClientBuilder;
|
||||
private CloseableHttpClient httpClient;
|
||||
@ -206,8 +210,10 @@ public class HttpClient {
|
||||
if (interval.compareTo(defaultInterval) == 0) {
|
||||
interval = TimeValue.of(((Double) (Math.pow(2, execCount - 1) * defaultInterval.getDuration())).longValue(),
|
||||
defaultInterval.getTimeUnit() );
|
||||
logger.warn("Retrying HTTP request after "+interval.toString());
|
||||
return interval;
|
||||
}
|
||||
logger.warn("Retrying HTTP request after "+interval.toString());
|
||||
return interval;
|
||||
}
|
||||
|
||||
|
@ -72,6 +72,8 @@ import com.google.refine.model.Column;
|
||||
import com.google.refine.model.ModelException;
|
||||
import com.google.refine.model.Project;
|
||||
import com.google.refine.model.Row;
|
||||
import com.google.refine.process.Process;
|
||||
import com.google.refine.process.ProcessManager;
|
||||
import com.google.refine.util.TestUtils;
|
||||
|
||||
import edu.mit.simile.butterfly.ButterflyModule;
|
||||
@ -375,4 +377,19 @@ public class RefineTest extends PowerMockTestCase {
|
||||
when(coreModule.getName()).thenReturn("core");
|
||||
return coreModule;
|
||||
}
|
||||
|
||||
protected void runAndWait(ProcessManager processManager, Process process, int timeout) {
|
||||
process.startPerforming(processManager);
|
||||
Assert.assertTrue(process.isRunning());
|
||||
int time = 0;
|
||||
try {
|
||||
while (process.isRunning() && time < timeout) {
|
||||
Thread.sleep(200);
|
||||
time += 200;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Assert.fail("Test interrupted");
|
||||
}
|
||||
Assert.assertFalse(process.isRunning(),"Process failed to complete within timeout " + timeout);
|
||||
}
|
||||
}
|
||||
|
@ -45,13 +45,11 @@ import org.testng.annotations.Test;
|
||||
import com.fasterxml.jackson.core.JsonParseException;
|
||||
import com.fasterxml.jackson.databind.JsonMappingException;
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
|
||||
import com.google.refine.RefineTest;
|
||||
import com.google.refine.browsing.EngineConfig;
|
||||
import com.google.refine.model.Cell;
|
||||
import com.google.refine.model.Project;
|
||||
import com.google.refine.model.Recon;
|
||||
import com.google.refine.model.ReconCandidate;
|
||||
import com.google.refine.model.Row;
|
||||
import com.google.refine.model.recon.StandardReconConfig.ColumnDetail;
|
||||
import com.google.refine.model.recon.StandardReconConfig.ReconResult;
|
||||
@ -269,13 +267,13 @@ public class StandardReconConfigTests extends RefineTest {
|
||||
|
||||
assertNotNull(request1);
|
||||
|
||||
// We won't have gotten a result, but we want to make sure things didn't die
|
||||
// We won't have gotten a result, but we want to make sure things didn't die.
|
||||
Row row = project.rows.get(0);
|
||||
Cell cell = row.cells.get(1);
|
||||
assertNotNull(cell.recon);
|
||||
assertEquals(cell.recon.service, url.toString());
|
||||
ReconCandidate candidate = cell.recon.getBestCandidate();
|
||||
assertNull(candidate);
|
||||
assertNotNull(cell.value);
|
||||
assertNull(cell.recon);
|
||||
// the recon object is left null, so that it can be told apart from
|
||||
// empty recon objects (the service legitimally did not return any candidate)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -121,20 +121,9 @@ public class ColumnAdditionByFetchingURLsOperationTests extends RefineTest {
|
||||
private void runAndWait(EngineDependentOperation op, int timeout) throws Exception {
|
||||
ProcessManager pm = project.getProcessManager();
|
||||
Process process = op.createProcess(project, options);
|
||||
process.startPerforming(pm);
|
||||
Assert.assertTrue(process.isRunning());
|
||||
int time = 0;
|
||||
try {
|
||||
while (process.isRunning() && time < timeout) {
|
||||
Thread.sleep(200);
|
||||
time += 200;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Assert.fail("Test interrupted");
|
||||
}
|
||||
Assert.assertFalse(process.isRunning(),"Process failed to complete within timeout " + timeout);
|
||||
runAndWait(pm, process, timeout);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void serializeColumnAdditionByFetchingURLsOperation() throws Exception {
|
||||
TestUtils.isSerializedTo(ParsingUtilities.mapper.readValue(json, ColumnAdditionByFetchingURLsOperation.class), json);
|
||||
|
@ -27,18 +27,29 @@
|
||||
package com.google.refine.operations.recon;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.stubbing.OngoingStubbing;
|
||||
import org.testng.Assert;
|
||||
import org.testng.annotations.BeforeSuite;
|
||||
import org.testng.annotations.BeforeTest;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import com.google.refine.RefineTest;
|
||||
import com.google.refine.browsing.EngineConfig;
|
||||
import com.google.refine.model.Column;
|
||||
import com.google.refine.model.Project;
|
||||
import com.google.refine.model.Recon;
|
||||
import com.google.refine.model.recon.ReconConfig;
|
||||
import com.google.refine.model.recon.ReconJob;
|
||||
import com.google.refine.model.recon.StandardReconConfig;
|
||||
import com.google.refine.operations.OperationRegistry;
|
||||
import com.google.refine.operations.recon.ReconOperation;
|
||||
import com.google.refine.process.Process;
|
||||
import com.google.refine.util.ParsingUtilities;
|
||||
import com.google.refine.util.TestUtils;
|
||||
|
||||
@ -59,7 +70,6 @@ public class ReconOperationTests extends RefineTest {
|
||||
+ " \"limit\":0"
|
||||
+ "},"
|
||||
+ "\"engineConfig\":{\"mode\":\"row-based\",\"facets\":[]}}";
|
||||
private Project project = mock(Project.class);
|
||||
|
||||
private String processJson = ""
|
||||
+ " {\n" +
|
||||
@ -90,6 +100,7 @@ public class ReconOperationTests extends RefineTest {
|
||||
" \"progress\" : 0,\n" +
|
||||
" \"status\" : \"pending\"\n" +
|
||||
" }";
|
||||
|
||||
|
||||
@BeforeSuite
|
||||
public void registerOperation() {
|
||||
@ -105,7 +116,37 @@ public class ReconOperationTests extends RefineTest {
|
||||
@Test
|
||||
public void serializeReconProcess() throws Exception {
|
||||
ReconOperation op = ParsingUtilities.mapper.readValue(json, ReconOperation.class);
|
||||
com.google.refine.process.Process process = op.createProcess(project, new Properties());
|
||||
Project project = mock(Project.class);
|
||||
Process process = op.createProcess(project, new Properties());
|
||||
TestUtils.isSerializedTo(process, String.format(processJson, process.hashCode()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailingRecon() throws Exception {
|
||||
Project project = createCSVProject("my recon test project",
|
||||
"column\n"
|
||||
+ "valueA\n"
|
||||
+ "valueB\n"
|
||||
+ "valueC");
|
||||
StandardReconConfig reconConfig = mock(StandardReconConfig.class);
|
||||
List<Recon> reconList = Arrays.asList((Recon)null, (Recon)null, (Recon)null);
|
||||
ReconJob reconJob = mock(ReconJob.class);
|
||||
when(reconConfig.batchRecon(Mockito.any(), Mockito.anyLong())).thenReturn(reconList);
|
||||
when(reconConfig.getBatchSize()).thenReturn(10);
|
||||
when(reconConfig.createJob(Mockito.eq(project), Mockito.anyInt(), Mockito.any(), Mockito.any(), Mockito.any()))
|
||||
.thenReturn(reconJob);
|
||||
|
||||
ReconOperation op = new ReconOperation(EngineConfig.reconstruct("{}"), "column", reconConfig);
|
||||
|
||||
Process process = op.createProcess(project, new Properties());
|
||||
runAndWait(project.getProcessManager(), process, 1000);
|
||||
|
||||
Column column = project.columnModel.columns.get(0);
|
||||
Assert.assertNotNull(column.getReconStats());
|
||||
Assert.assertEquals(column.getReconStats().matchedTopics, 0);
|
||||
|
||||
Assert.assertNull(project.rows.get(0).getCell(0).recon);
|
||||
Assert.assertNull(project.rows.get(1).getCell(0).recon);
|
||||
Assert.assertNull(project.rows.get(2).getCell(0).recon);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user