Refactor HTTP code into common module & Improve Fetch URL - fixes #3129 (#3237)

* Refactor HTTP code into a common utility class 

Centralizes the six (slightly) different implementations to use
a common Apache HTTP Client 5 implementation which implements our
strategies for retries, timeouts, error handling, etc.

Apache HTTP Client 5 adds support for Retry-After headers, HTTP/2,
and a bunch of other stuff under the covers.

Moves request delay to a request interceptor and fixes calculation
of the delay (again). Increase retries from 1x to 3x and use delay*2
as the default retry interval, if no Retry-After header. Uses an 
exponential backoff strategy for multiple retries.

* Reuses HTTP client across requests
* Use IOException instead of Exception for HTTP errors
This commit is contained in:
Tom Morris 2020-12-07 00:38:36 -05:00 committed by GitHub
parent 9e94d32b49
commit 14f43dc2cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 490 additions and 372 deletions

View File

@ -290,6 +290,11 @@
<artifactId>clojure</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>

View File

@ -48,19 +48,6 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.http.Consts;
import org.apache.http.NameValuePair;
import org.apache.http.StatusLine;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.client.LaxRedirectStrategy;
import org.apache.http.message.BasicNameValuePair;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -68,7 +55,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.refine.RefineServlet;
import com.google.refine.commands.Command;
import com.google.refine.expr.ExpressionUtils;
import com.google.refine.model.Column;
@ -76,6 +62,7 @@ import com.google.refine.model.Project;
import com.google.refine.model.ReconType;
import com.google.refine.model.Row;
import com.google.refine.model.recon.StandardReconConfig.ReconResult;
import com.google.refine.util.HttpClient;
import com.google.refine.util.ParsingUtilities;
public class GuessTypesOfColumnCommand extends Command {
@ -180,61 +167,38 @@ public class GuessTypesOfColumnCommand extends Command {
}
String queriesString = ParsingUtilities.defaultWriter.writeValueAsString(queryMap);
String responseString;
try {
RequestConfig defaultRequestConfig = RequestConfig.custom()
.setConnectTimeout(30 * 1000)
.build();
responseString = postQueries(serviceUrl, queriesString);
ObjectNode o = ParsingUtilities.evaluateJsonStringToObjectNode(responseString);
HttpClientBuilder httpClientBuilder = HttpClients.custom()
.setUserAgent(RefineServlet.getUserAgent())
.setRedirectStrategy(new LaxRedirectStrategy())
.setDefaultRequestConfig(defaultRequestConfig);
CloseableHttpClient httpClient = httpClientBuilder.build();
HttpPost request = new HttpPost(serviceUrl);
List<NameValuePair> body = Collections.singletonList(
new BasicNameValuePair("queries", queriesString));
request.setEntity(new UrlEncodedFormEntity(body, Consts.UTF_8));
try (CloseableHttpResponse response = httpClient.execute(request)) {
StatusLine statusLine = response.getStatusLine();
if (statusLine.getStatusCode() >= 400) {
throw new IOException("Failed - code:"
+ Integer.toString(statusLine.getStatusCode())
+ " message: " + statusLine.getReasonPhrase());
Iterator<JsonNode> iterator = o.iterator();
while (iterator.hasNext()) {
JsonNode o2 = iterator.next();
if (!(o2.has("result") && o2.get("result") instanceof ArrayNode)) {
continue;
}
String s = ParsingUtilities.inputStreamToString(response.getEntity().getContent());
ObjectNode o = ParsingUtilities.evaluateJsonStringToObjectNode(s);
Iterator<JsonNode> iterator = o.iterator();
while (iterator.hasNext()) {
JsonNode o2 = iterator.next();
if (!(o2.has("result") && o2.get("result") instanceof ArrayNode)) {
continue;
}
ArrayNode results = (ArrayNode) o2.get("result");
List<ReconResult> reconResults = ParsingUtilities.mapper.convertValue(results, new TypeReference<List<ReconResult>>() {});
int count = reconResults.size();
ArrayNode results = (ArrayNode) o2.get("result");
List<ReconResult> reconResults = ParsingUtilities.mapper.convertValue(results, new TypeReference<List<ReconResult>>() {});
int count = reconResults.size();
for (int j = 0; j < count; j++) {
ReconResult result = reconResults.get(j);
double score = 1.0 / (1 + j); // score by each result's rank
for (int j = 0; j < count; j++) {
ReconResult result = reconResults.get(j);
double score = 1.0 / (1 + j); // score by each result's rank
List<ReconType> types = result.types;
int typeCount = types.size();
List<ReconType> types = result.types;
int typeCount = types.size();
for (int t = 0; t < typeCount; t++) {
ReconType type = types.get(t);
double score2 = score * (typeCount - t) / typeCount;
if (map.containsKey(type.id)) {
TypeGroup tg = map.get(type.id);
tg.score += score2;
tg.count++;
} else {
map.put(type.id, new TypeGroup(type.id, type.name, score2));
}
for (int t = 0; t < typeCount; t++) {
ReconType type = types.get(t);
double score2 = score * (typeCount - t) / typeCount;
if (map.containsKey(type.id)) {
TypeGroup tg = map.get(type.id);
tg.score += score2;
tg.count++;
} else {
map.put(type.id, new TypeGroup(type.id, type.name, score2));
}
}
}
@ -243,7 +207,7 @@ public class GuessTypesOfColumnCommand extends Command {
logger.error("Failed to guess cell types for load\n" + queriesString, e);
throw e;
}
List<TypeGroup> types = new ArrayList<TypeGroup>(map.values());
Collections.sort(types, new Comparator<TypeGroup>() {
@Override
@ -258,7 +222,12 @@ public class GuessTypesOfColumnCommand extends Command {
return types;
}
private String postQueries(String serviceUrl, String queriesString) throws IOException {
HttpClient client = new HttpClient();
return client.postNameValue(serviceUrl, "queries", queriesString);
}
static protected class TypeGroup {
@JsonProperty("id")
protected String id;

View File

@ -69,19 +69,13 @@ import org.apache.commons.fileupload.ProgressListener;
import org.apache.commons.fileupload.disk.DiskFileItemFactory;
import org.apache.commons.fileupload.servlet.ServletFileUpload;
import org.apache.commons.fileupload.util.Streams;
import org.apache.http.HttpEntity;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.http.StatusLine;
import org.apache.hc.client5.http.ClientProtocolException;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.io.HttpClientResponseHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -89,10 +83,10 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.refine.ProjectManager;
import com.google.refine.ProjectMetadata;
import com.google.refine.RefineServlet;
import com.google.refine.importing.ImportingManager.Format;
import com.google.refine.importing.UrlRewriter.Result;
import com.google.refine.model.Project;
import com.google.refine.util.HttpClient;
import com.google.refine.util.JSONUtilities;
import com.google.refine.util.ParsingUtilities;
import java.util.stream.Collectors;
@ -287,65 +281,56 @@ public class ImportingUtilities {
}
if ("http".equals(url.getProtocol()) || "https".equals(url.getProtocol())) {
HttpClientBuilder clientbuilder = HttpClients.custom()
.setUserAgent(RefineServlet.getUserAgent());
// .setConnectionBackoffStrategy(ConnectionBackoffStrategy)
final URL lastUrl = url;
final HttpClientResponseHandler<String> responseHandler = new HttpClientResponseHandler<String>() {
String userinfo = url.getUserInfo();
// HTTPS only - no sending password in the clear over HTTP
if ("https".equals(url.getProtocol()) && userinfo != null) {
int s = userinfo.indexOf(':');
if (s > 0) {
String user = userinfo.substring(0, s);
String pw = userinfo.substring(s + 1, userinfo.length());
CredentialsProvider credsProvider = new BasicCredentialsProvider();
credsProvider.setCredentials(new AuthScope(url.getHost(), 443),
new UsernamePasswordCredentials(user, pw));
clientbuilder = clientbuilder.setDefaultCredentialsProvider(credsProvider);
}
}
@Override
public String handleResponse(final ClassicHttpResponse response) throws IOException {
final int status = response.getCode();
if (status >= HttpStatus.SC_SUCCESS && status < HttpStatus.SC_REDIRECTION) {
final HttpEntity entity = response.getEntity();
if (entity == null) {
throw new IOException("No content found in " + lastUrl.toExternalForm());
}
CloseableHttpClient httpclient = clientbuilder.build();
HttpGet httpGet = new HttpGet(url.toURI());
CloseableHttpResponse response = httpclient.execute(httpGet);
try {
InputStream stream2 = entity.getContent();
try {
HttpEntity entity = response.getEntity();
if (entity == null) {
throw new Exception("No content found in " + url.toString());
}
StatusLine status = response.getStatusLine();
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode >= 400) {
String errorString = ParsingUtilities.inputStreamToString(entity.getContent());
String message = String.format("HTTP error %d : %s | %s", statusCode,
status.getReasonPhrase(), errorString);
throw new Exception(message);
}
InputStream stream2 = entity.getContent();
String mimeType = null;
String charset = null;
ContentType contentType = ContentType.parse(entity.getContentType());
if (contentType != null) {
mimeType = contentType.getMimeType();
Charset cs = contentType.getCharset();
if (cs != null) {
charset = cs.toString();
}
}
JSONUtilities.safePut(fileRecord, "declaredMimeType", mimeType);
JSONUtilities.safePut(fileRecord, "declaredEncoding", charset);
if (saveStream(stream2, lastUrl, rawDataDir, progress, update,
fileRecord, fileRecords,
entity.getContentLength())) {
return "saved"; // signal to increment archive count
}
String mimeType = null;
String charset = null;
ContentType contentType = ContentType.get(entity);
if (contentType != null) {
mimeType = contentType.getMimeType();
Charset cs = contentType.getCharset();
if (cs != null) {
charset = cs.toString();
} catch (final IOException ex) {
throw new ClientProtocolException(ex);
}
return null;
} else {
// String errorBody = EntityUtils.toString(response.getEntity());
throw new ClientProtocolException(String.format("HTTP error %d : %s for URL %s", status,
response.getReasonPhrase(), lastUrl.toExternalForm()));
}
}
JSONUtilities.safePut(fileRecord, "declaredMimeType", mimeType);
JSONUtilities.safePut(fileRecord, "declaredEncoding", charset);
if (saveStream(stream2, url, rawDataDir, progress, update,
fileRecord, fileRecords,
entity.getContentLength())) {
archiveCount++;
}
downloadCount++;
EntityUtils.consume(entity);
} finally {
httpGet.reset();
}
};
HttpClient httpClient = new HttpClient();
if (httpClient.getResponse(urlString, null, responseHandler) != null) {
archiveCount++;
};
downloadCount++;
} else {
// Fallback handling for non HTTP connections (only FTP?)
URLConnection urlConnection = url.openConnection();
@ -418,7 +403,7 @@ public class ImportingUtilities {
private static boolean saveStream(InputStream stream, URL url, File rawDataDir, final Progress progress,
final SavingUpdate update, ObjectNode fileRecord, ArrayNode fileRecords, long length)
throws IOException, Exception {
throws IOException {
String localname = url.getPath();
if (localname.isEmpty() || localname.endsWith("/")) {
localname = localname + "temp";
@ -436,7 +421,7 @@ public class ImportingUtilities {
long actualLength = saveStreamToFile(stream, file, update);
JSONUtilities.safePut(fileRecord, "size", actualLength);
if (actualLength == 0) {
throw new Exception("No content found in " + url.toString());
throw new IOException("No content found in " + url.toString());
} else if (length >= 0) {
update.totalExpectedSize += (actualLength - length);
} else {

View File

@ -37,30 +37,15 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package com.google.refine.model.recon;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.http.Consts;
import org.apache.http.NameValuePair;
import org.apache.http.StatusLine;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.client.LaxRedirectStrategy;
import org.apache.http.message.BasicNameValuePair;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
@ -69,14 +54,15 @@ import com.fasterxml.jackson.annotation.JsonView;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.refine.RefineServlet;
import com.google.refine.expr.functions.ToDate;
import com.google.refine.model.ReconCandidate;
import com.google.refine.model.ReconType;
import com.google.refine.util.HttpClient;
import com.google.refine.util.JSONUtilities;
import com.google.refine.util.JsonViews;
import com.google.refine.util.ParsingUtilities;
public class ReconciledDataExtensionJob {
@ -170,15 +156,23 @@ public class ReconciledDataExtensionJob {
final public DataExtensionConfig extension;
final public String endpoint;
final public List<ColumnInfo> columns = new ArrayList<ColumnInfo>();
// not final: initialized lazily
private static CloseableHttpClient httpClient = null;
private static HttpClient httpClient = null;
public ReconciledDataExtensionJob(DataExtensionConfig obj, String endpoint) {
this.extension = obj;
this.endpoint = endpoint;
}
/**
* @todo Although the HTTP code has been unified, there may still be opportunity
* to refactor a higher level querying library out of this which could be shared
* with StandardReconConfig
*
* It may also be possible to extract a library to query reconciliation services
* which could be used outside of OpenRefine.
*/
public Map<String, ReconciledDataExtensionJob.DataExtension> extend(
Set<String> ids,
Map<String, ReconCandidate> reconCandidateMap
@ -187,7 +181,7 @@ public class ReconciledDataExtensionJob {
formulateQuery(ids, extension, writer);
String query = writer.toString();
String response = performQuery(this.endpoint, query);
String response = postExtendQuery(this.endpoint, query);
ObjectNode o = ParsingUtilities.mapper.readValue(response, ObjectNode.class);
@ -218,46 +212,17 @@ public class ReconciledDataExtensionJob {
return map;
}
/**
* @todo this should be refactored to be unified with the HTTP querying code
* from StandardReconConfig. We should ideally extract a library to query
* reconciliation services and expose it as such for others to reuse.
*/
static protected String performQuery(String endpoint, String query) throws IOException {
HttpPost request = new HttpPost(endpoint);
List<NameValuePair> body = Collections.singletonList(
new BasicNameValuePair("extend", query));
request.setEntity(new UrlEncodedFormEntity(body, Consts.UTF_8));
try (CloseableHttpResponse response = getHttpClient().execute(request)) {
StatusLine statusLine = response.getStatusLine();
if (statusLine.getStatusCode() >= 400) {
throw new IOException("Data extension query failed - code: "
+ Integer.toString(statusLine.getStatusCode())
+ " message: " + statusLine.getReasonPhrase());
} else {
return ParsingUtilities.inputStreamToString(response.getEntity().getContent());
}
}
static protected String postExtendQuery(String endpoint, String query) throws IOException {
return getHttpClient().postNameValue(endpoint, "extend", query);
}
private static CloseableHttpClient getHttpClient() {
if (httpClient != null) {
return httpClient;
private static HttpClient getHttpClient() {
if (httpClient == null) {
httpClient = new HttpClient();
}
RequestConfig defaultRequestConfig = RequestConfig.custom()
.setConnectTimeout(30 * 1000)
.build();
HttpClientBuilder httpClientBuilder = HttpClients.custom()
.setUserAgent(RefineServlet.getUserAgent())
.setRedirectStrategy(new LaxRedirectStrategy())
.setDefaultRequestConfig(defaultRequestConfig);
httpClient = httpClientBuilder.build();
return httpClient;
}
protected ReconciledDataExtensionJob.DataExtension collectResult(
ObjectNode record,
Map<String, ReconCandidate> reconCandidateMap

View File

@ -45,18 +45,6 @@ import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.http.Consts;
import org.apache.http.NameValuePair;
import org.apache.http.StatusLine;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.client.LaxRedirectStrategy;
import org.apache.http.message.BasicNameValuePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -69,7 +57,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.refine.RefineServlet;
import com.google.refine.expr.ExpressionUtils;
import com.google.refine.model.Cell;
import com.google.refine.model.Project;
@ -79,6 +66,7 @@ import com.google.refine.model.ReconCandidate;
import com.google.refine.model.ReconType;
import com.google.refine.model.RecordModel.RowDependency;
import com.google.refine.model.Row;
import com.google.refine.util.HttpClient;
import com.google.refine.util.ParsingUtilities;
public class StandardReconConfig extends ReconConfig {
@ -164,7 +152,7 @@ public class StandardReconConfig extends ReconConfig {
final private int limit;
// initialized lazily
private CloseableHttpClient httpClient = null;
private HttpClient httpClient = null;
@JsonCreator
public StandardReconConfig(
@ -434,29 +422,29 @@ public class StandardReconConfig extends ReconConfig {
try {
job.code = ParsingUtilities.defaultWriter.writeValueAsString(query);
} catch (JsonProcessingException e) {
// FIXME: This error will get lost
e.printStackTrace();
return null; // TODO: Throw exception instead?
}
return job;
}
private CloseableHttpClient getHttpClient() {
if (httpClient != null) {
return httpClient;
private HttpClient getHttpClient() {
if (httpClient == null) {
httpClient = new HttpClient();
}
RequestConfig defaultRequestConfig = RequestConfig.custom()
.setConnectTimeout(30 * 1000)
.setSocketTimeout(60 * 1000)
.build();
HttpClientBuilder httpClientBuilder = HttpClients.custom()
.setUserAgent(RefineServlet.getUserAgent())
.setRedirectStrategy(new LaxRedirectStrategy())
.setDefaultRequestConfig(defaultRequestConfig);
httpClient = httpClientBuilder.build();
return httpClient;
}
private String postQueries(String url, String queriesString) throws IOException {
try {
return getHttpClient().postNameValue(url, "queries", queriesString);
} catch (IOException e) {
throw new IOException("Failed to batch recon with load:\n" + queriesString, e);
}
}
@Override
public List<Recon> batchRecon(List<ReconJob> jobs, long historyEntryID) {
List<Recon> recons = new ArrayList<Recon>(jobs.size());
@ -475,51 +463,41 @@ public class StandardReconConfig extends ReconConfig {
stringWriter.write("}");
String queriesString = stringWriter.toString();
HttpPost request = new HttpPost(service);
List<NameValuePair> body = Collections.singletonList(
new BasicNameValuePair("queries", queriesString));
request.setEntity(new UrlEncodedFormEntity(body, Consts.UTF_8));
try (CloseableHttpResponse response = getHttpClient().execute(request)) {
StatusLine statusLine = response.getStatusLine();
if (statusLine.getStatusCode() >= 400) {
logger.error("Failed - code: "
+ Integer.toString(statusLine.getStatusCode())
+ " message: " + statusLine.getReasonPhrase());
try {
String responseString = postQueries(service, queriesString);
ObjectNode o = ParsingUtilities.evaluateJsonStringToObjectNode(responseString);
if (o == null) { // utility method returns null instead of throwing
logger.error("Failed to parse string as JSON: " + responseString);
} else {
String s = ParsingUtilities.inputStreamToString(response.getEntity().getContent());
ObjectNode o = ParsingUtilities.evaluateJsonStringToObjectNode(s);
if (o == null) { // utility method returns null instead of throwing
logger.error("Failed to parse string as JSON: " + s);
} else {
for (int i = 0; i < jobs.size(); i++) {
StandardReconJob job = (StandardReconJob) jobs.get(i);
Recon recon = null;
for (int i = 0; i < jobs.size(); i++) {
StandardReconJob job = (StandardReconJob) jobs.get(i);
Recon recon = null;
String text = job.text;
String key = "q" + i;
if (o.has(key) && o.get(key) instanceof ObjectNode) {
ObjectNode o2 = (ObjectNode) o.get(key);
if (o2.has("result") && o2.get("result") instanceof ArrayNode) {
ArrayNode results = (ArrayNode) o2.get("result");
String text = job.text;
String key = "q" + i;
if (o.has(key) && o.get(key) instanceof ObjectNode) {
ObjectNode o2 = (ObjectNode) o.get(key);
if (o2.has("result") && o2.get("result") instanceof ArrayNode) {
ArrayNode results = (ArrayNode) o2.get("result");
recon = createReconServiceResults(text, results, historyEntryID);
} else {
logger.warn("Service error for text: " + text + "\n Job code: " + job.code + "\n Response: " + o2.toString());
}
recon = createReconServiceResults(text, results, historyEntryID);
} else {
// TODO: better error reporting
logger.warn("Service error for text: " + text + "\n Job code: " + job.code);
logger.warn("Service error for text: " + text + "\n Job code: " + job.code + "\n Response: " + o2.toString());
}
if (recon != null) {
recon.service = service;
}
recons.add(recon);
} else {
// TODO: better error reporting
logger.warn("Service error for text: " + text + "\n Job code: " + job.code);
}
if (recon != null) {
recon.service = service;
}
recons.add(recon);
}
}
} catch (Exception e) {
} catch (IOException e) {
logger.error("Failed to batch recon with load:\n" + queriesString, e);
}
@ -535,7 +513,7 @@ public class StandardReconConfig extends ReconConfig {
return recons;
}
@Override
public Recon createNewRecon(long historyEntryID) {
Recon recon = new Recon(historyEntryID, identifierSpace, schemaSpace);
@ -543,7 +521,7 @@ public class StandardReconConfig extends ReconConfig {
return recon;
}
protected Recon createReconServiceResults(String text, ArrayNode resultsList, long historyEntryID) throws IOException {
protected Recon createReconServiceResults(String text, ArrayNode resultsList, long historyEntryID) {
Recon recon = new Recon(historyEntryID, identifierSpace, schemaSpace);
List<ReconResult> results = ParsingUtilities.mapper.convertValue(resultsList, new TypeReference<List<ReconResult>>() {});

View File

@ -37,27 +37,13 @@ import static com.google.common.base.Strings.isNullOrEmpty;
import java.io.IOException;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.StatusLine;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.message.BasicHeader;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -65,7 +51,6 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.refine.RefineServlet;
import com.google.refine.browsing.Engine;
import com.google.refine.browsing.EngineConfig;
import com.google.refine.browsing.FilteredRows;
@ -86,7 +71,7 @@ import com.google.refine.operations.EngineDependentOperation;
import com.google.refine.operations.OnError;
import com.google.refine.process.LongRunningProcess;
import com.google.refine.process.Process;
import com.google.refine.util.ParsingUtilities;
import com.google.refine.util.HttpClient;
public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperation {
@ -117,8 +102,8 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat
final protected boolean _cacheResponses;
final protected List<HttpHeader> _httpHeadersJson;
private Header[] httpHeaders = new Header[0];
final private RequestConfig defaultRequestConfig;
private HttpClientBuilder httpClientBuilder;
private HttpClient _httpClient;
@JsonCreator
public ColumnAdditionByFetchingURLsOperation(
@ -163,22 +148,8 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat
}
}
httpHeaders = headers.toArray(httpHeaders);
_httpClient = new HttpClient(_delay);
defaultRequestConfig = RequestConfig.custom()
.setConnectTimeout(30 * 1000)
.setConnectionRequestTimeout(30 * 1000)
.setSocketTimeout(10 * 1000).build();
// TODO: Placeholder for future Basic Auth implementation
// CredentialsProvider credsProvider = new BasicCredentialsProvider();
// credsProvider.setCredentials(new AuthScope(host, 443),
// new UsernamePasswordCredentials(user, password));
httpClientBuilder = HttpClients.custom()
.setUserAgent(RefineServlet.getUserAgent())
.setDefaultRequestConfig(defaultRequestConfig);
// .setConnectionBackoffStrategy(ConnectionBackoffStrategy)
// .setDefaultCredentialsProvider(credsProvider);
}
@JsonProperty("newColumnName")
@ -281,20 +252,7 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat
.build(
new CacheLoader<String, Serializable>() {
public Serializable load(String urlString) throws Exception {
Serializable result = fetch(urlString);
try {
// Always sleep for the delay, no matter how long the
// request took. This is more responsible than substracting
// the time spend requesting the URL, because it naturally
// slows us down if the server is busy and takes a long time
// to reply.
if (_delay > 0) {
Thread.sleep(_delay);
}
} catch (InterruptedException e) {
result = null;
}
Serializable result = fetch(urlString, httpHeaders);
if (result == null) {
// the load method should not return any null value
throw new Exception("null result returned by fetch");
@ -335,9 +293,9 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat
Serializable response = null;
if (_urlCache != null) {
response = cachedFetch(urlString); // TODO: Why does this need a separate method?
response = cachedFetch(urlString);
} else {
response = fetch(urlString);
response = fetch(urlString, httpHeaders);
}
if (response != null) {
@ -380,68 +338,19 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat
}
}
Serializable fetch(String urlString) {
HttpGet httpGet;
try {
// Use of URL constructor below is purely to get additional error checking to mimic
// previous behavior for the tests.
httpGet = new HttpGet(new URL(urlString).toURI());
} catch (IllegalArgumentException | MalformedURLException | URISyntaxException e) {
return null;
}
try {
httpGet.setHeaders(httpHeaders);
httpGet.setConfig(defaultRequestConfig);
CloseableHttpClient httpclient = httpClientBuilder.build();
CloseableHttpResponse response = null;
Serializable fetch(String urlString, Header[] headers) {
try { //HttpClients.createDefault()) {
try {
response = httpclient.execute(httpGet);
HttpEntity entity = response.getEntity();
if (entity == null) {
throw new Exception("No content found in " + httpGet.getURI().toString());
}
String encoding = null;
if (entity.getContentEncoding() != null) {
encoding = entity.getContentEncoding().getValue();
} else {
Charset charset = ContentType.getOrDefault(entity).getCharset();
if (charset != null) {
encoding = charset.name();
}
}
String result = ParsingUtilities.inputStreamToString(
entity.getContent(), (encoding == null) || ( encoding.equalsIgnoreCase("\"UTF-8\"")) ? "UTF-8" : encoding);
EntityUtils.consume(entity);
return result;
return _httpClient.getAsString(urlString, headers);
} catch (IOException e) {
String message;
if (response == null) {
message = "Unknown HTTP error " + e.getLocalizedMessage();
} else {
StatusLine status = response.getStatusLine();
HttpEntity errorEntity = response.getEntity();
String errorString = ParsingUtilities.inputStreamToString(errorEntity.getContent());
message = String.format("HTTP error %d : %s | %s", status.getStatusCode(),
status.getReasonPhrase(),
errorString);
}
return _onError == OnError.StoreError ? new EvalError(message) : null;
return _onError == OnError.StoreError ? new EvalError(e) : null;
}
} catch (Exception e) {
return _onError == OnError.StoreError ? new EvalError(e.getMessage()) : null;
}
}
RowVisitor createRowVisitor(List<CellAtRow> cellsAtRows) {
return new RowVisitor() {
int cellIndex;
@ -497,4 +406,5 @@ public class ColumnAdditionByFetchingURLsOperation extends EngineDependentOperat
}.init(cellsAtRows);
}
}
}

View File

@ -0,0 +1,208 @@
package com.google.refine.util;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hc.client5.http.ClientProtocolException;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.entity.UrlEncodedFormEntity;
import org.apache.hc.client5.http.impl.DefaultHttpRequestRetryStrategy;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpRequestInterceptor;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.hc.core5.http.ParseException;
import org.apache.hc.core5.http.io.HttpClientResponseHandler;
import org.apache.hc.core5.http.io.SocketConfig;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.message.BasicNameValuePair;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.util.TimeValue;
import com.google.refine.RefineServlet;
public class HttpClient {
final private RequestConfig defaultRequestConfig;
private HttpClientBuilder httpClientBuilder;
private CloseableHttpClient httpClient;
private int _delay;
public HttpClient() {
this(0);
}
public HttpClient(int delay) {
_delay = delay;
// Create a connection manager with a custom socket timeout
PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager();
final SocketConfig socketConfig = SocketConfig.custom()
.setSoTimeout(10, TimeUnit.SECONDS)
.build();
connManager.setDefaultSocketConfig(socketConfig);
defaultRequestConfig = RequestConfig.custom()
.setConnectTimeout(30, TimeUnit.SECONDS)
.setConnectionRequestTimeout(30, TimeUnit.SECONDS) // TODO: 60 seconds in some places in old code
.build();
httpClientBuilder = HttpClients.custom()
.setUserAgent(RefineServlet.getUserAgent())
.setDefaultRequestConfig(defaultRequestConfig)
.setConnectionManager(connManager)
// Default Apache HC retry is 1x @1 sec (or the value in Retry-Header)
.setRetryStrategy(new ExponentialBackoffRetryStrategy(3, TimeValue.ofMilliseconds(_delay)))
// .setRedirectStrategy(new LaxRedirectStrategy()) // TODO: No longer needed since default doesn't exclude POST?
// .setConnectionBackoffStrategy(ConnectionBackoffStrategy)
.addRequestInterceptorFirst(new HttpRequestInterceptor() {
private long nextRequestTime = System.currentTimeMillis();
@Override
public void process(
final HttpRequest request,
final EntityDetails entity,
final HttpContext context) throws HttpException, IOException {
long delay = nextRequestTime - System.currentTimeMillis();
if (delay > 0) {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
}
}
nextRequestTime = System.currentTimeMillis() + _delay;
}
});
// TODO: Placeholder for future Basic Auth implementation
// String userinfo = url.getUserInfo();
// // HTTPS only - no sending password in the clear over HTTP
// if ("https".equals(url.getProtocol()) && userinfo != null) {
// int s = userinfo.indexOf(':');
// if (s > 0) {
// String user = userinfo.substring(0, s);
// String pw = userinfo.substring(s + 1, userinfo.length());
// CredentialsProvider credsProvider = new BasicCredentialsProvider();
// credsProvider.setCredentials(new AuthScope(url.getHost(), 443),
// new UsernamePasswordCredentials(user, pw.toCharArray()));
// httpClientBuilder = httpClientBuilder.setDefaultCredentialsProvider(credsProvider);
// }
// }
httpClient = httpClientBuilder.build();
}
public String getAsString(String urlString, Header[] headers) throws IOException {
final HttpClientResponseHandler<String> responseHandler = new HttpClientResponseHandler<String>() {
@Override
public String handleResponse(final ClassicHttpResponse response) throws IOException {
final int status = response.getCode();
if (status >= HttpStatus.SC_SUCCESS && status < HttpStatus.SC_REDIRECTION) {
final HttpEntity entity = response.getEntity();
if (entity == null) {
throw new IOException("No content found in " + urlString);
}
try {
return EntityUtils.toString(entity);
} catch (final ParseException ex) {
throw new ClientProtocolException(ex);
}
} else {
// String errorBody = EntityUtils.toString(response.getEntity());
throw new ClientProtocolException(String.format("HTTP error %d : %s for URL %s", status,
response.getReasonPhrase(), urlString));
}
}
};
return getResponse(urlString, headers, responseHandler);
}
public String getResponse(String urlString, Header[] headers, HttpClientResponseHandler<String> responseHandler) throws IOException {
try {
// Use of URL constructor below is purely to get additional error checking to mimic
// previous behavior for the tests.
new URL(urlString).toURI();
} catch (IllegalArgumentException | MalformedURLException | URISyntaxException e) {
return null;
}
HttpGet httpGet = new HttpGet(urlString);
if (headers != null && headers.length > 0) {
httpGet.setHeaders(headers);
}
httpGet.setConfig(defaultRequestConfig); // FIXME: Redundant? already includes in client builder
return httpClient.execute(httpGet, responseHandler);
}
public String postNameValue(String serviceUrl, String name, String value) throws IOException {
HttpPost request = new HttpPost(serviceUrl);
List<NameValuePair> body = Collections.singletonList(
new BasicNameValuePair(name, value));
request.setEntity(new UrlEncodedFormEntity(body, StandardCharsets.UTF_8));
try (CloseableHttpResponse response = httpClient.execute(request)) {
String reasonPhrase = response.getReasonPhrase();
int statusCode = response.getCode();
if (statusCode >= 400) { // We should never see 3xx since they get handled automatically
throw new IOException(String.format("HTTP error %d : %s for URL %s", statusCode, reasonPhrase,
request.getRequestUri()));
}
return ParsingUtilities.inputStreamToString(response.getEntity().getContent());
}
}
/**
* Use binary exponential backoff strategy, instead of the default fixed
* retry interval, if the server doesn't provide a Retry-After time.
*/
class ExponentialBackoffRetryStrategy extends DefaultHttpRequestRetryStrategy {
private final TimeValue defaultInterval;
public ExponentialBackoffRetryStrategy(final int maxRetries, final TimeValue defaultRetryInterval) {
super(maxRetries, defaultRetryInterval);
this.defaultInterval = defaultRetryInterval;
}
@Override
public TimeValue getRetryInterval(HttpResponse response, int execCount, HttpContext context) {
// Get the default implementation's interval
TimeValue interval = super.getRetryInterval(response, execCount, context);
// If it's the same as the default, there was no Retry-After, so use binary
// exponential backoff
if (interval.compareTo(defaultInterval) == 0) {
interval = TimeValue.of(((Double) (Math.pow(2, execCount) * defaultInterval.getDuration())).longValue(),
defaultInterval.getTimeUnit() );
return interval;
}
return interval;
}
}
}

View File

@ -29,6 +29,7 @@ package com.google.refine.importing;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.File;
import java.io.IOException;
@ -98,8 +99,6 @@ public class ImportingUtilitiesTests extends ImporterTest {
public void urlImporting() throws IOException {
String RESPONSE_BODY = "{code:401,message:Unauthorised}";
String MESSAGE = String.format("HTTP error %d : %s | %s", 401,
"Client Error", RESPONSE_BODY);
MockWebServer server = new MockWebServer();
MockResponse mockResponse = new MockResponse();
@ -108,6 +107,8 @@ public class ImportingUtilitiesTests extends ImporterTest {
server.start();
server.enqueue(mockResponse);
HttpUrl url = server.url("/random");
String MESSAGE = String.format("HTTP error %d : %s for URL %s", 401,
"Client Error", url);
MultipartEntityBuilder builder = MultipartEntityBuilder.create();
StringBody stringBody = new StringBody(url.toString(), ContentType.MULTIPART_FORM_DATA);
@ -145,9 +146,9 @@ public class ImportingUtilitiesTests extends ImporterTest {
return job.canceled;
}
});
Assert.fail("No Exception was thrown");
fail("No Exception was thrown");
} catch (Exception exception) {
Assert.assertEquals(MESSAGE, exception.getMessage());
assertEquals(exception.getMessage(), MESSAGE);
} finally {
server.close();
}

View File

@ -91,7 +91,7 @@ public class StandardReconConfigTests extends RefineTest {
return wordDistance(s1, s2);
}
protected Recon createReconServiceResults(String text, ArrayNode resultsList, long historyEntryID) throws IOException {
protected Recon createReconServiceResults(String text, ArrayNode resultsList, long historyEntryID) {
return super.createReconServiceResults(text, resultsList, historyEntryID);
}
}

View File

@ -33,6 +33,9 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package com.google.refine.operations.column;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -129,7 +132,7 @@ public class ColumnAdditionByFetchingURLsOperationTests extends RefineTest {
} catch (InterruptedException e) {
Assert.fail("Test interrupted");
}
Assert.assertFalse(process.isRunning());
Assert.assertFalse(process.isRunning(),"Process failed to complete within timeout " + timeout);
}
@Test
@ -273,4 +276,102 @@ public class ColumnAdditionByFetchingURLsOperationTests extends RefineTest {
}
}
@Test
public void testRetries() throws Exception {
try (MockWebServer server = new MockWebServer()) {
server.start();
HttpUrl url = server.url("/retries");
for (int i = 0; i < 2; i++) {
Row row = new Row(2);
row.setCell(0, new Cell("test" + (i + 1), null));
project.rows.add(row);
}
// Queue 5 error responses with 1 sec. Retry-After interval
for (int i = 0; i < 5; i++) {
server.enqueue(new MockResponse()
.setHeader("Retry-After", 1)
.setResponseCode(429)
.setBody(Integer.toString(i,10)));
}
server.enqueue(new MockResponse().setBody("success"));
EngineDependentOperation op = new ColumnAdditionByFetchingURLsOperation(engine_config,
"fruits",
"\"" + url + "?city=\"+value",
OnError.StoreError,
"rand",
1,
100,
false,
null);
// 6 requests (4 retries @1 sec) + final response
long start = System.currentTimeMillis();
runAndWait(op, 4500);
// Make sure that our Retry-After headers were obeyed (4*1 sec vs 4*100msec)
long elapsed = System.currentTimeMillis() - start;
assertTrue(elapsed > 4000, "Retry-After retries didn't take long enough - elapsed = " + elapsed );
// 1st row fails after 4 tries (3 retries), 2nd row tries twice and gets value
assertTrue(project.rows.get(0).getCellValue(1).toString().contains("HTTP error 429"), "missing 429 error");
assertEquals(project.rows.get(1).getCellValue(1).toString(), "success");
server.shutdown();
}
}
@Test
public void testExponentialRetries() throws Exception {
try (MockWebServer server = new MockWebServer()) {
server.start();
HttpUrl url = server.url("/retries");
for (int i = 0; i < 3; i++) {
Row row = new Row(2);
row.setCell(0, new Cell("test" + (i + 1), null));
project.rows.add(row);
}
// Use 503 Server Unavailable with no Retry-After header this time
for (int i = 0; i < 5; i++) {
server.enqueue(new MockResponse()
.setResponseCode(503)
.setBody(Integer.toString(i,10)));
}
server.enqueue(new MockResponse().setBody("success"));
server.enqueue(new MockResponse().setBody("not found").setResponseCode(404));
ColumnAdditionByFetchingURLsOperation op = new ColumnAdditionByFetchingURLsOperation(engine_config,
"fruits",
"\"" + url + "?city=\"+value",
OnError.StoreError,
"rand",
1,
100,
false,
null);
// 6 requests (4 retries 200, 400, 800, 200 msec) + final response
long start = System.currentTimeMillis();
runAndWait(op, 2500);
// Make sure that our exponential back off is working
long elapsed = System.currentTimeMillis() - start;
assertTrue(elapsed > 1600, "Exponential retries didn't take enough time - elapsed = " + elapsed);
// 1st row fails after 4 tries (3 retries), 2nd row tries twice and gets value, 3rd row is hard error
assertTrue(project.rows.get(0).getCellValue(1).toString().contains("HTTP error 503"), "Missing 503 error");
assertEquals(project.rows.get(1).getCellValue(1).toString(), "success");
assertTrue(project.rows.get(2).getCellValue(1).toString().contains("HTTP error 404"),"Missing 404 error");
server.shutdown();
}
}
}

View File

@ -38,9 +38,7 @@ import static org.mockito.Mockito.mock;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -49,7 +47,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
@ -225,7 +222,6 @@ public class ExtendDataOperationTests extends RefineTest {
* Test to fetch simple strings
* @throws Exception
*/
@BeforeMethod
public void mockHttpCalls() throws Exception {
mockStatic(ReconciledDataExtensionJob.class);
@ -236,9 +232,9 @@ public class ExtendDataOperationTests extends RefineTest {
return fakeHttpCall(invocation.getArgument(0), invocation.getArgument(1));
}
};
PowerMockito.doAnswer(mockedResponse).when(ReconciledDataExtensionJob.class, "performQuery", anyString(), anyString());
PowerMockito.doAnswer(mockedResponse).when(ReconciledDataExtensionJob.class, "postExtendQuery", anyString(), anyString());
}
@AfterMethod
public void cleanupHttpMocks() {
mockedResponses.clear();