Better MQL batching during extending data operations.
Tried to use JSON streaming in changes as well. git-svn-id: http://google-refine.googlecode.com/svn/trunk@479 7d457c2a-affb-35e4-300a-418c747d4874
This commit is contained in:
parent
2277f45ef6
commit
1d938bc4d0
@ -7,6 +7,7 @@ import java.util.Date;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.codehaus.jackson.JsonFactory;
|
||||
import org.codehaus.jackson.JsonParser;
|
||||
import org.codehaus.jackson.JsonToken;
|
||||
import org.json.JSONException;
|
||||
@ -106,6 +107,17 @@ public class Cell implements HasFields, Jsonizable {
|
||||
return new Cell(value, recon);
|
||||
}
|
||||
|
||||
static public Cell loadStreaming(String s, Map<Long, Recon> reconCache) throws Exception {
|
||||
JsonFactory jsonFactory = new JsonFactory();
|
||||
JsonParser jp = jsonFactory.createJsonParser(s);
|
||||
|
||||
if (jp.nextToken() != JsonToken.START_OBJECT) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return loadStreaming(jp, reconCache);
|
||||
}
|
||||
|
||||
static public Cell loadStreaming(JsonParser jp, Map<Long, Recon> reconCache) throws Exception {
|
||||
JsonToken t = jp.getCurrentToken();
|
||||
if (t == JsonToken.VALUE_NULL || t != JsonToken.START_OBJECT) {
|
||||
|
@ -29,7 +29,7 @@ public class CellAtRow {
|
||||
static public CellAtRow load(String s, Map<Long, Recon> reconCache) throws Exception {
|
||||
int semicolon = s.indexOf(';');
|
||||
int row = Integer.parseInt(s.substring(0, semicolon));
|
||||
Cell cell = semicolon < s.length() - 1 ? Cell.load(s.substring(semicolon + 1), reconCache) : null;
|
||||
Cell cell = semicolon < s.length() - 1 ? Cell.loadStreaming(s.substring(semicolon + 1), reconCache) : null;
|
||||
|
||||
return new CellAtRow(row, cell);
|
||||
}
|
||||
|
@ -72,9 +72,9 @@ public class CellChange implements Change {
|
||||
} else if ("cell".equals(field)) {
|
||||
cellIndex = Integer.parseInt(value);
|
||||
} else if ("new".equals(field) && value.length() > 0) {
|
||||
newCell = Cell.load(value, reconCache);
|
||||
newCell = Cell.loadStreaming(value, reconCache);
|
||||
} else if ("old".equals(field) && value.length() > 0) {
|
||||
oldCell = Cell.load(value, reconCache);
|
||||
oldCell = Cell.loadStreaming(value, reconCache);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -157,10 +157,12 @@ public class ExtendDataOperation extends EngineDependentOperation {
|
||||
}.init(rowIndices));
|
||||
}
|
||||
|
||||
protected void extendRows(List<Integer> rowIndices, List<DataExtension> dataExtensions, int from, int to) {
|
||||
protected int extendRows(List<Integer> rowIndices, List<DataExtension> dataExtensions, int from, int limit) {
|
||||
Set<String> guids = new HashSet<String>();
|
||||
for (int i = from; i < to; i++) {
|
||||
int index = rowIndices.get(i);
|
||||
|
||||
int end;
|
||||
for (end = from; end < limit && guids.size() < 10; end++) {
|
||||
int index = rowIndices.get(end);
|
||||
Row row = _project.rows.get(index);
|
||||
Cell cell = row.getCell(_cellIndex);
|
||||
|
||||
@ -174,7 +176,7 @@ public class ExtendDataOperation extends EngineDependentOperation {
|
||||
map = new HashMap<String, DataExtension>();
|
||||
}
|
||||
|
||||
for (int i = from; i < to; i++) {
|
||||
for (int i = from; i < end; i++) {
|
||||
int index = rowIndices.get(i);
|
||||
Row row = _project.rows.get(index);
|
||||
Cell cell = row.getCell(_cellIndex);
|
||||
@ -186,6 +188,8 @@ public class ExtendDataOperation extends EngineDependentOperation {
|
||||
dataExtensions.add(null);
|
||||
}
|
||||
}
|
||||
|
||||
return end;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
@ -201,9 +205,7 @@ public class ExtendDataOperation extends EngineDependentOperation {
|
||||
|
||||
int start = 0;
|
||||
while (start < rowIndices.size()) {
|
||||
int end = Math.min(start + 20, rowIndices.size());
|
||||
|
||||
extendRows(rowIndices, dataExtensions, start, end);
|
||||
int end = extendRows(rowIndices, dataExtensions, start, rowIndices.size());
|
||||
start = end;
|
||||
|
||||
_progress = end * 100 / rowIndices.size();
|
||||
|
Loading…
Reference in New Issue
Block a user