From 782a2f5b48d54dd839398b14b72c878f7b526a10 Mon Sep 17 00:00:00 2001 From: Antonin Delpeuch Date: Tue, 7 Mar 2017 20:21:27 +0000 Subject: [PATCH] Add caching in URL fetching --- ...ColumnAdditionByFetchingURLsOperation.java | 187 ++++++++++-------- 1 file changed, 105 insertions(+), 82 deletions(-) diff --git a/main/src/com/google/refine/operations/column/ColumnAdditionByFetchingURLsOperation.java b/main/src/com/google/refine/operations/column/ColumnAdditionByFetchingURLsOperation.java index 3160a0153..ebcb4fcbd 100644 --- a/main/src/com/google/refine/operations/column/ColumnAdditionByFetchingURLsOperation.java +++ b/main/src/com/google/refine/operations/column/ColumnAdditionByFetchingURLsOperation.java @@ -23,8 +23,8 @@ LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. @@ -43,6 +43,8 @@ import java.net.URLConnection; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.Map; +import java.util.HashMap; import org.json.JSONException; import org.json.JSONObject; @@ -76,14 +78,14 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat final protected String _baseColumnName; final protected String _urlExpression; final protected OnError _onError; - + final protected String _newColumnName; final protected int _columnInsertIndex; final protected int _delay; static public AbstractOperation reconstruct(Project project, JSONObject obj) throws Exception { JSONObject engineConfig = obj.getJSONObject("engineConfig"); - + return new ColumnAdditionByFetchingURLsOperation( engineConfig, obj.getString("baseColumnName"), @@ -94,32 +96,32 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat obj.getInt("delay") ); } - + public ColumnAdditionByFetchingURLsOperation( JSONObject engineConfig, String baseColumnName, String urlExpression, OnError onError, - String newColumnName, + String newColumnName, int columnInsertIndex, int delay ) { super(engineConfig); - + _baseColumnName = baseColumnName; _urlExpression = urlExpression; _onError = onError; - + _newColumnName = newColumnName; _columnInsertIndex = columnInsertIndex; - + _delay = delay; } @Override public void write(JSONWriter writer, Properties options) throws JSONException { - + writer.object(); writer.key("op"); writer.value(OperationRegistry.s_opClassToName.get(this.getClass())); writer.key("description"); writer.value(getBriefDescription(null)); @@ -135,44 +137,45 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat @Override protected String getBriefDescription(Project project) { - return "Create column " + _newColumnName + - " at index " + _columnInsertIndex + - " by fetching URLs based on column " + _baseColumnName + + return "Create column " + _newColumnName + + " at index " + _columnInsertIndex + + " by fetching URLs based on column " + _baseColumnName + " using expression " + _urlExpression; } protected String createDescription(Column column, List cellsAtRows) { - return "Create new column " + _newColumnName + + return "Create new column " + _newColumnName + ", filling " + cellsAtRows.size() + - " rows by fetching URLs based on column " + column.getName() + + " rows by fetching URLs based on column " + column.getName() + " and formulated as " + _urlExpression; } - - + + @Override public Process createProcess(Project project, Properties options) throws Exception { Engine engine = createEngine(project); engine.initializeFromJSON(_engineConfig); - + Evaluable eval = MetaParser.parse(_urlExpression); - + return new ColumnAdditionByFetchingURLsProcess( - project, + project, engine, eval, getBriefDescription(null) ); } - + public class ColumnAdditionByFetchingURLsProcess extends LongRunningProcess implements Runnable { - final protected Project _project; - final protected Engine _engine; - final protected Evaluable _eval; - final protected long _historyEntryID; - protected int _cellIndex; + final protected Project _project; + final protected Engine _engine; + final protected Evaluable _eval; + final protected long _historyEntryID; + protected int _cellIndex; + protected Map _urlCache; public ColumnAdditionByFetchingURLsProcess( - Project project, + Project project, Engine engine, Evaluable eval, String description @@ -182,12 +185,13 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat _engine = engine; _eval = eval; _historyEntryID = HistoryEntry.allocateID(); + _urlCache = new HashMap(); } - + @Override public void write(JSONWriter writer, Properties options) throws JSONException { - + writer.object(); writer.key("id"); writer.value(hashCode()); writer.key("description"); writer.value(_description); @@ -196,12 +200,12 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat writer.key("progress"); writer.value(_progress); writer.endObject(); } - + @Override protected Runnable getRunnable() { return this; } - + @Override public void run() { Column column = _project.columnModel.getColumnByName(_baseColumnName); @@ -213,66 +217,89 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat _project.processManager.onFailedProcess(this, new Exception("Another column already named " + _newColumnName)); return; } - + List urls = new ArrayList(_project.rows.size()); - + FilteredRows filteredRows = _engine.getAllFilteredRows(); filteredRows.accept(_project, createRowVisitor(urls)); - + List responseBodies = new ArrayList(urls.size()); for (int i = 0; i < urls.size(); i++) { CellAtRow urlData = urls.get(i); - long start = System.currentTimeMillis(); - CellAtRow cellAtRow = fetch(urlData); + CellAtRow cellAtRow = cachedFetch(urlData); if (cellAtRow != null) { responseBodies.add(cellAtRow); } - + _progress = i * 100 / urls.size(); - try { - long delay = _delay - (System.currentTimeMillis() - start); - if (delay > 0) { - Thread.sleep(delay); - } - } catch (InterruptedException e) { - if (_canceled) { - break; - } - } + + if (_canceled) { + break; + } } - + + _urlCache.clear(); + if (!_canceled) { - HistoryEntry historyEntry = new HistoryEntry( _historyEntryID, - _project, - _description, - ColumnAdditionByFetchingURLsOperation.this, + _project, + _description, + ColumnAdditionByFetchingURLsOperation.this, new ColumnAdditionChange( _newColumnName, _columnInsertIndex, responseBodies) ); - + _project.history.addEntry(historyEntry); _project.processManager.onDoneProcess(this); } } - - CellAtRow fetch(CellAtRow urlData) { - String urlString = urlData.cell.value.toString(); - URL url = null; - - try { - url = new URL(urlString); - } catch (MalformedURLException e) { - return null; - } - + + CellAtRow cachedFetch(CellAtRow urlData) { + String urlString = urlData.cell.value.toString(); + URL url = null; + try { + url = new URL(urlString); + } catch (MalformedURLException e) { + return null; + } + + Serializable cellResult = _urlCache.get(url); + if (cellResult == null) { + cellResult = fetch(url); + if (cellResult != null) { + _urlCache.put(url, cellResult); + } + + try { + // Always sleep for the delay, no matter how long the + // request took. This is more responsible than substracting + // the time spend requesting the URL, because it naturally + // slows us down if the server is busy and takes a long time + // to reply. + if (_delay > 0) { + Thread.sleep(_delay); + } + } catch (InterruptedException e) { + return null; + } + } + + if (cellResult != null) { + return new CellAtRow( + urlData.row, + new Cell(cellResult, null)); + } + return null; + } + + Serializable fetch(URL url) { try { URLConnection urlConnection = url.openConnection(); // urlConnection.setRequestProperty(_headerKey, _headerValue); - + try { InputStream is = urlConnection.getInputStream(); try { @@ -287,12 +314,8 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat } } } - return new CellAtRow( - urlData.row, - new Cell( - ParsingUtilities.inputStreamToString( - is, (encoding == null) || ( encoding.equalsIgnoreCase("\"UTF-8\"")) ? "UTF-8" : encoding), - null)); + return ParsingUtilities.inputStreamToString( + is, (encoding == null) || ( encoding.equalsIgnoreCase("\"UTF-8\"")) ? "UTF-8" : encoding); } finally { is.close(); @@ -313,11 +336,11 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat message = e.toString(); } return _onError == OnError.StoreError ? - new CellAtRow(urlData.row, new Cell(new EvalError(message), null)) : null; + new EvalError(message) : null; } } catch (Exception e) { return _onError == OnError.StoreError ? - new CellAtRow(urlData.row, new Cell(new EvalError(e.getMessage()), null)) : null; + new EvalError(e.getMessage()) : null; } } @@ -326,33 +349,33 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat int cellIndex; Properties bindings; List cellsAtRows; - + public RowVisitor init(List cellsAtRows) { Column column = _project.columnModel.getColumnByName(_baseColumnName); - + this.cellIndex = column.getCellIndex(); this.bindings = ExpressionUtils.createBindings(_project); this.cellsAtRows = cellsAtRows; return this; } - + @Override public void start(Project project) { // nothing to do } - + @Override public void end(Project project) { // nothing to do } - + @Override public boolean visit(Project project, int rowIndex, Row row) { Cell cell = row.getCell(cellIndex); Cell newCell = null; - + ExpressionUtils.bind(bindings, row, rowIndex, _baseColumnName, cell); - + Object o = _eval.evaluate(bindings); if (o != null) { if (o instanceof Cell) { @@ -366,11 +389,11 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat } } } - + if (newCell != null) { cellsAtRows.add(new CellAtRow(rowIndex, newCell)); } - + return false; } }.init(cellsAtRows);