Add exponential backoff retries for reconciliation calls. (#3770)
* Update test to demonstrate retry mechanism for recon queries. This was fixed earlier by #3237. Closes #3369. * Ensure non-zero retry interval in HttpClient * Restore original bound
This commit is contained in:
parent
fa9d670d30
commit
577caabec1
@ -46,13 +46,19 @@ public class HttpClient {
|
|||||||
private HttpClientBuilder httpClientBuilder;
|
private HttpClientBuilder httpClientBuilder;
|
||||||
private CloseableHttpClient httpClient;
|
private CloseableHttpClient httpClient;
|
||||||
private int _delay;
|
private int _delay;
|
||||||
|
private int _retryInterval; // delay between original request and first retry, in ms
|
||||||
|
|
||||||
public HttpClient() {
|
public HttpClient() {
|
||||||
this(0);
|
this(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
public HttpClient(int delay) {
|
public HttpClient(int delay) {
|
||||||
|
this(delay, Math.max(delay, 200));
|
||||||
|
}
|
||||||
|
|
||||||
|
public HttpClient(int delay, int retryInterval) {
|
||||||
_delay = delay;
|
_delay = delay;
|
||||||
|
_retryInterval = retryInterval;
|
||||||
// Create a connection manager with a custom socket timeout
|
// Create a connection manager with a custom socket timeout
|
||||||
PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager();
|
PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager();
|
||||||
final SocketConfig socketConfig = SocketConfig.custom()
|
final SocketConfig socketConfig = SocketConfig.custom()
|
||||||
@ -70,7 +76,7 @@ public class HttpClient {
|
|||||||
.setDefaultRequestConfig(defaultRequestConfig)
|
.setDefaultRequestConfig(defaultRequestConfig)
|
||||||
.setConnectionManager(connManager)
|
.setConnectionManager(connManager)
|
||||||
// Default Apache HC retry is 1x @1 sec (or the value in Retry-Header)
|
// Default Apache HC retry is 1x @1 sec (or the value in Retry-Header)
|
||||||
.setRetryStrategy(new ExponentialBackoffRetryStrategy(3, TimeValue.ofMilliseconds(_delay)))
|
.setRetryStrategy(new ExponentialBackoffRetryStrategy(3, TimeValue.ofMilliseconds(_retryInterval)))
|
||||||
// .setRedirectStrategy(new LaxRedirectStrategy()) // TODO: No longer needed since default doesn't exclude POST?
|
// .setRedirectStrategy(new LaxRedirectStrategy()) // TODO: No longer needed since default doesn't exclude POST?
|
||||||
// .setConnectionBackoffStrategy(ConnectionBackoffStrategy)
|
// .setConnectionBackoffStrategy(ConnectionBackoffStrategy)
|
||||||
.addRequestInterceptorFirst(new HttpRequestInterceptor() {
|
.addRequestInterceptorFirst(new HttpRequestInterceptor() {
|
||||||
@ -198,7 +204,7 @@ public class HttpClient {
|
|||||||
// If it's the same as the default, there was no Retry-After, so use binary
|
// If it's the same as the default, there was no Retry-After, so use binary
|
||||||
// exponential backoff
|
// exponential backoff
|
||||||
if (interval.compareTo(defaultInterval) == 0) {
|
if (interval.compareTo(defaultInterval) == 0) {
|
||||||
interval = TimeValue.of(((Double) (Math.pow(2, execCount) * defaultInterval.getDuration())).longValue(),
|
interval = TimeValue.of(((Double) (Math.pow(2, execCount - 1) * defaultInterval.getDuration())).longValue(),
|
||||||
defaultInterval.getTimeUnit() );
|
defaultInterval.getTimeUnit() );
|
||||||
return interval;
|
return interval;
|
||||||
}
|
}
|
||||||
|
@ -337,9 +337,8 @@ public class StandardReconConfigTests extends RefineTest {
|
|||||||
try (MockWebServer server = new MockWebServer()) {
|
try (MockWebServer server = new MockWebServer()) {
|
||||||
server.start();
|
server.start();
|
||||||
HttpUrl url = server.url("/openrefine-wikidata/en/api");
|
HttpUrl url = server.url("/openrefine-wikidata/en/api");
|
||||||
// FIXME: Retry doesn't currently work, but should be tested
|
server.enqueue(new MockResponse().setResponseCode(503)); // service initially overloaded
|
||||||
// server.enqueue(new MockResponse().setResponseCode(503)); // service overloaded
|
server.enqueue(new MockResponse().setBody(reconResponse)); // service returns successfully
|
||||||
server.enqueue(new MockResponse().setBody(reconResponse));
|
|
||||||
server.enqueue(new MockResponse());
|
server.enqueue(new MockResponse());
|
||||||
|
|
||||||
String configJson = " {\n" +
|
String configJson = " {\n" +
|
||||||
@ -366,13 +365,13 @@ public class StandardReconConfigTests extends RefineTest {
|
|||||||
process.startPerforming(pm);
|
process.startPerforming(pm);
|
||||||
Assert.assertTrue(process.isRunning());
|
Assert.assertTrue(process.isRunning());
|
||||||
try {
|
try {
|
||||||
Thread.sleep(1000); // TODO: timeout will need to increase for retries
|
Thread.sleep(1500);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Assert.fail("Test interrupted");
|
Assert.fail("Test interrupted");
|
||||||
}
|
}
|
||||||
Assert.assertFalse(process.isRunning());
|
Assert.assertFalse(process.isRunning());
|
||||||
|
|
||||||
// RecordedRequest scratchFirstRquest = server.takeRequest();
|
server.takeRequest(); // ignore the first request which was a 503 error
|
||||||
RecordedRequest request1 = server.takeRequest();
|
RecordedRequest request1 = server.takeRequest();
|
||||||
|
|
||||||
assertNotNull(request1);
|
assertNotNull(request1);
|
||||||
|
Loading…
Reference in New Issue
Block a user