Use Apache HTTP Commons for Fetch URL (#2692)

* Use mockwebserver instead of live network for tests

Fixes #2680. Fixes #1904.

* Remove use of deprecated methods

* Convert to use Apache HTTP Components client library

Fixes #1410 by virtue of redirect following being a built-in
capability of the library, along with retries with binary backoff,
built-in decompression, etc.

* Address review comments
This commit is contained in:
Tom Morris 2020-06-16 03:38:06 -04:00 committed by GitHub
parent 983c8bd422
commit 749704518c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 253 additions and 228 deletions

View File

@ -377,6 +377,12 @@
<artifactId>powermock-api-mockito2</artifactId>
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<version>4.7.2</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -69,12 +69,15 @@ import org.apache.commons.fileupload.disk.DiskFileItemFactory;
import org.apache.commons.fileupload.servlet.ServletFileUpload;
import org.apache.commons.fileupload.util.Streams;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.DecompressingHttpClient;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -218,7 +221,6 @@ public class ImportingUtilities {
}
});
@SuppressWarnings("unchecked")
List<FileItem> tempFiles = (List<FileItem>)upload.parseRequest(request);
progress.setProgress("Uploading data ...", -1);
@ -280,28 +282,28 @@ public class ImportingUtilities {
}
if ("http".equals(url.getProtocol()) || "https".equals(url.getProtocol())) {
DefaultHttpClient client = new DefaultHttpClient();
DecompressingHttpClient httpclient =
new DecompressingHttpClient(client);
HttpGet httpGet = new HttpGet(url.toURI());
httpGet.setHeader("User-Agent", RefineServlet.getUserAgent());
if ("https".equals(url.getProtocol())) {
// HTTPS only - no sending password in the clear over HTTP
String userinfo = url.getUserInfo();
if (userinfo != null) {
int s = userinfo.indexOf(':');
if (s > 0) {
String user = userinfo.substring(0, s);
String pw = userinfo.substring(s + 1, userinfo.length());
client.getCredentialsProvider().setCredentials(
new AuthScope(url.getHost(), 443),
new UsernamePasswordCredentials(user, pw));
}
HttpClientBuilder clientbuilder = HttpClients.custom()
.setUserAgent(RefineServlet.getUserAgent());
// .setConnectionBackoffStrategy(ConnectionBackoffStrategy)
String userinfo = url.getUserInfo();
// HTTPS only - no sending password in the clear over HTTP
if ("https".equals(url.getProtocol()) && userinfo != null) {
int s = userinfo.indexOf(':');
if (s > 0) {
String user = userinfo.substring(0, s);
String pw = userinfo.substring(s + 1, userinfo.length());
CredentialsProvider credsProvider = new BasicCredentialsProvider();
credsProvider.setCredentials(new AuthScope(url.getHost(), 443),
new UsernamePasswordCredentials(user, pw));
clientbuilder = clientbuilder.setDefaultCredentialsProvider(credsProvider);
}
}
HttpResponse response = httpclient.execute(httpGet);
CloseableHttpClient httpclient = clientbuilder.build();
HttpGet httpGet = new HttpGet(url.toURI());
CloseableHttpResponse response = httpclient.execute(httpGet);
try {
response.getStatusLine();
HttpEntity entity = response.getEntity();
@ -327,7 +329,7 @@ public class ImportingUtilities {
downloadCount++;
EntityUtils.consume(entity);
} finally {
httpGet.releaseConnection();
httpGet.reset();
}
} else {
// Fallback handling for non HTTP connections (only FTP?)
@ -355,7 +357,6 @@ public class ImportingUtilities {
parameters.put(name, value);
// TODO: We really want to store this on the request so it's available for everyone
// request.getParameterMap().put(name, value);
}
} else { // is file content

View File

@ -33,23 +33,39 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package com.google.refine.operations.column;
import static com.google.common.base.Strings.isNullOrEmpty;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.StatusLine;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.refine.RefineServlet;
import com.google.refine.browsing.Engine;
import com.google.refine.browsing.EngineConfig;
import com.google.refine.browsing.FilteredRows;
@ -100,6 +116,9 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat
final protected int _delay;
final protected boolean _cacheResponses;
final protected List<HttpHeader> _httpHeadersJson;
private Header[] httpHeaders = new Header[0];
final private RequestConfig defaultRequestConfig;
private HttpClientBuilder httpClientBuilder;
@JsonCreator
public ColumnAdditionByFetchingURLsOperation(
@ -134,13 +153,39 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat
_delay = delay;
_cacheResponses = cacheResponses;
_httpHeadersJson = httpHeadersJson;
List<Header> headers = new ArrayList<Header>();
if (_httpHeadersJson != null) {
for (HttpHeader header : _httpHeadersJson) {
if (!isNullOrEmpty(header.name) && !isNullOrEmpty(header.value)) {
headers.add(new BasicHeader(header.name, header.value));
}
}
}
httpHeaders = headers.toArray(httpHeaders);
defaultRequestConfig = RequestConfig.custom()
.setConnectTimeout(30 * 1000)
.setConnectionRequestTimeout(30 * 1000)
.setSocketTimeout(10 * 1000).build();
// TODO: Placeholder for future Basic Auth implementation
// CredentialsProvider credsProvider = new BasicCredentialsProvider();
// credsProvider.setCredentials(new AuthScope(host, 443),
// new UsernamePasswordCredentials(user, password));
httpClientBuilder = HttpClients.custom()
.setUserAgent(RefineServlet.getUserAgent())
.setDefaultRequestConfig(defaultRequestConfig);
// .setConnectionBackoffStrategy(ConnectionBackoffStrategy)
// .setDefaultCredentialsProvider(credsProvider);
}
@JsonProperty("newColumnName")
public String getNewColumnName() {
return _newColumnName;
}
@JsonProperty("columnInsertIndex")
public int getColumnInsertIndex() {
return _columnInsertIndex;
@ -282,14 +327,15 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat
FilteredRows filteredRows = _engine.getAllFilteredRows();
filteredRows.accept(_project, createRowVisitor(urls));
List<CellAtRow> responseBodies = new ArrayList<CellAtRow>(urls.size());
for (int i = 0; i < urls.size(); i++) {
CellAtRow urlData = urls.get(i);
int count = urls.size();
List<CellAtRow> responseBodies = new ArrayList<CellAtRow>(count);
int i = 0;
for (CellAtRow urlData : urls) {
String urlString = urlData.cell.value.toString();
Serializable response = null;
if (_urlCache != null) {
response = cachedFetch(urlString);
response = cachedFetch(urlString); // TODO: Why does this need a separate method?
} else {
response = fetch(urlString);
}
@ -302,7 +348,7 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat
responseBodies.add(cellAtRow);
}
_progress = i * 100 / urls.size();
_progress = i++ * 100 / count;
if (_canceled) {
break;
@ -335,68 +381,64 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat
}
Serializable fetch(String urlString) {
URL url = null;
HttpGet httpGet;
try {
url = new URL(urlString);
} catch (MalformedURLException e) {
// Use of URL constructor below is purely to get additional error checking to mimic
// previous behavior for the tests.
httpGet = new HttpGet(new URL(urlString).toURI());
} catch (IllegalArgumentException | MalformedURLException | URISyntaxException e) {
return null;
}
try {
URLConnection urlConnection = url.openConnection();
urlConnection.setRequestProperty("Accept-Encoding", "gzip");
if (_httpHeadersJson != null) {
for (int i = 0; i < _httpHeadersJson.size(); i++) {
String headerLabel = _httpHeadersJson.get(i).name;
String headerValue = _httpHeadersJson.get(i).value;
if (headerValue != null && !headerValue.isEmpty()) {
urlConnection.setRequestProperty(headerLabel, headerValue);
}
}
}
httpGet.setHeaders(httpHeaders);
httpGet.setConfig(defaultRequestConfig);
CloseableHttpClient httpclient = httpClientBuilder.build();
CloseableHttpResponse response = null;
try {
InputStream is = urlConnection.getInputStream();
try {
String encoding = urlConnection.getContentEncoding();
if (encoding == null) {
String contentType = urlConnection.getContentType();
if (contentType != null) {
final String charsetEqual = "charset=";
int c = contentType.lastIndexOf(charsetEqual);
if (c > 0) {
encoding = contentType.substring(c + charsetEqual.length());
}
}
}
return ParsingUtilities.inputStreamToString(
is, (encoding == null) || ( encoding.equalsIgnoreCase("\"UTF-8\"")) ? "UTF-8" : encoding);
response = httpclient.execute(httpGet);
} finally {
is.close();
HttpEntity entity = response.getEntity();
if (entity == null) {
throw new Exception("No content found in " + httpGet.getURI().toString());
}
String encoding = null;
if (entity.getContentEncoding() != null) {
encoding = entity.getContentEncoding().getValue();
} else {
Charset charset = ContentType.getOrDefault(entity).getCharset();
if (charset != null) {
encoding = charset.name();
}
}
String result = ParsingUtilities.inputStreamToString(
entity.getContent(), (encoding == null) || ( encoding.equalsIgnoreCase("\"UTF-8\"")) ? "UTF-8" : encoding);
EntityUtils.consume(entity);
return result;
} catch (IOException e) {
String message;
if (urlConnection instanceof HttpURLConnection) {
int status = ((HttpURLConnection)urlConnection).getResponseCode();
String errorString = "";
InputStream errorStream = ((HttpURLConnection)urlConnection).getErrorStream();
if (errorStream != null) {
errorString = ParsingUtilities.inputStreamToString(errorStream);
}
message = String.format("HTTP error %d : %s | %s",status,
((HttpURLConnection)urlConnection).getResponseMessage(),
errorString);
if (response == null) {
message = "Unknown HTTP error " + e.getLocalizedMessage();
} else {
message = e.toString();
StatusLine status = response.getStatusLine();
HttpEntity errorEntity = response.getEntity();
String errorString = ParsingUtilities.inputStreamToString(errorEntity.getContent());
message = String.format("HTTP error %d : %s | %s", status.getStatusCode(),
status.getReasonPhrase(),
errorString);
}
return _onError == OnError.StoreError ?
new EvalError(message) : null;
return _onError == OnError.StoreError ? new EvalError(message) : null;
}
} catch (Exception e) {
return _onError == OnError.StoreError ?
new EvalError(e.getMessage()) : null;
return _onError == OnError.StoreError ? new EvalError(e.getMessage()) : null;
}
}

View File

@ -34,10 +34,10 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package com.google.refine.operations.column;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@ -45,7 +45,6 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.refine.RefineTest;
import com.google.refine.browsing.EngineConfig;
import com.google.refine.expr.ExpressionUtils;
@ -64,11 +63,17 @@ import com.google.refine.process.ProcessManager;
import com.google.refine.util.ParsingUtilities;
import com.google.refine.util.TestUtils;
import okhttp3.HttpUrl;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
public class ColumnAdditionByFetchingURLsOperationTests extends RefineTest {
static final String ENGINE_JSON_URLS = "{\"mode\":\"row-based\"}";
// This is only used for serialization tests. The URL is never fetched.
private String json = "{\"op\":\"core/column-addition-by-fetching-urls\","
+ "\"description\":\"Create column employments at index 2 by fetching URLs based on column orcid using expression grel:\\\"https://pub.orcid.org/\\\"+value+\\\"/employments\\\"\","
+ "\"engineConfig\":{\"mode\":\"row-based\",\"facets\":[]},"
@ -111,18 +116,23 @@ public class ColumnAdditionByFetchingURLsOperationTests extends RefineTest {
project = createProjectWithColumns("UrlFetchingTests", "fruits");
}
private boolean isHostReachable(String host, int timeout){
boolean state = false;
private void runAndWait(EngineDependentOperation op, int timeout) throws Exception {
ProcessManager pm = project.getProcessManager();
Process process = op.createProcess(project, options);
process.startPerforming(pm);
Assert.assertTrue(process.isRunning());
int time = 0;
try {
state = InetAddress.getByName(host).isReachable(timeout);
} catch (IOException e) {
// e.printStackTrace();
while (process.isRunning() && time < timeout) {
Thread.sleep(200);
time += 200;
}
} catch (InterruptedException e) {
Assert.fail("Test interrupted");
}
return state;
Assert.assertFalse(process.isRunning());
}
@Test
public void serializeColumnAdditionByFetchingURLsOperation() throws Exception {
TestUtils.isSerializedTo(ParsingUtilities.mapper.readValue(json, ColumnAdditionByFetchingURLsOperation.class), json);
@ -138,54 +148,45 @@ public class ColumnAdditionByFetchingURLsOperationTests extends RefineTest {
/**
* Test for caching
*/
@Test
public void testUrlCaching() throws Exception {
if (!isHostReachable("www.random.org", 5000))
return;
for (int i = 0; i < 100; i++) {
Row row = new Row(2);
row.setCell(0, new Cell(i < 5 ? "apple":"orange", null));
project.rows.add(row);
}
try (MockWebServer server = new MockWebServer()) {
server.start();
HttpUrl url = server.url("/random");
Random rand = new Random();
for (int i = 0; i < 100; i++) {
Row row = new Row(2);
row.setCell(0, new Cell(i < 5 ? "apple":"orange", null));
project.rows.add(row);
// We won't need them all, but queue 100 random responses
server.enqueue(new MockResponse().setBody(Integer.toString(rand.nextInt(100))));
}
EngineDependentOperation op = new ColumnAdditionByFetchingURLsOperation(engine_config,
"fruits",
"\"" + url + "?city=\"+value",
OnError.StoreError,
"rand",
1,
500,
true,
null);
EngineDependentOperation op = new ColumnAdditionByFetchingURLsOperation(engine_config,
"fruits",
"\"https://www.random.org/integers/?num=1&min=1&max=100&col=1&base=10&format=plain&rnd=new&city=\"+value",
OnError.StoreError,
"rand",
1,
500,
true,
null);
ProcessManager pm = project.getProcessManager();
Process process = op.createProcess(project, options);
process.startPerforming(pm);
Assert.assertTrue(process.isRunning());
try {
// We have 100 rows and 500 ms per row but only two distinct
// values so we should not wait more than ~2000 ms to get the
// results. Just to make sure the test passes with plenty of
// net latency we sleep for longer (but still less than
// 50,000ms).
Thread.sleep(5000);
} catch (InterruptedException e) {
Assert.fail("Test interrupted");
}
// values so we should not wait much more than ~1000 ms to get the
// results.
runAndWait(op, 1500);
// Inspect rows
String ref_val = (String)project.rows.get(0).getCellValue(1).toString();
if (ref_val.startsWith("HTTP error"))
return;
Assert.assertFalse(ref_val.equals("apple")); // just to make sure I picked the right column
for (int i = 1; i < 4; i++) {
System.out.println("value:" + project.rows.get(i).getCellValue(1));
// all random values should be equal due to caching
Assert.assertEquals(project.rows.get(i).getCellValue(1).toString(), ref_val);
// Inspect rows
String ref_val = (String)project.rows.get(0).getCellValue(1).toString();
Assert.assertFalse(ref_val.equals("apple")); // just to make sure I picked the right column
for (int i = 1; i < 4; i++) {
// all random values should be equal due to caching
Assert.assertEquals(project.rows.get(i).getCellValue(1).toString(), ref_val);
}
server.shutdown();
}
Assert.assertFalse(process.isRunning());
}
@ -195,17 +196,64 @@ public class ColumnAdditionByFetchingURLsOperationTests extends RefineTest {
*/
@Test
public void testInvalidUrl() throws Exception {
Row row0 = new Row(2);
row0.setCell(0, new Cell("auinrestrsc", null)); // malformed -> null
project.rows.add(row0);
Row row1 = new Row(2);
row1.setCell(0, new Cell("https://www.random.org/integers/?num=1&min=1&max=100&col=1&base=10&format=plain", null)); // fine
project.rows.add(row1);
Row row2 = new Row(2);
row2.setCell(0, new Cell("http://anursiebcuiesldcresturce.detur/anusclbc", null)); // well-formed but invalid
project.rows.add(row2);
try (MockWebServer server = new MockWebServer()) {
server.start();
HttpUrl url = server.url("/random");
server.enqueue(new MockResponse());
EngineDependentOperation op = new ColumnAdditionByFetchingURLsOperation(engine_config,
Row row0 = new Row(2);
row0.setCell(0, new Cell("auinrestrsc", null)); // malformed -> null
project.rows.add(row0);
Row row1 = new Row(2);
row1.setCell(0, new Cell(url.toString(), null)); // fine
project.rows.add(row1);
Row row2 = new Row(2);
// well-formed but not resolvable.
row2.setCell(0, new Cell("http://domain.invalid/random", null));
project.rows.add(row2);
EngineDependentOperation op = new ColumnAdditionByFetchingURLsOperation(engine_config,
"fruits",
"value",
OnError.StoreError,
"junk",
1,
50,
true,
null);
runAndWait(op, 1000);
int newCol = project.columnModel.getColumnByName("junk").getCellIndex();
// Inspect rows
Assert.assertEquals(project.rows.get(0).getCellValue(newCol), null);
Assert.assertTrue(project.rows.get(1).getCellValue(newCol) != null);
Assert.assertTrue(ExpressionUtils.isError(project.rows.get(2).getCellValue(newCol)));
}
}
@Test
public void testHttpHeaders() throws Exception {
try (MockWebServer server = new MockWebServer()) {
server.start();
HttpUrl url = server.url("/checkheader");
Row row0 = new Row(2);
row0.setCell(0, new Cell(url.toString(), null));
project.rows.add(row0);
String userAgentValue = "OpenRefine";
String authorizationValue = "Basic";
String acceptValue = "*/*";
List<HttpHeader> headers = new ArrayList<>();
headers.add(new HttpHeader("authorization", authorizationValue));
headers.add(new HttpHeader("user-agent", userAgentValue));
headers.add(new HttpHeader("accept", acceptValue));
server.enqueue(new MockResponse().setBody("first"));
server.enqueue(new MockResponse().setBody("second"));
EngineDependentOperation op = new ColumnAdditionByFetchingURLsOperation(engine_config,
"fruits",
"value",
OnError.StoreError,
@ -213,89 +261,17 @@ public class ColumnAdditionByFetchingURLsOperationTests extends RefineTest {
1,
50,
true,
null);
headers);
ProcessManager pm = project.getProcessManager();
Process process = op.createProcess(project, options);
process.startPerforming(pm);
Assert.assertTrue(process.isRunning());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Assert.fail("Test interrupted");
runAndWait(op, 1000);
RecordedRequest request = server.takeRequest();
Assert.assertEquals(request.getHeader("user-agent"), userAgentValue);
Assert.assertEquals(request.getHeader("authorization"), authorizationValue);
Assert.assertEquals(request.getHeader("accept"), acceptValue);
server.shutdown();
}
Assert.assertFalse(process.isRunning());
int newCol = project.columnModel.getColumnByName("junk").getCellIndex();
// Inspect rows
Assert.assertEquals(project.rows.get(0).getCellValue(newCol), null);
Assert.assertTrue(project.rows.get(1).getCellValue(newCol) != null);
Assert.assertTrue(ExpressionUtils.isError(project.rows.get(2).getCellValue(newCol)));
}
@Test
public void testHttpHeaders() throws Exception {
Row row0 = new Row(2);
row0.setCell(0, new Cell("http://headers.jsontest.com", null));
/*
http://headers.jsontest.com is a service which returns the HTTP request headers
as JSON. For example:
{
"X-Cloud-Trace-Context": "579a1a2ee5c778dfc0810a3bf131ba4e/11053223648711966807",
"Authorization": "Basic",
"Host": "headers.jsontest.com",
"User-Agent": "OpenRefine",
"Accept": "*"
}
*/
project.rows.add(row0);
String userAgentValue = "OpenRefine";
String authorizationValue = "Basic";
String acceptValue = "*/*";
List<HttpHeader> headers = new ArrayList<>();
headers.add(new HttpHeader("authorization", authorizationValue));
headers.add(new HttpHeader("user-agent", userAgentValue));
headers.add(new HttpHeader("accept", acceptValue));
EngineDependentOperation op = new ColumnAdditionByFetchingURLsOperation(engine_config,
"fruits",
"value",
OnError.StoreError,
"junk",
1,
50,
true,
headers);
ProcessManager pm = project.getProcessManager();
Process process = op.createProcess(project, options);
process.startPerforming(pm);
Assert.assertTrue(process.isRunning());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Assert.fail("Test interrupted");
}
Assert.assertFalse(process.isRunning());
int newCol = project.columnModel.getColumnByName("junk").getCellIndex();
ObjectNode headersUsed = null;
// sometime, we got response:
// Error
// Over Quota
// This application is temporarily over its serving quota. Please try again later.
try {
String response = project.rows.get(0).getCellValue(newCol).toString();
headersUsed = ParsingUtilities.mapper.readValue(response, ObjectNode.class);
} catch (IOException ex) {
return;
}
// Inspect the results we got from remote service
Assert.assertEquals(headersUsed.get("user-agent").asText(), userAgentValue);
Assert.assertEquals(headersUsed.get("authorization").asText(), authorizationValue);
Assert.assertEquals(headersUsed.get("accept").asText(), acceptValue);
}
}