Long Running Tasks¶
The Problem¶
How do you handle operations that take a long time to complete: - Video transcoding (minutes to hours) - Report generation - Data exports/imports - ML model training - Batch processing - Payment processing with retries - Email campaigns
Challenges: - HTTP timeouts - Client disconnections - Server restarts/crashes - Progress tracking - Resource management - User experience
Options & Trade-offs¶
1. Async Request-Response Pattern¶
Philosophy: "Accept request immediately, process in background, notify when complete"
Implementation:
// Controller
@RestController
public class TaskController {
@PostMapping("/api/reports")
public ResponseEntity<TaskResponse> generateReport(@RequestBody ReportRequest request) {
String taskId = UUID.randomUUID().toString();
// Create task record
Task task = Task.builder()
.id(taskId)
.type("REPORT_GENERATION")
.status(TaskStatus.PENDING)
.request(serialize(request))
.createdAt(Instant.now())
.build();
taskRepository.save(task);
// Queue for async processing
taskQueue.send(new TaskMessage(taskId, request));
return ResponseEntity
.accepted()
.header("Location", "/api/tasks/" + taskId)
.body(new TaskResponse(taskId, TaskStatus.PENDING));
}
@GetMapping("/api/tasks/{taskId}")
public ResponseEntity<TaskStatusResponse> getTaskStatus(@PathVariable String taskId) {
Task task = taskRepository.findById(taskId)
.orElseThrow(() -> new NotFoundException("Task not found"));
TaskStatusResponse response = TaskStatusResponse.builder()
.taskId(taskId)
.status(task.getStatus())
.progress(task.getProgress())
.result(task.getResult())
.error(task.getError())
.createdAt(task.getCreatedAt())
.completedAt(task.getCompletedAt())
.build();
if (task.getStatus() == TaskStatus.COMPLETED && task.getResultUrl() != null) {
response.setResultUrl(task.getResultUrl());
}
return ResponseEntity.ok(response);
}
}
// Worker
@Component
public class ReportWorker {
@KafkaListener(topics = "report-tasks")
public void processReport(TaskMessage message) {
Task task = taskRepository.findById(message.getTaskId())
.orElseThrow();
try {
task.setStatus(TaskStatus.IN_PROGRESS);
task.setStartedAt(Instant.now());
taskRepository.save(task);
// Long-running work with progress updates
ReportResult result = reportService.generate(
message.getRequest(),
progress -> updateProgress(task, progress)
);
task.setStatus(TaskStatus.COMPLETED);
task.setResult(serialize(result));
task.setResultUrl(result.getDownloadUrl());
task.setCompletedAt(Instant.now());
} catch (Exception e) {
task.setStatus(TaskStatus.FAILED);
task.setError(e.getMessage());
task.setCompletedAt(Instant.now());
}
taskRepository.save(task);
}
private void updateProgress(Task task, int progress) {
task.setProgress(progress);
taskRepository.save(task);
}
}
Client-side Polling:
async function waitForTask(taskId, intervalMs = 2000) {
while (true) {
const response = await fetch(`/api/tasks/${taskId}`);
const task = await response.json();
updateProgressUI(task.progress);
if (task.status === 'COMPLETED') {
return task.result;
}
if (task.status === 'FAILED') {
throw new Error(task.error);
}
await delay(intervalMs);
}
}
| Pros | Cons |
|---|---|
| Simple to implement | Polling overhead |
| Works with any client | Delayed feedback |
| Stateless API | Many status requests |
2. Webhook Callbacks¶
Philosophy: "Server notifies client when task completes"
Implementation:
@PostMapping("/api/exports")
public ResponseEntity<TaskResponse> startExport(
@RequestBody ExportRequest request,
@RequestHeader("X-Callback-URL") String callbackUrl) {
String taskId = UUID.randomUUID().toString();
Task task = Task.builder()
.id(taskId)
.type("DATA_EXPORT")
.status(TaskStatus.PENDING)
.callbackUrl(callbackUrl)
.build();
taskRepository.save(task);
taskQueue.send(new ExportTaskMessage(taskId, request));
return ResponseEntity.accepted()
.body(new TaskResponse(taskId, TaskStatus.PENDING));
}
// Worker sends callback on completion
@Component
public class ExportWorker {
@KafkaListener(topics = "export-tasks")
public void processExport(ExportTaskMessage message) {
Task task = taskRepository.findById(message.getTaskId()).orElseThrow();
try {
ExportResult result = exportService.export(message.getRequest());
task.setStatus(TaskStatus.COMPLETED);
task.setResult(serialize(result));
taskRepository.save(task);
// Send webhook callback
sendCallback(task.getCallbackUrl(), CallbackPayload.builder()
.taskId(task.getId())
.status("COMPLETED")
.resultUrl(result.getDownloadUrl())
.build());
} catch (Exception e) {
task.setStatus(TaskStatus.FAILED);
task.setError(e.getMessage());
taskRepository.save(task);
sendCallback(task.getCallbackUrl(), CallbackPayload.builder()
.taskId(task.getId())
.status("FAILED")
.error(e.getMessage())
.build());
}
}
private void sendCallback(String url, CallbackPayload payload) {
// Retry with exponential backoff
retryTemplate.execute(context -> {
restTemplate.postForEntity(url, payload, Void.class);
return null;
});
}
}
| Pros | Cons |
|---|---|
| Real-time notification | Client needs public endpoint |
| No polling overhead | Security concerns (validation) |
| Efficient | Callback failures need handling |
| Firewall issues |
3. Server-Sent Events (SSE)¶
Philosophy: "Push progress updates to client in real-time"
@GetMapping(value = "/api/tasks/{taskId}/stream",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamProgress(@PathVariable String taskId) {
SseEmitter emitter = new SseEmitter(3600000L); // 1 hour timeout
// Subscribe to task updates
taskUpdateSubscriber.subscribe(taskId, update -> {
try {
emitter.send(SseEmitter.event()
.name("progress")
.data(update));
if (update.getStatus().isFinal()) {
emitter.complete();
}
} catch (IOException e) {
emitter.completeWithError(e);
}
});
emitter.onCompletion(() -> taskUpdateSubscriber.unsubscribe(taskId));
emitter.onTimeout(() -> emitter.complete());
return emitter;
}
Client:
const eventSource = new EventSource(`/api/tasks/${taskId}/stream`);
eventSource.addEventListener('progress', (event) => {
const update = JSON.parse(event.data);
updateProgressBar(update.progress);
if (update.status === 'COMPLETED') {
showResult(update.result);
eventSource.close();
}
});
eventSource.onerror = (error) => {
console.error('SSE error:', error);
eventSource.close();
// Fallback to polling
pollForCompletion(taskId);
};
| Pros | Cons |
|---|---|
| Real-time updates | Connection held open |
| No polling needed | Browser connection limits |
| Easy client implementation | Server resource usage |
| Auto-reconnect built in |
4. Workflow Engines¶
Philosophy: "Use dedicated orchestration for complex long-running processes"
Tools: - Temporal (recommended) - Cadence (Uber) - AWS Step Functions - Apache Airflow - Netflix Conductor
Temporal Example:
// Workflow Interface
@WorkflowInterface
public interface VideoProcessingWorkflow {
@WorkflowMethod
VideoResult processVideo(VideoRequest request);
@QueryMethod
ProcessingStatus getStatus();
@SignalMethod
void cancel();
}
// Workflow Implementation
public class VideoProcessingWorkflowImpl implements VideoProcessingWorkflow {
private final VideoActivities activities =
Workflow.newActivityStub(VideoActivities.class, activityOptions);
private ProcessingStatus status = new ProcessingStatus();
@Override
public VideoResult processVideo(VideoRequest request) {
// Step 1: Download
status.setStage("DOWNLOADING");
String localPath = activities.download(request.getSourceUrl());
// Step 2: Transcode (long running)
status.setStage("TRANSCODING");
TranscodeResult transcoded = activities.transcode(localPath, request.getOutputFormats());
// Step 3: Generate thumbnails
status.setStage("THUMBNAILS");
List<String> thumbnails = activities.generateThumbnails(localPath);
// Step 4: Upload results
status.setStage("UPLOADING");
List<String> outputUrls = activities.uploadResults(transcoded.getOutputFiles());
status.setStage("COMPLETED");
return new VideoResult(outputUrls, thumbnails);
}
@Override
public ProcessingStatus getStatus() {
return status;
}
@Override
public void cancel() {
// Handle cancellation
Workflow.newDetachedCancellationScope(() -> {
activities.cleanup();
});
}
}
// Activities (the actual work)
@ActivityInterface
public interface VideoActivities {
@ActivityMethod(startToCloseTimeout = "10m")
String download(String url);
@ActivityMethod(startToCloseTimeout = "2h", heartbeatTimeout = "30s")
TranscodeResult transcode(String path, List<String> formats);
@ActivityMethod(startToCloseTimeout = "10m")
List<String> generateThumbnails(String path);
@ActivityMethod(startToCloseTimeout = "30m")
List<String> uploadResults(List<String> files);
}
// Starting workflow
@Service
public class VideoService {
private final WorkflowClient workflowClient;
public String startProcessing(VideoRequest request) {
VideoProcessingWorkflow workflow = workflowClient.newWorkflowStub(
VideoProcessingWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue("video-processing")
.setWorkflowId("video-" + UUID.randomUUID())
.build()
);
// Start async
WorkflowClient.start(workflow::processVideo, request);
return workflow.getWorkflowId();
}
public ProcessingStatus getStatus(String workflowId) {
VideoProcessingWorkflow workflow = workflowClient.newWorkflowStub(
VideoProcessingWorkflow.class, workflowId);
return workflow.getStatus();
}
}
| Pros | Cons |
|---|---|
| Handles failures automatically | Infrastructure complexity |
| Built-in retries & timeouts | Learning curve |
| Visual monitoring | Additional service to run |
| State persistence | |
| Versioning support | |
| Timer/sleep support |
5. Job Queues with Priority¶
Philosophy: "Manage job execution with configurable priority and resources"
public class PriorityJobQueue {
private final PriorityBlockingQueue<Job> queue = new PriorityBlockingQueue<>(
100,
Comparator.comparing(Job::getPriority)
.thenComparing(Job::getCreatedAt)
);
public void submit(Job job) {
queue.offer(job);
}
@Scheduled(fixedDelay = 100)
public void processJobs() {
Job job = queue.poll();
if (job != null) {
executor.submit(() -> processJob(job));
}
}
}
// Using Sidekiq-style approach (Redis-backed)
@Component
public class JobScheduler {
private final StringRedisTemplate redis;
public void enqueue(Job job, Priority priority) {
String queueKey = "jobs:" + priority.name().toLowerCase();
redis.opsForList().rightPush(queueKey, serialize(job));
}
public void enqueueDelayed(Job job, Duration delay) {
double score = Instant.now().plus(delay).toEpochMilli();
redis.opsForZSet().add("jobs:scheduled", serialize(job), score);
}
// Worker processes from queues in priority order
@Scheduled(fixedDelay = 100)
public void processQueues() {
// Check queues in priority order
for (String queue : List.of("jobs:critical", "jobs:high", "jobs:default", "jobs:low")) {
String jobData = redis.opsForList().leftPop(queue);
if (jobData != null) {
Job job = deserialize(jobData);
executor.submit(() -> processWithRetry(job));
return;
}
}
}
// Move scheduled jobs to ready queue
@Scheduled(fixedDelay = 1000)
public void promoteScheduledJobs() {
double now = Instant.now().toEpochMilli();
Set<String> ready = redis.opsForZSet().rangeByScore("jobs:scheduled", 0, now);
for (String jobData : ready) {
redis.opsForZSet().remove("jobs:scheduled", jobData);
redis.opsForList().rightPush("jobs:default", jobData);
}
}
}
6. Heartbeating & Liveness¶
Philosophy: "Detect stuck/dead workers and recover"
@Component
public class HeartbeatWorker {
@Value("${worker.id}")
private String workerId;
private volatile String currentTaskId;
// Send heartbeat every 10 seconds
@Scheduled(fixedRate = 10000)
public void sendHeartbeat() {
if (currentTaskId != null) {
redis.opsForValue().set(
"worker:heartbeat:" + workerId,
currentTaskId,
30,
TimeUnit.SECONDS
);
}
}
public void processTask(Task task) {
currentTaskId = task.getId();
try {
// Long-running work
doWork(task);
} finally {
currentTaskId = null;
}
}
}
// Monitor for dead workers
@Component
public class WorkerMonitor {
@Scheduled(fixedRate = 60000)
public void checkWorkers() {
// Find tasks claimed by workers with expired heartbeats
List<Task> orphanedTasks = taskRepository.findByStatusAndWorkerHeartbeatBefore(
TaskStatus.IN_PROGRESS,
Instant.now().minusSeconds(60)
);
for (Task task : orphanedTasks) {
log.warn("Recovering orphaned task: {}", task.getId());
if (task.getRetryCount() < task.getMaxRetries()) {
task.setStatus(TaskStatus.PENDING);
task.setRetryCount(task.getRetryCount() + 1);
task.setWorkerId(null);
taskRepository.save(task);
} else {
task.setStatus(TaskStatus.FAILED);
task.setError("Max retries exceeded after worker failure");
taskRepository.save(task);
}
}
}
}
7. Checkpointing & Resume¶
Philosophy: "Save progress so tasks can resume after failure"
public class CheckpointedProcessor {
public void processLargeDataset(Task task) {
Checkpoint checkpoint = loadCheckpoint(task.getId());
long startOffset = checkpoint != null ? checkpoint.getOffset() : 0;
try (DatasetReader reader = new DatasetReader(task.getDataPath())) {
reader.seekTo(startOffset);
int batchSize = 1000;
List<Record> batch;
while (!(batch = reader.readBatch(batchSize)).isEmpty()) {
processBatch(batch);
// Save checkpoint every batch
long currentOffset = reader.getOffset();
saveCheckpoint(task.getId(), new Checkpoint(
currentOffset,
Instant.now(),
computeStats()
));
}
}
// Complete
deleteCheckpoint(task.getId());
}
private void saveCheckpoint(String taskId, Checkpoint checkpoint) {
redis.opsForValue().set(
"checkpoint:" + taskId,
serialize(checkpoint),
Duration.ofDays(7)
);
}
private Checkpoint loadCheckpoint(String taskId) {
String data = redis.opsForValue().get("checkpoint:" + taskId);
return data != null ? deserialize(data) : null;
}
}
8. Resource Management¶
Philosophy: "Control concurrent execution to prevent overload"
@Configuration
public class ExecutorConfig {
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setRejectedExecutionHandler(new CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
return executor;
}
}
// Semaphore for limiting concurrent heavy tasks
@Component
public class ResourceLimitedProcessor {
private final Semaphore heavyTaskSemaphore = new Semaphore(3);
public void processHeavyTask(Task task) {
try {
if (!heavyTaskSemaphore.tryAcquire(30, TimeUnit.SECONDS)) {
throw new ResourceUnavailableException("No capacity for heavy task");
}
try {
doHeavyWork(task);
} finally {
heavyTaskSemaphore.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TaskException("Interrupted waiting for resources", e);
}
}
}
// Rate limiting
@Component
public class RateLimitedProcessor {
private final RateLimiter rateLimiter = RateLimiter.create(10.0); // 10 per second
public void process(Task task) {
rateLimiter.acquire();
doWork(task);
}
}
Comparison Matrix¶
| Approach | Complexity | Real-time | Reliability | Best For |
|---|---|---|---|---|
| Async + Polling | Low | No | Medium | Simple tasks |
| Webhooks | Medium | Yes | Medium | B2B integrations |
| SSE | Medium | Yes | Medium | User-facing progress |
| Workflow Engine | High | Yes | High | Complex processes |
| Priority Queues | Medium | No | High | Mixed workloads |
| Checkpointing | Medium | No | High | Data processing |
Decision Tree¶
Architecture Example¶
Key Takeaways¶
- Never block HTTP requests - Return 202 Accepted immediately
- Persist task state - Survive restarts and failures
- Heartbeat for liveness - Detect and recover from stuck workers
- Checkpoint for resumability - Don't lose progress on failure
- Use workflow engines - For complex multi-step processes
- Provide progress updates - Users need feedback
- Plan for cleanup - Tasks can be cancelled or fail
- Set timeouts everywhere - Tasks shouldn't run forever