为什么是zk,因为用到zk的场景实在太多了,大部分耳熟能详的分布式系统都有zookeeper的影子,比如hbase,storm,dubbo,kafka等等,另外前面提到的也用到了zookeeper。1 环境准备
1.1 导入代码
然后,将src/java下面的main和generated设置为源码目录,同时将lib目录添加为liabary。1.2 设置配置文件
1.3 调试配置
set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain....endlocal
2 启动分析
2.1 QuorumPeerMain
protected void initializeAndRun(String[] args) throws ConfigException, IOException { QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { config.parse(args[0]); } // Start and schedule the the purge task 清理任务 DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); // 集群模式 if (args.length == 1 && config.servers.size() > 0) { runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode"); // there is only server in the quorum -- run as standalone // 单机模式 ZooKeeperServerMain.main(args); } }
- 加载解析配置文件到QuorumPeerConfig
- 执行清理任务
- 判断是集群模式还是单机模式,我们的配置文件未配置server,所以是单机模式,执行 ZooKeeperServerMain.main
2.2 ZooKeeperServerMain
``` java
protected void initializeAndRun(String[] args) throws ConfigException, IOException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); }ServerConfig config = new ServerConfig(); if (args.length == 1) { config.parse(args[0]); } else { config.parse(args); } runFromConfig(config);}```
public void runFromConfig(ServerConfig config) throws IOException { LOG.info("Starting server"); FileTxnSnapLog txnLog = null; try { // Note that this thread isn't going to be doing anything else, // so rather than spawning another thread, we will just call // run() in this thread. // create a file logger url from the command line args final ZooKeeperServer zkServer = new ZooKeeperServer(); // Registers shutdown handler which will be used to know the // server error or shutdown state changes. final CountDownLatch shutdownLatch = new CountDownLatch(1); zkServer.registerServerShutdownHandler( new ZooKeeperServerShutdownHandler(shutdownLatch)); // 快照 txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File( config.dataDir)); zkServer.setTxnLogFactory(txnLog); zkServer.setTickTime(config.tickTime); zkServer.setMinSessionTimeout(config.minSessionTimeout); zkServer.setMaxSessionTimeout(config.maxSessionTimeout); // socket工厂 cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); cnxnFactory.startup(zkServer); // Watch status of ZooKeeper server. It will do a graceful shutdown // if the server is not running or hits an internal error. shutdownLatch.await(); shutdown(); cnxnFactory.join(); if (zkServer.canShutdown()) { zkServer.shutdown(); } } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Server interrupted", e); } finally { if (txnLog != null) { txnLog.close(); } } }
- 创建zkServer,对ZooKeeperServer设置一些配置参数,如tickTime、minSessionTimeout、maxSessionTimeout
- 创建CountDownLatch,注释里写了,用来watch zk的状态,当zk关闭或者出现内部错误的时候优雅的关闭服务
- 根据配置参数dataLogDir和dataDir创建FileTxnSnapLog,用来存储zk数据和日志快照
- 创建cnxnFactory,zk的 socket工厂,负责处理网络请求,zk里有netty和NIO两种实现
- cnxnFactory.startup(zkServer),启动zk服务器
2.3 ServerCnxnFactory
static public ServerCnxnFactory createFactory() throws IOException { String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY); if (serverCnxnFactoryName == null) { serverCnxnFactoryName = NIOServerCnxnFactory.class.getName(); } try { return (ServerCnxnFactory) Class.forName(serverCnxnFactoryName) .newInstance(); } catch (Exception e) { IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName); ioe.initCause(e); throw ioe; } }
2.4 ZooKeeperServer
我们先来看ZooKeeperServer包含哪些内容:public static final int DEFAULT_TICK_TIME = 3000; protected int tickTime = DEFAULT_TICK_TIME; /** value of -1 indicates unset, use default */ protected int minSessionTimeout = -1; /** value of -1 indicates unset, use default */ protected int maxSessionTimeout = -1; protected SessionTracker sessionTracker; //创建和管理session private FileTxnSnapLog txnLogFactory = null; //文件快照 private ZKDatabase zkDb; // ZooKeeper树形数据的模型 private final AtomicLong hzxid = new AtomicLong(0); //原子增长Long,用于分配事务编号 public final static Exception ok = new Exception("No prob"); protected RequestProcessor firstProcessor; // ZooKeeperServer请求处理器链中的第一个处理器 protected volatile State state = State.INITIAL; protected enum State { INITIAL, RUNNING, SHUTDOWN, ERROR; } /** * This is the secret that we use to generate passwords, for the moment it * is more of a sanity check. */ static final private long superSecret = 0XB3415C00L; private final AtomicInteger requestsInProcess = new AtomicInteger(0); final ListoutstandingChanges = new ArrayList (); // this data structure must be accessed under the outstandingChanges lock final HashMap outstandingChangesForPath = new HashMap (); private ServerCnxnFactory serverCnxnFactory; //ServerSocket工厂,接受客户端的socket连接 private final ServerStats serverStats; //server的运行状态统计 private final ZooKeeperServerListener listener; // ZK运行状态监听 private ZooKeeperServerShutdownHandler zkShutdownHandler;
2.5 服务启动
cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); cnxnFactory.startup(zkServer);
2.5.1 配置cnxnFactory
@Override public void configure(InetSocketAddress addr, int maxcc) throws IOException { configureSaslLogin(); // ZK网络请求主线程 thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr); thread.setDaemon(true); maxClientCnxns = maxcc; this.ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true); LOG.info("binding to port " + addr); ss.socket().bind(addr); ss.configureBlocking(false); ss.register(selector, SelectionKey.OP_ACCEPT); }
- configureSaslLogin,具体不细看,应该是处理鉴权
- 初始化ZooKeeperThread,这个ZooKeeperThread的作用是负责处理未处理异常:
public class ZooKeeperThread extends Thread { private static final Logger LOG = LoggerFactory .getLogger(ZooKeeperThread.class); private UncaughtExceptionHandler uncaughtExceptionalHandler = new UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { handleException(t.getName(), e); } }; public ZooKeeperThread(Runnable thread, String threadName) { super(thread, threadName); setUncaughtExceptionHandler(uncaughtExceptionalHandler); } protected void handleException(String thName, Throwable e) { LOG.warn("Exception occured from thread {}", thName, e); }}
- 启动ServerSocketChannel,并绑定配置的addr,并且注册selector(可以搜索NIO了解细节)
2.5.2 启动cnxnFactory
@Override public void startup(ZooKeeperServer zks) throws IOException, InterruptedException { start(); setZooKeeperServer(zks); zks.startdata(); zks.startup(); }
@Override public void start() { // ensure thread is started once and only once if (thread.getState() == Thread.State.NEW) { thread.start(); } }
public void run() { while (!ss.socket().isClosed()) { try { selector.select(1000); Setselected; synchronized (this) { selected = selector.selectedKeys(); } ArrayList selectedList = new ArrayList ( selected); Collections.shuffle(selectedList); for (SelectionKey k : selectedList) { if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { SocketChannel sc = ((ServerSocketChannel) k .channel()).accept(); InetAddress ia = sc.socket().getInetAddress(); int cnxncount = getClientCnxnCount(ia); if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){ LOG.warn("Too many connections from " + ia + " - max is " + maxClientCnxns ); sc.close(); } else { LOG.info("Accepted socket connection from " + sc.socket().getRemoteSocketAddress()); sc.configureBlocking(false); SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); NIOServerCnxn cnxn = createConnection(sc, sk); sk.attach(cnxn); addCnxn(cnxn); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { NIOServerCnxn c = (NIOServerCnxn) k.attachment(); c.doIO(k); } else { if (LOG.isDebugEnabled()) { LOG.debug("Unexpected ops in select " + k.readyOps()); } } } selected.clear(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring exception", e); } } closeAll(); LOG.info("NIOServerCnxn factory exited run method"); }
- 首先打乱 Collections.shuffle(selectedList);
- for循环处理
- 如果SelectionKey.OP_ACCEPT,代表一个新连接请求,创建SocketChannel,创建NIOServerCnxn,然后addCnxn
- 如果可读写,则 NIOServerCnxn.doIO(k),执行IO操作
void doIO(SelectionKey k) throws InterruptedException { try { if (isSocketOpen() == false) { LOG.warn("trying to do i/o on a null socket for session:0x" + Long.toHexString(sessionId)); return; } if (k.isReadable()) { // 读取4个字节 int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from client sessionid 0x" + Long.toHexString(sessionId) + ", likely client has closed socket"); } // 读满了 if (incomingBuffer.remaining() == 0) { boolean isPayload; if (incomingBuffer == lenBuffer) { // start of next request incomingBuffer.flip(); // 复位 isPayload = readLength(k); // 读取载荷长度 incomingBuffer.clear(); } else { // continuation isPayload = true; } if (isPayload) { // not the case for 4letterword readPayload(); } else { // four letter words take care // need not do anything else return; } } }
private void readPayload() throws IOException, InterruptedException { if (incomingBuffer.remaining() != 0) { // have we read length bytes? int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from client sessionid 0x" + Long.toHexString(sessionId) + ", likely client has closed socket"); } } if (incomingBuffer.remaining() == 0) { // have we read length bytes? packetReceived(); incomingBuffer.flip(); // 复位 if (!initialized) { readConnectRequest(); // 读取连接请求 } else { readRequest(); } lenBuffer.clear(); incomingBuffer = lenBuffer; } }
private void readConnectRequest() throws IOException, InterruptedException { if (zkServer == null) { throw new IOException("ZooKeeperServer not running"); } zkServer.processConnectRequest(this, incomingBuffer); initialized = true; }
public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer)); ConnectRequest connReq = new ConnectRequest(); connReq.deserialize(bia, "connect"); // 反序列化请求 .... // 客户端设置的超时时间 int sessionTimeout = connReq.getTimeOut(); byte passwd[] = connReq.getPasswd(); int minSessionTimeout = getMinSessionTimeout(); if (sessionTimeout < minSessionTimeout) { sessionTimeout = minSessionTimeout; } // 服务端设置的最大超时时间 int maxSessionTimeout = getMaxSessionTimeout(); if (sessionTimeout > maxSessionTimeout) { sessionTimeout = maxSessionTimeout; } cnxn.setSessionTimeout(sessionTimeout); // We don't want to receive any packets until we are sure that the // session is setup cnxn.disableRecv(); // 请求是否带上sessionid long sessionId = connReq.getSessionId(); if (sessionId != 0) { // 请求带了sessionid long clientSessionId = connReq.getSessionId(); LOG.info("Client attempting to renew session 0x" + Long.toHexString(clientSessionId) + " at " + cnxn.getRemoteSocketAddress()); // 关闭请求 serverCnxnFactory.closeSession(sessionId); cnxn.setSessionId(sessionId); // 重新打开请求 reopenSession(cnxn, sessionId, passwd, sessionTimeout); } else { LOG.info("Client attempting to establish new session at " + cnxn.getRemoteSocketAddress()); // 创建新sesssion createSession(cnxn, passwd, sessionTimeout); } }
- 将读取出来的incomingBuffer反序列化为ConnectRequest对象
- 然后设置超时时间,ServerCnxn接收到该申请后,根据客户端传递过来的sessionTimeout时间以及ZooKeeperServer本身的minSessionTimeout、maxSessionTimeout参数,确定最终的sessionTimeout时间
- 判断客户端的请求是否已经含有sessionId
- 如果含有,则执行sessionId的是否过期、密码是否正确等检查
- 如果没有sessionId,则创建一个session
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) { long sessionId = sessionTracker.createSession(timeout); Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd); ByteBuffer to = ByteBuffer.allocate(4); to.putInt(timeout); cnxn.setSessionId(sessionId); submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null); return sessionId; }
- 使用sessionTracker生成一个sessionId
- submitRequest构建一个Request请求,请求的类型为OpCode.createSession
private void submitRequest(ServerCnxn cnxn, long sessionId, int type, int xid, ByteBuffer bb, ListauthInfo) { Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo); submitRequest(si); } public void submitRequest(Request si) { if (firstProcessor == null) { synchronized (this) { try { // Since all requests are passed to the request // processor it should wait for setting up the request // processor chain. The state will be updated to RUNNING // after the setup. while (state == State.INITIAL) { wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (firstProcessor == null || state != State.RUNNING) { throw new RuntimeException("Not started"); } } } try { touch(si.cnxn); boolean validpacket = Request.isValid(si.type); if (validpacket) { firstProcessor.processRequest(si); if (si.cnxn != null) { incInProcess(); } } else { LOG.warn("Received packet at server of unknown type " + si.type); new UnimplementedRequestProcessor().processRequest(si); } } catch (MissingSessionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Dropping request: " + e.getMessage()); } } catch (RequestProcessorException e) { LOG.error("Unable to process request:" + e.getMessage(), e); } }
- 创建一个Request
- 等待firstProcessor创建完成,然后调用firstProcessor.processRequest
2.5.3 zk服务器启动
再次回到startup, setZooKeeperServer(zks),代码很简单
final public void setZooKeeperServer(ZooKeeperServer zk) { this.zkServer = zk; if (zk != null) { zk.setServerCnxnFactory(this); } }
public void startdata() throws IOException, InterruptedException { //check to see if zkDb is not null if (zkDb == null) { zkDb = new ZKDatabase(this.txnLogFactory); } if (!zkDb.isInitialized()) { loadData(); } }
public synchronized void startup() { if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); setupRequestProcessors(); registerJMX(); setState(State.RUNNING); notifyAll(); }
- createSessionTracker创建sessionTracker
- startSessionTracker启动SessionTracker
- setupRequestProcessors 创建请求处理器链
- registerJMX 注册JMX
- setState(State.RUNNING) 设置状态为运行中
This is the basic interface that ZooKeeperServer uses to track sessions.
protected void createSessionTracker() { sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, 1, getZooKeeperServerListener()); } protected void startSessionTracker() { ((SessionTrackerImpl)sessionTracker).start(); }
2.5.4 ZooKeeperServer请求处理器链介绍
protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); ((SyncRequestProcessor)syncProcessor).start(); firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor)firstProcessor).start(); }
- 首先是PrepRequestProcessor
- 然后是SyncRequestProcessor
- 最后是finalProcessor
RequestProcessors are chained together to process transactions.
public interface RequestProcessor { @SuppressWarnings("serial") public static class RequestProcessorException extends Exception { public RequestProcessorException(String msg, Throwable t) { super(msg, t); } } void processRequest(Request request) throws RequestProcessorException; void shutdown();}
RequestProcessor nextProcessor; ZooKeeperServer zks; public PrepRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) { super("ProcessThread(sid:" + zks.getServerId() + " cport:" + zks.getClientPort() + "):", zks.getZooKeeperServerListener()); this.nextProcessor = nextProcessor; this.zks = zks; } protected void pRequest(Request request) throws RequestProcessorException { …… nextProcessor.processRequest(request); }
public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor { LinkedBlockingQueuesubmittedRequests = new LinkedBlockingQueue (); RequestProcessor nextProcessor; }
- 继承自ZooKeeperCriticalThread,是一个Thread
- 重要属性submittedRequests 是一个LinkedBlockingQueue,LinkedBlockingQueue实现是线程安全的,实现了先进先出特性,是作为生产者消费者的首选。
public void processRequest(Request request) { // request.addRQRec(">prep="+zks.outstandingChanges.size()); submittedRequests.add(request); }
public void run() { while (true) { // 取出一个请求 Request request = submittedRequests.take(); if (Request.requestOfDeath == request) { break; } // 处理请求 pRequest(request); } }
根据request的type,构造对应的请求,对于增删改等影响数据状态的操作都被认为是事务(txn:transaction) ,需要创建出事务请求头(hdr),调用pRequest2Txn,其他操作则不属于事务操作,需要验证下sessionId是否合法。
//create/close session don't require request record case OpCode.createSession: case OpCode.closeSession: pRequest2Txn(request.type, zks.getNextZxid(), request, null, true); break; //All the rest don't need to create a Txn - just verify session case OpCode.sync: case OpCode.exists: case OpCode.getData: case OpCode.getACL: case OpCode.getChildren: case OpCode.getChildren2: case OpCode.ping: case OpCode.setWatches: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); break;
pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true); protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException { request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, zks.getTime(), type); switch (type) { case OpCode.create: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); CreateRequest createRequest = (CreateRequest)record; if(deserialize) ByteBufferInputStream.byteBuffer2Record(request.request, createRequest); String path = createRequest.getPath(); int lastSlash = path.lastIndexOf('/'); if (lastSlash == -1 || path.indexOf('\0') != -1 || failCreate) { LOG.info("Invalid path " + path + " with session 0x" + Long.toHexString(request.sessionId)); throw new KeeperException.BadArgumentsException(path); } ListlistACL = removeDuplicates(createRequest.getAcl()); if (!fixupACL(request.authInfo, listACL)) { throw new KeeperException.InvalidACLException(path); } String parentPath = path.substring(0, lastSlash); ChangeRecord parentRecord = getRecordForPath(parentPath); checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo); int parentCVersion = parentRecord.stat.getCversion(); CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags()); if (createMode.isSequential()) { path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion); } validatePath(path, request.sessionId); try { if (getRecordForPath(path) != null) { throw new KeeperException.NodeExistsException(path); } } catch (KeeperException.NoNodeException e) { // ignore this one } boolean ephemeralParent = parentRecord.stat.getEphemeralOwner() != 0; if (ephemeralParent) { throw new KeeperException.NoChildrenForEphemeralsException(path); } int newCversion = parentRecord.stat.getCversion()+1; request.txn = new CreateTxn(path, createRequest.getData(), listACL, createMode.isEphemeral(), newCversion); StatPersisted s = new StatPersisted(); if (createMode.isEphemeral()) { s.setEphemeralOwner(request.sessionId); } parentRecord = parentRecord.duplicate(request.hdr.getZxid()); parentRecord.childCount++; parentRecord.stat.setCversion(newCversion); addChangeRecord(parentRecord); addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s, 0, listACL)); break;
- 首先是 zks.getNextZxid()创建一个事务id,AtomicLong hzxid是自增长id,初始化为0,每次加一
- 在pRequest2Txn内部,先给request创建一个TxnHeader,这个header包含事务id
- 然后判断请求类型
- zks.sessionTracker.checkSession(request.sessionId, request.getOwner()) 检查session
- 反序列化为CreateRequest
出处:jqpeng的技术记事本-- 您的支持是对博主最大的鼓励,感谢您的认真阅读。 本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。