Zookeeper¶
What is ZooKeeper?¶
Apache ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and group services in distributed systems.
- Type: Distributed coordination service
- Written in: Java
- License: Apache 2.0
- Protocol: Custom binary protocol over TCP
- Default Ports: 2181 (client), 2888 (follower), 3888 (election)
- Originally developed by: Yahoo!
Core Concepts¶
Data Model (ZNode Tree)¶
ZNode Types¶
| Type | Description |
|---|---|
| Persistent | Survives client disconnect |
| Ephemeral | Deleted when client disconnects |
| Sequential | Auto-appended sequence number |
| Persistent Sequential | Persistent + sequence |
| Ephemeral Sequential | Ephemeral + sequence |
Architecture¶
Consistency Model¶
• Linearizable writes (all writes go through leader)
• Sequential consistency for reads
• Watches provide eventual notification
Write path:
Client → Leader → Propose → Quorum Ack → Commit → Response
Read path (default):
Client → Any Server (may be stale)
Read path (sync):
Client → sync() → read() (guaranteed fresh)
Core Features¶
ZooKeeper offers: - Distributed coordination - Leader election - Configuration management - Service discovery - Distributed locks - Cluster membership - Barriers/synchronization - Strong consistency guarantees
Common Use Cases¶
1. Configuration Management¶
// Store configuration
public class ConfigManager {
private ZooKeeper zk;
public void saveConfig(String path, String config) throws Exception {
Stat stat = zk.exists(path, false);
if (stat == null) {
zk.create(path, config.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} else {
zk.setData(path, config.getBytes(), stat.getVersion());
}
}
public String getConfig(String path) throws Exception {
byte[] data = zk.getData(path, false, null);
return new String(data);
}
// Watch for changes
public void watchConfig(String path, Consumer<String> callback) throws Exception {
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDataChanged) {
try {
String newConfig = getConfig(path);
callback.accept(newConfig);
// Re-register watch (one-time by default)
watchConfig(path, callback);
} catch (Exception e) {
e.printStackTrace();
}
}
}
};
zk.getData(path, watcher, null);
}
}
2. Leader Election¶
public class LeaderElection implements Watcher {
private ZooKeeper zk;
private String electionPath = "/election";
private String myNode;
private Runnable onBecomingLeader;
public void participate() throws Exception {
// Create ephemeral sequential node
myNode = zk.create(electionPath + "/candidate_",
null,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
checkLeadership();
}
private void checkLeadership() throws Exception {
List<String> children = zk.getChildren(electionPath, false);
Collections.sort(children);
String smallest = electionPath + "/" + children.get(0);
if (myNode.equals(smallest)) {
// I am the leader!
onBecomingLeader.run();
} else {
// Watch the node before me
int myIndex = children.indexOf(myNode.substring(electionPath.length() + 1));
String nodeToWatch = electionPath + "/" + children.get(myIndex - 1);
Stat stat = zk.exists(nodeToWatch, this);
if (stat == null) {
// Node disappeared, check again
checkLeadership();
}
}
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted) {
try {
checkLeadership();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
3. Distributed Locking¶
public class DistributedLock {
private ZooKeeper zk;
private String lockPath;
private String myLockNode;
public DistributedLock(ZooKeeper zk, String resource) {
this.zk = zk;
this.lockPath = "/locks/" + resource;
}
public void acquire() throws Exception {
// Create lock base path if needed
if (zk.exists(lockPath, false) == null) {
try {
zk.create(lockPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException e) {
// Another thread created it
}
}
// Create ephemeral sequential node
myLockNode = zk.create(lockPath + "/lock_",
null,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// Wait until we have the lock
while (true) {
List<String> children = zk.getChildren(lockPath, false);
Collections.sort(children);
String smallest = lockPath + "/" + children.get(0);
if (myLockNode.equals(smallest)) {
// We have the lock!
return;
}
// Wait for the node before us
int myIndex = children.indexOf(myLockNode.substring(lockPath.length() + 1));
String nodeToWatch = lockPath + "/" + children.get(myIndex - 1);
CountDownLatch latch = new CountDownLatch(1);
Stat stat = zk.exists(nodeToWatch, event -> {
if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
latch.countDown();
}
});
if (stat != null) {
latch.await();
}
// Loop again to check if we're now first
}
}
public void release() throws Exception {
zk.delete(myLockNode, -1);
}
}
4. Service Discovery¶
public class ServiceRegistry {
private ZooKeeper zk;
private String servicesPath = "/services";
// Register a service instance
public String register(String serviceName, String address) throws Exception {
String servicePath = servicesPath + "/" + serviceName;
// Create service path if needed
if (zk.exists(servicePath, false) == null) {
zk.create(servicePath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
// Create ephemeral node for this instance
return zk.create(servicePath + "/instance_",
address.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
// Get all instances of a service
public List<String> discover(String serviceName) throws Exception {
String servicePath = servicesPath + "/" + serviceName;
List<String> instances = new ArrayList<>();
List<String> children = zk.getChildren(servicePath, false);
for (String child : children) {
byte[] data = zk.getData(servicePath + "/" + child, false, null);
instances.add(new String(data));
}
return instances;
}
// Watch for changes
public void watch(String serviceName, Consumer<List<String>> callback) throws Exception {
String servicePath = servicesPath + "/" + serviceName;
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
try {
List<String> instances = discover(serviceName);
callback.accept(instances);
watch(serviceName, callback); // Re-register
} catch (Exception e) {
e.printStackTrace();
}
}
}
};
zk.getChildren(servicePath, watcher);
}
}
5. Barrier / Synchronization¶
public class DistributedBarrier {
private ZooKeeper zk;
private String barrierPath;
private int expectedParticipants;
private String myNode;
public DistributedBarrier(ZooKeeper zk, String name, int participants) {
this.zk = zk;
this.barrierPath = "/barriers/" + name;
this.expectedParticipants = participants;
}
public void enter() throws Exception {
// Create barrier path
if (zk.exists(barrierPath, false) == null) {
try {
zk.create(barrierPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException e) {
// OK
}
}
// Register as participant
myNode = zk.create(barrierPath + "/participant_",
null,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// Wait until all participants arrive
while (true) {
List<String> children = zk.getChildren(barrierPath, false);
if (children.size() >= expectedParticipants) {
return; // All arrived, proceed
}
// Wait for more participants
CountDownLatch latch = new CountDownLatch(1);
zk.getChildren(barrierPath, event -> {
if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
latch.countDown();
}
});
latch.await(1, TimeUnit.SECONDS);
}
}
public void leave() throws Exception {
zk.delete(myNode, -1);
}
}
Curator Framework¶
Apache Curator provides higher-level abstractions over ZooKeeper.
// Setup
CuratorFramework client = CuratorFrameworkFactory.newClient(
"zk1:2181,zk2:2181,zk3:2181",
new ExponentialBackoffRetry(1000, 3)
);
client.start();
// Leader Election
LeaderSelector selector = new LeaderSelector(client, "/leader", new LeaderSelectorListener() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
// I am the leader now
// Method should block while leader
Thread.sleep(Long.MAX_VALUE);
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
// Handle connection state changes
}
});
selector.start();
// Distributed Lock
InterProcessMutex lock = new InterProcessMutex(client, "/locks/mylock");
if (lock.acquire(10, TimeUnit.SECONDS)) {
try {
// Do work
} finally {
lock.release();
}
}
// Service Discovery
ServiceDiscovery<InstanceDetails> discovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class)
.client(client)
.basePath("/services")
.build();
discovery.start();
// Register
ServiceInstance<InstanceDetails> instance = ServiceInstance.<InstanceDetails>builder()
.name("myservice")
.address("10.0.0.1")
.port(8080)
.build();
discovery.registerService(instance);
// Discover
Collection<ServiceInstance<InstanceDetails>> instances =
discovery.queryForInstances("myservice");
// Cache (with automatic updates)
PathChildrenCache cache = new PathChildrenCache(client, "/config", true);
cache.getListenable().addListener((client1, event) -> {
switch (event.getType()) {
case CHILD_ADDED:
case CHILD_UPDATED:
case CHILD_REMOVED:
// Handle change
break;
}
});
cache.start();
Operations¶
Session Management¶
// Session timeout: 10-40 seconds typical
ZooKeeper zk = new ZooKeeper(
"zk1:2181,zk2:2181,zk3:2181",
30000, // Session timeout in ms
watchedEvent -> {
switch (watchedEvent.getState()) {
case SyncConnected:
// Connected
break;
case Disconnected:
// Temporarily disconnected (will retry)
break;
case Expired:
// Session expired, need to recreate
break;
}
}
);
Watches¶
// One-time watches (need to re-register)
zk.getData("/path", event -> {
// Called once when data changes
}, null);
zk.getChildren("/path", event -> {
// Called once when children change
});
zk.exists("/path", event -> {
// Called once when node created/deleted
});
Trade-offs¶
| Pros | Cons |
|---|---|
| Strong consistency | Limited storage (in-memory) |
| Simple API | Complex clustering setup |
| Ordered operations | Session management complexity |
| Ephemeral nodes | Single write leader (bottleneck) |
| Watches | Watch storm potential |
| Mature and proven | JVM dependency |
| High availability | Limited throughput |
Performance Characteristics¶
| Metric | Typical Value |
|---|---|
| Read throughput | 10,000+ ops/sec |
| Write throughput | 1,000-5,000 ops/sec |
| Latency | 1-10ms |
| Max ZNode size | 1 MB |
| Max children | Millions (practical: thousands) |
| Session timeout | 10-40 seconds |
When to Use ZooKeeper¶
Good For: - Distributed coordination - Leader election - Configuration management - Service discovery - Distributed locks - Cluster membership - Barriers/synchronization
Not Good For: - Large data storage - Message queuing (use Kafka) - General-purpose database - High write throughput - Large numbers of clients per node
ZooKeeper vs Alternatives¶
| Feature | ZooKeeper | etcd | Consul |
|---|---|---|---|
| Consistency | CP (linearizable writes) | CP (Raft) | CP (Raft) |
| Language | Java | Go | Go |
| Protocol | Custom | gRPC | HTTP/gRPC |
| Watch | Yes | Yes | Yes |
| TTL | Session-based | Key-based | Both |
| ACL | Yes | Yes | Yes |
| UI | No (third-party) | No | Yes |
| Multi-DC | No | No | Yes |
Best Practices¶
- Use odd number of servers - 3 or 5 (never even)
- Separate ZK from application - Dedicated servers
- Use Curator - Higher-level API, handles edge cases
- Handle session expiry - Recreate ephemeral nodes
- Keep znodes small - < 1KB typically
- Limit children - Thousands, not millions
- Re-register watches - They are one-time
- Monitor ensemble - Four-letter commands, JMX
- Use transactions - Multi-op for atomicity
- Set appropriate session timeout - Balance between detection and stability
Four-Letter Commands¶
# Server status
echo stat | nc localhost 2181
# Configuration
echo conf | nc localhost 2181
# Connection info
echo cons | nc localhost 2181
# Server mode (leader/follower)
echo srvr | nc localhost 2181
# Watch info
echo wchs | nc localhost 2181
# Is server alive
echo ruok | nc localhost 2181
# Dump sessions
echo dump | nc localhost 2181
Ensemble Configuration¶
# zoo.cfg
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=10
syncLimit=5
# Ensemble members
server.1=zk1:2888:3888
server.2=zk2:2888:3888
server.3=zk3:2888:3888
# Create myid file
# On zk1: echo "1" > /var/lib/zookeeper/myid
# On zk2: echo "2" > /var/lib/zookeeper/myid
# On zk3: echo "3" > /var/lib/zookeeper/myid