Skip to main content

Async Processing and Webhooks Implementation Guide

Overview

This document outlines the implementation approach for adding async processing and webhook notifications to the BotSKYC API. These features will be implemented in a future phase.


. Async Processing Architecture

Use Case

When processing large batches of documents (5+ files) or when clients prefer non-blocking operations.

Architecture

Implementation Steps

Step : Add Dependencies (pom.xml)

<!-- Async Processing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!-- Redis for job storage -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- Job scheduling -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

Step : Create Job Entity

package com.nirvac.botskyc.domain.job.model;

import java.time.Instant;
import java.util.UUID;

@Entity
@Table(name = "kyc_jobs")
public class KYCJob {

@Id
private String id = UUID.randomUUID().toString();

@Enumerated(EnumType.STRING)
private JobStatus status; // PENDING, PROCESSING, COMPLETED, FAILED

@Enumerated(EnumType.STRING)
private JobType type; // IDENTITY, ADDRESS, INCOME, etc.

private String clientId;
private Instant createdAt;
private Instant startedAt;
private Instant completedAt;

@Column(columnDefinition = "TEXT")
private String inputMetadata; // JSON with file info

@Column(columnDefinition = "TEXT")
private String result; // JSON result when complete

@Column(columnDefinition = "TEXT")
private String error; // Error message if failed

private Integer totalFiles;
private Integer processedFiles;
private Integer failedFiles;

// Webhooks
private String webhookUrl;
private Integer webhookAttempts = 0;
private Instant lastWebhookAttempt;

// Getters/Setters
}

public enum JobStatus {
PENDING,
PROCESSING,
COMPLETED,
FAILED,
CANCELLED
}

public enum JobType {
IDENTITY,
ADDRESS,
INCOME,
ENTITY,
COMPLIANCE,
MULTIPURPOSE
}

Step : Create Async Controller

package com.nirvac.botskyc.api.controller.v;

@RestController
@RequestMapping("/api/kyc/async")
public class AsyncKYCController {

private final AsyncJobService asyncJobService;
private final KYCJobRepository jobRepository;

/**
* Submit identity verification job for async processing
*/
@PostMapping("/identity/verify")
public ResponseEntity<AsyncJobResponse> submitIdentityJob(
@RequestParam("documents") List<MultipartFile> documents,
@RequestParam(value = "webhookUrl", required = false) String webhookUrl,
@RequestHeader(value = "X-Client-ID", required = false) String clientId) {

// Generate job ID
String jobId = UUID.randomUUID().toString();

// Store files temporarily (object storage or local filesystem)
List<String> fileUrls = fileStorageService.storeTemporary(documents);

// Create job
KYCJob job = new KYCJob();
job.setId(jobId);
job.setType(JobType.IDENTITY);
job.setStatus(JobStatus.PENDING);
job.setClientId(clientId);
job.setTotalFiles(documents.size());
job.setWebhookUrl(webhookUrl);
job.setInputMetadata(createMetadata(fileUrls));
job.setCreatedAt(Instant.now());

jobRepository.save(job);

// Submit to queue
asyncJobService.submitJob(jobId, JobType.IDENTITY, fileUrls);

// Return response
return ResponseEntity.accepted()
.body(new AsyncJobResponse(
jobId,
JobStatus.PENDING,
"/api/kyc/async/jobs/" + jobId
));
}

/**
* Get job status and results
*/
@GetMapping("/jobs/{jobId}")
public ResponseEntity<JobStatusResponse> getJobStatus(@PathVariable String jobId) {
KYCJob job = jobRepository.findById(jobId)
.orElseThrow(() -> new ResourceNotFoundException("Job not found"));

JobStatusResponse response = new JobStatusResponse();
response.setJobId(job.getId());
response.setStatus(job.getStatus());
response.setCreatedAt(job.getCreatedAt());
response.setProgress(calculateProgress(job));

if (job.getStatus() == JobStatus.COMPLETED) {
response.setResult(objectMapper.readValue(job.getResult(), KYCAnalysisResponse.class));
} else if (job.getStatus() == JobStatus.FAILED) {
response.setError(job.getError());
}

return ResponseEntity.ok(response);
}

/**
* Cancel a pending/processing job
*/
@DeleteMapping("/jobs/{jobId}")
public ResponseEntity<Void> cancelJob(@PathVariable String jobId) {
KYCJob job = jobRepository.findById(jobId)
.orElseThrow(() -> new ResourceNotFoundException("Job not found"));

if (job.getStatus() != JobStatus.PENDING && job.getStatus() != JobStatus.PROCESSING) {
throw new IllegalStateException("Cannot cancel job in status: " + job.getStatus());
}

asyncJobService.cancelJob(jobId);
job.setStatus(JobStatus.CANCELLED);
jobRepository.save(job);

return ResponseEntity.noContent().build();
}

/**
* List jobs for a client
*/
@GetMapping("/jobs")
public ResponseEntity<Page<JobSummary>> listJobs(
@RequestHeader("X-Client-ID") String clientId,
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "20") int size,
@RequestParam(required = false) JobStatus status) {

Pageable pageable = PageRequest.of(page, size, Sort.by("createdAt").descending());

Page<KYCJob> jobs;
if (status != null) {
jobs = jobRepository.findByClientIdAndStatus(clientId, status, pageable);
} else {
jobs = jobRepository.findByClientId(clientId, pageable);
}

Page<JobSummary> summaries = jobs.map(this::toSummary);
return ResponseEntity.ok(summaries);
}
}

