Skip to content

Long Running Tasks

The Problem

How do you handle operations that take a long time to complete: - Video transcoding (minutes to hours) - Report generation - Data exports/imports - ML model training - Batch processing - Payment processing with retries - Email campaigns

Challenges: - HTTP timeouts - Client disconnections - Server restarts/crashes - Progress tracking - Resource management - User experience


Options & Trade-offs

1. Async Request-Response Pattern

Philosophy: "Accept request immediately, process in background, notify when complete"

Async Request-Response Pattern

Implementation:

// Controller
@RestController
public class TaskController {

    @PostMapping("/api/reports")
    public ResponseEntity<TaskResponse> generateReport(@RequestBody ReportRequest request) {
        String taskId = UUID.randomUUID().toString();

        // Create task record
        Task task = Task.builder()
            .id(taskId)
            .type("REPORT_GENERATION")
            .status(TaskStatus.PENDING)
            .request(serialize(request))
            .createdAt(Instant.now())
            .build();
        taskRepository.save(task);

        // Queue for async processing
        taskQueue.send(new TaskMessage(taskId, request));

        return ResponseEntity
            .accepted()
            .header("Location", "/api/tasks/" + taskId)
            .body(new TaskResponse(taskId, TaskStatus.PENDING));
    }

    @GetMapping("/api/tasks/{taskId}")
    public ResponseEntity<TaskStatusResponse> getTaskStatus(@PathVariable String taskId) {
        Task task = taskRepository.findById(taskId)
            .orElseThrow(() -> new NotFoundException("Task not found"));

        TaskStatusResponse response = TaskStatusResponse.builder()
            .taskId(taskId)
            .status(task.getStatus())
            .progress(task.getProgress())
            .result(task.getResult())
            .error(task.getError())
            .createdAt(task.getCreatedAt())
            .completedAt(task.getCompletedAt())
            .build();

        if (task.getStatus() == TaskStatus.COMPLETED && task.getResultUrl() != null) {
            response.setResultUrl(task.getResultUrl());
        }

        return ResponseEntity.ok(response);
    }
}

// Worker
@Component
public class ReportWorker {

    @KafkaListener(topics = "report-tasks")
    public void processReport(TaskMessage message) {
        Task task = taskRepository.findById(message.getTaskId())
            .orElseThrow();

        try {
            task.setStatus(TaskStatus.IN_PROGRESS);
            task.setStartedAt(Instant.now());
            taskRepository.save(task);

            // Long-running work with progress updates
            ReportResult result = reportService.generate(
                message.getRequest(),
                progress -> updateProgress(task, progress)
            );

            task.setStatus(TaskStatus.COMPLETED);
            task.setResult(serialize(result));
            task.setResultUrl(result.getDownloadUrl());
            task.setCompletedAt(Instant.now());

        } catch (Exception e) {
            task.setStatus(TaskStatus.FAILED);
            task.setError(e.getMessage());
            task.setCompletedAt(Instant.now());
        }

        taskRepository.save(task);
    }

    private void updateProgress(Task task, int progress) {
        task.setProgress(progress);
        taskRepository.save(task);
    }
}

Client-side Polling:

async function waitForTask(taskId, intervalMs = 2000) {
    while (true) {
        const response = await fetch(`/api/tasks/${taskId}`);
        const task = await response.json();

        updateProgressUI(task.progress);

        if (task.status === 'COMPLETED') {
            return task.result;
        }
        if (task.status === 'FAILED') {
            throw new Error(task.error);
        }

        await delay(intervalMs);
    }
}

Pros Cons
Simple to implement Polling overhead
Works with any client Delayed feedback
Stateless API Many status requests

2. Webhook Callbacks

Philosophy: "Server notifies client when task completes"

Webhook Callbacks

Implementation:

@PostMapping("/api/exports")
public ResponseEntity<TaskResponse> startExport(
        @RequestBody ExportRequest request,
        @RequestHeader("X-Callback-URL") String callbackUrl) {

    String taskId = UUID.randomUUID().toString();

    Task task = Task.builder()
        .id(taskId)
        .type("DATA_EXPORT")
        .status(TaskStatus.PENDING)
        .callbackUrl(callbackUrl)
        .build();
    taskRepository.save(task);

    taskQueue.send(new ExportTaskMessage(taskId, request));

    return ResponseEntity.accepted()
        .body(new TaskResponse(taskId, TaskStatus.PENDING));
}

