Java Concurrency¶
Thread Basics¶
Creating Threads¶
// Method 1: Extend Thread
class MyThread extends Thread {
@Override
public void run() {
System.out.println("Running in: " + Thread.currentThread().getName());
}
}
MyThread t1 = new MyThread();
t1.start();
// Method 2: Implement Runnable (preferred)
class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("Running in: " + Thread.currentThread().getName());
}
}
Thread t2 = new Thread(new MyRunnable());
t2.start();
// Method 3: Lambda (Java 8+)
Thread t3 = new Thread(() -> {
System.out.println("Running in: " + Thread.currentThread().getName());
});
t3.start();
// Method 4: Callable (returns value)
Callable<Integer> callable = () -> {
Thread.sleep(1000);
return 42;
};
Thread Lifecycle¶
Basic Thread Operations¶
Thread thread = new Thread(() -> {
// Do work
});
thread.start(); // Start execution
thread.join(); // Wait for completion
thread.join(1000); // Wait with timeout (ms)
thread.interrupt(); // Request interruption
thread.isInterrupted(); // Check interrupted status
Thread.interrupted(); // Check and clear interrupted status
Thread.sleep(1000); // Sleep current thread (ms)
Thread.yield(); // Hint to scheduler
Thread.currentThread(); // Get current thread
thread.setDaemon(true); // Set as daemon thread
thread.setPriority(Thread.MAX_PRIORITY); // Set priority (1-10)
Synchronization¶
synchronized Keyword¶
public class Counter {
private int count = 0;
// Synchronized method
public synchronized void increment() {
count++;
}
// Synchronized block
public void incrementBlock() {
synchronized (this) {
count++;
}
}
// Synchronized on specific object
private final Object lock = new Object();
public void incrementWithLock() {
synchronized (lock) {
count++;
}
}
// Synchronized static method (locks on Class object)
public static synchronized void staticMethod() {
// ...
}
}
volatile Keyword¶
public class VolatileExample {
// Ensures visibility across threads (not atomicity)
private volatile boolean running = true;
public void stop() {
running = false; // Visible to all threads immediately
}
public void run() {
while (running) {
// Do work
}
}
}
// volatile guarantees:
// 1. Visibility: Changes visible to all threads
// 2. Ordering: Prevents reordering
// Does NOT guarantee atomicity (use AtomicXxx for that)
wait(), notify(), notifyAll()¶
public class ProducerConsumer {
private final Queue<Integer> queue = new LinkedList<>();
private final int capacity = 10;
public synchronized void produce(int item) throws InterruptedException {
while (queue.size() == capacity) {
wait(); // Wait until space available
}
queue.add(item);
notifyAll(); // Notify consumers
}
public synchronized int consume() throws InterruptedException {
while (queue.isEmpty()) {
wait(); // Wait until item available
}
int item = queue.poll();
notifyAll(); // Notify producers
return item;
}
}
Locks (java.util.concurrent.locks)¶
ReentrantLock¶
public class Counter {
private final ReentrantLock lock = new ReentrantLock();
private int count = 0;
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock(); // Always unlock in finally
}
}
// Try lock (non-blocking)
public boolean tryIncrement() {
if (lock.tryLock()) {
try {
count++;
return true;
} finally {
lock.unlock();
}
}
return false;
}
// Try lock with timeout
public boolean tryIncrementWithTimeout() throws InterruptedException {
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
count++;
return true;
} finally {
lock.unlock();
}
}
return false;
}
// Interruptible lock
public void incrementInterruptibly() throws InterruptedException {
lock.lockInterruptibly();
try {
count++;
} finally {
lock.unlock();
}
}
}
ReadWriteLock¶
public class Cache<K, V> {
private final Map<K, V> map = new HashMap<>();
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
// Multiple readers can read concurrently
public V get(K key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
// Only one writer at a time, blocks readers
public void put(K key, V value) {
writeLock.lock();
try {
map.put(key, value);
} finally {
writeLock.unlock();
}
}
}
StampedLock (Java 8+)¶
public class Point {
private double x, y;
private final StampedLock lock = new StampedLock();
// Exclusive write lock
public void move(double deltaX, double deltaY) {
long stamp = lock.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
lock.unlockWrite(stamp);
}
}
// Optimistic read (no actual locking)
public double distanceFromOrigin() {
long stamp = lock.tryOptimisticRead();
double currentX = x, currentY = y;
// Check if write occurred during read
if (!lock.validate(stamp)) {
// Fallback to read lock
stamp = lock.readLock();
try {
currentX = x;
currentY = y;
} finally {
lock.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}
Atomic Classes¶
// AtomicInteger
AtomicInteger counter = new AtomicInteger(0);
counter.incrementAndGet(); // ++counter (returns new value)
counter.getAndIncrement(); // counter++ (returns old value)
counter.addAndGet(5); // counter += 5
counter.compareAndSet(10, 20); // CAS: if value==10, set to 20
counter.updateAndGet(x -> x * 2); // Atomic update with function
// AtomicLong
AtomicLong longCounter = new AtomicLong(0);
// AtomicBoolean
AtomicBoolean flag = new AtomicBoolean(false);
flag.compareAndSet(false, true);
// AtomicReference
AtomicReference<String> ref = new AtomicReference<>("initial");
ref.compareAndSet("initial", "updated");
ref.updateAndGet(s -> s.toUpperCase());
// AtomicIntegerArray
AtomicIntegerArray array = new AtomicIntegerArray(10);
array.incrementAndGet(0); // Increment element at index 0
// LongAdder (better for high contention)
LongAdder adder = new LongAdder();
adder.increment();
adder.add(10);
long sum = adder.sum();
ExecutorService¶
Creating Executors¶
// Fixed thread pool
ExecutorService fixed = Executors.newFixedThreadPool(4);
// Cached thread pool (creates threads as needed)
ExecutorService cached = Executors.newCachedThreadPool();
// Single thread executor
ExecutorService single = Executors.newSingleThreadExecutor();
// Scheduled executor
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);
// Work-stealing pool (Java 8+)
ExecutorService workStealing = Executors.newWorkStealingPool();
// Custom ThreadPoolExecutor
ThreadPoolExecutor custom = new ThreadPoolExecutor(
4, // Core pool size
8, // Max pool size
60L, TimeUnit.SECONDS, // Keep-alive time
new LinkedBlockingQueue<>(100), // Work queue
new ThreadPoolExecutor.CallerRunsPolicy() // Rejection policy
);
Submitting Tasks¶
ExecutorService executor = Executors.newFixedThreadPool(4);
// Submit Runnable (no return value)
executor.execute(() -> System.out.println("Task executed"));
// Submit Runnable with Future
Future<?> future1 = executor.submit(() -> System.out.println("Task"));
future1.get(); // Returns null on completion
// Submit Callable (with return value)
Future<Integer> future2 = executor.submit(() -> {
Thread.sleep(1000);
return 42;
});
Integer result = future2.get(); // Blocking wait
Integer result2 = future2.get(5, TimeUnit.SECONDS); // With timeout
// Check status
future2.isDone();
future2.isCancelled();
future2.cancel(true); // Cancel with interrupt
// Shutdown
executor.shutdown(); // Graceful shutdown
executor.shutdownNow(); // Force shutdown
executor.awaitTermination(60, TimeUnit.SECONDS); // Wait for completion
invokeAll and invokeAny¶
List<Callable<Integer>> tasks = List.of(
() -> { Thread.sleep(1000); return 1; },
() -> { Thread.sleep(2000); return 2; },
() -> { Thread.sleep(3000); return 3; }
);
// Wait for all tasks to complete
List<Future<Integer>> futures = executor.invokeAll(tasks);
for (Future<Integer> f : futures) {
System.out.println(f.get());
}
// Get result of first completed task
Integer first = executor.invokeAny(tasks);
ScheduledExecutorService¶
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
// Run after delay
scheduler.schedule(() -> System.out.println("Delayed"), 5, TimeUnit.SECONDS);
// Run at fixed rate (every 2 seconds)
scheduler.scheduleAtFixedRate(
() -> System.out.println("Fixed rate"),
0, // Initial delay
2, // Period
TimeUnit.SECONDS
);
// Run with fixed delay (2 seconds after previous completes)
scheduler.scheduleWithFixedDelay(
() -> System.out.println("Fixed delay"),
0, // Initial delay
2, // Delay after completion
TimeUnit.SECONDS
);
CompletableFuture¶
Creating CompletableFutures¶
// Run async task (no return value)
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
System.out.println("Running async");
});
// Supply async task (with return value)
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
return "Hello";
});
// With custom executor
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(
() -> "Hello",
customExecutor
);
// Already completed
CompletableFuture<String> completed = CompletableFuture.completedFuture("Done");
Chaining Operations¶
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
// Transform result (synchronous)
.thenApply(s -> s + " World")
// Transform result (asynchronous)
.thenApplyAsync(s -> s.toUpperCase())
// Consume result (no return)
.thenAccept(s -> System.out.println(s))
// Run after completion (no access to result)
.thenRun(() -> System.out.println("Done"));
// Chain with another CompletableFuture
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
Combining Futures¶
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> "World");
// Combine two futures
CompletableFuture<String> combined = cf1.thenCombine(cf2, (s1, s2) -> s1 + " " + s2);
// Wait for both (no combining)
CompletableFuture<Void> both = cf1.thenAcceptBoth(cf2, (s1, s2) -> {
System.out.println(s1 + " " + s2);
});
// Wait for all
CompletableFuture<Void> all = CompletableFuture.allOf(cf1, cf2);
all.thenRun(() -> {
String result1 = cf1.join();
String result2 = cf2.join();
});
// Wait for any
CompletableFuture<Object> any = CompletableFuture.anyOf(cf1, cf2);
Error Handling¶
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("Error!");
return "Success";
})
// Handle exception (return default)
.exceptionally(ex -> {
System.out.println("Error: " + ex.getMessage());
return "Default";
})
// Handle both success and failure
.handle((result, ex) -> {
if (ex != null) {
return "Error: " + ex.getMessage();
}
return result;
})
// Like handle but doesn't transform
.whenComplete((result, ex) -> {
if (ex != null) {
System.out.println("Failed: " + ex);
} else {
System.out.println("Success: " + result);
}
});
Concurrent Collections¶
ConcurrentHashMap¶
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// Thread-safe operations
map.put("key", 1);
map.putIfAbsent("key", 2);
map.computeIfAbsent("key", k -> expensiveComputation());
map.merge("key", 1, Integer::sum);
// Atomic operations
map.compute("counter", (k, v) -> v == null ? 1 : v + 1);
// Bulk operations (parallel)
map.forEach(1, (k, v) -> System.out.println(k + ": " + v));
long sum = map.reduceValues(1, Integer::sum);
CopyOnWriteArrayList¶
// Thread-safe list optimized for read-heavy workloads
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("item"); // Creates new copy of array
String item = list.get(0); // Fast read, no locking
// Good for: Event listeners, rarely modified lists
// Bad for: Frequent modifications (expensive copies)
BlockingQueue¶
// ArrayBlockingQueue (bounded)
BlockingQueue<String> bounded = new ArrayBlockingQueue<>(100);
// LinkedBlockingQueue (optionally bounded)
BlockingQueue<String> unbounded = new LinkedBlockingQueue<>();
// PriorityBlockingQueue (priority ordered)
BlockingQueue<Task> priority = new PriorityBlockingQueue<>();
// Operations
queue.put(item); // Blocks if full
queue.take(); // Blocks if empty
queue.offer(item); // Returns false if full
queue.poll(); // Returns null if empty
queue.offer(item, 1, TimeUnit.SECONDS); // With timeout
// Producer-Consumer pattern
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
// Producer
executor.submit(() -> {
while (true) {
queue.put(produce());
}
});
// Consumer
executor.submit(() -> {
while (true) {
Integer item = queue.take();
consume(item);
}
});
Synchronizers¶
CountDownLatch¶
// One-time barrier - wait for N events
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
executor.submit(() -> {
doWork();
latch.countDown(); // Signal completion
});
}
latch.await(); // Wait for all 3 to complete
System.out.println("All tasks completed");
CyclicBarrier¶
// Reusable barrier - all threads wait for each other
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("All threads reached barrier");
});
for (int i = 0; i < 3; i++) {
executor.submit(() -> {
doPhase1();
barrier.await(); // Wait for all threads
doPhase2();
barrier.await(); // Barrier is reusable
doPhase3();
});
}
Semaphore¶
// Limit concurrent access to resource
Semaphore semaphore = new Semaphore(3); // 3 permits
void accessResource() throws InterruptedException {
semaphore.acquire(); // Get permit (blocks if none available)
try {
// Access limited resource
} finally {
semaphore.release(); // Return permit
}
}
// Try with timeout
if (semaphore.tryAcquire(1, TimeUnit.SECONDS)) {
try {
// Access resource
} finally {
semaphore.release();
}
}
Phaser¶
// Flexible synchronizer (like CountDownLatch + CyclicBarrier)
Phaser phaser = new Phaser(1); // Register main thread
for (int i = 0; i < 3; i++) {
phaser.register(); // Register worker
executor.submit(() -> {
doPhase1();
phaser.arriveAndAwaitAdvance(); // Wait for phase 1
doPhase2();
phaser.arriveAndAwaitAdvance(); // Wait for phase 2
phaser.arriveAndDeregister(); // Done
});
}
phaser.arriveAndDeregister(); // Main thread done
Virtual Threads (Java 21+)¶
// Create virtual thread
Thread vThread = Thread.ofVirtual().start(() -> {
System.out.println("Running in virtual thread");
});
// Virtual thread factory
ThreadFactory factory = Thread.ofVirtual().factory();
Thread t = factory.newThread(() -> doWork());
// Executor with virtual threads
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 10000; i++) {
executor.submit(() -> {
// Each task gets its own virtual thread
// Can have millions of virtual threads
Thread.sleep(1000);
});
}
}
// Virtual threads are:
// - Lightweight (low memory footprint)
// - Managed by JVM, not OS
// - Best for I/O-bound tasks
// - Don't pool - create new ones
Common Interview Questions¶
- synchronized vs Lock?
- Lock: More flexible (tryLock, timeout, interruptible)
-
synchronized: Simpler, auto-release, no forgetting unlock
-
volatile vs synchronized?
- volatile: Visibility only, no atomicity
-
synchronized: Both visibility and atomicity
-
wait() vs sleep()?
- wait(): Releases lock, must be in synchronized
-
sleep(): Doesn't release lock, can be anywhere
-
Why use Executor over new Thread()?
-
Thread pooling, resource management, better control
-
CompletableFuture vs Future?
-
CompletableFuture: Chaining, combining, non-blocking
-
When to use virtual threads?
- I/O-bound tasks, high concurrency, simple blocking code