Step 4: Create Async Worker Service

package com.nirvac.botskyc.domain.job.service;

@Service
public class AsyncJobWorkerService {

private final KYCJobRepository jobRepository;
private final IdentityVerificationService identityService;
private final WebhookService webhookService;
private final FileStorageService fileStorageService;

@RabbitListener(queues = "${app.queue.kyc-jobs}")
public void processJob(AsyncJobMessage message) {
String jobId = message.getJobId();

try {
// Update status to PROCESSING
KYCJob job = jobRepository.findById(jobId)
.orElseThrow(() -> new JobNotFoundException(jobId));

job.setStatus(JobStatus.PROCESSING);
job.setStartedAt(Instant.now());
jobRepository.save(job);

// Download files from temporary storage
List<MultipartFile> files = fileStorageService.retrieve(message.getFileUrls());

// Process based on job type
KYCAnalysisResponse result = switch(job.getType()) {
case IDENTITY -> identityService.analyzeKYCDocuments(files.toArray(new MultipartFile[0]));
case ADDRESS -> addressService.analyzeKYCDocuments(files.toArray(new MultipartFile[0]));
// ... other types
default -> throw new UnsupportedOperationException("Unknown job type: " + job.getType());
};

// Store result
job.setResult(objectMapper.writeValueAsString(result));
job.setStatus(JobStatus.COMPLETED);
job.setCompletedAt(Instant.now());
job.setProcessedFiles(result.getTotalDocuments());
jobRepository.save(job);

// Trigger webhook if configured
if (job.getWebhookUrl() != null) {
webhookService.sendWebhook(job);
}

// Clean up temporary files
fileStorageService.deleteTemporary(message.getFileUrls());

} catch (Exception e) {
logger.error("Job {} failed: {}", jobId, e.getMessage(), e);

KYCJob job = jobRepository.findById(jobId).orElse(null);
if (job != null) {
job.setStatus(JobStatus.FAILED);
job.setError(e.getMessage());
job.setCompletedAt(Instant.now());
jobRepository.save(job);

// Still try to notify via webhook
if (job.getWebhookUrl() != null) {
webhookService.sendWebhook(job);
}
}
}
}
}

Step 5: Response DTOs

package com.nirvac.botskyc.api.dto;

public class AsyncJobResponse {
private String jobId;
private JobStatus status;
private String statusUrl;
private Instant submittedAt;

// Constructor, getters, setters
}

public class JobStatusResponse {
private String jobId;
private JobStatus status;
private JobType type;
private Instant createdAt;
private Instant startedAt;
private Instant completedAt;
private ProgressInfo progress;
private KYCAnalysisResponse result; // Only when completed
private String error; // Only when failed

// Getters, setters
}

public class ProgressInfo {
private int totalFiles;
private int processedFiles;
private int failedFiles;
private double percentage;

// Getters, setters
}

. Webhooks Architecture

Use Case

Notify clients when async jobs complete or when specific events occur.

Architecture

Implementation Steps

Step 1: Webhook Configuration Entity

package com.nirvac.botskyc.domain.webhook.model;

@Entity
@Table(name = "webhook_configs")
public class WebhookConfig {

@Id
private String id = UUID.randomUUID().toString();

private String clientId;
private String url;
private String secret; // For HMAC signing

@ElementCollection
@CollectionTable(name = "webhook_events")
private Set<WebhookEvent> events; // Which events to listen to

private boolean active = true;
private Instant createdAt;
private Instant updatedAt;

// Statistics
private Integer successCount = 0;
private Integer failureCount = 0;
private Instant lastSuccessAt;
private Instant lastFailureAt;

// Getters/Setters
}