// Worker sends callback on completion
@Component
public class ExportWorker {

    @KafkaListener(topics = "export-tasks")
    public void processExport(ExportTaskMessage message) {
        Task task = taskRepository.findById(message.getTaskId()).orElseThrow();

        try {
            ExportResult result = exportService.export(message.getRequest());

            task.setStatus(TaskStatus.COMPLETED);
            task.setResult(serialize(result));
            taskRepository.save(task);

            // Send webhook callback
            sendCallback(task.getCallbackUrl(), CallbackPayload.builder()
                .taskId(task.getId())
                .status("COMPLETED")
                .resultUrl(result.getDownloadUrl())
                .build());

        } catch (Exception e) {
            task.setStatus(TaskStatus.FAILED);
            task.setError(e.getMessage());
            taskRepository.save(task);

            sendCallback(task.getCallbackUrl(), CallbackPayload.builder()
                .taskId(task.getId())
                .status("FAILED")
                .error(e.getMessage())
                .build());
        }
    }

    private void sendCallback(String url, CallbackPayload payload) {
        // Retry with exponential backoff
        retryTemplate.execute(context -> {
            restTemplate.postForEntity(url, payload, Void.class);
            return null;
        });
    }
}

Pros Cons
Real-time notification Client needs public endpoint
No polling overhead Security concerns (validation)
Efficient Callback failures need handling
Firewall issues

3. Server-Sent Events (SSE)

Philosophy: "Push progress updates to client in real-time"

