Better error handling for reconciliation process - fixes #2590 (#2671)

* Harden reconciliation - Fixes #2590

- check for non-JSON / unparseable JSON returns
- handle malformed results response with no name for candidates
- catch any Exception, not just IOExceptions
- call processManager.onFailedProcess() for cleanup on error

* Add default constructor for Jackson

Jackson complains about needing a default constructor for the
NON_DEFAULT annotation, but I'm not sure why this worked before.

* Clean up indentation and unused variable - no functional changes

Make indentation consistent throughout the module, changing recently
added lines to use the standard all spaces convention.

Remove unused count variable

* Simplify control flow

* Update limit parameter comment. No functional change.

* Replace ternary expression which is causing NPE

* Add reconciliation tests using mock HTTP server
This commit is contained in:
Tom Morris 2020-06-23 15:54:54 -04:00 committed by GitHub
parent 6e66cb5144
commit 1849e62234
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 340 additions and 137 deletions

View File

@ -74,10 +74,10 @@ import com.google.refine.util.ParsingUtilities;
public class StandardReconConfig extends ReconConfig {
final static Logger logger = LoggerFactory.getLogger("refine-standard-recon");
private static final String DEFAULT_SCHEMA_SPACE = "http://localhost/schema";
private static final String DEFAULT_IDENTIFIER_SPACE = "http://localhost/identifier";
private static final String DEFAULT_SCHEMA_SPACE = "http://localhost/schema";
private static final String DEFAULT_IDENTIFIER_SPACE = "http://localhost/identifier";
static public class ColumnDetail {
@JsonProperty("column")
final public String columnName;
@ -107,14 +107,14 @@ public class StandardReconConfig extends ReconConfig {
this.propertyID = property == null ? propertyID : property.id;
}
@Override
public String toString() {
try {
return ParsingUtilities.mapper.writeValueAsString(this);
} catch (JsonProcessingException e) {
return super.toString();
}
}
@Override
public String toString() {
try {
return ParsingUtilities.mapper.writeValueAsString(this);
} catch (JsonProcessingException e) {
return super.toString();
}
}
}
static public StandardReconConfig reconstruct(String json) throws IOException {
@ -129,13 +129,13 @@ public class StandardReconConfig extends ReconConfig {
public int getKey() {
return code.hashCode();
}
@Override
public String toString() {
return code;
return code;
}
}
@JsonProperty("service")
final public String service;
@JsonProperty("identifierSpace")
@ -171,11 +171,11 @@ public class StandardReconConfig extends ReconConfig {
@JsonProperty("limit")
int limit) {
this(service, identifierSpace, schemaSpace,
type != null ? type.id : null,
type != null ? type.name : null,
autoMatch, columnDetails, limit);
type != null ? type.id : null,
type != null ? type.name : null,
autoMatch, columnDetails, limit);
}
public StandardReconConfig(
String service,
String identifierSpace,
@ -275,14 +275,14 @@ public class StandardReconConfig extends ReconConfig {
this.v = v;
}
@Override
public String toString() {
try {
return ParsingUtilities.mapper.writeValueAsString(this);
} catch (JsonProcessingException e) {
return super.toString();
}
}
@Override
public String toString() {
try {
return ParsingUtilities.mapper.writeValueAsString(this);
} catch (JsonProcessingException e) {
return super.toString();
}
}
}
protected static class ReconQuery {
@ -306,12 +306,21 @@ public class StandardReconConfig extends ReconConfig {
@JsonInclude(Include.NON_EMPTY)
protected List<QueryProperty> properties;
// Only send limit if it's non-default to preserve backward compatibility with
// services which might choke on this
// Only send limit if it's non-default (default = 0) to preserve backward
// compatibility with services which might choke on this (pre-2013)
@JsonProperty("limit")
@JsonInclude(Include.NON_DEFAULT)
protected int limit;
public ReconQuery() {
super();
this.query = "";
this.typeID = null;
this.properties = null;
this.limit = 0;
}
@JsonCreator
public ReconQuery(
String query,
String typeID,
@ -323,52 +332,47 @@ public class StandardReconConfig extends ReconConfig {
this.limit = limit;
}
@Override
public String toString() {
try {
return ParsingUtilities.mapper.writeValueAsString(this);
} catch (JsonProcessingException e) {
return super.toString();
}
}
@Override
public String toString() {
try {
return ParsingUtilities.mapper.writeValueAsString(this);
} catch (JsonProcessingException e) {
return super.toString();
}
}
}
public static class ReconResult {
@JsonProperty("name")
public String name;
@JsonProperty("id")
public String id;
@JsonProperty("type")
public List<ReconType> types = Collections.emptyList();
@JsonProperty("score")
public double score;
@JsonProperty("match")
public boolean match = false;
@JsonIgnore
public ReconCandidate toCandidate() {
String[] bareTypes = new String[types.size()];
for(int i = 0; i != types.size(); i++) {
bareTypes[i] = types.get(i).id;
}
ReconCandidate result = new ReconCandidate(
id,
name,
bareTypes,
score
);
return result;
}
@Override
public String toString() {
try {
return ParsingUtilities.mapper.writeValueAsString(this);
} catch (JsonProcessingException e) {
return super.toString();
}
}
@JsonProperty("name")
public String name;
@JsonProperty("id")
public String id;
@JsonProperty("type")
public List<ReconType> types = Collections.emptyList();
@JsonProperty("score")
public double score;
@JsonProperty("match")
public boolean match = false;
@JsonIgnore
public ReconCandidate toCandidate() {
String[] bareTypes = new String[types.size()];
for (int i = 0; i != types.size(); i++) {
bareTypes[i] = types.get(i).id;
}
ReconCandidate result = new ReconCandidate(id, name, bareTypes, score);
return result;
}
@Override
public String toString() {
try {
return ParsingUtilities.mapper.writeValueAsString(this);
} catch (JsonProcessingException e) {
return super.toString();
}
}
}
@Override
@ -419,6 +423,7 @@ public class StandardReconConfig extends ReconConfig {
job.code = ParsingUtilities.defaultWriter.writeValueAsString(query);
} catch (JsonProcessingException e) {
e.printStackTrace();
return null; // TODO: Throw exception instead?
}
return job;
}
@ -446,7 +451,7 @@ public class StandardReconConfig extends ReconConfig {
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
{
connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8");
connection.setConnectTimeout(30000);
connection.setConnectTimeout(30000); // TODO parameterize
connection.setDoOutput(true);
DataOutputStream dos = new DataOutputStream(connection.getOutputStream());
@ -464,48 +469,53 @@ public class StandardReconConfig extends ReconConfig {
if (connection.getResponseCode() >= 400) {
InputStream is = connection.getErrorStream();
logger.error("Failed - code:"
+ Integer.toString(connection.getResponseCode())
+ " message: " + is == null ? ""
: ParsingUtilities.inputStreamToString(is));
String msg = is == null ? "" : ParsingUtilities.inputStreamToString(is);
logger.error("Failed - code: "
+ Integer.toString(connection.getResponseCode())
+ " message: " + msg);
} else {
InputStream is = connection.getInputStream();
try {
String s = ParsingUtilities.inputStreamToString(is);
ObjectNode o = ParsingUtilities.evaluateJsonStringToObjectNode(s);
if (o == null) { // utility method returns null instead of throwing
logger.error("Failed to parse string as JSON: " + s);
} else {
for (int i = 0; i < jobs.size(); i++) {
StandardReconJob job = (StandardReconJob) jobs.get(i);
Recon recon = null;
for (int i = 0; i < jobs.size(); i++) {
StandardReconJob job = (StandardReconJob) jobs.get(i);
Recon recon = null;
String text = job.text;
String key = "q" + i;
if (o.has(key) && o.get(key) instanceof ObjectNode) {
ObjectNode o2 = (ObjectNode) o.get(key);
if (o2.has("result") && o2.get("result") instanceof ArrayNode) {
ArrayNode results = (ArrayNode) o2.get("result");
String text = job.text;
String key = "q" + i;
if (o.has(key) && o.get(key) instanceof ObjectNode) {
ObjectNode o2 = (ObjectNode) o.get(key);
if (o2.has("result") && o2.get("result") instanceof ArrayNode) {
ArrayNode results = (ArrayNode) o2.get("result");
recon = createReconServiceResults(text, results, historyEntryID);
recon = createReconServiceResults(text, results, historyEntryID);
} else {
logger.warn("Service error for text: " + text + "\n Job code: " + job.code + "\n Response: " + o2.toString());
}
} else {
logger.warn("Service error for text: " + text + "\n Job code: " + job.code + "\n Response: " + o2.toString());
// TODO: better error reporting
logger.warn("Service error for text: " + text + "\n Job code: " + job.code);
}
} else {
logger.warn("Service error for text: " + text + "\n Job code: " + job.code);
}
if (recon != null) {
recon.service = service;
if (recon != null) {
recon.service = service;
}
recons.add(recon);
}
recons.add(recon);
}
} finally {
is.close();
}
}
} catch (IOException e) {
} catch (Exception e) {
logger.error("Failed to batch recon with load:\n" + queriesString, e);
}
// TODO: This code prevents the retry mechanism in ReconOperation from working
while (recons.size() < jobs.size()) {
Recon recon = new Recon(historyEntryID, identifierSpace, schemaSpace);
recon.service = service;
@ -531,28 +541,26 @@ public class StandardReconConfig extends ReconConfig {
// Sort results by decreasing score
results.sort(new Comparator<ReconResult>() {
@Override
public int compare(ReconResult a, ReconResult b) {
return Double.compare(b.score, a.score);
}
@Override
public int compare(ReconResult a, ReconResult b) {
return Double.compare(b.score, a.score);
}
});
int length = results.size();
int count = 0;
for (int i = 0; i < length; i++) {
ReconResult result = results.get(i);
ReconCandidate candidate = result.toCandidate();
if (autoMatch && i == 0 && result.match) {
recon.match = candidate;
recon.matchRank = 0;
recon.judgment = Judgment.Matched;
recon.judgmentAction = "auto";
}
if (autoMatch && i == 0 && result.match) {
recon.match = candidate;
recon.matchRank = 0;
recon.judgment = Judgment.Matched;
recon.judgmentAction = "auto";
}
recon.addCandidate(candidate);
count++;
}
computeFeatures(recon, text);
@ -563,18 +571,18 @@ public class StandardReconConfig extends ReconConfig {
* Recomputes the features associated with this reconciliation
* object (only if we have at least one candidate).
*
* @param text
* the cell value to compare the reconciliation data to
* @param text the cell value to compare the reconciliation data to
*/
public void computeFeatures(Recon recon, String text) {
if (recon.candidates != null && !recon.candidates.isEmpty() && text != null) {
ReconCandidate candidate = recon.candidates.get(0);
recon.setFeature(Recon.Feature_nameMatch, text.equalsIgnoreCase(candidate.name));
recon.setFeature(Recon.Feature_nameLevenshtein,
StringUtils.getLevenshteinDistance(StringUtils.lowerCase(text), StringUtils.lowerCase(candidate.name)));
recon.setFeature(Recon.Feature_nameWordDistance, wordDistance(text, candidate.name));
if (candidate.name != null) {
recon.setFeature(Recon.Feature_nameMatch, text.equalsIgnoreCase(candidate.name));
recon.setFeature(Recon.Feature_nameLevenshtein,
StringUtils.getLevenshteinDistance(StringUtils.lowerCase(text), StringUtils.lowerCase(candidate.name)));
recon.setFeature(Recon.Feature_nameWordDistance, wordDistance(text, candidate.name));
}
recon.setFeature(Recon.Feature_typeMatch, false);
if (this.typeID != null) {
for (String typeID : candidate.types) {
@ -585,7 +593,7 @@ public class StandardReconConfig extends ReconConfig {
}
}
} else {
recon.features = new Object[Recon.Feature_max];
recon.features = new Object[Recon.Feature_max];
}
}

View File

@ -285,23 +285,27 @@ public class ReconOperation extends EngineDependentOperation {
Recon recon = j < recons.size() ? recons.get(j) : null;
JobGroup group = jobToGroup.get(job);
List<ReconEntry> entries = group.entries;
/*
* TODO: Not sure what this retry is meant to handle, but it's currently
* non-functional due the code at the end of StandardReconConfig#batchRecon()
* which tops up any missing entries.
*/
if (recon == null) {
group.trials++;
if (group.trials < 3) {
logger.warn("Re-trying job including cell containing: " + entries.get(0).cell.value);
continue; // try again next time
}
logger.warn("Failed after 3 trials for job including cell containing: " + entries.get(0).cell.value);
String msg = "Failed after 3 trials for job including cell containing: " + entries.get(0).cell.value;
logger.warn(msg);
recon = _reconConfig.createNewRecon(_historyEntryID);
}
jobToGroup.remove(job);
jobs.remove(j);
done++;
if (recon == null) {
recon = _reconConfig.createNewRecon(_historyEntryID);
}
recon.judgmentBatchSize = entries.size();
for (ReconEntry entry : entries) {
@ -328,6 +332,7 @@ public class ReconOperation extends EngineDependentOperation {
}
}
// TODO: Option to keep partial results after cancellation?
if (!_canceled) {
Change reconChange = new ReconChange(
cellChanges,

View File

@ -32,7 +32,9 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import java.io.IOException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Properties;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@ -43,9 +45,13 @@ import org.testng.annotations.Test;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.refine.RefineTest;
import com.google.refine.browsing.EngineConfig;
import com.google.refine.model.Cell;
import com.google.refine.model.Project;
import com.google.refine.model.Recon;
import com.google.refine.model.ReconCandidate;
import com.google.refine.model.Row;
import com.google.refine.model.recon.ReconConfig;
import com.google.refine.model.recon.ReconJob;
@ -54,9 +60,16 @@ import com.google.refine.model.recon.StandardReconConfig.ColumnDetail;
import com.google.refine.model.recon.StandardReconConfig.ReconResult;
import com.google.refine.operations.OperationRegistry;
import com.google.refine.operations.recon.ReconOperation;
import com.google.refine.process.Process;
import com.google.refine.process.ProcessManager;
import com.google.refine.util.ParsingUtilities;
import com.google.refine.util.TestUtils;
import okhttp3.HttpUrl;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
public class StandardReconConfigTests extends RefineTest {
@BeforeMethod
@ -170,10 +183,10 @@ public class StandardReconConfigTests extends RefineTest {
@Test
public void formulateQueryTest() throws IOException {
Project project = createCSVProject("title,director\n"
+ "mulholland drive,david lynch");
String config = " {\n" +
Project project = createCSVProject("title,director\n"
+ "mulholland drive,david lynch");
String config = " {\n" +
" \"mode\": \"standard-service\",\n" +
" \"service\": \"https://tools.wmflabs.org/openrefine-wikidata/en/api\",\n" +
" \"identifierSpace\": \"http://www.wikidata.org/entity/\",\n" +
@ -190,18 +203,195 @@ public class StandardReconConfigTests extends RefineTest {
" \"propertyID\": \"P123\"\n" +
" }\n" +
" ]}";
StandardReconConfig r = StandardReconConfig.reconstruct(config);
Row row = project.rows.get(0);
StandardReconConfig r = StandardReconConfig.reconstruct(config);
Row row = project.rows.get(0);
ReconJob job = r.createJob(project, 0, row, "title", row.getCell(0));
TestUtils.assertEqualAsJson("{"
+ "\"query\":\"mulholland drive\","
+ "\"type\":\"Q1234\","
+ "\"properties\":["
+ " {\"pid\":\"P123\",\"v\":\"david lynch\"}"
+ "],"
+ "\"type_strict\":\"should\"}", job.toString());
+ "\"query\":\"mulholland drive\","
+ "\"type\":\"Q1234\","
+ "\"properties\":["
+ " {\"pid\":\"P123\",\"v\":\"david lynch\"}"
+ "],"
+ "\"type_strict\":\"should\"}", job.toString());
}
@Test
public void reconNonJsonTest() throws Exception {
Project project = createCSVProject("title,director\n"
+ "mulholland drive,david lynch");
String nonJsonResponse = "<!DOCTYPE html>\n" +
"<html lang=\"en\">\n" +
" <head>\n" +
" <meta charset=\"utf-8\">\n" +
" <title>Error</title>\n" +
" </head>\n" +
" <body>\n" +
" You have reached an error page.\n" +
" </body>\n" +
"</html>";
try (MockWebServer server = new MockWebServer()) {
server.start();
HttpUrl url = server.url("/openrefine-wikidata/en/api");
server.enqueue(new MockResponse().setBody(nonJsonResponse));
server.enqueue(new MockResponse());
String configJson = " {\n" +
" \"mode\": \"standard-service\",\n" +
" \"service\": \"" + url + "\",\n" +
" \"identifierSpace\": \"http://www.wikidata.org/entity/\",\n" +
" \"schemaSpace\": \"http://www.wikidata.org/prop/direct/\",\n" +
" \"type\": {\n" +
" \"id\": \"Q11424\",\n" +
" \"name\": \"film\"\n" +
" },\n" +
" \"autoMatch\": true,\n" +
" \"columnDetails\": [\n" +
" {\n" +
" \"column\": \"director\",\n" +
" \"propertyName\": \"Director\",\n" +
" \"propertyID\": \"P57\"\n" +
" }\n" +
" ]}";
StandardReconConfig config = StandardReconConfig.reconstruct(configJson);
ReconOperation op = new ReconOperation(EngineConfig.reconstruct(null), "director", config);
Process process = op.createProcess(project, new Properties());
ProcessManager pm = project.getProcessManager();
process.startPerforming(pm);
Assert.assertTrue(process.isRunning());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Assert.fail("Test interrupted");
}
Assert.assertFalse(process.isRunning());
RecordedRequest request1 = server.takeRequest();
assertNotNull(request1);
// We won't have gotten a result, but we want to make sure things didn't die
Row row = project.rows.get(0);
Cell cell = row.cells.get(1);
assertNotNull(cell.recon);
assertEquals(cell.recon.service, url.toString());
ReconCandidate candidate = cell.recon.getBestCandidate();
assertNull(candidate);
}
}
@Test
public void reconTest() throws Exception {
Project project = createCSVProject("title,director\n"
+ "mulholland drive,david lynch");
String reconResponse = "{\n" +
"q0: {\n" +
" result: [\n" +
" {\n" +
" P57: {\n" +
"score: 100,\n" +
"weighted: 40\n" +
"},\n" +
"all_labels: {\n" +
"score: 59,\n" +
"weighted: 59\n" +
"},\n" +
"score: 70.71428571428572,\n" +
"id: \"Q3989262\",\n" +
"name: \"The Short Films of David Lynch\",\n" +
"type: [\n" +
"{\n" +
"id: \"Q24862\",\n" +
"name: \"short film\"\n" +
"},\n" +
"{\n" +
"id: \"Q202866\",\n" +
"name: \"animated film\"\n" +
"}\n" +
"],\n" +
"match: false\n" +
"},\n" +
"{\n" +
"P57: {\n" +
"score: 100,\n" +
"weighted: 40\n" +
"},\n" +
"all_labels: {\n" +
"score: 44,\n" +
"weighted: 44\n" +
"},\n" +
"score: 60.00000000000001,\n" +
"id: \"Q83365219\",\n" +
"name: \"What Did Jack Do?\",\n" +
"type: [\n" +
"{\n" +
"id: \"Q24862\",\n" +
"name: \"short film\"\n" +
"}\n" +
"],\n" +
"match: false\n" +
" }\n" +
" ]\n" +
" }\n" +
"}\n";
try (MockWebServer server = new MockWebServer()) {
server.start();
HttpUrl url = server.url("/openrefine-wikidata/en/api");
// FIXME: Retry doesn't currently work, but should be tested
// server.enqueue(new MockResponse().setResponseCode(503)); // service overloaded
server.enqueue(new MockResponse().setBody(reconResponse));
server.enqueue(new MockResponse());
String configJson = " {\n" +
" \"mode\": \"standard-service\",\n" +
" \"service\": \"" + url + "\",\n" +
" \"identifierSpace\": \"http://www.wikidata.org/entity/\",\n" +
" \"schemaSpace\": \"http://www.wikidata.org/prop/direct/\",\n" +
" \"type\": {\n" +
" \"id\": \"Q11424\",\n" +
" \"name\": \"film\"\n" +
" },\n" +
" \"autoMatch\": true,\n" +
" \"columnDetails\": [\n" +
" {\n" +
" \"column\": \"director\",\n" +
" \"propertyName\": \"Director\",\n" +
" \"propertyID\": \"P57\"\n" +
" }\n" +
" ]}";
StandardReconConfig config = StandardReconConfig.reconstruct(configJson);
ReconOperation op = new ReconOperation(EngineConfig.reconstruct(null), "director", config);
Process process = op.createProcess(project, new Properties());
ProcessManager pm = project.getProcessManager();
process.startPerforming(pm);
Assert.assertTrue(process.isRunning());
try {
Thread.sleep(1000); // TODO: timeout will need to increase for retries
} catch (InterruptedException e) {
Assert.fail("Test interrupted");
}
Assert.assertFalse(process.isRunning());
// RecordedRequest scratchFirstRquest = server.takeRequest();
RecordedRequest request1 = server.takeRequest();
assertNotNull(request1);
String query = request1.getBody().readUtf8Line();
assertNotNull(query);
String expected = "queries=" + URLEncoder.encode("{\"q0\":{\"query\":\"david lynch\",\"type\":\"Q11424\",\"properties\":[{\"pid\":\"P57\",\"v\":\"david lynch\"}],\"type_strict\":\"should\"}}", "UTF-8");
assertEquals(query, expected);
Row row = project.rows.get(0);
Cell cell = row.cells.get(1);
assertNotNull(cell.recon);
assertEquals(cell.recon.service, url.toString());
assertEquals(cell.recon.getBestCandidate().types[0], "Q24862");
}
}
/**
* The UI format and the backend format differ for serialization
* (the UI never deserializes and the backend serialization did not matter).