Async Processing and Webhooks Implementation Guide
Overview
This document outlines the implementation approach for adding async processing and webhook notifications to the BotSKYC API. These features will be implemented in a future phase.
. Async Processing Architecture
Use Case
When processing large batches of documents (5+ files) or when clients prefer non-blocking operations.
Architecture
Implementation Steps
Step : Add Dependencies (pom.xml)
<!-- Async Processing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Redis for job storage -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Job scheduling -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
Step : Create Job Entity
package com.nirvac.botskyc.domain.job.model;
import java.time.Instant;
import java.util.UUID;
@Entity
@Table(name = "kyc_jobs")
public class KYCJob {
@Id
private String id = UUID.randomUUID().toString();
@Enumerated(EnumType.STRING)
private JobStatus status; // PENDING, PROCESSING, COMPLETED, FAILED
@Enumerated(EnumType.STRING)
private JobType type; // IDENTITY, ADDRESS, INCOME, etc.
private String clientId;
private Instant createdAt;
private Instant startedAt;
private Instant completedAt;
@Column(columnDefinition = "TEXT")
private String inputMetadata; // JSON with file info
@Column(columnDefinition = "TEXT")
private String result; // JSON result when complete
@Column(columnDefinition = "TEXT")
private String error; // Error message if failed
private Integer totalFiles;
private Integer processedFiles;
private Integer failedFiles;
// Webhooks
private String webhookUrl;
private Integer webhookAttempts = 0;
private Instant lastWebhookAttempt;
// Getters/Setters
}
public enum JobStatus {
PENDING,
PROCESSING,
COMPLETED,
FAILED,
CANCELLED
}
public enum JobType {
IDENTITY,
ADDRESS,
INCOME,
ENTITY,
COMPLIANCE,
MULTIPURPOSE
}
Step : Create Async Controller
package com.nirvac.botskyc.api.controller.v;
@RestController
@RequestMapping("/api/kyc/async")
public class AsyncKYCController {
private final AsyncJobService asyncJobService;
private final KYCJobRepository jobRepository;
/**
* Submit identity verification job for async processing
*/
@PostMapping("/identity/verify")
public ResponseEntity<AsyncJobResponse> submitIdentityJob(
@RequestParam("documents") List<MultipartFile> documents,
@RequestParam(value = "webhookUrl", required = false) String webhookUrl,
@RequestHeader(value = "X-Client-ID", required = false) String clientId) {
// Generate job ID
String jobId = UUID.randomUUID().toString();
// Store files temporarily (object storage or local filesystem)
List<String> fileUrls = fileStorageService.storeTemporary(documents);
// Create job
KYCJob job = new KYCJob();
job.setId(jobId);
job.setType(JobType.IDENTITY);
job.setStatus(JobStatus.PENDING);
job.setClientId(clientId);
job.setTotalFiles(documents.size());
job.setWebhookUrl(webhookUrl);
job.setInputMetadata(createMetadata(fileUrls));
job.setCreatedAt(Instant.now());
jobRepository.save(job);
// Submit to queue
asyncJobService.submitJob(jobId, JobType.IDENTITY, fileUrls);
// Return response
return ResponseEntity.accepted()
.body(new AsyncJobResponse(
jobId,
JobStatus.PENDING,
"/api/kyc/async/jobs/" + jobId
));
}
/**
* Get job status and results
*/
@GetMapping("/jobs/{jobId}")
public ResponseEntity<JobStatusResponse> getJobStatus(@PathVariable String jobId) {
KYCJob job = jobRepository.findById(jobId)
.orElseThrow(() -> new ResourceNotFoundException("Job not found"));
JobStatusResponse response = new JobStatusResponse();
response.setJobId(job.getId());
response.setStatus(job.getStatus());
response.setCreatedAt(job.getCreatedAt());
response.setProgress(calculateProgress(job));
if (job.getStatus() == JobStatus.COMPLETED) {
response.setResult(objectMapper.readValue(job.getResult(), KYCAnalysisResponse.class));
} else if (job.getStatus() == JobStatus.FAILED) {
response.setError(job.getError());
}
return ResponseEntity.ok(response);
}
/**
* Cancel a pending/processing job
*/
@DeleteMapping("/jobs/{jobId}")
public ResponseEntity<Void> cancelJob(@PathVariable String jobId) {
KYCJob job = jobRepository.findById(jobId)
.orElseThrow(() -> new ResourceNotFoundException("Job not found"));
if (job.getStatus() != JobStatus.PENDING && job.getStatus() != JobStatus.PROCESSING) {
throw new IllegalStateException("Cannot cancel job in status: " + job.getStatus());
}
asyncJobService.cancelJob(jobId);
job.setStatus(JobStatus.CANCELLED);
jobRepository.save(job);
return ResponseEntity.noContent().build();
}
/**
* List jobs for a client
*/
@GetMapping("/jobs")
public ResponseEntity<Page<JobSummary>> listJobs(
@RequestHeader("X-Client-ID") String clientId,
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "20") int size,
@RequestParam(required = false) JobStatus status) {
Pageable pageable = PageRequest.of(page, size, Sort.by("createdAt").descending());
Page<KYCJob> jobs;
if (status != null) {
jobs = jobRepository.findByClientIdAndStatus(clientId, status, pageable);
} else {
jobs = jobRepository.findByClientId(clientId, pageable);
}
Page<JobSummary> summaries = jobs.map(this::toSummary);
return ResponseEntity.ok(summaries);
}
}
Step 4: Create Async Worker Service
package com.nirvac.botskyc.domain.job.service;
@Service
public class AsyncJobWorkerService {
private final KYCJobRepository jobRepository;
private final IdentityVerificationService identityService;
private final WebhookService webhookService;
private final FileStorageService fileStorageService;
@RabbitListener(queues = "${app.queue.kyc-jobs}")
public void processJob(AsyncJobMessage message) {
String jobId = message.getJobId();
try {
// Update status to PROCESSING
KYCJob job = jobRepository.findById(jobId)
.orElseThrow(() -> new JobNotFoundException(jobId));
job.setStatus(JobStatus.PROCESSING);
job.setStartedAt(Instant.now());
jobRepository.save(job);
// Download files from temporary storage
List<MultipartFile> files = fileStorageService.retrieve(message.getFileUrls());
// Process based on job type
KYCAnalysisResponse result = switch(job.getType()) {
case IDENTITY -> identityService.analyzeKYCDocuments(files.toArray(new MultipartFile[0]));
case ADDRESS -> addressService.analyzeKYCDocuments(files.toArray(new MultipartFile[0]));
// ... other types
default -> throw new UnsupportedOperationException("Unknown job type: " + job.getType());
};
// Store result
job.setResult(objectMapper.writeValueAsString(result));
job.setStatus(JobStatus.COMPLETED);
job.setCompletedAt(Instant.now());
job.setProcessedFiles(result.getTotalDocuments());
jobRepository.save(job);
// Trigger webhook if configured
if (job.getWebhookUrl() != null) {
webhookService.sendWebhook(job);
}
// Clean up temporary files
fileStorageService.deleteTemporary(message.getFileUrls());
} catch (Exception e) {
logger.error("Job {} failed: {}", jobId, e.getMessage(), e);
KYCJob job = jobRepository.findById(jobId).orElse(null);
if (job != null) {
job.setStatus(JobStatus.FAILED);
job.setError(e.getMessage());
job.setCompletedAt(Instant.now());
jobRepository.save(job);
// Still try to notify via webhook
if (job.getWebhookUrl() != null) {
webhookService.sendWebhook(job);
}
}
}
}
}
Step 5: Response DTOs
package com.nirvac.botskyc.api.dto;
public class AsyncJobResponse {
private String jobId;
private JobStatus status;
private String statusUrl;
private Instant submittedAt;
// Constructor, getters, setters
}
public class JobStatusResponse {
private String jobId;
private JobStatus status;
private JobType type;
private Instant createdAt;
private Instant startedAt;
private Instant completedAt;
private ProgressInfo progress;
private KYCAnalysisResponse result; // Only when completed
private String error; // Only when failed
// Getters, setters
}
public class ProgressInfo {
private int totalFiles;
private int processedFiles;
private int failedFiles;
private double percentage;
// Getters, setters
}
. Webhooks Architecture
Use Case
Notify clients when async jobs complete or when specific events occur.
Architecture
Implementation Steps
Step 1: Webhook Configuration Entity
package com.nirvac.botskyc.domain.webhook.model;
@Entity
@Table(name = "webhook_configs")
public class WebhookConfig {
@Id
private String id = UUID.randomUUID().toString();
private String clientId;
private String url;
private String secret; // For HMAC signing
@ElementCollection
@CollectionTable(name = "webhook_events")
private Set<WebhookEvent> events; // Which events to listen to
private boolean active = true;
private Instant createdAt;
private Instant updatedAt;
// Statistics
private Integer successCount = 0;
private Integer failureCount = 0;
private Instant lastSuccessAt;
private Instant lastFailureAt;
// Getters/Setters
}
public enum WebhookEvent {
JOB_CREATED,
JOB_STARTED,
JOB_COMPLETED,
JOB_FAILED,
JOB_CANCELLED,
DOCUMENT_PROCESSED,
VERIFICATION_HIGH_CONFIDENCE,
VERIFICATION_LOW_CONFIDENCE
}
Step : Webhook Service
package com.nirvac.botskyc.domain.webhook.service;
@Service
public class WebhookService {
private final WebhookConfigRepository webhookConfigRepository;
private final WebhookDeliveryRepository webhookDeliveryRepository;
private final RestTemplate restTemplate;
private final ObjectMapper objectMapper;
/**
* Send webhook notification
*/
@Async
public void sendWebhook(KYCJob job) {
// Find webhook configs for this client
List<WebhookConfig> configs = webhookConfigRepository
.findByClientIdAndActiveTrue(job.getClientId());
WebhookEvent event = determineEvent(job);
for (WebhookConfig config : configs) {
if (!config.getEvents().contains(event)) {
continue; // Skip if not subscribed to this event
}
sendWebhookToUrl(config, job, event);
}
}
private void sendWebhookToUrl(WebhookConfig config, KYCJob job, WebhookEvent event) {
try {
// Create payload
WebhookPayload payload = new WebhookPayload();
payload.setEvent(event);
payload.setJobId(job.getId());
payload.setTimestamp(Instant.now());
payload.setData(createEventData(job));
String payloadJson = objectMapper.writeValueAsString(payload);
// Sign payload
String signature = generateSignature(payloadJson, config.getSecret());
// Create request
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set("X-Webhook-Signature", "sha56=" + signature);
headers.set("X-Event-Type", event.name());
headers.set("X-Job-ID", job.getId());
headers.set("X-Delivery-ID", UUID.randomUUID().toString());
HttpEntity<String> request = new HttpEntity<>(payloadJson, headers);
// Send with retries
sendWithRetry(config, request, payload);
} catch (Exception e) {
logger.error("Failed to send webhook to {}: {}", config.getUrl(), e.getMessage(), e);
recordFailure(config, e.getMessage());
}
}
private void sendWithRetry(WebhookConfig config, HttpEntity<String> request, WebhookPayload payload) {
int maxRetries = 3;
int retryDelay = 1000; // Start with 1 second
for (int attempt = 0; attempt < maxRetries; attempt++) {
try {
ResponseEntity<String> response = restTemplate.postForEntity(
config.getUrl(),
request,
String.class
);
if (response.getStatusCode().isxxSuccessful()) {
recordSuccess(config, payload);
return;
}
logger.warn("Webhook returned non-xx: {}", response.getStatusCode());
} catch (Exception e) {
if (attempt == maxRetries - 1) {
throw e; // Last attempt, throw the exception
}
logger.warn("Webhook attempt {} failed, retrying in {}ms",
attempt + 1, retryDelay);
try {
Thread.sleep(retryDelay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
retryDelay *= ; // Exponential backoff
}
}
}
private String generateSignature(String payload, String secret) {
try {
Mac mac = Mac.getInstance("HmacSHA56");
SecretKeySpec secretKey = new SecretKeySpec(secret.getBytes(), "HmacSHA56");
mac.init(secretKey);
byte[] hash = mac.doFinal(payload.getBytes());
return Hex.encodeHexString(hash);
} catch (Exception e) {
throw new RuntimeException("Failed to generate signature", e);
}
}
private void recordSuccess(WebhookConfig config, WebhookPayload payload) {
config.setSuccessCount(config.getSuccessCount() + );
config.setLastSuccessAt(Instant.now());
webhookConfigRepository.save(config);
// Store delivery record
WebhookDelivery delivery = new WebhookDelivery();
delivery.setWebhookConfigId(config.getId());
delivery.setPayload(objectMapper.writeValueAsString(payload));
delivery.setSuccess(true);
delivery.setDeliveredAt(Instant.now());
webhookDeliveryRepository.save(delivery);
}
private void recordFailure(WebhookConfig config, String error) {
config.setFailureCount(config.getFailureCount() + );
config.setLastFailureAt(Instant.now());
// Disable if too many failures
if (config.getFailureCount() > 0) {
config.setActive(false);
logger.error("Disabling webhook {} due to repeated failures", config.getId());
}
webhookConfigRepository.save(config);
}
}
Step 3: Webhook Management Controller
package com.nirvac.botskyc.api.controller.v;
@RestController
@RequestMapping("/api/webhooks")
public class WebhookController {
private final WebhookConfigRepository webhookConfigRepository;
/**
* Register a new webhook
*/
@PostMapping
public ResponseEntity<WebhookConfig> registerWebhook(
@RequestHeader("X-Client-ID") String clientId,
@RequestBody @Valid WebhookRegistrationRequest request) {
// Validate URL is reachable
validateWebhookUrl(request.getUrl());
WebhookConfig config = new WebhookConfig();
config.setClientId(clientId);
config.setUrl(request.getUrl());
config.setSecret(generateSecret());
config.setEvents(request.getEvents());
config.setActive(true);
config.setCreatedAt(Instant.now());
webhookConfigRepository.save(config);
return ResponseEntity.status(HttpStatus.CREATED).body(config);
}
/**
* List webhooks for client
*/
@GetMapping
public ResponseEntity<List<WebhookConfig>> listWebhooks(
@RequestHeader("X-Client-ID") String clientId) {
List<WebhookConfig> webhooks = webhookConfigRepository.findByClientId(clientId);
return ResponseEntity.ok(webhooks);
}
/**
* Update webhook
*/
@PutMapping("/{webhookId}")
public ResponseEntity<WebhookConfig> updateWebhook(
@PathVariable String webhookId,
@RequestHeader("X-Client-ID") String clientId,
@RequestBody @Valid WebhookUpdateRequest request) {
WebhookConfig config = webhookConfigRepository.findById(webhookId)
.orElseThrow(() -> new ResourceNotFoundException("Webhook not found"));
if (!config.getClientId().equals(clientId)) {
throw new ForbiddenException("Not authorized");
}
config.setUrl(request.getUrl());
config.setEvents(request.getEvents());
config.setActive(request.isActive());
config.setUpdatedAt(Instant.now());
webhookConfigRepository.save(config);
return ResponseEntity.ok(config);
}
/**
* Delete webhook
*/
@DeleteMapping("/{webhookId}")
public ResponseEntity<Void> deleteWebhook(
@PathVariable String webhookId,
@RequestHeader("X-Client-ID") String clientId) {
WebhookConfig config = webhookConfigRepository.findById(webhookId)
.orElseThrow(() -> new ResourceNotFoundException("Webhook not found"));
if (!config.getClientId().equals(clientId)) {
throw new ForbiddenException("Not authorized");
}
webhookConfigRepository.delete(config);
return ResponseEntity.noContent().build();
}
/**
* Test webhook (sends a ping event)
*/
@PostMapping("/{webhookId}/test")
public ResponseEntity<WebhookTestResponse> testWebhook(
@PathVariable String webhookId,
@RequestHeader("X-Client-ID") String clientId) {
WebhookConfig config = webhookConfigRepository.findById(webhookId)
.orElseThrow(() -> new ResourceNotFoundException("Webhook not found"));
if (!config.getClientId().equals(clientId)) {
throw new ForbiddenException("Not authorized");
}
boolean success = webhookService.sendTestEvent(config);
return ResponseEntity.ok(new WebhookTestResponse(success));
}
}
Step 4: Webhook Payload Structure
public class WebhookPayload {
private WebhookEvent event;
private String jobId;
private Instant timestamp;
private Object data; // Event-specific data
// Getters/Setters
}
// Example payloads:
// Job Completed Event
{
"event": "JOB_COMPLETED",
"jobId": "job-123",
"timestamp": "2025-11-09T14:30:00Z",
"data": {
"type": "IDENTITY",
"totalFiles": 2,
"processedFiles": 2,
"documentsFound": 2,
"averageConfidence": 0.98,
"processingTimeMs": 2500,
"resultUrl": "/api/kyc/async/jobs/job-123"
}
}
// Job Failed Event
{
"event": "JOB_FAILED",
"jobId": "job-456",
"timestamp": "2025-11-09T14:35:00Z",
"data": {
"type": "ADDRESS",
"error": "Invalid file format",
"errorCode": "INVALID_FILE_FORMAT"
}
}
. Configuration
Application Properties
# RabbitMQ Configuration
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
app.queue.kyc-jobs=kyc-jobs
app.queue.kyc-jobs.dlq=kyc-jobs-dlq
# Redis Configuration
spring.redis.host=localhost
spring.redis.port=6379
spring.data.redis.repositories.enabled=true
# Async Processing
spring.task.execution.pool.core-size=5
spring.task.execution.pool.max-size=10
spring.task.execution.pool.queue-capacity=100
# Webhook Configuration
app.webhook.timeout=5000
app.webhook.max-retries=3
app.webhook.retry-delay=1000
4. Client Implementation Examples
Verifying Webhook Signatures (Python)
import hmac
import hashlib
def verify_webhook_signature(payload, signature, secret):
"""Verify webhook signature"""
expected_signature = hmac.new(
secret.encode(),
payload.encode(),
hashlib.sha56
).hexdigest()
# Remove 'sha56=' prefix if present
if signature.startswith('sha56='):
signature = signature[7:]
return hmac.compare_digest(expected_signature, signature)
# In your webhook endpoint:
@app.route('/webhooks/kyc', methods=['POST'])
def handle_kyc_webhook():
payload = request.get_data(as_text=True)
signature = request.headers.get('X-Webhook-Signature')
secret = 'your-webhook-secret'
if not verify_webhook_signature(payload, signature, secret):
return {'error': 'Invalid signature'}, 40
data = request.json
if data['event'] == 'JOB_COMPLETED':
# Handle completed job
job_id = data['jobId']
fetch_results(job_id)
return {'status': 'received'}, 00
Using Async API (JavaScript)
// Submit async job
async function submitAsyncVerification(files, webhookUrl) {
const formData = new FormData();
files.forEach(file => formData.append('documents', file));
if (webhookUrl) {
formData.append('webhookUrl', webhookUrl);
}
const response = await fetch('/api/kyc/async/identity/verify', {
method: 'POST',
headers: {
'X-Client-ID': 'your-client-id'
},
body: formData
});
const job = await response.json();
console.log('Job submitted:', job.jobId);
return job;
}
// Poll for job status
async function pollJobStatus(jobId) {
const response = await fetch(`/api/kyc/async/jobs/${jobId}`);
const job = await response.json();
if (job.status === 'COMPLETED') {
console.log('Job completed:', job.result);
return job.result;
} else if (job.status === 'FAILED') {
console.error('Job failed:', job.error);
throw new Error(job.error);
} else {
// Still processing, wait and poll again
await new Promise(resolve => setTimeout(resolve, 2000));
return pollJobStatus(jobId);
}
}
// Usage
const job = await submitAsyncVerification(files, 'https://myapp.com/webhooks');
const result = await pollJobStatus(job.jobId);
5. Security Considerations
Webhook Security
- HMAC Signatures: All webhooks signed with HMAC-SHA256
- HTTPS Only: Webhooks only sent to HTTPS URLs
- URL Validation: Validate webhook URLs before registration
- Rate Limiting: Limit webhook registration attempts
- IP Whitelisting: Optional IP restriction for webhook sources
Async Job Security
- Job Isolation: Clients can only access their own jobs
- Expiration: Jobs expire after 7 days
- File Cleanup: Temporary files deleted after processing
- Client Authentication: Require X-Client-ID header
- Job Cancellation: Only owner can cancel jobs
6. Monitoring and Observability
Metrics to Track
- Job submission rate
- Job processing time
- Job success/failure rate
- Webhook delivery success rate
- Queue depth
- Worker utilization
- Retry count
Alerts to Configure
- Queue depth > 000
- Job failure rate > 5%
- Webhook failure rate > 0%
- Processing time > 60s
- Worker errors
7. Implementation Timeline
Phase 1: Async Processing (2 weeks)
- Week 1: Job entity, queue setup, worker service
- Week 2: Async endpoints, testing, documentation
Phase 2: Webhooks (1 week)
- Week 3: Webhook service, management endpoints, signature verification
Phase 3: Monitoring and Production (1 week)
- Week 4: Metrics, alerts, load testing, deployment
Total Estimated Time: 4 weeks
8. Testing Strategy
Unit Tests
- Job creation and state transitions
- Webhook signature generation/verification
- Retry logic
- Error handling
Integration Tests
- End-to-end async flow
- Webhook delivery
- Job cancellation
- Queue processing
Load Tests
- 1000 concurrent job submissions
- 100 jobs/second throughput
- Webhook delivery under load
Next Steps
- Review this document
- Set up RabbitMQ/Redis infrastructure
- Implement Phase 1 (Async Processing)
- Implement Phase 2 (Webhooks)
- Deploy to staging
- Load test
- Deploy to production