Add caching in URL fetching

This commit is contained in:
Antonin Delpeuch 2017-03-07 20:21:27 +00:00
parent 5d8d372244
commit 782a2f5b48

View File

@ -23,8 +23,8 @@ LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
@ -43,6 +43,8 @@ import java.net.URLConnection;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.Map;
import java.util.HashMap;
import org.json.JSONException; import org.json.JSONException;
import org.json.JSONObject; import org.json.JSONObject;
@ -76,14 +78,14 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat
final protected String _baseColumnName; final protected String _baseColumnName;
final protected String _urlExpression; final protected String _urlExpression;
final protected OnError _onError; final protected OnError _onError;
final protected String _newColumnName; final protected String _newColumnName;
final protected int _columnInsertIndex; final protected int _columnInsertIndex;
final protected int _delay; final protected int _delay;
static public AbstractOperation reconstruct(Project project, JSONObject obj) throws Exception { static public AbstractOperation reconstruct(Project project, JSONObject obj) throws Exception {
JSONObject engineConfig = obj.getJSONObject("engineConfig"); JSONObject engineConfig = obj.getJSONObject("engineConfig");
return new ColumnAdditionByFetchingURLsOperation( return new ColumnAdditionByFetchingURLsOperation(
engineConfig, engineConfig,
obj.getString("baseColumnName"), obj.getString("baseColumnName"),
@ -94,32 +96,32 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat
obj.getInt("delay") obj.getInt("delay")
); );
} }
public ColumnAdditionByFetchingURLsOperation( public ColumnAdditionByFetchingURLsOperation(
JSONObject engineConfig, JSONObject engineConfig,
String baseColumnName, String baseColumnName,
String urlExpression, String urlExpression,
OnError onError, OnError onError,
String newColumnName, String newColumnName,
int columnInsertIndex, int columnInsertIndex,
int delay int delay
) { ) {
super(engineConfig); super(engineConfig);
_baseColumnName = baseColumnName; _baseColumnName = baseColumnName;
_urlExpression = urlExpression; _urlExpression = urlExpression;
_onError = onError; _onError = onError;
_newColumnName = newColumnName; _newColumnName = newColumnName;
_columnInsertIndex = columnInsertIndex; _columnInsertIndex = columnInsertIndex;
_delay = delay; _delay = delay;
} }
@Override @Override
public void write(JSONWriter writer, Properties options) public void write(JSONWriter writer, Properties options)
throws JSONException { throws JSONException {
writer.object(); writer.object();
writer.key("op"); writer.value(OperationRegistry.s_opClassToName.get(this.getClass())); writer.key("op"); writer.value(OperationRegistry.s_opClassToName.get(this.getClass()));
writer.key("description"); writer.value(getBriefDescription(null)); writer.key("description"); writer.value(getBriefDescription(null));
@ -135,44 +137,45 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat
@Override @Override
protected String getBriefDescription(Project project) { protected String getBriefDescription(Project project) {
return "Create column " + _newColumnName + return "Create column " + _newColumnName +
" at index " + _columnInsertIndex + " at index " + _columnInsertIndex +
" by fetching URLs based on column " + _baseColumnName + " by fetching URLs based on column " + _baseColumnName +
" using expression " + _urlExpression; " using expression " + _urlExpression;
} }
protected String createDescription(Column column, List<CellAtRow> cellsAtRows) { protected String createDescription(Column column, List<CellAtRow> cellsAtRows) {
return "Create new column " + _newColumnName + return "Create new column " + _newColumnName +
", filling " + cellsAtRows.size() + ", filling " + cellsAtRows.size() +
" rows by fetching URLs based on column " + column.getName() + " rows by fetching URLs based on column " + column.getName() +
" and formulated as " + _urlExpression; " and formulated as " + _urlExpression;
} }
@Override @Override
public Process createProcess(Project project, Properties options) throws Exception { public Process createProcess(Project project, Properties options) throws Exception {
Engine engine = createEngine(project); Engine engine = createEngine(project);
engine.initializeFromJSON(_engineConfig); engine.initializeFromJSON(_engineConfig);
Evaluable eval = MetaParser.parse(_urlExpression); Evaluable eval = MetaParser.parse(_urlExpression);
return new ColumnAdditionByFetchingURLsProcess( return new ColumnAdditionByFetchingURLsProcess(
project, project,
engine, engine,
eval, eval,
getBriefDescription(null) getBriefDescription(null)
); );
} }
public class ColumnAdditionByFetchingURLsProcess extends LongRunningProcess implements Runnable { public class ColumnAdditionByFetchingURLsProcess extends LongRunningProcess implements Runnable {
final protected Project _project; final protected Project _project;
final protected Engine _engine; final protected Engine _engine;
final protected Evaluable _eval; final protected Evaluable _eval;
final protected long _historyEntryID; final protected long _historyEntryID;
protected int _cellIndex; protected int _cellIndex;
protected Map<URL, Serializable> _urlCache;
public ColumnAdditionByFetchingURLsProcess( public ColumnAdditionByFetchingURLsProcess(
Project project, Project project,
Engine engine, Engine engine,
Evaluable eval, Evaluable eval,
String description String description
@ -182,12 +185,13 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat
_engine = engine; _engine = engine;
_eval = eval; _eval = eval;
_historyEntryID = HistoryEntry.allocateID(); _historyEntryID = HistoryEntry.allocateID();
_urlCache = new HashMap<URL, Serializable>();
} }
@Override @Override
public void write(JSONWriter writer, Properties options) public void write(JSONWriter writer, Properties options)
throws JSONException { throws JSONException {
writer.object(); writer.object();
writer.key("id"); writer.value(hashCode()); writer.key("id"); writer.value(hashCode());
writer.key("description"); writer.value(_description); writer.key("description"); writer.value(_description);
@ -196,12 +200,12 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat
writer.key("progress"); writer.value(_progress); writer.key("progress"); writer.value(_progress);
writer.endObject(); writer.endObject();
} }
@Override @Override
protected Runnable getRunnable() { protected Runnable getRunnable() {
return this; return this;
} }
@Override @Override
public void run() { public void run() {
Column column = _project.columnModel.getColumnByName(_baseColumnName); Column column = _project.columnModel.getColumnByName(_baseColumnName);
@ -213,66 +217,89 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat
_project.processManager.onFailedProcess(this, new Exception("Another column already named " + _newColumnName)); _project.processManager.onFailedProcess(this, new Exception("Another column already named " + _newColumnName));
return; return;
} }
List<CellAtRow> urls = new ArrayList<CellAtRow>(_project.rows.size()); List<CellAtRow> urls = new ArrayList<CellAtRow>(_project.rows.size());
FilteredRows filteredRows = _engine.getAllFilteredRows(); FilteredRows filteredRows = _engine.getAllFilteredRows();
filteredRows.accept(_project, createRowVisitor(urls)); filteredRows.accept(_project, createRowVisitor(urls));
List<CellAtRow> responseBodies = new ArrayList<CellAtRow>(urls.size()); List<CellAtRow> responseBodies = new ArrayList<CellAtRow>(urls.size());
for (int i = 0; i < urls.size(); i++) { for (int i = 0; i < urls.size(); i++) {
CellAtRow urlData = urls.get(i); CellAtRow urlData = urls.get(i);
long start = System.currentTimeMillis(); CellAtRow cellAtRow = cachedFetch(urlData);
CellAtRow cellAtRow = fetch(urlData);
if (cellAtRow != null) { if (cellAtRow != null) {
responseBodies.add(cellAtRow); responseBodies.add(cellAtRow);
} }
_progress = i * 100 / urls.size(); _progress = i * 100 / urls.size();
try {
long delay = _delay - (System.currentTimeMillis() - start); if (_canceled) {
if (delay > 0) { break;
Thread.sleep(delay); }
}
} catch (InterruptedException e) {
if (_canceled) {
break;
}
}
} }
_urlCache.clear();
if (!_canceled) { if (!_canceled) {
HistoryEntry historyEntry = new HistoryEntry( HistoryEntry historyEntry = new HistoryEntry(
_historyEntryID, _historyEntryID,
_project, _project,
_description, _description,
ColumnAdditionByFetchingURLsOperation.this, ColumnAdditionByFetchingURLsOperation.this,
new ColumnAdditionChange( new ColumnAdditionChange(
_newColumnName, _newColumnName,
_columnInsertIndex, _columnInsertIndex,
responseBodies) responseBodies)
); );
_project.history.addEntry(historyEntry); _project.history.addEntry(historyEntry);
_project.processManager.onDoneProcess(this); _project.processManager.onDoneProcess(this);
} }
} }
CellAtRow fetch(CellAtRow urlData) { CellAtRow cachedFetch(CellAtRow urlData) {
String urlString = urlData.cell.value.toString(); String urlString = urlData.cell.value.toString();
URL url = null; URL url = null;
try {
try { url = new URL(urlString);
url = new URL(urlString); } catch (MalformedURLException e) {
} catch (MalformedURLException e) { return null;
return null; }
}
Serializable cellResult = _urlCache.get(url);
if (cellResult == null) {
cellResult = fetch(url);
if (cellResult != null) {
_urlCache.put(url, cellResult);
}
try {
// Always sleep for the delay, no matter how long the
// request took. This is more responsible than substracting
// the time spend requesting the URL, because it naturally
// slows us down if the server is busy and takes a long time
// to reply.
if (_delay > 0) {
Thread.sleep(_delay);
}
} catch (InterruptedException e) {
return null;
}
}
if (cellResult != null) {
return new CellAtRow(
urlData.row,
new Cell(cellResult, null));
}
return null;
}
Serializable fetch(URL url) {
try { try {
URLConnection urlConnection = url.openConnection(); URLConnection urlConnection = url.openConnection();
// urlConnection.setRequestProperty(_headerKey, _headerValue); // urlConnection.setRequestProperty(_headerKey, _headerValue);
try { try {
InputStream is = urlConnection.getInputStream(); InputStream is = urlConnection.getInputStream();
try { try {
@ -287,12 +314,8 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat
} }
} }
} }
return new CellAtRow( return ParsingUtilities.inputStreamToString(
urlData.row, is, (encoding == null) || ( encoding.equalsIgnoreCase("\"UTF-8\"")) ? "UTF-8" : encoding);
new Cell(
ParsingUtilities.inputStreamToString(
is, (encoding == null) || ( encoding.equalsIgnoreCase("\"UTF-8\"")) ? "UTF-8" : encoding),
null));
} finally { } finally {
is.close(); is.close();
@ -313,11 +336,11 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat
message = e.toString(); message = e.toString();
} }
return _onError == OnError.StoreError ? return _onError == OnError.StoreError ?
new CellAtRow(urlData.row, new Cell(new EvalError(message), null)) : null; new EvalError(message) : null;
} }
} catch (Exception e) { } catch (Exception e) {
return _onError == OnError.StoreError ? return _onError == OnError.StoreError ?
new CellAtRow(urlData.row, new Cell(new EvalError(e.getMessage()), null)) : null; new EvalError(e.getMessage()) : null;
} }
} }
@ -326,33 +349,33 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat
int cellIndex; int cellIndex;
Properties bindings; Properties bindings;
List<CellAtRow> cellsAtRows; List<CellAtRow> cellsAtRows;
public RowVisitor init(List<CellAtRow> cellsAtRows) { public RowVisitor init(List<CellAtRow> cellsAtRows) {
Column column = _project.columnModel.getColumnByName(_baseColumnName); Column column = _project.columnModel.getColumnByName(_baseColumnName);
this.cellIndex = column.getCellIndex(); this.cellIndex = column.getCellIndex();
this.bindings = ExpressionUtils.createBindings(_project); this.bindings = ExpressionUtils.createBindings(_project);
this.cellsAtRows = cellsAtRows; this.cellsAtRows = cellsAtRows;
return this; return this;
} }
@Override @Override
public void start(Project project) { public void start(Project project) {
// nothing to do // nothing to do
} }
@Override @Override
public void end(Project project) { public void end(Project project) {
// nothing to do // nothing to do
} }
@Override @Override
public boolean visit(Project project, int rowIndex, Row row) { public boolean visit(Project project, int rowIndex, Row row) {
Cell cell = row.getCell(cellIndex); Cell cell = row.getCell(cellIndex);
Cell newCell = null; Cell newCell = null;
ExpressionUtils.bind(bindings, row, rowIndex, _baseColumnName, cell); ExpressionUtils.bind(bindings, row, rowIndex, _baseColumnName, cell);
Object o = _eval.evaluate(bindings); Object o = _eval.evaluate(bindings);
if (o != null) { if (o != null) {
if (o instanceof Cell) { if (o instanceof Cell) {
@ -366,11 +389,11 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat
} }
} }
} }
if (newCell != null) { if (newCell != null) {
cellsAtRows.add(new CellAtRow(rowIndex, newCell)); cellsAtRows.add(new CellAtRow(rowIndex, newCell));
} }
return false; return false;
} }
}.init(cellsAtRows); }.init(cellsAtRows);