Jackson serialization for long running processes

This commit is contained in:
Antonin Delpeuch 2018-09-29 19:28:55 +01:00
parent 18c2183cbc
commit 7d3af420ce
6 changed files with 172 additions and 78 deletions

View File

@ -316,20 +316,7 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat
}); });
} }
} }
@Override
public void write(JSONWriter writer, Properties options)
throws JSONException {
writer.object();
writer.key("id"); writer.value(hashCode());
writer.key("description"); writer.value(_description);
writer.key("immediate"); writer.value(false);
writer.key("status"); writer.value(_thread == null ? "pending" : (_thread.isAlive() ? "running" : "done"));
writer.key("progress"); writer.value(_progress);
writer.endObject();
}
@Override @Override
protected Runnable getRunnable() { protected Runnable getRunnable() {
return this; return this;

View File

@ -178,19 +178,6 @@ public class ExtendDataOperation extends EngineDependentOperation {
_job = new ReconciledDataExtensionJob(_extension, _endpoint); _job = new ReconciledDataExtensionJob(_extension, _endpoint);
} }
@Override
public void write(JSONWriter writer, Properties options)
throws JSONException {
writer.object();
writer.key("id"); writer.value(hashCode());
writer.key("description"); writer.value(_description);
writer.key("immediate"); writer.value(false);
writer.key("status"); writer.value(_thread == null ? "pending" : (_thread.isAlive() ? "running" : "done"));
writer.key("progress"); writer.value(_progress);
writer.endObject();
}
@Override @Override
protected Runnable getRunnable() { protected Runnable getRunnable() {
return this; return this;

View File

@ -33,6 +33,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package com.google.refine.operations.recon; package com.google.refine.operations.recon;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -46,6 +47,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.refine.browsing.Engine; import com.google.refine.browsing.Engine;
import com.google.refine.browsing.EngineConfig; import com.google.refine.browsing.EngineConfig;
@ -69,6 +71,7 @@ import com.google.refine.operations.EngineDependentOperation;
import com.google.refine.operations.OperationRegistry; import com.google.refine.operations.OperationRegistry;
import com.google.refine.process.LongRunningProcess; import com.google.refine.process.LongRunningProcess;
import com.google.refine.process.Process; import com.google.refine.process.Process;
import com.google.refine.util.ParsingUtilities;
public class ReconOperation extends EngineDependentOperation { public class ReconOperation extends EngineDependentOperation {
final static Logger logger = LoggerFactory.getLogger("recon-operation"); final static Logger logger = LoggerFactory.getLogger("recon-operation");
@ -76,7 +79,7 @@ public class ReconOperation extends EngineDependentOperation {
final protected String _columnName; final protected String _columnName;
final protected ReconConfig _reconConfig; final protected ReconConfig _reconConfig;
static public AbstractOperation reconstruct(Project project, JSONObject obj) throws Exception { static public ReconOperation reconstruct(Project project, JSONObject obj) throws Exception {
JSONObject engineConfig = obj.getJSONObject("engineConfig"); JSONObject engineConfig = obj.getJSONObject("engineConfig");
return new ReconOperation( return new ReconOperation(
@ -159,6 +162,32 @@ public class ReconOperation extends EngineDependentOperation {
protected List<ReconEntry> _entries; protected List<ReconEntry> _entries;
protected int _cellIndex; protected int _cellIndex;
protected final String _addJudgmentFacetJson =
"{\n" +
" \"action\" : \"createFacet\",\n" +
" \"facetConfig\" : {\n" +
" \"columnName\" : \"researcher\",\n" +
" \"expression\" : \"forNonBlank(cell.recon.judgment, v, v, if(isNonBlank(value), \\\"(unreconciled)\\\", \\\"(blank)\\\"))\",\n" +
" \"name\" : \"researcher: judgment\"\n" +
" },\n" +
" \"facetOptions\" : {\n" +
" \"scroll\" : false\n" +
" },\n" +
" \"facetType\" : \"list\"\n" +
" }";
protected final String _addScoreFacetJson =
"{\n" +
" \"action\" : \"createFacet\",\n" +
" \"facetConfig\" : {\n" +
" \"columnName\" : \"researcher\",\n" +
" \"expression\" : \"cell.recon.best.score\",\n" +
" \"mode\" : \"range\",\n" +
" \"name\" : \"researcher: best candidate's score\"\n" +
" },\n" +
" \"facetType\" : \"range\"\n" +
"}";
protected JsonNode _addJudgmentFacet, _addScoreFacet;
public ReconProcess( public ReconProcess(
Project project, Project project,
EngineConfig engineConfig, EngineConfig engineConfig,
@ -168,6 +197,12 @@ public class ReconOperation extends EngineDependentOperation {
_project = project; _project = project;
_engineConfig = engineConfig; _engineConfig = engineConfig;
_historyEntryID = HistoryEntry.allocateID(); _historyEntryID = HistoryEntry.allocateID();
try {
_addJudgmentFacet = ParsingUtilities.mapper.readValue(_addJudgmentFacetJson, JsonNode.class);
_addScoreFacet = ParsingUtilities.mapper.readValue(_addScoreFacetJson, JsonNode.class);
} catch (IOException e) {
e.printStackTrace();
}
} }
@Override @Override
@ -214,6 +249,16 @@ public class ReconOperation extends EngineDependentOperation {
writer.endObject(); writer.endObject();
} }
@JsonProperty("onDone")
public List<JsonNode> onDoneActions() {
List<JsonNode> onDone = new ArrayList<>();
onDone.add(_addJudgmentFacet);
if (_reconConfig instanceof StandardReconConfig) {
onDone.add(_addScoreFacet);
}
return onDone;
}
@Override @Override
protected Runnable getRunnable() { protected Runnable getRunnable() {
return this; return this;

View File

@ -49,6 +49,7 @@ import org.testng.annotations.Test;
import com.google.refine.browsing.EngineConfig; import com.google.refine.browsing.EngineConfig;
import com.google.refine.expr.ExpressionUtils; import com.google.refine.expr.ExpressionUtils;
import com.google.refine.model.AbstractOperation;
import com.google.refine.model.Cell; import com.google.refine.model.Cell;
import com.google.refine.model.ModelException; import com.google.refine.model.ModelException;
import com.google.refine.model.Project; import com.google.refine.model.Project;
@ -67,6 +68,31 @@ import com.google.refine.tests.util.TestUtils;
public class ColumnAdditionByFetchingURLsOperationTests extends RefineTest { public class ColumnAdditionByFetchingURLsOperationTests extends RefineTest {
static final String ENGINE_JSON_URLS = "{\"mode\":\"row-based\"}"; static final String ENGINE_JSON_URLS = "{\"mode\":\"row-based\"}";
private String json = "{\"op\":\"core/column-addition-by-fetching-urls\","
+ "\"description\":\"Create column employments at index 2 by fetching URLs based on column orcid using expression grel:\\\"https://pub.orcid.org/\\\"+value+\\\"/employments\\\"\","
+ "\"engineConfig\":{\"mode\":\"row-based\",\"facets\":[]},"
+ "\"newColumnName\":\"employments\","
+ "\"columnInsertIndex\":2,"
+ "\"baseColumnName\":\"orcid\","
+ "\"urlExpression\":\"grel:\\\"https://pub.orcid.org/\\\"+value+\\\"/employments\\\"\","
+ "\"onError\":\"set-to-blank\","
+ "\"delay\":500,"
+ "\"cacheResponses\":true,"
+ "\"httpHeadersJson\":["
+ " {\"name\":\"authorization\",\"value\":\"\"},"
+ " {\"name\":\"user-agent\",\"value\":\"OpenRefine 3.0 rc.1 [TRUNK]\"},"
+ " {\"name\":\"accept\",\"value\":\"application/json\"}"
+ "]}";
private String processJson = ""
+"{\n" +
" \"description\" : \"Create column employments at index 2 by fetching URLs based on column orcid using expression grel:\\\"https://pub.orcid.org/\\\"+value+\\\"/employments\\\"\",\n" +
" \"id\" : %d,\n" +
" \"immediate\" : false,\n" +
" \"progress\" : 0,\n" +
" \"status\" : \"pending\"\n" +
" }";
@Override @Override
@BeforeTest @BeforeTest
@ -99,24 +125,16 @@ public class ColumnAdditionByFetchingURLsOperationTests extends RefineTest {
@Test @Test
public void serializeColumnAdditionByFetchingURLsOperation() throws JSONException, Exception { public void serializeColumnAdditionByFetchingURLsOperation() throws JSONException, Exception {
String json = "{\"op\":\"core/column-addition-by-fetching-urls\","
+ "\"description\":\"Create column employments at index 2 by fetching URLs based on column orcid using expression grel:\\\"https://pub.orcid.org/\\\"+value+\\\"/employments\\\"\","
+ "\"engineConfig\":{\"mode\":\"row-based\",\"facets\":[]},"
+ "\"newColumnName\":\"employments\","
+ "\"columnInsertIndex\":2,"
+ "\"baseColumnName\":\"orcid\","
+ "\"urlExpression\":\"grel:\\\"https://pub.orcid.org/\\\"+value+\\\"/employments\\\"\","
+ "\"onError\":\"set-to-blank\","
+ "\"delay\":500,"
+ "\"cacheResponses\":true,"
+ "\"httpHeadersJson\":["
+ " {\"name\":\"authorization\",\"value\":\"\"},"
+ " {\"name\":\"user-agent\",\"value\":\"OpenRefine 3.0 rc.1 [TRUNK]\"},"
+ " {\"name\":\"accept\",\"value\":\"application/json\"}"
+ "]}";
TestUtils.isSerializedTo(ColumnAdditionByFetchingURLsOperation.reconstruct(project, new JSONObject(json)), json); TestUtils.isSerializedTo(ColumnAdditionByFetchingURLsOperation.reconstruct(project, new JSONObject(json)), json);
} }
@Test
public void serializeUrlFetchingProcess() throws Exception {
AbstractOperation op = ColumnAdditionByFetchingURLsOperation.reconstruct(project, new JSONObject(json));
Process process = op.createProcess(project, new Properties());
TestUtils.isSerializedTo(process, String.format(processJson, process.hashCode()));
}
/** /**
* Test for caching * Test for caching
*/ */

View File

@ -86,6 +86,34 @@ public class ExtendDataOperationTests extends RefineTest {
+ " ]" + " ]"
+ "}"; + "}";
private String operationJson = "{\"op\":\"core/extend-reconciled-data\","
+ "\"description\":\"Extend data at index 3 based on column organization_name\","
+ "\"engineConfig\":{\"mode\":\"row-based\",\"facets\":["
+ " {\"selectNumeric\":true,\"expression\":\"cell.recon.best.score\",\"selectBlank\":false,\"selectNonNumeric\":true,\"selectError\":true,\"name\":\"organization_name: best candidate's score\",\"from\":13,\"to\":101,\"type\":\"range\",\"columnName\":\"organization_name\"},"
+ " {\"selectNonTime\":true,\"expression\":\"grel:toDate(value)\",\"selectBlank\":true,\"selectError\":true,\"selectTime\":true,\"name\":\"start_year\",\"from\":410242968000,\"to\":1262309184000,\"type\":\"timerange\",\"columnName\":\"start_year\"}"
+ "]},"
+ "\"columnInsertIndex\":3,"
+ "\"baseColumnName\":\"organization_name\","
+ "\"endpoint\":\"https://tools.wmflabs.org/openrefine-wikidata/en/api\","
+ "\"identifierSpace\":\"http://www.wikidata.org/entity/\","
+ "\"schemaSpace\":\"http://www.wikidata.org/prop/direct/\","
+ "\"extension\":{"
+ " \"properties\":["
+ " {\"name\":\"inception\",\"id\":\"P571\"},"
+ " {\"name\":\"headquarters location\",\"id\":\"P159\"},"
+ " {\"name\":\"coordinate location\",\"id\":\"P625\"}"
+ " ]"
+ "}}";
private String processJson = ""
+ " {\n" +
" \"description\" : \"Extend data at index 3 based on column organization_name\",\n" +
" \"id\" : %d,\n" +
" \"immediate\" : false,\n" +
" \"progress\" : 0,\n" +
" \"status\" : \"pending\"\n" +
" }";
static public class ReconciledDataExtensionJobStub extends ReconciledDataExtensionJob { static public class ReconciledDataExtensionJobStub extends ReconciledDataExtensionJob {
public ReconciledDataExtensionJobStub(DataExtensionConfig obj, String endpoint) throws JSONException { public ReconciledDataExtensionJobStub(DataExtensionConfig obj, String endpoint) throws JSONException {
super(obj, endpoint); super(obj, endpoint);
@ -137,25 +165,14 @@ public class ExtendDataOperationTests extends RefineTest {
@Test @Test
public void serializeExtendDataOperation() throws JSONException, Exception { public void serializeExtendDataOperation() throws JSONException, Exception {
String json = "{\"op\":\"core/extend-reconciled-data\"," TestUtils.isSerializedTo(ExtendDataOperation.reconstruct(project, new JSONObject(operationJson)), operationJson);
+ "\"description\":\"Extend data at index 3 based on column organization_name\"," }
+ "\"engineConfig\":{\"mode\":\"row-based\",\"facets\":["
+ " {\"selectNumeric\":true,\"expression\":\"cell.recon.best.score\",\"selectBlank\":false,\"selectNonNumeric\":true,\"selectError\":true,\"name\":\"organization_name: best candidate's score\",\"from\":13,\"to\":101,\"type\":\"range\",\"columnName\":\"organization_name\"}," @Test
+ " {\"selectNonTime\":true,\"expression\":\"grel:toDate(value)\",\"selectBlank\":true,\"selectError\":true,\"selectTime\":true,\"name\":\"start_year\",\"from\":410242968000,\"to\":1262309184000,\"type\":\"timerange\",\"columnName\":\"start_year\"}" public void serializeExtendDataProcess() throws JSONException, Exception {
+ "]}," Process p = ExtendDataOperation.reconstruct(project, new JSONObject(operationJson))
+ "\"columnInsertIndex\":3," .createProcess(project, new Properties());
+ "\"baseColumnName\":\"organization_name\"," TestUtils.isSerializedTo(p, String.format(processJson, p.hashCode()));
+ "\"endpoint\":\"https://tools.wmflabs.org/openrefine-wikidata/en/api\","
+ "\"identifierSpace\":\"http://www.wikidata.org/entity/\","
+ "\"schemaSpace\":\"http://www.wikidata.org/prop/direct/\","
+ "\"extension\":{"
+ " \"properties\":["
+ " {\"name\":\"inception\",\"id\":\"P571\"},"
+ " {\"name\":\"headquarters location\",\"id\":\"P159\"},"
+ " {\"name\":\"coordinate location\",\"id\":\"P625\"}"
+ " ]"
+ "}}";
TestUtils.isSerializedTo(ExtendDataOperation.reconstruct(project, new JSONObject(json)), json);
} }
@Test @Test

View File

@ -2,6 +2,8 @@ package com.google.refine.tests.operations.recon;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import java.util.Properties;
import org.json.JSONException; import org.json.JSONException;
import org.json.JSONObject; import org.json.JSONObject;
import org.testng.annotations.BeforeSuite; import org.testng.annotations.BeforeSuite;
@ -17,6 +19,53 @@ import com.google.refine.tests.util.TestUtils;
public class ReconOperationTests extends RefineTest { public class ReconOperationTests extends RefineTest {
private String json= "{"
+ "\"op\":\"core/recon\","
+ "\"description\":\"Reconcile cells in column researcher to type Q5\","
+ "\"columnName\":\"researcher\","
+ "\"config\":{"
+ " \"mode\":\"standard-service\","
+ " \"service\":\"https://tools.wmflabs.org/openrefine-wikidata/en/api\","
+ " \"identifierSpace\":\"http://www.wikidata.org/entity/\","
+ " \"schemaSpace\":\"http://www.wikidata.org/prop/direct/\","
+ " \"type\":{\"id\":\"Q5\",\"name\":\"human\"},"
+ " \"autoMatch\":true,"
+ " \"columnDetails\":[],"
+ " \"limit\":0"
+ "},"
+ "\"engineConfig\":{\"mode\":\"row-based\",\"facets\":[]}}";
private Project project = mock(Project.class);
private String processJson = ""
+ " {\n" +
" \"description\" : \"Reconcile cells in column researcher to type Q5\",\n" +
" \"id\" : %d,\n" +
" \"immediate\" : false,\n" +
" \"onDone\" : [ {\n" +
" \"action\" : \"createFacet\",\n" +
" \"facetConfig\" : {\n" +
" \"columnName\" : \"researcher\",\n" +
" \"expression\" : \"forNonBlank(cell.recon.judgment, v, v, if(isNonBlank(value), \\\"(unreconciled)\\\", \\\"(blank)\\\"))\",\n" +
" \"name\" : \"researcher: judgment\"\n" +
" },\n" +
" \"facetOptions\" : {\n" +
" \"scroll\" : false\n" +
" },\n" +
" \"facetType\" : \"list\"\n" +
" }, {\n" +
" \"action\" : \"createFacet\",\n" +
" \"facetConfig\" : {\n" +
" \"columnName\" : \"researcher\",\n" +
" \"expression\" : \"cell.recon.best.score\",\n" +
" \"mode\" : \"range\",\n" +
" \"name\" : \"researcher: best candidate's score\"\n" +
" },\n" +
" \"facetType\" : \"range\"\n" +
" } ],\n" +
" \"progress\" : 0,\n" +
" \"status\" : \"pending\"\n" +
" }";
@BeforeSuite @BeforeSuite
public void registerOperation() { public void registerOperation() {
OperationRegistry.registerOperation(getCoreModule(), "recon", ReconOperation.class); OperationRegistry.registerOperation(getCoreModule(), "recon", ReconOperation.class);
@ -25,22 +74,13 @@ public class ReconOperationTests extends RefineTest {
@Test @Test
public void serializeReconOperation() throws JSONException, Exception { public void serializeReconOperation() throws JSONException, Exception {
String json = "{"
+ "\"op\":\"core/recon\","
+ "\"description\":\"Reconcile cells in column researcher to type Q5\","
+ "\"columnName\":\"researcher\","
+ "\"config\":{"
+ " \"mode\":\"standard-service\","
+ " \"service\":\"https://tools.wmflabs.org/openrefine-wikidata/en/api\","
+ " \"identifierSpace\":\"http://www.wikidata.org/entity/\","
+ " \"schemaSpace\":\"http://www.wikidata.org/prop/direct/\","
+ " \"type\":{\"id\":\"Q5\",\"name\":\"human\"},"
+ " \"autoMatch\":true,"
+ " \"columnDetails\":[],"
+ " \"limit\":0"
+ "},"
+ "\"engineConfig\":{\"mode\":\"row-based\",\"facets\":[]}}";
Project project = mock(Project.class);
TestUtils.isSerializedTo(ReconOperation.reconstruct(project, new JSONObject(json)), json); TestUtils.isSerializedTo(ReconOperation.reconstruct(project, new JSONObject(json)), json);
} }
@Test
public void serializeReconProcess() throws JSONException, Exception {
ReconOperation op = ReconOperation.reconstruct(project, new JSONObject(json));
com.google.refine.process.Process process = op.createProcess(project, new Properties());
TestUtils.isSerializedTo(process, String.format(processJson, process.hashCode()));
}
} }