Integration Patterns
Overview
This comprehensive guide covers integration patterns for the Ink platform, including REST client implementations, retry strategies, circuit breakers, timeout handling, error recovery, and connection pooling for reliable third-party integrations.
Target Audience: Backend developers and integration engineers
Prerequisites: REST APIs, HTTP clients, resilience patterns
Estimated Time: 50-60 minutes
Prerequisites
- REST API concepts
- HTTP client libraries (RestTemplate, WebClient)
- Understanding of distributed systems
- Exception handling patterns
- Completed Service Layer Architecture
Integration Architecture
Installation Steps
1. Integration Dependencies
<!-- filepath: /Users/jetstart/dev/jetrev/ink/pom.xml -->
<!-- ...existing code... -->
<dependencies>
<!-- Spring WebClient for reactive HTTP -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- Resilience4j for circuit breaker and retry -->
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot3</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-retry</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-ratelimiter</artifactId>
<version>2.1.0</version>
</dependency>
<!-- Apache HttpClient for connection pooling -->
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
</dependency>
</dependencies>
<!-- ...existing code... -->
2. Resilience Configuration
# filepath: /Users/jetstart/dev/jetrev/ink/src/main/resources/application.yml
# ...existing code...
resilience4j:
circuitbreaker:
instances:
stripe:
registerHealthIndicator: true
slidingWindowSize: 10
minimumNumberOfCalls: 5
permittedNumberOfCallsInHalfOpenState: 3
automaticTransitionFromOpenToHalfOpenEnabled: true
waitDurationInOpenState: 10s
failureRateThreshold: 50
eventConsumerBufferSize: 10
recordExceptions:
- org.springframework.web.client.HttpServerErrorException
- java.io.IOException
ignoreExceptions:
- org.springframework.web.client.HttpClientErrorException
quickbooks:
registerHealthIndicator: true
slidingWindowSize: 10
minimumNumberOfCalls: 5
waitDurationInOpenState: 15s
failureRateThreshold: 60
retry:
instances:
stripe:
maxAttempts: 3
waitDuration: 1000
enableExponentialBackoff: true
exponentialBackoffMultiplier: 2
retryExceptions:
- org.springframework.web.client.HttpServerErrorException
- java.net.SocketTimeoutException
ignoreExceptions:
- org.springframework.web.client.HttpClientErrorException
quickbooks:
maxAttempts: 3
waitDuration: 2000
enableExponentialBackoff: true
exponentialBackoffMultiplier: 2
ratelimiter:
instances:
stripe:
limitForPeriod: 100
limitRefreshPeriod: 1s
timeoutDuration: 5s
quickbooks:
limitForPeriod: 50
limitRefreshPeriod: 1s
timeoutDuration: 5s
# Integration specific configuration
integration:
stripe:
baseUrl: ${STRIPE_BASE_URL:https://api.stripe.com}
apiKey: ${STRIPE_API_KEY}
timeout: 30000
connectionPoolSize: 20
quickbooks:
baseUrl: ${QUICKBOOKS_BASE_URL:https://sandbox-quickbooks.api.intuit.com}
clientId: ${QUICKBOOKS_CLIENT_ID}
clientSecret: ${QUICKBOOKS_CLIENT_SECRET}
timeout: 60000
connectionPoolSize: 10
# ...existing code...
3. WebClient Configuration
// filepath: /Users/jetstart/dev/jetrev/ink/src/main/java/com/jetrev/ink/config/WebClientConfig.java
@Configuration
public class WebClientConfig {
@Bean
@Qualifier("stripeWebClient")
public WebClient stripeWebClient(
@Value("${integration.stripe.baseUrl}") String baseUrl,
@Value("${integration.stripe.apiKey}") String apiKey,
@Value("${integration.stripe.timeout}") int timeout) {
return WebClient.builder()
.baseUrl(baseUrl)
.defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + apiKey)
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.clientConnector(new ReactorClientHttpConnector(
HttpClient.create()
.responseTimeout(Duration.ofMillis(timeout))
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout)
))
.filter(ExchangeFilterFunction.ofRequestProcessor(
clientRequest -> {
log.debug("Request: {} {}", clientRequest.method(), clientRequest.url());
return Mono.just(clientRequest);
}
))
.filter(ExchangeFilterFunction.ofResponseProcessor(
clientResponse -> {
log.debug("Response status: {}", clientResponse.statusCode());
return Mono.just(clientResponse);
}
))
.build();
}
@Bean
@Qualifier("quickbooksWebClient")
public WebClient quickbooksWebClient(
@Value("${integration.quickbooks.baseUrl}") String baseUrl,
@Value("${integration.quickbooks.timeout}") int timeout) {
return WebClient.builder()
.baseUrl(baseUrl)
.defaultHeader(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.clientConnector(new ReactorClientHttpConnector(
HttpClient.create()
.responseTimeout(Duration.ofMillis(timeout))
))
.build();
}
}
Configuration
Base Integration Client
// filepath: /Users/jetstart/dev/jetrev/ink/src/main/java/com/jetrev/ink/integration/client/BaseIntegrationClient.java
@Slf4j
public abstract class BaseIntegrationClient {
protected final WebClient webClient;
protected final ObjectMapper objectMapper;
public BaseIntegrationClient(WebClient webClient, ObjectMapper objectMapper) {
this.webClient = webClient;
this.objectMapper = objectMapper;
}
protected <T> Mono<T> get(String uri, Class<T> responseType) {
return webClient.get()
.uri(uri)
.retrieve()
.onStatus(
HttpStatusCode::is4xxClientError,
response -> handle4xxError(response)
)
.onStatus(
HttpStatusCode::is5xxServerError,
response -> handle5xxError(response)
)
.bodyToMono(responseType)
.doOnError(error -> log.error("GET request failed: {}", uri, error))
.retryWhen(getRetrySpec());
}
protected <T, R> Mono<R> post(String uri, T request, Class<R> responseType) {
return webClient.post()
.uri(uri)
.bodyValue(request)
.retrieve()
.onStatus(
HttpStatusCode::is4xxClientError,
response -> handle4xxError(response)
)
.onStatus(
HttpStatusCode::is5xxServerError,
response -> handle5xxError(response)
)
.bodyToMono(responseType)
.doOnError(error -> log.error("POST request failed: {}", uri, error))
.retryWhen(getRetrySpec());
}
protected <T, R> Mono<R> put(String uri, T request, Class<R> responseType) {
return webClient.put()
.uri(uri)
.bodyValue(request)
.retrieve()
.onStatus(
HttpStatusCode::is4xxClientError,
response -> handle4xxError(response)
)
.onStatus(
HttpStatusCode::is5xxServerError,
response -> handle5xxError(response)
)
.bodyToMono(responseType)
.doOnError(error -> log.error("PUT request failed: {}", uri, error))
.retryWhen(getRetrySpec());
}
protected Mono<Void> delete(String uri) {
return webClient.delete()
.uri(uri)
.retrieve()
.onStatus(
HttpStatusCode::is4xxClientError,
response -> handle4xxError(response)
)
.onStatus(
HttpStatusCode::is5xxServerError,
response -> handle5xxError(response)
)
.bodyToMono(Void.class)
.doOnError(error -> log.error("DELETE request failed: {}", uri, error))
.retryWhen(getRetrySpec());
}
protected Retry getRetrySpec() {
return Retry.backoff(3, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(10))
.filter(this::isRetryableException)
.doBeforeRetry(retrySignal ->
log.warn("Retrying request, attempt: {}", retrySignal.totalRetries() + 1))
.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) ->
new IntegrationException("Max retry attempts exceeded",
retrySignal.failure()));
}
protected boolean isRetryableException(Throwable throwable) {
return throwable instanceof WebClientResponseException.ServiceUnavailable ||
throwable instanceof WebClientResponseException.GatewayTimeout ||
throwable instanceof WebClientResponseException.TooManyRequests ||
throwable instanceof SocketTimeoutException;
}
protected Mono<? extends Throwable> handle4xxError(ClientResponse response) {
return response.bodyToMono(String.class)
.flatMap(body -> {
log.error("4xx error: status={}, body={}", response.statusCode(), body);
return Mono.error(new IntegrationClientException(
"Client error: " + response.statusCode(),
response.statusCode().value(),
body
));
});
}
protected Mono<? extends Throwable> handle5xxError(ClientResponse response) {
return response.bodyToMono(String.class)
.flatMap(body -> {
log.error("5xx error: status={}, body={}", response.statusCode(), body);
return Mono.error(new IntegrationServerException(
"Server error: " + response.statusCode(),
response.statusCode().value(),
body
));
});
}
}
Stripe Integration Client
// filepath: /Users/jetstart/dev/jetrev/ink/src/main/java/com/jetrev/ink/integration/client/StripeClient.java
@Service
@Slf4j
public class StripeClient extends BaseIntegrationClient {
private final CircuitBreaker circuitBreaker;
private final RateLimiter rateLimiter;
public StripeClient(
@Qualifier("stripeWebClient") WebClient webClient,
ObjectMapper objectMapper,
CircuitBreakerRegistry circuitBreakerRegistry,
RateLimiterRegistry rateLimiterRegistry) {
super(webClient, objectMapper);
this.circuitBreaker = circuitBreakerRegistry.circuitBreaker("stripe");
this.rateLimiter = rateLimiterRegistry.rateLimiter("stripe");
}
public Mono<StripeCharge> createCharge(StripeChargeRequest request) {
return Mono.fromCallable(() ->
rateLimiter.executeCallable(() ->
circuitBreaker.executeCallable(() ->
post("/v1/charges", request, StripeCharge.class)
.block()
)
)
).subscribeOn(Schedulers.boundedElastic());
}
public Mono<StripeCustomer> createCustomer(StripeCustomerRequest request) {
return Mono.fromCallable(() ->
rateLimiter.executeCallable(() ->
circuitBreaker.executeCallable(() ->
post("/v1/customers", request, StripeCustomer.class)
.block()
)
)
).subscribeOn(Schedulers.boundedElastic());
}
public Mono<StripeSubscription> createSubscription(StripeSubscriptionRequest request) {
return Mono.fromCallable(() ->
rateLimiter.executeCallable(() ->
circuitBreaker.executeCallable(() ->
post("/v1/subscriptions", request, StripeSubscription.class)
.block()
)
)
).subscribeOn(Schedulers.boundedElastic());
}
public Mono<StripeSubscription> getSubscription(String subscriptionId) {
return Mono.fromCallable(() ->
rateLimiter.executeCallable(() ->
circuitBreaker.executeCallable(() ->
get("/v1/subscriptions/" + subscriptionId, StripeSubscription.class)
.block()
)
)
).subscribeOn(Schedulers.boundedElastic());
}
public Mono<StripeSubscription> cancelSubscription(String subscriptionId) {
return Mono.fromCallable(() ->
rateLimiter.executeCallable(() ->
circuitBreaker.executeCallable(() ->
delete("/v1/subscriptions/" + subscriptionId)
.then(Mono.empty())
.block()
)
)
).subscribeOn(Schedulers.boundedElastic());
}
}
Usage Examples
Circuit Breaker with Fallback
// filepath: /Users/jetstart/dev/jetrev/ink/src/main/java/com/jetrev/ink/service/PaymentService.java
@Service
@Slf4j
@RequiredArgsConstructor
public class PaymentService {
private final StripeClient stripeClient;
private final PaymentRepository paymentRepository;
@CircuitBreaker(name = "stripe", fallbackMethod = "createChargeFallback")
@Retry(name = "stripe")
@RateLimiter(name = "stripe")
public PaymentResult processPayment(PaymentRequest request) {
log.info("Processing payment for amount: {}", request.getAmount());
StripeChargeRequest stripeRequest = StripeChargeRequest.builder()
.amount(request.getAmount())
.currency(request.getCurrency())
.source(request.getToken())
.description(request.getDescription())
.build();
StripeCharge charge = stripeClient.createCharge(stripeRequest)
.block();
return PaymentResult.builder()
.transactionId(charge.getId())
.status(charge.getStatus())
.amount(charge.getAmount())
.build();
}
private PaymentResult createChargeFallback(
PaymentRequest request,
Exception exception) {
log.error("Payment processing failed, using fallback", exception);
// Save payment for retry
Payment payment = Payment.builder()
.amount(request.getAmount())
.currency(request.getCurrency())
.status("PENDING_RETRY")
.errorMessage(exception.getMessage())
.build();
paymentRepository.save(payment);
return PaymentResult.builder()
.transactionId(payment.getId().toString())
.status("PENDING")
.amount(request.getAmount())
.message("Payment queued for processing")
.build();
}
}
Retry with Exponential Backoff
// filepath: /Users/jetstart/dev/jetrev/ink/src/main/java/com/jetrev/ink/integration/retry/RetryableIntegrationClient.java
@Service
@Slf4j
public class RetryableIntegrationClient {
private final RestTemplate restTemplate;
public RetryableIntegrationClient(RestTemplateBuilder builder) {
this.restTemplate = builder
.setConnectTimeout(Duration.ofSeconds(5))
.setReadTimeout(Duration.ofSeconds(30))
.build();
}
@Retryable(
value = { RestClientException.class, SocketTimeoutException.class },
maxAttempts = 5,
backoff = @Backoff(
delay = 1000,
multiplier = 2,
maxDelay = 10000,
random = true
)
)
public <T> T executeWithRetry(String url, HttpMethod method, Object request, Class<T> responseType) {
log.info("Executing request: {} {}", method, url);
HttpEntity<?> entity = new HttpEntity<>(request, createHeaders());
try {
ResponseEntity<T> response = restTemplate.exchange(
url,
method,
entity,
responseType
);
return response.getBody();
} catch (HttpClientErrorException e) {
log.error("4xx error: {}", e.getStatusCode());
throw new IntegrationClientException("Client error", e);
} catch (HttpServerErrorException e) {
log.error("5xx error: {}", e.getStatusCode());
throw new IntegrationServerException("Server error", e);
}
}
@Recover
public <T> T recover(
RestClientException e,
String url,
HttpMethod method,
Object request,
Class<T> responseType) {
log.error("All retry attempts failed for {} {}", method, url, e);
throw new IntegrationException("Max retries exceeded", e);
}
private HttpHeaders createHeaders() {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set("User-Agent", "Ink-Platform/1.0");
return headers;
}
}
Connection Pooling
// filepath: /Users/jetstart/dev/jetrev/ink/src/main/java/com/jetrev/ink/config/HttpClientConfig.java
@Configuration
public class HttpClientConfig {
@Bean
public RestTemplate restTemplate(RestTemplateBuilder builder) {
return builder
.requestFactory(this::createRequestFactory)
.setConnectTimeout(Duration.ofSeconds(5))
.setReadTimeout(Duration.ofSeconds(30))
.build();
}
private ClientHttpRequestFactory createRequestFactory() {
PoolingHttpClientConnectionManager connectionManager =
PoolingHttpClientConnectionManagerBuilder.create()
.setMaxConnTotal(100)
.setMaxConnPerRoute(20)
.setDefaultConnectionConfig(ConnectionConfig.custom()
.setConnectTimeout(Timeout.ofSeconds(5))
.setSocketTimeout(Timeout.ofSeconds(30))
.build())
.setValidateAfterInactivity(TimeValue.ofSeconds(10))
.build();
HttpClient httpClient = HttpClientBuilder.create()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(RequestConfig.custom()
.setConnectionRequestTimeout(Timeout.ofSeconds(5))
.build())
.setRetryStrategy(new DefaultHttpRequestRetryStrategy(3, TimeValue.ofSeconds(1)))
.build();
return new HttpComponentsClientHttpRequestFactory(httpClient);
}
}
Verification
Integration Testing
// filepath: /Users/jetstart/dev/jetrev/ink/src/test/java/com/jetrev/ink/integration/StripeClientTest.java
@SpringBootTest
@ActiveProfiles("test")
class StripeClientTest {
@Autowired
private StripeClient stripeClient;
private WireMockServer wireMockServer;
@BeforeEach
void setUp() {
wireMockServer = new WireMockServer(8089);
wireMockServer.start();
WireMock.configureFor("localhost", 8089);
}
@AfterEach
void tearDown() {
wireMockServer.stop();
}
@Test
void shouldCreateChargeSuccessfully() {
// Given
stubFor(post(urlEqualTo("/v1/charges"))
.willReturn(aResponse()
.withStatus(200)
.withHeader("Content-Type", "application/json")
.withBody("""
{
"id": "ch_123",
"amount": 1000,
"currency": "usd",
"status": "succeeded"
}
""")));
StripeChargeRequest request = StripeChargeRequest.builder()
.amount(1000L)
.currency("usd")
.source("tok_visa")
.build();
// When
StripeCharge charge = stripeClient.createCharge(request).block();
// Then
assertThat(charge).isNotNull();
assertThat(charge.getId()).isEqualTo("ch_123");
assertThat(charge.getStatus()).isEqualTo("succeeded");
}
@Test
void shouldRetryOnServerError() {
// Given - first two attempts fail, third succeeds
stubFor(post(urlEqualTo("/v1/charges"))
.inScenario("Retry Scenario")
.whenScenarioStateIs(STARTED)
.willReturn(aResponse().withStatus(503))
.willSetStateTo("First Retry"));
stubFor(post(urlEqualTo("/v1/charges"))
.inScenario("Retry Scenario")
.whenScenarioStateIs("First Retry")
.willReturn(aResponse().withStatus(503))
.willSetStateTo("Second Retry"));
stubFor(post(urlEqualTo("/v1/charges"))
.inScenario("Retry Scenario")
.whenScenarioStateIs("Second Retry")
.willReturn(aResponse()
.withStatus(200)
.withBody("{\"id\":\"ch_123\",\"status\":\"succeeded\"}")));
StripeChargeRequest request = StripeChargeRequest.builder()
.amount(1000L)
.currency("usd")
.build();
// When
StripeCharge charge = stripeClient.createCharge(request).block();
// Then
assertThat(charge).isNotNull();
verify(3, postRequestedFor(urlEqualTo("/v1/charges")));
}
}
Troubleshooting
Circuit Breaker Not Opening
# Ensure proper configuration
resilience4j:
circuitbreaker:
instances:
myService:
minimumNumberOfCalls: 5 # Reduce for testing
failureRateThreshold: 50
waitDurationInOpenState: 10s
slidingWindowSize: 10
Connection Pool Exhaustion
// Monitor pool metrics
@Component
@Slf4j
public class ConnectionPoolMonitor {
@Scheduled(fixedDelay = 60000)
public void monitorConnectionPool() {
PoolingHttpClientConnectionManager manager = getConnectionManager();
PoolStats stats = manager.getTotalStats();
log.info("Connection pool stats - Available: {}, Leased: {}, Max: {}",
stats.getAvailable(),
stats.getLeased(),
stats.getMax());
if (stats.getLeased() > stats.getMax() * 0.8) {
log.warn("Connection pool utilization above 80%");
}
}
}
Best Practices
- Use Circuit Breakers: Prevent cascading failures
- Implement Retry Logic: Handle transient failures gracefully
- Connection Pooling: Reuse HTTP connections
- Timeout Configuration: Set appropriate timeouts
- Rate Limiting: Respect API rate limits
- Idempotency: Make requests idempotent where possible
- Error Handling: Distinguish between retryable and non-retryable errors
- Monitoring: Track integration health and performance
- Fallback Strategies: Provide graceful degradation
- Testing: Use WireMock for integration testing
Performance Optimization
Async Processing
@Service
@Slf4j
public class AsyncIntegrationService {
private final StripeClient stripeClient;
@Async("integrationExecutor")
public CompletableFuture<PaymentResult> processPaymentAsync(PaymentRequest request) {
return CompletableFuture.supplyAsync(() -> {
return stripeClient.createCharge(request).block();
});
}
}
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean(name = "integrationExecutor")
public Executor integrationExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("integration-");
executor.initialize();
return executor;
}
}
Related Documentation
Additional Resources
- Resilience4j Documentation
- Spring WebClient Documentation
- Circuit Breaker Pattern
- WireMock Documentation
Next Steps: Learn how to Add New Integrations to the platform.