public enum WebhookEvent {
JOB_CREATED,
JOB_STARTED,
JOB_COMPLETED,
JOB_FAILED,
JOB_CANCELLED,
DOCUMENT_PROCESSED,
VERIFICATION_HIGH_CONFIDENCE,
VERIFICATION_LOW_CONFIDENCE
}

Step : Webhook Service

package com.nirvac.botskyc.domain.webhook.service;

@Service
public class WebhookService {

private final WebhookConfigRepository webhookConfigRepository;
private final WebhookDeliveryRepository webhookDeliveryRepository;
private final RestTemplate restTemplate;
private final ObjectMapper objectMapper;

/**
* Send webhook notification
*/
@Async
public void sendWebhook(KYCJob job) {
// Find webhook configs for this client
List<WebhookConfig> configs = webhookConfigRepository
.findByClientIdAndActiveTrue(job.getClientId());

WebhookEvent event = determineEvent(job);

for (WebhookConfig config : configs) {
if (!config.getEvents().contains(event)) {
continue; // Skip if not subscribed to this event
}

sendWebhookToUrl(config, job, event);
}
}

private void sendWebhookToUrl(WebhookConfig config, KYCJob job, WebhookEvent event) {
try {
// Create payload
WebhookPayload payload = new WebhookPayload();
payload.setEvent(event);
payload.setJobId(job.getId());
payload.setTimestamp(Instant.now());
payload.setData(createEventData(job));

String payloadJson = objectMapper.writeValueAsString(payload);

// Sign payload
String signature = generateSignature(payloadJson, config.getSecret());

// Create request
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set("X-Webhook-Signature", "sha56=" + signature);
headers.set("X-Event-Type", event.name());
headers.set("X-Job-ID", job.getId());
headers.set("X-Delivery-ID", UUID.randomUUID().toString());

HttpEntity<String> request = new HttpEntity<>(payloadJson, headers);

// Send with retries
sendWithRetry(config, request, payload);

} catch (Exception e) {
logger.error("Failed to send webhook to {}: {}", config.getUrl(), e.getMessage(), e);
recordFailure(config, e.getMessage());
}
}

private void sendWithRetry(WebhookConfig config, HttpEntity<String> request, WebhookPayload payload) {
int maxRetries = 3;
int retryDelay = 1000; // Start with 1 second

for (int attempt = 0; attempt < maxRetries; attempt++) {
try {
ResponseEntity<String> response = restTemplate.postForEntity(
config.getUrl(),
request,
String.class
);

if (response.getStatusCode().isxxSuccessful()) {
recordSuccess(config, payload);
return;
}

logger.warn("Webhook returned non-xx: {}", response.getStatusCode());

} catch (Exception e) {
if (attempt == maxRetries - 1) {
throw e; // Last attempt, throw the exception
}

logger.warn("Webhook attempt {} failed, retrying in {}ms",
attempt + 1, retryDelay);

try {
Thread.sleep(retryDelay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}

retryDelay *= ; // Exponential backoff
}
}
}

private String generateSignature(String payload, String secret) {
try {
Mac mac = Mac.getInstance("HmacSHA56");
SecretKeySpec secretKey = new SecretKeySpec(secret.getBytes(), "HmacSHA56");
mac.init(secretKey);
byte[] hash = mac.doFinal(payload.getBytes());
return Hex.encodeHexString(hash);
} catch (Exception e) {
throw new RuntimeException("Failed to generate signature", e);
}
}

private void recordSuccess(WebhookConfig config, WebhookPayload payload) {
config.setSuccessCount(config.getSuccessCount() + );
config.setLastSuccessAt(Instant.now());
webhookConfigRepository.save(config);

// Store delivery record
WebhookDelivery delivery = new WebhookDelivery();
delivery.setWebhookConfigId(config.getId());
delivery.setPayload(objectMapper.writeValueAsString(payload));
delivery.setSuccess(true);
delivery.setDeliveredAt(Instant.now());
webhookDeliveryRepository.save(delivery);
}

private void recordFailure(WebhookConfig config, String error) {
config.setFailureCount(config.getFailureCount() + );
config.setLastFailureAt(Instant.now());

// Disable if too many failures
if (config.getFailureCount() > 0) {
config.setActive(false);
logger.error("Disabling webhook {} due to repeated failures", config.getId());
}

webhookConfigRepository.save(config);
}
}

Step 3: Webhook Management Controller

package com.nirvac.botskyc.api.controller.v;

@RestController
@RequestMapping("/api/webhooks")
public class WebhookController {

private final WebhookConfigRepository webhookConfigRepository;

/**
* Register a new webhook
*/
@PostMapping
public ResponseEntity<WebhookConfig> registerWebhook(
@RequestHeader("X-Client-ID") String clientId,
@RequestBody @Valid WebhookRegistrationRequest request) {

// Validate URL is reachable
validateWebhookUrl(request.getUrl());

WebhookConfig config = new WebhookConfig();
config.setClientId(clientId);
config.setUrl(request.getUrl());
config.setSecret(generateSecret());
config.setEvents(request.getEvents());
config.setActive(true);
config.setCreatedAt(Instant.now());

webhookConfigRepository.save(config);

return ResponseEntity.status(HttpStatus.CREATED).body(config);
}

/**
* List webhooks for client
*/
@GetMapping
public ResponseEntity<List<WebhookConfig>> listWebhooks(
@RequestHeader("X-Client-ID") String clientId) {

List<WebhookConfig> webhooks = webhookConfigRepository.findByClientId(clientId);
return ResponseEntity.ok(webhooks);
}

/**
* Update webhook
*/
@PutMapping("/{webhookId}")
public ResponseEntity<WebhookConfig> updateWebhook(
@PathVariable String webhookId,
@RequestHeader("X-Client-ID") String clientId,
@RequestBody @Valid WebhookUpdateRequest request) {

WebhookConfig config = webhookConfigRepository.findById(webhookId)
.orElseThrow(() -> new ResourceNotFoundException("Webhook not found"));

if (!config.getClientId().equals(clientId)) {
throw new ForbiddenException("Not authorized");
}

config.setUrl(request.getUrl());
config.setEvents(request.getEvents());
config.setActive(request.isActive());
config.setUpdatedAt(Instant.now());

webhookConfigRepository.save(config);

return ResponseEntity.ok(config);
}

/**
* Delete webhook
*/
@DeleteMapping("/{webhookId}")
public ResponseEntity<Void> deleteWebhook(
@PathVariable String webhookId,
@RequestHeader("X-Client-ID") String clientId) {

WebhookConfig config = webhookConfigRepository.findById(webhookId)
.orElseThrow(() -> new ResourceNotFoundException("Webhook not found"));

if (!config.getClientId().equals(clientId)) {
throw new ForbiddenException("Not authorized");
}

webhookConfigRepository.delete(config);

return ResponseEntity.noContent().build();
}

/**
* Test webhook (sends a ping event)
*/
@PostMapping("/{webhookId}/test")
public ResponseEntity<WebhookTestResponse> testWebhook(
@PathVariable String webhookId,
@RequestHeader("X-Client-ID") String clientId) {

WebhookConfig config = webhookConfigRepository.findById(webhookId)
.orElseThrow(() -> new ResourceNotFoundException("Webhook not found"));

if (!config.getClientId().equals(clientId)) {
throw new ForbiddenException("Not authorized");
}

boolean success = webhookService.sendTestEvent(config);

return ResponseEntity.ok(new WebhookTestResponse(success));
}
}

Step 4: Webhook Payload Structure

public class WebhookPayload {
private WebhookEvent event;
private String jobId;
private Instant timestamp;
private Object data; // Event-specific data

// Getters/Setters
}

// Example payloads:

// Job Completed Event
{
"event": "JOB_COMPLETED",
"jobId": "job-123",
"timestamp": "2025-11-09T14:30:00Z",
"data": {
"type": "IDENTITY",
"totalFiles": 2,
"processedFiles": 2,
"documentsFound": 2,
"averageConfidence": 0.98,
"processingTimeMs": 2500,
"resultUrl": "/api/kyc/async/jobs/job-123"
}
}

// Job Failed Event
{
"event": "JOB_FAILED",
"jobId": "job-456",
"timestamp": "2025-11-09T14:35:00Z",
"data": {
"type": "ADDRESS",
"error": "Invalid file format",
"errorCode": "INVALID_FILE_FORMAT"
}
}

. Configuration

Application Properties

# RabbitMQ Configuration
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
app.queue.kyc-jobs=kyc-jobs
app.queue.kyc-jobs.dlq=kyc-jobs-dlq

# Redis Configuration
spring.redis.host=localhost
spring.redis.port=6379
spring.data.redis.repositories.enabled=true

# Async Processing
spring.task.execution.pool.core-size=5
spring.task.execution.pool.max-size=10
spring.task.execution.pool.queue-capacity=100

# Webhook Configuration
app.webhook.timeout=5000
app.webhook.max-retries=3
app.webhook.retry-delay=1000

4. Client Implementation Examples

Verifying Webhook Signatures (Python)

import hmac
import hashlib

def verify_webhook_signature(payload, signature, secret):
"""Verify webhook signature"""
expected_signature = hmac.new(
secret.encode(),
payload.encode(),
hashlib.sha56
).hexdigest()

# Remove 'sha56=' prefix if present
if signature.startswith('sha56='):
signature = signature[7:]

return hmac.compare_digest(expected_signature, signature)

# In your webhook endpoint:
@app.route('/webhooks/kyc', methods=['POST'])
def handle_kyc_webhook():
payload = request.get_data(as_text=True)
signature = request.headers.get('X-Webhook-Signature')
secret = 'your-webhook-secret'

if not verify_webhook_signature(payload, signature, secret):
return {'error': 'Invalid signature'}, 40

data = request.json

if data['event'] == 'JOB_COMPLETED':
# Handle completed job
job_id = data['jobId']
fetch_results(job_id)

return {'status': 'received'}, 00

Using Async API (JavaScript)

// Submit async job
async function submitAsyncVerification(files, webhookUrl) {
const formData = new FormData();
files.forEach(file => formData.append('documents', file));

if (webhookUrl) {
formData.append('webhookUrl', webhookUrl);
}

const response = await fetch('/api/kyc/async/identity/verify', {
method: 'POST',
headers: {
'X-Client-ID': 'your-client-id'
},
body: formData
});

const job = await response.json();
console.log('Job submitted:', job.jobId);

return job;
}

// Poll for job status
async function pollJobStatus(jobId) {
const response = await fetch(`/api/kyc/async/jobs/${jobId}`);
const job = await response.json();

if (job.status === 'COMPLETED') {
console.log('Job completed:', job.result);
return job.result;
} else if (job.status === 'FAILED') {
console.error('Job failed:', job.error);
throw new Error(job.error);
} else {
// Still processing, wait and poll again
await new Promise(resolve => setTimeout(resolve, 2000));
return pollJobStatus(jobId);
}
}

// Usage
const job = await submitAsyncVerification(files, 'https://myapp.com/webhooks');
const result = await pollJobStatus(job.jobId);

5. Security Considerations

Webhook Security