@GetMapping(value = "/api/tasks/{taskId}/stream",
            produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamProgress(@PathVariable String taskId) {
    SseEmitter emitter = new SseEmitter(3600000L); // 1 hour timeout

    // Subscribe to task updates
    taskUpdateSubscriber.subscribe(taskId, update -> {
        try {
            emitter.send(SseEmitter.event()
                .name("progress")
                .data(update));

            if (update.getStatus().isFinal()) {
                emitter.complete();
            }
        } catch (IOException e) {
            emitter.completeWithError(e);
        }
    });

    emitter.onCompletion(() -> taskUpdateSubscriber.unsubscribe(taskId));
    emitter.onTimeout(() -> emitter.complete());

    return emitter;
}

Client:

const eventSource = new EventSource(`/api/tasks/${taskId}/stream`);

eventSource.addEventListener('progress', (event) => {
    const update = JSON.parse(event.data);
    updateProgressBar(update.progress);

    if (update.status === 'COMPLETED') {
        showResult(update.result);
        eventSource.close();
    }
});

eventSource.onerror = (error) => {
    console.error('SSE error:', error);
    eventSource.close();
    // Fallback to polling
    pollForCompletion(taskId);
};

Pros Cons
Real-time updates Connection held open
No polling needed Browser connection limits
Easy client implementation Server resource usage
Auto-reconnect built in

4. Workflow Engines

Philosophy: "Use dedicated orchestration for complex long-running processes"

Tools: - Temporal (recommended) - Cadence (Uber) - AWS Step Functions - Apache Airflow - Netflix Conductor

Temporal Example:

// Workflow Interface
@WorkflowInterface
public interface VideoProcessingWorkflow {
    @WorkflowMethod
    VideoResult processVideo(VideoRequest request);

    @QueryMethod
    ProcessingStatus getStatus();

    @SignalMethod
    void cancel();
}

// Workflow Implementation
public class VideoProcessingWorkflowImpl implements VideoProcessingWorkflow {

    private final VideoActivities activities =
        Workflow.newActivityStub(VideoActivities.class, activityOptions);

    private ProcessingStatus status = new ProcessingStatus();

    @Override
    public VideoResult processVideo(VideoRequest request) {
        // Step 1: Download
        status.setStage("DOWNLOADING");
        String localPath = activities.download(request.getSourceUrl());

        // Step 2: Transcode (long running)
        status.setStage("TRANSCODING");
        TranscodeResult transcoded = activities.transcode(localPath, request.getOutputFormats());

        // Step 3: Generate thumbnails
        status.setStage("THUMBNAILS");
        List<String> thumbnails = activities.generateThumbnails(localPath);

        // Step 4: Upload results
        status.setStage("UPLOADING");
        List<String> outputUrls = activities.uploadResults(transcoded.getOutputFiles());

        status.setStage("COMPLETED");
        return new VideoResult(outputUrls, thumbnails);
    }

    @Override
    public ProcessingStatus getStatus() {
        return status;
    }

    @Override
    public void cancel() {
        // Handle cancellation
        Workflow.newDetachedCancellationScope(() -> {
            activities.cleanup();
        });
    }
}

// Activities (the actual work)
@ActivityInterface
public interface VideoActivities {
    @ActivityMethod(startToCloseTimeout = "10m")
    String download(String url);

    @ActivityMethod(startToCloseTimeout = "2h", heartbeatTimeout = "30s")
    TranscodeResult transcode(String path, List<String> formats);

    @ActivityMethod(startToCloseTimeout = "10m")
    List<String> generateThumbnails(String path);

    @ActivityMethod(startToCloseTimeout = "30m")
    List<String> uploadResults(List<String> files);
}

// Starting workflow
@Service
public class VideoService {
    private final WorkflowClient workflowClient;

    public String startProcessing(VideoRequest request) {
        VideoProcessingWorkflow workflow = workflowClient.newWorkflowStub(
            VideoProcessingWorkflow.class,
            WorkflowOptions.newBuilder()
                .setTaskQueue("video-processing")
                .setWorkflowId("video-" + UUID.randomUUID())
                .build()
        );

        // Start async
        WorkflowClient.start(workflow::processVideo, request);

        return workflow.getWorkflowId();
    }

    public ProcessingStatus getStatus(String workflowId) {
        VideoProcessingWorkflow workflow = workflowClient.newWorkflowStub(
            VideoProcessingWorkflow.class, workflowId);
        return workflow.getStatus();
    }
}

Pros Cons
Handles failures automatically Infrastructure complexity
Built-in retries & timeouts Learning curve
Visual monitoring Additional service to run
State persistence
Versioning support
Timer/sleep support

5. Job Queues with Priority

Philosophy: "Manage job execution with configurable priority and resources"

public class PriorityJobQueue {

    private final PriorityBlockingQueue<Job> queue = new PriorityBlockingQueue<>(
        100,
        Comparator.comparing(Job::getPriority)
            .thenComparing(Job::getCreatedAt)
    );

    public void submit(Job job) {
        queue.offer(job);
    }

    @Scheduled(fixedDelay = 100)
    public void processJobs() {
        Job job = queue.poll();
        if (job != null) {
            executor.submit(() -> processJob(job));
        }
    }
}

// Using Sidekiq-style approach (Redis-backed)
@Component
public class JobScheduler {
    private final StringRedisTemplate redis;

    public void enqueue(Job job, Priority priority) {
        String queueKey = "jobs:" + priority.name().toLowerCase();
        redis.opsForList().rightPush(queueKey, serialize(job));
    }

    public void enqueueDelayed(Job job, Duration delay) {
        double score = Instant.now().plus(delay).toEpochMilli();
        redis.opsForZSet().add("jobs:scheduled", serialize(job), score);
    }

    // Worker processes from queues in priority order
    @Scheduled(fixedDelay = 100)
    public void processQueues() {
        // Check queues in priority order
        for (String queue : List.of("jobs:critical", "jobs:high", "jobs:default", "jobs:low")) {
            String jobData = redis.opsForList().leftPop(queue);
            if (jobData != null) {
                Job job = deserialize(jobData);
                executor.submit(() -> processWithRetry(job));
                return;
            }
        }
    }

    // Move scheduled jobs to ready queue
    @Scheduled(fixedDelay = 1000)
    public void promoteScheduledJobs() {
        double now = Instant.now().toEpochMilli();
        Set<String> ready = redis.opsForZSet().rangeByScore("jobs:scheduled", 0, now);

        for (String jobData : ready) {
            redis.opsForZSet().remove("jobs:scheduled", jobData);
            redis.opsForList().rightPush("jobs:default", jobData);
        }
    }
}

6. Heartbeating & Liveness

Philosophy: "Detect stuck/dead workers and recover"

@Component
public class HeartbeatWorker {

    @Value("${worker.id}")
    private String workerId;

    private volatile String currentTaskId;

    // Send heartbeat every 10 seconds
    @Scheduled(fixedRate = 10000)
    public void sendHeartbeat() {
        if (currentTaskId != null) {
            redis.opsForValue().set(
                "worker:heartbeat:" + workerId,
                currentTaskId,
                30,
                TimeUnit.SECONDS
            );
        }
    }

    public void processTask(Task task) {
        currentTaskId = task.getId();
        try {
            // Long-running work
            doWork(task);
        } finally {
            currentTaskId = null;
        }
    }
}

// Monitor for dead workers
@Component
public class WorkerMonitor {

    @Scheduled(fixedRate = 60000)
    public void checkWorkers() {
        // Find tasks claimed by workers with expired heartbeats
        List<Task> orphanedTasks = taskRepository.findByStatusAndWorkerHeartbeatBefore(
            TaskStatus.IN_PROGRESS,
            Instant.now().minusSeconds(60)
        );

        for (Task task : orphanedTasks) {
            log.warn("Recovering orphaned task: {}", task.getId());

            if (task.getRetryCount() < task.getMaxRetries()) {
                task.setStatus(TaskStatus.PENDING);
                task.setRetryCount(task.getRetryCount() + 1);
                task.setWorkerId(null);
                taskRepository.save(task);
            } else {
                task.setStatus(TaskStatus.FAILED);
                task.setError("Max retries exceeded after worker failure");
                taskRepository.save(task);
            }
        }
    }
}

7. Checkpointing & Resume

Philosophy: "Save progress so tasks can resume after failure"

public class CheckpointedProcessor {

    public void processLargeDataset(Task task) {
        Checkpoint checkpoint = loadCheckpoint(task.getId());
        long startOffset = checkpoint != null ? checkpoint.getOffset() : 0;

        try (DatasetReader reader = new DatasetReader(task.getDataPath())) {
            reader.seekTo(startOffset);

            int batchSize = 1000;
            List<Record> batch;

            while (!(batch = reader.readBatch(batchSize)).isEmpty()) {
                processBatch(batch);

                // Save checkpoint every batch
                long currentOffset = reader.getOffset();
                saveCheckpoint(task.getId(), new Checkpoint(
                    currentOffset,
                    Instant.now(),
                    computeStats()
                ));
            }
        }

        // Complete
        deleteCheckpoint(task.getId());
    }

    private void saveCheckpoint(String taskId, Checkpoint checkpoint) {
        redis.opsForValue().set(
            "checkpoint:" + taskId,
            serialize(checkpoint),
            Duration.ofDays(7)
        );
    }

    private Checkpoint loadCheckpoint(String taskId) {
        String data = redis.opsForValue().get("checkpoint:" + taskId);
        return data != null ? deserialize(data) : null;
    }
}

8. Resource Management

Philosophy: "Control concurrent execution to prevent overload"

@Configuration
public class ExecutorConfig {

    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(100);
        executor.setRejectedExecutionHandler(new CallerRunsPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        return executor;
    }
}

// Semaphore for limiting concurrent heavy tasks
@Component
public class ResourceLimitedProcessor {

    private final Semaphore heavyTaskSemaphore = new Semaphore(3);

    public void processHeavyTask(Task task) {
        try {
            if (!heavyTaskSemaphore.tryAcquire(30, TimeUnit.SECONDS)) {
                throw new ResourceUnavailableException("No capacity for heavy task");
            }

            try {
                doHeavyWork(task);
            } finally {
                heavyTaskSemaphore.release();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new TaskException("Interrupted waiting for resources", e);
        }
    }
}

// Rate limiting
@Component
public class RateLimitedProcessor {

    private final RateLimiter rateLimiter = RateLimiter.create(10.0); // 10 per second

    public void process(Task task) {
        rateLimiter.acquire();
        doWork(task);
    }
}

Comparison Matrix

Approach Complexity Real-time Reliability Best For
Async + Polling Low No Medium Simple tasks
Webhooks Medium Yes Medium B2B integrations
SSE Medium Yes Medium User-facing progress
Workflow Engine High Yes High Complex processes
Priority Queues Medium No High Mixed workloads
Checkpointing Medium No High Data processing

Decision Tree

Long Running Tasks Decision Tree


Architecture Example

Long Running Tasks Architecture


Key Takeaways

  1. Never block HTTP requests - Return 202 Accepted immediately
  2. Persist task state - Survive restarts and failures
  3. Heartbeat for liveness - Detect and recover from stuck workers
  4. Checkpoint for resumability - Don't lose progress on failure
  5. Use workflow engines - For complex multi-step processes
  6. Provide progress updates - Users need feedback
  7. Plan for cleanup - Tasks can be cancelled or fail
  8. Set timeouts everywhere - Tasks shouldn't run forever