  1. HMAC Signatures: All webhooks signed with HMAC-SHA256
  2. HTTPS Only: Webhooks only sent to HTTPS URLs
  3. URL Validation: Validate webhook URLs before registration
  4. Rate Limiting: Limit webhook registration attempts
  5. IP Whitelisting: Optional IP restriction for webhook sources

Async Job Security

  1. Job Isolation: Clients can only access their own jobs
  2. Expiration: Jobs expire after 7 days
  3. File Cleanup: Temporary files deleted after processing
  4. Client Authentication: Require X-Client-ID header
  5. Job Cancellation: Only owner can cancel jobs

6. Monitoring and Observability

Metrics to Track

  • Job submission rate
  • Job processing time
  • Job success/failure rate
  • Webhook delivery success rate
  • Queue depth
  • Worker utilization
  • Retry count

Alerts to Configure

  • Queue depth > 000
  • Job failure rate > 5%
  • Webhook failure rate > 0%
  • Processing time > 60s
  • Worker errors

7. Implementation Timeline

Phase 1: Async Processing (2 weeks)

  • Week 1: Job entity, queue setup, worker service
  • Week 2: Async endpoints, testing, documentation

Phase 2: Webhooks (1 week)

  • Week 3: Webhook service, management endpoints, signature verification

Phase 3: Monitoring and Production (1 week)

  • Week 4: Metrics, alerts, load testing, deployment

Total Estimated Time: 4 weeks


8. Testing Strategy

Unit Tests

  • Job creation and state transitions
  • Webhook signature generation/verification
  • Retry logic
  • Error handling

Integration Tests

  • End-to-end async flow
  • Webhook delivery
  • Job cancellation
  • Queue processing

Load Tests

  • 1000 concurrent job submissions
  • 100 jobs/second throughput
  • Webhook delivery under load

Next Steps

  1. Review this document
  2. Set up RabbitMQ/Redis infrastructure
  3. Implement Phase 1 (Async Processing)
  4. Implement Phase 2 (Webhooks)
  5. Deploy to staging
  6. Load test
  7. Deploy to production