从源码的角度再学「Thread」

原创: zhangshaolin 张少林同学
微信号: zhangshaolin_tonxue

功能介绍 分享

前言

Java中的线程是使用Thread类实现的,Thread在初学Java的时候就学过了,也在实践中用过,不过一直没从源码的角度去看过它的实现,今天从源码的角度出发,再次学习Java Thread,愿此后对Thread的实践更加得心应手。

从注释开始

相信阅读过JDK源码的同学都能感受到JDK源码中有非常详尽的注释,阅读某个类的源码应当先看看注释对它的介绍,注释原文就不贴了,以下是我对它的总结:

Thread是程序中执行的线程,Java虚拟机允许应用程序同时允许多个执行线程

每个线程都有优先级的概念,具有较高优先级的线程优先于优先级较低的线程执行

每个线程都可以被设置为守护线程

当在某个线程中运行的代码创建一个新的Thread对象时,新的线程优先级跟创建线程一致

当Java虚拟机启动的时候都会启动一个叫做main的线程,它没有守护线程,main线程会继续执行,直到以下情况发送

Runtime 类的退出方法exit被调用并且安全管理器允许进行退出操作

所有非守护线程均已死亡,或者run方法执行结束正常返回结果,或者run方法抛出异常

创建线程第一种方式:继承Thread类,重写run方法

1 //定义线程类 2 class PrimeThread extends Thread { 3 long minPrime; 4 PrimeThread(long minPrime) { 5 this.minPrime = minPrime; 6 } 7 public void run() { 8 // compute primes larger than minPrime 9  . . . 10 } 11 } 12 //启动线程 13 PrimeThread p = new PrimeThread(143); 14 p.start();

创建线程第二种方式:实现Runnable接口,重写run方法,因为Java的单继承限制,通常使用这种方式创建线程更加灵活

1 //定义线程 2 class PrimeRun implements Runnable { 3 long minPrime; 4 PrimeRun(long minPrime) { 5 this.minPrime = minPrime; 6 } 7 public void run() { 8 // compute primes larger than minPrime 9  . . . 10 } 11 } 12 //启动线程 13 PrimeRun p = new PrimeRun(143); 14 new Thread(p).start();

创建线程时可以给线程指定名字,如果没有指定,会自动为它生成名字

除非另有说明,否则将null参数传递给Thread类中的构造函数或方法将导致抛出 NullPointerException

Thread 常用属性

阅读一个Java类,先从它拥有哪些属性入手:

1 //线程名称,创建线程时可以指定线程的名称 2 private volatile String name; 3 4 //线程优先级,可以设置线程的优先级 5 private int priority; 6 7 //可以配置线程是否为守护线程,默认为false 8 private boolean daemon = false; 9 10 //最终执行线程任务的`Runnable` 11 private Runnable target; 12 13 //描述线程组的类 14 private ThreadGroup group; 15 16 //此线程的上下文ClassLoader 17 private ClassLoader contextClassLoader; 18 19 //所有初始化线程的数目,用于自动编号匿名线程,当没有指定线程名称时,会自动为其编号 20 private static int threadInitNumber; 21 22 //此线程请求的堆栈大小,如果创建者没有指定堆栈大小,则为0。, 虚拟机可以用这个数字做任何喜欢的事情。, 一些虚拟机会忽略它。 23 private long stackSize; 24 25 //线程id 26 private long tid; 27 28 //用于生成线程ID 29 private static long threadSeqNumber; 30 31 //线程状态 32 private volatile int threadStatus = 0; 33 34 //线程可以拥有的最低优先级 35 public final static int MIN_PRIORITY = 1; 36 37 //分配给线程的默认优先级。 38 public final static int NORM_PRIORITY = 5; 39 40 //线程可以拥有的最大优先级 41 public final static int MAX_PRIORITY = 10; Thread 构造方法

了解了属性之后,看看Thread实例是怎么构造的?先预览下它大致有多少个构造方法:

查看每个构造方法内部源码,发现均调用的是名为init的私有方法,再看init方法有两个重载,而其核心方法如下:

1 /** 2 * Initializes a Thread. 3 * 4 * @param g 线程组 5 * @param target 最终执行任务的 `run()` 方法的对象 6 * @param name 新线程的名称 7 * @param stackSize 新线程所需的堆栈大小,或者 0 表示要忽略此参数 8 * @param acc 要继承的AccessControlContext,如果为null,则为 AccessController.getContext() 9 * @param inheritThreadLocals 如果为 true,从构造线程继承可继承的线程局部的初始值 10 */ 11 private void init(ThreadGroup g, Runnable target, String name, 12 long stackSize, AccessControlContext acc, 13 boolean inheritThreadLocals) { 14 //线程名称为空,直接抛出空指针异常 15 if (name == null) { 16 throw new NullPointerException("name cannot be null"); 17 } 18 //初始化当前线程对象的线程名称 19 this.name = name; 20 //获取当前正在执行的线程为父线程 21 Thread parent = currentThread(); 22 //获取系统安全管理器 23 SecurityManager security = System.getSecurityManager(); 24 //如果线程组为空 25 if (g == null) { 26 //如果安全管理器不为空 27 if (security != null) { 28 //获取SecurityManager中的线程组 29 g = security.getThreadGroup(); 30 } 31 //如果获取的线程组还是为空 32 if (g == null) { 33 //则使用父线程的线程组 34 g = parent.getThreadGroup(); 35 } 36 } 37 38 //检查安全权限 39 g.checkAccess(); 40 41 //使用安全管理器检查是否有权限 42 if (security != null) { 43 if (isCCLOverridden(getClass())) { 44 security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION); 45 } 46 } 47 48 //线程组中标记未启动的线程数+1,这里方法是同步的,防止出现线程安全问题 49 g.addUnstarted(); 50 51 //初始化当前线程对象的线程组 52 this.group = g; 53 //初始化当前线程对象的是否守护线程属性,注意到这里初始化时跟父线程一致 54 this.daemon = parent.isDaemon(); 55 //初始化当前线程对象的线程优先级属性,注意到这里初始化时跟父线程一致 56 this.priority = parent.getPriority(); 57 //这里初始化类加载器 58 if (security == null || isCCLOverridden(parent.getClass())) 59 this.contextClassLoader = parent.getContextClassLoader(); 60 else 61 this.contextClassLoader = parent.contextClassLoader; 62 this.inheritedAccessControlContext = 63 acc != null ? acc : AccessController.getContext(); 64 //初始化当前线程对象的最终执行任务对象 65 this.target = target; 66 //这里再对线程的优先级字段进行处理 67 setPriority(priority); 68 if (inheritThreadLocals && parent.inheritableThreadLocals != null) 69 this.inheritableThreadLocals = 70 ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); 71 //初始化当前线程对象的堆栈大小 72 this.stackSize = stackSize; 73 74 //初始化当前线程对象的线程ID,该方法是同步的,内部实际上是threadSeqNumber++ 75 tid = nextThreadID(); 76 }

另一个重载init私有方法如下,实际上内部调用的是上述init方法:

1 private void init(ThreadGroup g, Runnable target, String name, 2 long stackSize) { 3 init(g, target, name, stackSize, null, true); 4 }

接下来看看所有构造方法:

空构造方法

1 public Thread() { 2 init(null, null, "Thread-" + nextThreadNum(), 0); 3 }

内部调用的是init第二个重载方法,参数基本都是默认值,线程名称写死为"Thread-" + nextThreadNum()格式,nextThreadNum()为一个同步方法,内部维护一个静态属性表示线程的初始化数量+1:

1 private static int threadInitNumber; 2 private static synchronized int nextThreadNum() { 3 return threadInitNumber++; 4 }

与第一个构造方法区别在于可以自定义Runnable对象

自定义执行任务Runnable对象的构造方法

1 private static int threadInitNumber; 2 private static synchronized int nextThreadNum() { 3 return threadInitNumber++; 4 }

自定义执行任务Runnable对象和AccessControlContext对象的构造方法

1 Thread(Runnable target, AccessControlContext acc) { 2 init(null, target, "Thread-" + nextThreadNum(), 0, acc, false); 3 }

自定义线程组ThreadGroup和执行任务Runnable对象的构造方法

1 public Thread(ThreadGroup group, Runnable target) { 2 init(group, target, "Thread-" + nextThreadNum(), 0); 3 }

自定义线程名称name的构造方法

1 public Thread(String name) { 2 init(null, null, name, 0); 3 }

自定义线程组ThreadGroup和线程名称name的构造方法

1 public Thread(String name) { 2 init(null, null, name, 0); 3 }

自定义执行任务Runnable对象和线程名称name的构造方法

1 public Thread(Runnable target, String name) { 2 init(null, target, name, 0); 3 }

自定义线程组ThreadGroup和线程名称name和执行任务Runnable对象的构造方法

1 public Thread(ThreadGroup group, Runnable target, String name) { 2 init(group, target, name, 0); 3 }

全部属性都是自定义的构造方法

1 public Thread(ThreadGroup group, Runnable target, String name, 2 long stackSize) { 3 init(group, target, name, stackSize); 4 }

Thread提供了非常灵活的重载构造方法,方便开发者自定义各种参数的Thread对象。

常用方法

这里记录一些比较常见的方法吧,对于Thread中存在的一些本地方法,我们暂且不用管它~

设置线程名称

设置线程名称,该方法为同步方法,为了防止出现线程安全问题,可以手动调用Thread的实例方法设置名称,也可以在构造Thread时在构造方法中传入线程名称,我们通常都是在构造参数时设置

1 public final synchronized void setName(String name) { 2   //检查安全权限 3 checkAccess(); 4   //如果形参为空,抛出空指针异常 5 if (name == null) { 6 throw new NullPointerException("name cannot be null"); 7 } 8 //给当前线程对象设置名称 9 this.name = name; 10 if (threadStatus != 0) { 11 setNativeName(name); 12 } 13 } 获取线程名称

内部直接返回当前线程对象的名称属性

1 public final String getName() { 2 return name; 3 } 启动线程 1 public synchronized void start() { 2 //如果不是刚创建的线程,抛出异常 3 if (threadStatus != 0) 4 throw new IllegalThreadStateException(); 5 6 //通知线程组,当前线程即将启动,线程组当前启动线程数+1,未启动线程数-1 7 group.add(this); 8 9 //启动标识 10 boolean started = false; 11 try { 12 //直接调用本地方法启动线程 13 start0(); 14 //设置启动标识为启动成功 15 started = true; 16 } finally { 17 try { 18 //如果启动呢失败 19 if (!started) { 20 //线程组内部移除当前启动的线程数量-1,同时启动失败的线程数量+1 21 group.threadStartFailed(this); 22 } 23 } catch (Throwable ignore) { 24 /* do nothing. If start0 threw a Throwable then 25 it will be passed up the call stack */ 26 } 27 } 28 }

我们正常的启动线程都是调用Thread的start()方法,然后Java虚拟机内部会去调用Thred的run方法,可以看到Thread类也是实现Runnable接口,重写了run方法的:

1 @Override 2 public void run() { 3 //当前执行任务的Runnable对象不为空,则调用其run方法 4 if (target != null) { 5 target.run(); 6 } 7 }

Thread的两种使用方式:

继承Thread类,重写run方法,那么此时是直接执行run方法的逻辑,不会使用target.run();

实现Runnable接口,重写run方法,因为Java的单继承限制,通常使用这种方式创建线程更加灵活,这里真正的执行逻辑就会交给自定义Runnable去实现

设置守护线程

本质操作是设置daemon属性

1 public final void setDaemon(boolean on) { 2 //检查是否有安全权限 3 checkAccess(); 4 //本地方法,测试此线程是否存活。, 如果一个线程已经启动并且尚未死亡,则该线程处于活动状态 5 if (isAlive()) { 6 //如果线程先启动后再设置守护线程,将抛出异常 7 throw new IllegalThreadStateException(); 8 } 9 //设置当前守护线程属性 10 daemon = on; 11 } 判断线程是否为守护线程 1 public final boolean isDaemon() { 2 //直接返回当前对象的守护线程属性 3 return daemon; 4 } 线程状态

先来个线程状态图:

获取线程状态:

1 public State getState() { 2 //由虚拟机实现,获取当前线程的状态 3 return sun.misc.VM.toThreadState(threadStatus); 4 }

线程状态主要由内部枚举类State组成:

1 public enum State { 2 3 NEW, 4 5 6 RUNNABLE, 7 8 9 BLOCKED, 10 11 12 WAITING, 13 14 15 TIMED_WAITING, 16 17 18 TERMINATED; 19 }

NEW:刚刚创建,尚未启动的线程处于此状态

RUNNABLE:在Java虚拟机中执行的线程处于此状态

BLOCKED:被阻塞等待监视器锁的线程处于此状态,比如线程在执行过程中遇到synchronized同步块,就会进入此状态,此时线程暂停执行,直到获得请求的锁

WAITING:无限期等待另一个线程执行特定操作的线程处于此状态

通过 wait() 方法等待的线程在等待 notify() 方法

通过 join() 方法等待的线程则会等待目标线程的终止

TIMED_WAITING:正在等待另一个线程执行动作,直到指定等待时间的线程处于此状态

通过 wait() 方法,携带超时时间,等待的线程在等待 notify() 方法

通过 join() 方法,携带超时时间,等待的线程则会等待目标线程的终止

TERMINATED:已退出的线程处于此状态,此时线程无法再回到 RUNNABLE 状态

线程休眠

这是一个静态的本地方法,使当前执行的线程休眠暂停执行 millis 毫秒,当休眠被中断时会抛出InterruptedException中断异常

1 /** 2 * Causes the currently executing thread to sleep (temporarily cease 3 * execution) for the specified number of milliseconds, subject to 4 * the precision and accuracy of system timers and schedulers. The thread 5 * does not lose ownership of any monitors. 6 * 7 * @param millis 8 * the length of time to sleep in milliseconds 9 * 10 * @throws IllegalArgumentException 11 * if the value of {@code millis} is negative 12 * 13 * @throws InterruptedException 14 * if any thread has interrupted the current thread. The 15 * <i>interrupted status</i> of the current thread is 16 * cleared when this exception is thrown. 17 */ 18 public static native void sleep(long millis) throws InterruptedException; 检查线程是否存活

本地方法,测试此线程是否存活。 如果一个线程已经启动并且尚未死亡,则该线程处于活动状态。

1 /** 2 * Tests if this thread is alive. A thread is alive if it has 3 * been started and has not yet died. 4 * 5 * @return <code>true</code> if this thread is alive; 6 * <code>false</code> otherwise. 7 */ 8 public final native boolean isAlive(); 线程优先级

设置线程优先级

1 /** 2 * Changes the priority of this thread. 3 * <p> 4 * First the <code>checkAccess</code> method of this thread is called 5 * with no arguments. This may result in throwing a 6 * <code>SecurityException</code>. 7 * <p> 8 * Otherwise, the priority of this thread is set to the smaller of 9 * the specified <code>newPriority</code> and the maximum permitted 10 * priority of the thread's thread group. 11 * 12 * @param newPriority priority to set this thread to 13 * @exception IllegalArgumentException If the priority is not in the 14 * range <code>MIN_PRIORITY</code> to 15 * <code>MAX_PRIORITY</code>. 16 * @exception SecurityException if the current thread cannot modify 17 * this thread. 18 * @see #getPriority 19 * @see #checkAccess() 20 * @see #getThreadGroup() 21 * @see #MAX_PRIORITY 22 * @see #MIN_PRIORITY 23 * @see ThreadGroup#getMaxPriority() 24 */ 25 public final void setPriority(int newPriority) { 26 //线程组 27 ThreadGroup g; 28 //检查安全权限 29 checkAccess(); 30 //检查优先级形参范围 31 if (newPriority > MAX_PRIORITY || newPriority < MIN_PRIORITY) { 32 throw new IllegalArgumentException(); 33 } 34 if((g = getThreadGroup()) != null) { 35 //如果优先级形参大于线程组最大线程最大优先级 36 if (newPriority > g.getMaxPriority()) { 37 //则使用线程组的优先级数据 38 newPriority = g.getMaxPriority(); 39 } 40 //调用本地设置线程优先级方法 41 setPriority0(priority = newPriority); 42 } 43 } 线程中断

有一个stop()实例方法可以强制终止线程,不过这个方法因为太过于暴力,已经被标记为过时方法,不建议程序员再使用,因为强制终止线程会导致数据不一致的问题。

这里关于线程中断的方法涉及三个:

1 //实例方法,通知线程中断,设置标志位 2 public void interrupt(){} 3 //静态方法,检查当前线程的中断状态,同时会清除当前线程的中断标志位状态 4 public static boolean interrupted(){} 5 //实例方法,检查当前线程是否被中断,其实是检查中断标志位 6 public boolean isInterrupted(){}

interrupt() 方法解析

1 /** 2 * Interrupts this thread. 3 * 4 * <p> Unless the current thread is interrupting itself, which is 5 * always permitted, the {@link #checkAccess() checkAccess} method 6 * of this thread is invoked, which may cause a {@link 7 * SecurityException} to be thrown. 8 * 9 * <p> If this thread is blocked in an invocation of the {@link 10 * Object#wait() wait()}, {@link Object#wait(long) wait(long)}, or {@link 11 * Object#wait(long, int) wait(long, int)} methods of the {@link Object} 12 * class, or of the {@link #join()}, {@link #join(long)}, {@link 13 * #join(long, int)}, {@link #sleep(long)}, or {@link #sleep(long, int)}, 14 * methods of this class, then its interrupt status will be cleared and it 15 * will receive an {@link InterruptedException}. 16 * 17 * <p> If this thread is blocked in an I/O operation upon an {@link 18 * java.nio.channels.InterruptibleChannel InterruptibleChannel} 19 * then the channel will be closed, the thread's interrupt 20 * status will be set, and the thread will receive a {@link 21 * java.nio.channels.ClosedByInterruptException}. 22 * 23 * <p> If this thread is blocked in a {@link java.nio.channels.Selector} 24 * then the thread's interrupt status will be set and it will return 25 * immediately from the selection operation, possibly with a non-zero 26 * value, just as if the selector's {@link 27 * java.nio.channels.Selector#wakeup wakeup} method were invoked. 28 * 29 * <p> If none of the previous conditions hold then this thread's interrupt 30 * status will be set. </p> 31 * 32 * <p> Interrupting a thread that is not alive need not have any effect. 33 * 34 * @throws SecurityException 35 * if the current thread cannot modify this thread 36 * 37 * @revised 6.0 38 * @spec JSR-51 39 */ 40 public void interrupt() { 41 //检查是否是自身调用 42 if (this != Thread.currentThread()) 43 //检查安全权限,这可能导致抛出{@link * SecurityException}。 44 checkAccess(); 45 46 //同步代码块 47 synchronized (blockerLock) { 48 Interruptible b = blocker; 49 //检查是否是阻塞线程调用 50 if (b != null) { 51 //设置线程中断标志位 52 interrupt0(); 53 //此时抛出异常,将中断标志位设置为false,此时我们正常会捕获该异常,重新设置中断标志位 54 b.interrupt(this); 55 return; 56 } 57 } 58 //如无意外,则正常设置中断标志位 59 interrupt0(); 60 }

线程中断方法不会使线程立即退出,而是给线程发送一个通知,告知目标线程,有人希望你退出啦~

只能由自身调用,否则可能会抛出 SecurityException

调用中断方法是由目标线程自己决定是否中断,而如果同时调用了wait,join,sleep等方法,会使当前线程进入阻塞状态,此时有可能发生InterruptedException异常

被阻塞的线程再调用中断方法是不合理的

中断不活动的线程不会产生任何影响

检查线程是否被中断:

1 /** 2 * Tests whether this thread has been interrupted. The <i>interrupted 3 * status</i> of the thread is unaffected by this method. 4 5 测试此线程是否已被中断。, 线程的<i>中断*状态</ i>不受此方法的影响。 6 * 7 * <p>A thread interruption ignored because a thread was not alive 8 * at the time of the interrupt will be reflected by this method 9 * returning false. 10 * 11 * @return <code>true</code> if this thread has been interrupted; 12 * <code>false</code> otherwise. 13 * @see #interrupted() 14 * @revised 6.0 15 */ 16 public boolean isInterrupted() { 17 return isInterrupted(false); 18 }

静态方法,会清空当前线程的中断标志位:

1 /** 2 *测试当前线程是否已被中断。, 此方法清除线程的* <i>中断状态</ i>。, 换句话说,如果要连续两次调用此方法,则* second调用将返回false(除非当前线程再次被中断,在第一次调用已清除其中断的*状态 之后且在第二次调用已检查之前), 它) 3 * 4 * <p>A thread interruption ignored because a thread was not alive 5 * at the time of the interrupt will be reflected by this method 6 * returning false. 7 * 8 * @return <code>true</code> if the current thread has been interrupted; 9 * <code>false</code> otherwise. 10 * @see #isInterrupted() 11 * @revised 6.0 12 */ 13 public static boolean interrupted() { 14 return currentThread().isInterrupted(true); 15 } 总结

记录自己阅读Thread类源码的一些思考,不过对于其中用到的很多本地方法只能望而却步,还有一些代码没有看明白,暂且先这样吧,如果有不足之处,请留言告知我,谢谢!后续会在实践中对Thread做出更多总结记录。

read more
JVM内存结构:堆、栈、方法区

一、定义

1、堆:FIFO队列优先,先进先出。jvm只有一个堆区被所有线程所共享!堆存放在二级缓存中,调用对象的速度相对慢一些,生命周期由虚拟机的垃圾回收机制定。

2、栈:FILO先进后出,暂存数据的地方。每个线程都包含一个栈区!栈存放在一级缓存中,存取速度较快,“栈是限定仅在表头进行插入和删除操作的线性表”。

3、方法区:用来存放方法和static变量。

二、存储的数据类型

1、堆用来存储new出来的对象和数组

2、栈用来存储基本类型变量和对象的引用变量的地址

3、方法区存储方法和static变量

三、优缺点

1、堆的优点-可以动态的分配内存大小,生命周期不确定。缺点-速度略慢

2、栈的优点-速度快,缺点-存在栈中的数据大小和生命周期必须是明确的,缺少灵活性。

四、直接内存

直接内存并不是虚拟机运行时数据区的一部分,也不是Java 虚拟机规范中农定义的内存区域。在JDK1.4 中新加入了NIO(New Input/Output)类,引入了一种基于通道(Channel)与缓冲区(Buffer)的I/O 方式,它可以使用native 函数库直接分配堆外内存,然后通脱一个存储在Java堆中的DirectByteBuffer 对象作为这块内存的引用进行操作。这样能在一些场景中显著提高性能,因为避免了在Java堆和Native堆中来回复制数据。

本机直接内存的分配不会受到Java 堆大小的限制,受到本机总内存大小限制

配置虚拟机参数时,不要忽略直接内存 防止出现OutOfMemoryError异常

直接内存(堆外内存)与堆内存比较

直接内存申请空间耗费更高的性能,当频繁申请到一定量时尤为明显 直接内存IO读写的性能要优于普通的堆内存,在多次读写操作的情况下差异明显

代码验证:

package com.youyuan.web.controller.user; import java.nio.ByteBuffer; /** * 直接内存 与 堆内存的比较 */ public class ByteBufferCompare { public static void main(String[] args) { allocateCompare(); //分配比较 operateCompare(); //读写比较 } /** * 直接内存 和 堆内存的 分配空间比较 * <p> * 结论: 在数据量提升时,直接内存相比非直接内的申请,有很严重的性能问题 */ public static void allocateCompare() { int time = 10000000; //操作次数 long st = System.currentTimeMillis(); for (int i = 0; i < time; i++) { //ByteBuffer.allocate(int capacity) 分配一个新的字节缓冲区。 ByteBuffer buffer = ByteBuffer.allocate(2); //非直接内存分配申请 } long et = System.currentTimeMillis(); System.out.println("在进行" + time + "次分配操作时,堆内存 分配耗时:" + (et - st) + "ms"); long st_heap = System.currentTimeMillis(); for (int i = 0; i < time; i++) { //ByteBuffer.allocateDirect(int capacity) 分配新的直接字节缓冲区。 ByteBuffer buffer = ByteBuffer.allocateDirect(2); //直接内存分配申请 } long et_direct = System.currentTimeMillis(); System.out.println("在进行" + time + "次分配操作时,直接内存 分配耗时:" + (et_direct - st_heap) + "ms"); } /** * 直接内存 和 堆内存的 读写性能比较 * <p> * 结论:直接内存在直接的IO 操作上,在频繁的读写时 会有显著的性能提升 */ public static void operateCompare() { int time = 1000000000; ByteBuffer buffer = ByteBuffer.allocate(2 * time); long st = System.currentTimeMillis(); for (int i = 0; i < time; i++) { // putChar(char value) 用来写入 char 值的相对 put 方法 buffer.putChar('a'); } buffer.flip(); for (int i = 0; i < time; i++) { buffer.getChar(); } long et = System.currentTimeMillis(); System.out.println("在进行" + time + "次读写操作时,非直接内存读写耗时:" + (et - st) + "ms"); ByteBuffer buffer_d = ByteBuffer.allocateDirect(2 * time); long st_direct = System.currentTimeMillis(); for (int i = 0; i < time; i++) { // putChar(char value) 用来写入 char 值的相对 put 方法 buffer_d.putChar('a'); } buffer_d.flip(); for (int i = 0; i < time; i++) { buffer_d.getChar(); } long et_direct = System.currentTimeMillis(); System.out.println("在进行" + time + "次读写操作时,直接内存读写耗时:" + (et_direct - st_direct) + "ms"); } }

输出:
在进行10000000次分配操作时,堆内存 分配耗时:12ms
在进行10000000次分配操作时,直接内存 分配耗时:8233ms
在进行1000000000次读写操作时,非直接内存读写耗时:4055ms
在进行1000000000次读写操作时,直接内存读写耗时:745ms

可以自己设置不同的time 值进行比较

分析

从数据流的角度,来看

非直接内存作用链:
本地IO –>直接内存–>非直接内存–>直接内存–>本地IO
直接内存作用链:
本地IO–>直接内存–>本地IO

直接内存使用场景

有很大的数据需要存储,它的生命周期很长 适合频繁的IO操作,例如网络并发场景

参考

《深入理解Java虚拟机》 –周志明

博文:https://www.cnblogs.com/xing901022/p/5243657.html (rel=undefined)

read more
Java直接内存与非直接内存性能测试
什么是直接内存与非直接内存

根据官方文档的描述:

A byte bufferis either direct or non-direct. Given a direct byte buffer, the Java virtual machine will make a best effort to perform native I/O operations directly upon it. That is, it will attempt to avoid copying the buffer's content to (or from) an intermediate buffer before (orafter) each invocation of one of the underlying operating system's native I/O operations.

byte byffer可以是两种类型,一种是基于直接内存(也就是非堆内存);另一种是非直接内存(也就是堆内存)。

对于直接内存来说,JVM将会在IO操作上具有更高的性能,因为它直接作用于本地系统的IO操作。而非直接内存,也就是堆内存中的数据,如果要作IO操作,会先复制到直接内存,再利用本地IO处理。

从数据流的角度,非直接内存是下面这样的作用链:

本地IO-->直接内存-->非直接内存-->直接内存-->本地IO

而直接内存是:

本地IO-->直接内存-->本地IO

很明显,再做IO处理时,比如网络发送大量数据时,直接内存会具有更高的效率。

A direct byte buffer may be created by invoking the allocateDirect factory method of this class. The buffers returned by this method typically have somewhat higher allocation and deallocation costs than non-direct buffers. The contents of direct buffers may reside outside of the normal garbage-collected heap, and so their impact upon the memory footprint of an application might not be obvious. It is therefore recommended that direct buffers be allocated primarily for large, long-lived buffers that are subject to the underlying system's native I/O operations. In general it is best to allocate direct buffers only when they yield a measureable gain in program performance.

但是,不要高兴的太早。文档中也说了,直接内存使用allocateDirect创建,但是它比申请普通的堆内存需要耗费更高的性能。不过,这部分的数据是在JVM之外的,因此它不会占用应用的内存。

所以呢,当你有很大的数据要缓存,并且它的生命周期又很长,那么就比较适合使用直接内存。只是一般来说,如果不是能带来很明显的性能提升,还是推荐直接使用堆内存。

关于直接内存需要注意的,就是上面两点了,其他的关于视图啊、作用链啊,都是使用上的问题了。如果有兴趣,可以参考官方API ( 进去后搜索ByteBuffer,就能看到!),里面有少量的描述!重要的一些用法,还得自己摸索。

使用场景

通过上面的官方文档,与一些资料的搜索。可以总结下,直接内存的使用场景:

1 有很大的数据需要存储,它的生命周期又很长 2 适合频繁的IO操作,比如网络并发场景 申请分配地址速度比较

下面用一段简单的代码,测试下申请内存空间的速度:

inttime = 10000000; Date begin = newDate(); for(int i=0;i<time;i++){ ByteBuffer buffer = ByteBuffer.allocate(2); } Dateend = newDate(); System.out.println(end.getTime()-begin.getTime()); begin = newDate(); for(int i=0;i<time;i++){ ByteBuffer buffer = ByteBuffer.allocateDirect(2); } end = newDate(); System.out.println(end.getTime()-begin.getTime());

得到的测试结果如下:

在数据量提升时,直接内存相比于非直接内存的申请 有十分十分十分明显的性能问题!

读写速度比较

然后在写段代码,测试下读写的速度:

inttime = 1000; Date begin = newDate(); ByteBuffer buffer = ByteBuffer.allocate(2*time); for(int i=0;i<time;i++){ buffer.putChar('a'); } buffer.flip(); for(int i=0;i<time;i++){ buffer.getChar(); } Dateend = newDate(); System.out.println(end.getTime()-begin.getTime()); begin = newDate(); ByteBuffer buffer2 = ByteBuffer.allocateDirect(2*time); for(int i=0;i<time;i++){ buffer2.putChar('a'); } buffer2.flip(); for(int i=0;i<time;i++){ buffer2.getChar(); } end = newDate(); System.out.println(end.getTime()-begin.getTime());

测试的结果如下:

可以看到直接内存在直接的IO操作上,还是有明显的差异的!

作者:xingoo

出处:http://www.cnblogs.com/xing901022

read more
Spring 组合注解&注解继承
组合注解

被注解的注解称为组合注解。

1.好处

简单化注解配置,用很少的注解来标注特定含义的多个元注解 提供了很好的扩展性,可以根据实际需要灵活的自定义注解。

2 如何使用

(1)自定义一个组合注解

@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Configuration //实际上就是一个bean注解 @ComponentScan//自动扫描对应value(package路径)值下面的所有bean public @interface WiselyConfiguration { String[] value() default{};//可以不写,实际在@ComponentScan注解中已经定义过,所以可以拿过来直接用 }

(2)使用自定义注解

//@Configuration //属性注解 //@ComponentScan("com.gdb.spingboot.service") // 要扫描的bean路径 @WiselyConfiguration(value = "com.gdb.spingboot.service") //自定义注解,扫描的所有的bean来源于value值所对应的包路径下 public class ElConfig{ }

组合注解,和上面两个注解实现的功能完全一致,所以如果在繁琐的注解被多次使用的情况下,可以考虑自定义组合注解。

注解继承

@Inhberited注解可以让指定的注解在某个类上使用后,这个类的子类也将自动被该注解标记。

@Retention(RetentionPolicy.RUNTIME) @Inherited public @interface Hello { } public class AnnotationTest3{ //在基类上使用被@Inherited标记了的注解@Hello@HelloclassBase{} //派生类没有直接注解@HelloclassDerivedextendsBase{} public static void main(String[] args){ //派生类也会自动被注解@Hello标记。if( Derived.class.isAnnotationPresent(Hello.class)){ Hello hello = (Hello)Derived.class.getAnnotation(Hello.class); System.out.println("Hello"); } } } 注解的其它事项

(1)当注解中含有数组属性时,使用{}赋值,各个元素使用逗号分隔。

(2)注解的属性可以是另外一个注解。

(3)注解的属性可以是另外一个注解的数组。

(4)注解的默认属性是value,只有一个value属性时可以不写value=xxx,直接写值即可。

(5)注解的属性的默认值使用default来定义。

//定义注解 @Retention(RetentionPolicy.RUNTIME) public @interface SomeFuture { String value();//默认的属性int[] arrayValue(); //数组Hello helloValue();//是另外一个注解@Hello Hello[] helloArrayValue() default {@Hello,@Hello}; //注解的数组,而且提供默认值。 } //使用注解 @SomeFuture( value="hello", //默认属性 arrayValue={1,2,3},//数组属性 helloValue=@Hello,//属性是另外一个注解 helloArrayValue={@Hello,@Hello,@Hello}//属性是另外一个注解的数组。 ) public class AnnotationTest4{ }

read more
大厨小鲜——基于Netty自己动手实现RPC框架

今天我们要来做一道小菜,这道菜就是RPC通讯框架。它使用netty作为原料,fastjson序列化工具作为调料,来实现一个极简的多线程RPC服务框架。

我们暂且命名该RPC框架为rpckids。

食用指南

在告诉读者完整的制作菜谱之前,我们先来试试这个小菜怎么个吃法,好不好吃,是不是吃起来很方便。如果读者觉得很难吃,那后面的菜谱就没有多大意义了,何必花心思去学习制作一门谁也不爱吃的大烂菜呢?

例子中我会使用rpckids提供的远程RPC服务,用于计算斐波那契数和指数,客户端通过rpckids提供的RPC客户端向远程服务传送参数,并接受返回结果,然后呈现出来。你可以使用rpckids定制任意的业务rpc服务。

斐波那契数输入输出比较简单,一个Integer,一个Long。 指数输入有两个值,输出除了计算结果外还包含计算耗时,以纳秒计算。之所以包含耗时,只是为了呈现一个完整的自定义的输入和输出类。

指数服务自定义输入输出类 // 指数RPC的输入 public class ExpRequest { private int base; private int exp; // constructor & getter & setter } // 指数RPC的输出 public class ExpResponse { private long value; private long costInNanos; // constructor & getter & setter } 斐波那契和指数计算处理 public class FibRequestHandler implements IMessageHandler<Integer> { private List<Long> fibs = new ArrayList<>(); { fibs.add(1L); // fib(0) = 1 fibs.add(1L); // fib(1) = 1 } @Override public void handle(ChannelHandlerContext ctx, String requestId, Integer n) { for (int i = fibs.size(); i < n + 1; i++) { long value = fibs.get(i - 2) + fibs.get(i - 1); fibs.add(value); } // 输出响应 ctx.writeAndFlush(new MessageOutput(requestId, "fib_res", fibs.get(n))); } } public class ExpRequestHandler implements IMessageHandler<ExpRequest> { @Override public void handle(ChannelHandlerContext ctx, String requestId, ExpRequest message) { int base = message.getBase(); int exp = message.getExp(); long start = System.nanoTime(); long res = 1; for (int i = 0; i < exp; i++) { res *= base; } long cost = System.nanoTime() - start; // 输出响应 ctx.writeAndFlush(new MessageOutput(requestId, "exp_res", new ExpResponse(res, cost))); } } 构建RPC服务器

RPC服务类要监听指定IP端口,设定io线程数和业务计算线程数,然后注册斐波那契服务输入类和指数服务输入类,还有相应的计算处理器。

public class DemoServer { public static void main(String[] args) { RPCServer server = new RPCServer("localhost", 8888, 2, 16); server.service("fib", Integer.class, new FibRequestHandler()) .service("exp", ExpRequest.class, new ExpRequestHandler()); server.start(); } } 构建RPC客户端

RPC客户端要链接远程IP端口,并注册服务输出类(RPC响应类),然后分别调用20次斐波那契服务和指数服务,输出结果

public class DemoClient { private RPCClient client; public DemoClient(RPCClient client) { this.client = client; // 注册服务返回类型 this.client.rpc("fib_res", Long.class).rpc("exp_res", ExpResponse.class); } public long fib(int n) { return (Long) client.send("fib", n); } public ExpResponse exp(int base, int exp) { return (ExpResponse) client.send("exp", new ExpRequest(base, exp)); } public static void main(String[] args) { RPCClient client = new RPCClient("localhost", 8888); DemoClient demo = new DemoClient(client); for (int i = 0; i < 20; i++) { System.out.printf("fib(%d) = %d\n", i, demo.fib(i)); } for (int i = 0; i < 20; i++) { ExpResponse res = demo.exp(2, i); System.out.printf("exp2(%d) = %d cost=%dns\n", i, res.getValue(), res.getCostInNanos()); } } } 运行

先运行服务器,服务器输出如下,从日志中可以看到客户端链接过来了,然后发送了一系列消息,最后关闭链接走了。

server started @ localhost:8888 connection comes read a message read a message ... connection leaves

再运行客户端,可以看到一些列的计算结果都成功完成了输出。

fib(0) = 1 fib(1) = 1 fib(2) = 2 fib(3) = 3 fib(4) = 5 ... exp2(0) = 1 cost=559ns exp2(1) = 2 cost=495ns exp2(2) = 4 cost=524ns exp2(3) = 8 cost=640ns exp2(4) = 16 cost=711ns ... 牢骚

本以为是小菜一碟,但是编写完整的代码和文章却将近花费了一天的时间,深感写码要比做菜耗时太多了。因为只是为了教学目的,所以在实现细节上还有好多没有仔细去雕琢的地方。如果是要做一个开源项目,力求非常完美的话。至少还要考虑一下几点。

客户端连接池 多服务进程负载均衡 日志输出 参数校验,异常处理 客户端流量攻击 服务器压力极限

如果要参考grpc的话,还得实现流式响应处理。如果还要为了节省网络流量的话,又需要在协议上下功夫。这一大堆的问题还是抛给读者自己思考去吧。

关注公众号「码洞」,发送「RPC」即可获取以上完整菜谱的GitHub开源代码链接。读者有什么不明白的地方,洞主也会一一解答。

下面我们接着讲RPC服务器和客户端精细的制作过程

服务器菜谱

定义消息输入输出格式,消息类型、消息唯一ID和消息的json序列化字符串内容。消息唯一ID是用来客户端验证服务器请求和响应是否匹配。

public class MessageInput { private String type; private String requestId; private String payload; public MessageInput(String type, String requestId, String payload) { this.type = type; this.requestId = requestId; this.payload = payload; } public String getType() { return type; } public String getRequestId() { return requestId; } // 因为我们想直接拿到对象,所以要提供对象的类型参数 public <T> T getPayload(Class<T> clazz) { if (payload == null) { return null; } return JSON.parseObject(payload, clazz); } } public class MessageOutput { private String requestId; private String type; private Object payload; public MessageOutput(String requestId, String type, Object payload) { this.requestId = requestId; this.type = type; this.payload = payload; } public String getType() { return this.type; } public String getRequestId() { return requestId; } public Object getPayload() { return payload; } }

消息解码器,使用Netty的ReplayingDecoder实现。简单起见,这里没有使用checkpoint去优化性能了,感兴趣的话读者可以参考一下我之前在公众号里发表的相关文章,将checkpoint相关的逻辑自己添加进去。

public class MessageDecoder extends ReplayingDecoder<MessageInput> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { String requestId = readStr(in); String type = readStr(in); String content = readStr(in); out.add(new MessageInput(type, requestId, content)); } private String readStr(ByteBuf in) { // 字符串先长度后字节数组,统一UTF8编码 int len = in.readInt(); if (len < 0 || len > (1 << 20)) { throw new DecoderException("string too long len=" + len); } byte[] bytes = new byte[len]; in.readBytes(bytes); return new String(bytes, Charsets.UTF8); } }

消息处理器接口,每个自定义服务必须实现handle方法

public interface IMessageHandler<> { Tvoid handle(ChannelHandlerContext ctx, String requestId, T message); } // 找不到类型的消息统一使用默认处理器处理 public class DefaultHandler implements IMessageHandler<MessageInput> { @Override public void handle(ChannelHandlerContext ctx, String requesetId, MessageInput input) { System.out.println("unrecognized message type=" + input.getType() + " comes"); } }

消息类型注册中心和消息处理器注册中心,都是用静态字段和方法,其实也是为了图方便,写成非静态的可能会优雅一些。

public class MessageRegistry { private static Map<String, Class<?>> clazzes = new HashMap<>(); public static void register(String type, Class<?> clazz) { clazzes.put(type, clazz); } public static Class<?> get(String type) { return clazzes.get(type); } } public class MessageHandlers { private static Map<String, IMessageHandler<?>> handlers = new HashMap<>(); public static DefaultHandler defaultHandler = new DefaultHandler(); public static void register(String type, IMessageHandler<?> handler) { handlers.put(type, handler); } public static IMessageHandler<?> get(String type) { IMessageHandler<?> handler = handlers.get(type); return handler; } }

响应消息的编码器比较简单

@Sharable public class MessageEncoder extends MessageToMessageEncoder<MessageOutput> { @Override protected void encode(ChannelHandlerContext ctx, MessageOutput msg, List<Object> out) throws Exception { ByteBuf buf = PooledByteBufAllocator.DEFAULT.directBuffer(); writeStr(buf, msg.getRequestId()); writeStr(buf, msg.getType()); writeStr(buf, JSON.toJSONString(msg.getPayload())); out.add(buf); } private void writeStr(ByteBuf buf, String s) { buf.writeInt(s.length()); buf.writeBytes(s.getBytes(Charsets.UTF8)); } }

好,接下来进入关键环节,将上面的小模小块凑在一起,构建一个完整的RPC服务器框架,这里就需要读者有必须的Netty基础知识了,需要编写Netty的事件回调类和服务构建类。

@Sharable public class MessageCollector extends ChannelInboundHandlerAdapter { // 业务线程池 private ThreadPoolExecutor executor; public MessageCollector(int workerThreads) { // 业务队列最大1000,避免堆积 // 如果子线程处理不过来,io线程也会加入处理业务逻辑(callerRunsPolicy) BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000); // 给业务线程命名 ThreadFactory factory = new ThreadFactory() { AtomicInteger seq = new AtomicInteger(); @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("rpc-" + seq.getAndIncrement()); return t; } }; // 闲置时间超过30秒的线程自动销毁 this.executor = new ThreadPoolExecutor(1, workerThreads, 30, TimeUnit.SECONDS, queue, factory, new CallerRunsPolicy()); } public void closeGracefully() { // 优雅一点关闭,先通知,再等待,最后强制关闭 this.executor.shutdown(); try { this.executor.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e) { } this.executor.shutdownNow(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 客户端来了一个新链接 System.out.println("connection comes"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 客户端走了一个 System.out.println("connection leaves"); ctx.close(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof MessageInput) { System.out.println("read a message"); // 用业务线程池处理消息 this.executor.execute(() -> { this.handleMessage(ctx, (MessageInput) msg); }); } } private void handleMessage(ChannelHandlerContext ctx, MessageInput input) { // 业务逻辑在这里 Class<?> clazz = MessageRegistry.get(input.getType()); if (clazz == null) { // 没注册的消息用默认的处理器处理 MessageHandlers.defaultHandler.handle(ctx, input.getRequestId(), input); return; } Object o = input.getPayload(clazz); // 这里是小鲜的瑕疵,代码外观上比较难看,但是大厨表示才艺不够,很无奈 // 读者如果感兴趣可以自己想办法解决 @SuppressWarnings("unchecked") IMessageHandler<Object> handler = (IMessageHandler<Object>) MessageHandlers.get(input.getType()); if (handler != null) { handler.handle(ctx, input.getRequestId(), o); } else { // 用默认的处理器处理吧 MessageHandlers.defaultHandler.handle(ctx, input.getRequestId(), input); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 此处可能因为客户端机器突发重启 // 也可能是客户端链接闲置时间超时,后面的ReadTimeoutHandler抛出来的异常 // 也可能是消息协议错误,序列化异常 // etc. // 不管它,链接统统关闭,反正客户端具备重连机制 System.out.println("connection error"); cause.printStackTrace(); ctx.close(); } } public class RPCServer { private String ip; private int port; private int ioThreads; // 用来处理网络流的读写线程 private int workerThreads; // 用于业务处理的计算线程 public RPCServer(String ip, int port, int ioThreads, int workerThreads) { this.ip = ip; this.port = port; this.ioThreads = ioThreads; this.workerThreads = workerThreads; } private ServerBootstrap bootstrap; private EventLoopGroup group; private MessageCollector collector; private Channel serverChannel; // 注册服务的快捷方式 public RPCServer service(String type, Class<?> reqClass, IMessageHandler<?> handler) { MessageRegistry.register(type, reqClass); MessageHandlers.register(type, handler); return this; } // 启动RPC服务 public void start() { bootstrap = new ServerBootstrap(); group = new NioEventLoopGroup(ioThreads); bootstrap.group(group); collector = new MessageCollector(workerThreads); MessageEncoder encoder = new MessageEncoder(); bootstrap.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipe = ch.pipeline(); // 如果客户端60秒没有任何请求,就关闭客户端链接 pipe.addLast(new ReadTimeoutHandler(60)); // 挂上解码器 pipe.addLast(new MessageDecoder()); // 挂上编码器 pipe.addLast(encoder); // 将业务处理器放在最后 pipe.addLast(collector); } }); bootstrap.option(ChannelOption.SO_BACKLOG, 100) // 客户端套件字接受队列大小 .option(ChannelOption.SO_REUSEADDR, true) // reuse addr,避免端口冲突 .option(ChannelOption.TCP_NODELAY, true) // 关闭小流合并,保证消息的及时性 .childOption(ChannelOption.SO_KEEPALIVE, true); // 长时间没动静的链接自动关闭 serverChannel = bootstrap.bind(this.ip, this.port).channel(); System.out.printf("server started @ %s:%d\n", ip, port); } public void stop() { // 先关闭服务端套件字 serverChannel.close(); // 再斩断消息来源,停止io线程池 group.shutdownGracefully(); // 最后停止业务线程 collector.closeGracefully(); } }

上面就是完整的服务器菜谱,代码较多,读者如果没有Netty基础的话,可能会看得眼花缭乱。如果你不常使用JDK的Executors框架,阅读起来估计也够呛。如果读者需要相关学习资料,可以找我索取。

客户端菜谱

服务器使用NIO实现,客户端也可以使用NIO实现,不过必要性不大,用同步的socket实现也是没有问题的。更重要的是,同步的代码比较简短,便于理解。所以简单起见,这里使用了同步IO。

定义RPC请求对象和响应对象,和服务器一一对应。

public class RPCRequest { private String requestId; private String type; private Object payload; public RPCRequest(String requestId, String type, Object payload) { this.requestId = requestId; this.type = type; this.payload = payload; } public String getRequestId() { return requestId; } public String getType() { return type; } public Object getPayload() { return payload; } } public class RPCResponse { private String requestId; private String type; private Object payload; public RPCResponse(String requestId, String type, Object payload) { this.requestId = requestId; this.type = type; this.payload = payload; } public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } public String getType() { return type; } public void setType(String type) { this.type = type; } public Object getPayload() { return payload; } public void setPayload(Object payload) { this.payload = payload; } }

定义客户端异常,用于统一抛出RPC错误

public class RPCException extends RuntimeException { private static final long serialVersionUID = 1L; public RPCException(String message, Throwable cause) { super(message, cause); } public RPCException(String message) { super(message); } public RPCException(Throwable cause) { super(cause); } }

请求ID生成器,简单的UUID64

public class RequestId { public static String next() { return UUID.randomUUID().toString(); } }

响应类型注册中心,和服务器对应

public class ResponseRegistry { private static Map<String, Class<?>> clazzes = new HashMap<>(); public static void register(String type, Class<?> clazz) { clazzes.put(type, clazz); } public static Class<?> get(String type) { return clazzes.get(type); } }

好,接下来进入客户端的关键环节,链接管理、读写消息、链接重连都在这里

public class RPCClient { private String ip; private int port; private Socket sock; private DataInputStream input; private OutputStream output; public RPCClient(String ip, int port) { this.ip = ip; this.port = port; } public void connect() throws IOException { SocketAddress addr = new InetSocketAddress(ip, port); sock = new Socket(); sock.connect(addr, 5000); // 5s超时 input = new DataInputStream(sock.getInputStream()); output = sock.getOutputStream(); } public void close() { // 关闭链接 try { sock.close(); sock = null; input = null; output = null; } catch (IOException e) { } } public Object send(String type, Object payload) { // 普通rpc请求,正常获取响应 try { return this.sendInternal(type, payload, false); } catch (IOException e) { throw new RPCException(e); } } public RPCClient rpc(String type, Class<?> clazz) { // rpc响应类型注册快捷入口 ResponseRegistry.register(type, clazz); return this; } public void cast(String type, Object payload) { // 单向消息,服务器不得返回结果 try { this.sendInternal(type, payload, true); } catch (IOException e) { throw new RPCException(e); } } private Object sendInternal(String type, Object payload, boolean cast) throws IOException { if (output == null) { connect(); } String requestId = RequestId.next(); ByteArrayOutputStream bytes = new ByteArrayOutputStream(); DataOutputStream buf = new DataOutputStream(bytes); writeStr(buf, requestId); writeStr(buf, type); writeStr(buf, JSON.toJSONString(payload)); buf.flush(); byte[] fullLoad = bytes.toByteArray(); try { // 发送请求 output.write(fullLoad); } catch (IOException e) { // 网络异常要重连 close(); connect(); output.write(fullLoad); } if (!cast) { // RPC普通请求,要立即获取响应 String reqId = readStr(); // 校验请求ID是否匹配 if (!requestId.equals(reqId)) { close(); throw new RPCException("request id mismatch"); } String typ = readStr(); Class<?> clazz = ResponseRegistry.get(typ); // 响应类型必须提前注册 if (clazz == null) { throw new RPCException("unrecognized rpc response type=" + typ); } // 反序列化json串 String payld = readStr(); Object res = JSON.parseObject(payld, clazz); return res; } return null; } private String readStr() throws IOException { int len = input.readInt(); byte[] bytes = new byte[len]; input.readFully(bytes); return new String(bytes, Charsets.UTF8); } private void writeStr(DataOutputStream out, String s) throws IOException { out.writeInt(s.length()); out.write(s.getBytes(Charsets.UTF8)); } } 牢骚重提

本以为是小菜一碟,但是编写完整的代码和文章却将近花费了一天的时间,深感写码要比做菜耗时太多了。因为只是为了教学目的,所以在实现细节上还有好多没有仔细去雕琢的地方。如果是要做一个开源项目,力求非常完美的话。至少还要考虑一下几点。

客户端连接池 多服务进程负载均衡 日志输出 参数校验,异常处理 客户端流量攻击 服务器压力极限

如果要参考grpc的话,还得实现流式响应处理。如果还要为了节省网络流量的话,又需要在协议上下功夫。这一大堆的问题还是抛给读者自己思考去吧。

关注公众号「码洞」,发送「RPC」即可获取以上完整菜谱的GitHub开源代码链接。读者有什么不明白的地方,洞主也会一一解答。
原文:https://juejin.im/post/5ad2a99ff265da238d51264d

read more
java 中单例模式DCL的缺陷及单例的正确写法

首先在说明单例设计模式中的 DCL 问题之前我们首先看看实现单例设计模式的两种方式:饿汉式和懒汉式。

什么是饿汉式?

饿汉式就是不管你是否用的上,一开始就先初始化对象(也叫做提前初始化)

代码示例:

public class EagerInitialization{ private EagerInitialization() {} private static Resource resource = new Resource(); public static Resource getResource(){ return resource; } } 什么是懒汉式?

懒汉式就是当你真正需要使用时才创建对象。

于是,关于懒汉式的问题也就随之产生了~~~

我们先看一下有问题的代码:

代码示例:

public class LazyInitialization{ private static Resource resource; public static Resource getResource(){ if (resource == null) resource = new Resource();//不安全! return resource; } }

我们都知道上面的这个代码在单线程中运行是没有问题的,但是在平时的开发中常常会使用多线程,此时这个方法就会出现问题,假设有两个线程 A、B,当 A 线程满足判断还未来得及执行到 resource = new Resource() 时,线程执行资格被 B 拿走,此时线程 B 进入 getResource(), 而此时它也满足 resource 的值为 null, 于是导致最后产生两个实例。

针对上面的问题,于是有了相应的解决方案,即线程安全的延迟初始化,可以解决懒汉式出现的上述问题:

代码示例:

public class LazyInitialization{ private static Resource resource; public synchronized static Resource getResource(){ if (resource == null) resource = new Resource(); return resource; } }

上面代码通过使用 synchronized 关键字将 getResource 变成同步函数来保证方法的原子性,从而保证了线程安全而防止最后多个线程产生多个实例的现象。

我们都知道,在上述例子当中,每次在调用 getResource() 时都需要进行同步,而且在大多数时这种同步是没有必要的,并且大量无用的同步会对性能造成极大的影响。为什么呢?因为在第一次调用 getResource() 方法时就已经创建了 resource 实例了,之后 resource 就不再为空,然而之后再调用 getResource 时都需要进行同步,从而对性能造成了很大的影响。基于这些问题,一个新的方法也就产生了,这也是我们需要着重讨论的一个方法——双重检查加锁 (Double Check Locking) DCL。

双重检查加锁 DCL (Double Check Locking)

首先我们看看 DCL 的代码:

示例代码:

public class DoubleCheckedLocking{ private static Resource resource; public static Resource getResource(){ if (resource == null) { synchronized (DoubleCheckedLocking.class) { if (resource == null) resource = new Resource(); } } return resource; } }

你可能会疑惑,这样做不是挺好么,这样就可以解决刚刚说的那些问题了么,当 resource 被实例化后再调用 getResource() 方法不就不会再进行同步,这样不就节约了资源,提升了性能么?

说的对,DCL 确实存在着这些优点,但是与此同时,这个方法也会带来相应的问题,因为这个方法是含有缺陷的。再次之前,先了解一下JVM内存模型。

JVM内存模型

JVM模型如下图:

Thread Stack 是线程私有的区域。他是java方法执行时的字典:它里面记录了局部变量表、 操作数栈、 动态链接、 方法出口等信息。

在《java虚拟机规范》一书中对这部分的描述如下:

栈帧( Frame)是用来存储数据和部分过程结果的数据结构,同时也被用来处理动态链接 (Dynamic Linking)、 方法返回值和异常分派( Dispatch Exception)。

栈帧随着方法调用而创建,随着方法结束而销毁——无论方法是正常完成还是异常完成(抛出了在方法内未被捕获的异常)都算作方法结束。

栈帧的存储空间分配在 Java 虚拟机栈( §2.5.5)之中,每一个栈帧都有自己的局部变量表( Local Variables, §2.6.1)、操作数栈( OperandStack, §2.6.2)和指向当前方法所属的类的运行时常量池( §2.5.5)的引用。

Java 中某个线程在访问堆中的线程共享变量时,为了加快访问速度,提升效率,会把该变量临时拷贝一份到自己的 Thread Stack 中,并保持和堆中数据的同步。

缺陷

首先我们看到,DCL 方法包含了层判断语句,第一层判断语句用于判断 resource 对象是否为空,也就是是否被实例化,如果为空时就进入同步代码块进一步判断,问题就出在了 resource 的实例化语句 resource = new Resource() 上,因为这个语句实际上不是原子性的。这句话可以大致分解为如下步骤:

1. 给 Resource 的实例分配内存 2. 初始化 Resource 构造器 3. 将 resource 实例指向分配的内存空间,此时 resource 实例就不再为空

我们都希望这条语句的执行顺序是上述的 1——>2——>3,但是,由于 Java 编译器允许处理器乱序执行,以及 JDK1.5 之前 JMM(Java Memory Medel,即 Java 内存模型)中 Cache、寄存器到主内存回写顺序的规定,上面的第二点和第三点的顺序是无法保证的,也就是说,执行顺序可能是 1——>2——>3 也可能是 1——>3——>2。

如果有两个线程 A 和 B,如果 A 线程执行完 1 后先执行 3 然后执行 2,并且在 3 执行完毕、2 未执行之前,被切换到线程 B 上,这时候 resource 因为已经在线程 A 内执行过了第三点(jvm将未完成 Resource 构造器的值拷贝回堆中),resource 已经是非空了,所以线程 B 直接拿走 resource,然后使用,然后顺理成章地报错,而且这种难以跟踪难以重现的错误很可能会隐藏很久。

好了,关于 DCL 的问题阐述完了,那么这个方法既然有问题,那么该如何修改呢?

Happen-Before 原则

通过遵守 Happen-Before 原则,解决并发顺序问题。

1. 同一个线程中,书写在前面的操作happen-before书写在后面的操作。这条规则是说,在单线程中操作间happen-before关系完全是由源代码的顺序决定的,这里的前提“在同一个线程中”是很重要的,这条规则也称为单线程规则 。这个规则多少说得有些简单了,考虑到控制结构和循环结构,书写在后面的操作可能happen-before书写在前面的操作,不过我想读者应该明白我的意思。 2. 对锁的unlock操作happen-before后续的对同一个锁的lock操作。这里的“后续”指的是时间上的先后关系,unlock操作发生在退出同步块之后,lock操作发生在进入同步块之前。这是条最关键性的规则,线程安全性主要依赖于这条规则。但是仅仅是这条规则仍然不起任何作用,它必须和下面这条规则联合起来使用才显得意义重大。这里关键条件是必须对“同一个锁”的lock和unlock。 如果操作A happen-before操作B,操作B happen-before操作C,那么操作A happen-before操作C。这条规则也称为传递规。 通过 volatile 防止指令重排序

在 JMM 的后续版本(Java 5.0 及以上)中,如果把 resource 声明为 volatile 类型,因为 volatile 可以防止指令的重排序(对 volatile 字段的写操作 happen-before 后续的对同一个字段的读操作),那么这样就可以启用 DCL,并且这种方式对性能的影响很小,因为 volatile 变量读取操作的性能通常只是略高于非 volatile 变量读取操作的性能。改进后的 DCL 方法如下代码所示

代码示例:

public class DoubleCheckedLocking{ private static volatile Resource resource; public static Resource getResource{ if (resource == null) { synchronized (DoubleCheckedLocking.class) { if (resource == null) resource = new Resource(); } } return resource; } }

但是,DCL 的这种方法已经被广泛地遗弃了,因为促使该模式出现的驱动力(无竞争同步的执行速度很慢,以及 JVM 启动时很慢)已经不复存在,因为它不是一种高效的优化措施。延迟初始化占位类模式能带来同样的优势,并且更容易理解,延迟初始化占位类模式代码如下:

代码示例:

public class ResourceFactory{ private static class ResourceHolder { public static Resource resource = new Resource(); } public static Resource getResource(){ return ResourceHolder.resource; } }

关于单例和 DCL 问题就分析到这里了,在实际开发当中由于经常要考虑到代码的效率和安全性,一般使用饿汉式和延长初始化占位类模式,而延迟占位类模式更是优势明显并且容易使用和理解,是良好的单例设计模式的实现方法。

参考资料:

《java 并发编程实战》

关于 volatile 的问题可以参考:
http://blog.csdn.net/wxwzy738/article/details/43238089

关于 DCL 的其他问题可以参考:
http://blog.csdn.net/ns_code/article/details/17359719
https://blog.csdn.net/qiyei2009/article/details/71813069
https://blog.csdn.net/u013393958/article/details/70941579

read more
Learn Gradle - 3 Java 快速入门

上一节主要对Gradle的脚本进行了简要的介绍,本节将继续学习Gradle的另外一个特性——插件(plugins)。

1、插件介绍

插件是对Gradle功能的扩展,Gradle有着丰富的插件,你可以在这里搜索相关插件(传送门)。本章将简要介绍Gradle的Java插件(Java plugin),这个插件会给你的构建项目添加一些任务,比如编译java类、执行单元测试和将编译的class文件打包成jar文件等。

Java插件是基于约定的(约定优于配置),它在项目的很多方面定义了默认值,例如,Java源文件应该位于什么位置。我们只要遵循插件的约定,就不需要在Gradle配置脚本进行额外的相关配置。当然,在某些情况下,你的项目不想或不能遵循这个约定也是可以的,这样你就需要额外的配置你的构建脚本。

Gradle Java插件对于项目文件存放的默认位置与maven类似。

<!--more-->

Java源码存放在目录:src/main/java

Java测试代码存放目录:src/test/java

资源文件存放目录:src/main/resources

测试相关资源文件存放目录:src/test/resources

所有输出文件位于目录:build

输出的jar文件位于目录:build/libs

2、一个简单的Java项目

新建一个文件build.gradle,添加代码:

apply plugin: 'java'

以上代码即配置java插件到构建脚本中。当执行构建脚本时,它将给项目添加一系列任务。我们执行:gradle build,来看看输出的结果:

:compileJava UP-TO-DATE :processResources UP-TO-DATE :classes UP-TO-DATE :jar UP-TO-DATE :assemble UP-TO-DATE :compileTestJava UP-TO-DATE :processTestResources UP-TO-DATE :testClasses UP-TO-DATE :test UP-TO-DATE :check UP-TO-DATE :build UP-TO-DATE BUILD SUCCESSFUL

根据输出结果可以看出,我们执行的build这个任务依赖其他任务,比如compileJava等,这就是java插件预先定义好的一系列任务。

你还可以执行一些其他的任务,比如执行:gradle clean,gradle assemble,gradle check等。

gradle clean:删除构建目录以及已经构建完成的文件;

gradle assemble(装配):编译和打包java代码,但是不会执行单元测试(从上面的任务依赖结果也可以看出来)。如果你应用了其他插件,那么还会完成一下其他动作。例如,如果你应用了War这个插件,那么这个任务将会为你的项目生成war文件。

gradle check:编译且执行测试。与assemble类似,如果你应用了其他包含check任务的插件,例如,Checkstyle插件,那么这个任务将会检查你的项目代码的质量,并且生成检测报告。

如果想知道Gradle当前配置下哪些任务可执行,可以执行:gradle tasks,例如应用了java插件的配置,执行该命令,输出:

:tasks ------------------------------------------------------------ All tasks runnable from root project ------------------------------------------------------------ Build tasks ----------- assemble - Assembles the outputs of this project. build - Assembles and tests this project. buildDependents - Assembles and tests this project and all projects that depend on it. buildNeeded - Assembles and tests this project and all projects it depends on. classes - Assembles classes 'main'. clean - Deletes the build directory. jar - Assembles a jar archive containing the main classes. testClasses - Assembles classes 'test'. Build Setup tasks ----------------- init - Initializes a new Gradle build. [incubating] wrapper - Generates Gradle wrapper files. [incubating] Documentation tasks ------------------- javadoc - Generates Javadoc API documentation for the main source code. Help tasks ---------- components - Displays the components produced by root project 'learn-gradle'. [incubating] dependencies - Displays all dependencies declared in root project 'learn-gradle'. dependencyInsight - Displays the insight into a specific dependency in root project 'learn-gradle'. help - Displays a help message. model - Displays the configuration model of root project 'learn-gradle'. [incubating] projects - Displays the sub-projects of root project 'learn-gradle'. properties - Displays the properties of root project 'learn-gradle'. tasks - Displays the tasks runnable from root project 'learn-gradle'. Verification tasks ------------------ check - Runs all checks. test - Runs the unit tests. Rules ----- Pattern: clean<TaskName>: Cleans the output files of a task. Pattern: build<ConfigurationName>: Assembles the artifacts of a configuration. Pattern: upload<ConfigurationName>: Assembles and uploads the artifacts belonging to a configuration. To see all tasks and more detail, run gradle tasks --all To see more detail about a task, run gradle help --task <task> BUILD SUCCESSFUL

小伙伴们看到这里会不会有疑问,如果在构建脚本中定义了名为tasks的任务,执行会是如何?好奇的小伙伴可以自己试一试噢。事实上,是会覆盖原有的任务的。

3、外部依赖

通常一个Java项目会依赖多个其他项目或jar文件,我们可以通过配置gradle仓库(repository)告诉gradle从哪里获取需要的依赖,并且gradle还可以配置使用maven仓库。例如,我们配置gradle使用maven中央仓库,在build.gradle中添加代码:

repositories { mavenCentral() }

接下来,我们来添加一些依赖。代码示例:

dependencies { compile group: 'commons-collections', name: 'commons-collections', version: '3.2' testCompile group: 'junit', name: 'junit', version: '4.+' }

关于依赖,暂时就点这么多。详细可以参考gradle依赖管理基础,也可以关注后续文章。

4、定义项目属性

Java插件会为项目添加一系列的属性,通常情况下,初始的Java项目使用这些默认配置就足够了,我们不需要进行额外的配置。但是如果默认属性不满足于你的项目,你也可以进行自定义项目的一些信息。例如我们为项目指定版本号和一些jar manifest信息。

sourceCompatibility = 1.5 version = '1.0' jar { manifest { attributes 'Implementation-Title': 'Gradle Quickstart', 'Implementation-Version': version } }

事实上,Java插件添加的一系列任务与我们之前在脚本中自定义的任务没什么区别,都是很常规的任务。我们可以随意定制和修改这些任务。例如,设置任务的属性、为任务添加行为、改变任务的依赖,甚至替换已有的任务。例如我们可以配置Test类型的test任务,当test任务执行的时候,添加一个系统属性。配置脚本如下:

test { systemProperties 'property': 'value' }

另外,与之前提到的“gradle tasks”命令类型,我们可以通过“gradle properties”来查看当前配置所支持的可配置属性有哪些。

5、将Jar文件发布到仓库 uploadArchives { repositories { flatDir { dirs 'repos' } } }

执行gradle uploadArchives,将会把相关jar文件发布到reops仓库中。更多参考:Publishing artifacts

6、构建多个Java项目

假设我们的项目结构如下所示:

multiproject/ --api/ --services/webservice/ --shared/ --services/shared/

项目api生成jar文件,Java客户端通过jar提供的接口访问web服务;项目services/webservice是一个webapp,提供web服务;项目shared 包含api和webservice公共代码;项目services/shared依赖shared项目,包含webservice公共代码。

接下来,我们开始定义多项目构建。

1)首先,我们需要添加一个配置文件:settings.gradle文件。settings.gradle位于项目的根目录,也就是multiproject目录。编辑settings.gradle,输入配置信息:

include "shared", "api", "services:webservice", "services:shared"

include是Gradle DSL定义的核心类型Settings的方法,用于构建指定项目。配置中指定的参数“shared”、“api”等值默认是当前配置目录的目录名称,而“services:webservice”将根据默认约定映射系统物理路径"services/webservice"(相对于根目录)。关于include更详细的信息可以参考:构建树

2)定义所有子项目公用配置。在根目录创建文件:build.gradle,输入配置信息:

subprojects { apply plugin: 'java' apply plugin: 'eclipse-wtp' repositories { mavenCentral() } dependencies { testCompile 'junit:junit:4.12' } version = '1.0' jar { manifest.attributes provider: 'gradle' } }

subprojects 是Gradle DSL定义的构建脚本模块之一,用于定义所有子项目的配置信息。在以上配置中,我们给所有子项目定义了使用“java”和“eclipse-wtp”插件,还定义了仓库、依赖、版本号以及jar(jar是Gradle的任务类型之一,任务是装配jar包,jar任务包含属性manifest,用于描述jar的信息,具体参考:Jar)。

我们在根目录执行gradle build命令时,这些配置会应用到所有子项目中。

3)给项目添加依赖

新建文件:api/build.gradle,添加配置:

dependencies { compile project(':shared') }

以上,我们定义了api项目依赖shared项目,当我们在根目录执行gradle build命令时,gradle会确保在编译api之前,先完成shared项目编译,然后才会编译api项目。

同样,添加services/webservice/build.gradle,添加配置:

dependencies { compile project(':services:shared') }

在根目录执行:gradle compileJava,输出:

:shared:compileJava UP-TO-DATE :shared:processResources UP-TO-DATE :shared:classes UP-TO-DATE :shared:jar UP-TO-DATE :api:compileJava UP-TO-DATE :services:compileJava UP-TO-DATE :services:shared:compileJava UP-TO-DATE :services:shared:processResources UP-TO-DATE :services:shared:classes UP-TO-DATE :services:shared:jar UP-TO-DATE :services:webservice:compileJava UP-TO-DATE BUILD SUCCESSFUL

通过输出信息我们就可以清楚看出依赖配置是否正确啦。

read more
Zookeeper是如何解决脑裂问题
前言

这是分布式系统中一个很实际的问题,书上说的不是很详细,整理总结一下。

1、脑裂和假死 1.1 脑裂

官方定义:当一个集群的不同部分在同一时间都认为自己是活动的时候,我们就可以将这个现象称为脑裂症状。通俗的说,就是比如当你的 cluster 里面有两个结点,它们都知道在这个 cluster 里需要选举出一个 master。那么当它们两之间的通信完全没有问题的时候,就会达成共识,选出其中一个作为 master。但是如果它们之间的通信出了问题,那么两个结点都会觉得现在没有 master,所以每个都把自己选举成 master。于是 cluster 里面就会有两个 master。举例:

UserA和UserB分别将自己的信息注册在RouterA和RouterB中。RouterA和RouterB使用数据同步(2PC),来同步信息。那么当UserA想要向UserB发送一个消息的时候,需要现在RouterA中查询出UserA到UserB的消息路由路径,然后再交付给相应的路径进行路由。

当脑裂发生的时候,相当RouterA和RouterB直接的联系丢失了,RouterA认为整个系统中只有它一个Router,RouterB也是这样认为的。那么相当于RouterA中没有UserB的信息,RouterB中没有UserA的信息了,此时UserA再发送消息给UserB的时候,RouterA会认为UserB已经离线了,然后将该信息进行离线持久化,这样整个网络的路由是不是就乱掉了。

对于Zookeeper来说有一个很重要的问题,就是到底是根据一个什么样的情况来判断一个节点死亡down掉了。 在分布式系统中这些都是有监控者来判断的,但是监控者也很难判定其他的节点的状态,唯一一个可靠的途径就是心跳,Zookeeper也是使用心跳来判断客户端是否仍然活着,但是使用心跳机制来判断节点的存活状态也带来了假死问题。

1.2 假死

ZooKeeper每个节点都尝试注册一个象征master的临时节点,其他没有注册成功的则成为slaver,并且通过watch机制监控着master所创建的临时节点,Zookeeper通过内部心跳机制来确定master的状态,一旦master出现意外Zookeeper能很快获悉并且通知其他的slaver,其他slaver在之后作出相关反应。这样就完成了一个切换。

这种模式也是比较通用的模式,基本大部分都是这样实现的,但是这里面有个很严重的问题,如果注意不到会导致短暂的时间内系统出现脑裂,因为心跳出现超时可能是master挂了,但是也可能是master,zookeeper之间网络出现了问题,也同样可能导致。这种情况就是假死,master并未死掉,但是与ZooKeeper之间的网络出现问题导致Zookeeper认为其挂掉了然后通知其他节点进行切换,这样slaver中就有一个成为了master,但是原本的master并未死掉,这时候client也获得master切换的消息,但是仍然会有一些延时,zookeeper需要通讯需要一个一个通知,这时候整个系统就很混乱可能有一部分client已经通知到了连接到新的master上去了,有的client仍然连接在老的master上如果同时有两个client需要对master的同一个数据更新并且刚好这两个client此刻分别连接在新老的master上,就会出现很严重问题。

1.3 总结

假死:由于心跳超时(网络原因导致的)认为master死了,但其实master还存活着。

脑裂:由于假死会发起新的master选举,选举出一个新的master,但旧的master网络又通了,导致出现了两个master ,有的客户端连接到老的master 有的客户端链接到新的master。

2、Zookeeper的解决方案

要解决Split-Brain的问题,一般有3种方式:

Quorums(ˈkwôrəm 法定人数) :比如3个节点的集群,Quorums = 2, 也就是说集群可以容忍1个节点失效,这时候还能选举出1个lead,集群还可用。比如4个节点的集群,它的Quorums = 3,Quorums要超过3,相当于集群的容忍度还是1,如果2个节点失效,那么整个集群还是无效的 Redundant communications:冗余通信的方式,集群中采用多种通信方式,防止一种通信方式失效导致集群中的节点无法通信。 Fencing, 共享资源的方式:比如能看到共享资源就表示在集群中,能够获得共享资源的锁的就是Leader,看不到共享资源的,就不在集群中。

ZooKeeper默认采用了Quorums这种方式,即只有集群中超过半数节点投票才能选举出Leader。这样的方式可以确保leader的唯一性,要么选出唯一的一个leader,要么选举失败。在ZooKeeper中Quorums有2个作用:

集群中最少的节点数用来选举Leader保证集群可用:通知客户端数据已经安全保存前集群中最少数量的节点数已经保存了该数据。一旦这些节点保存了该数据,客户端将被通知已经安全保存了,可以继续其他任务。而集群中剩余的节点将会最终也保存了该数据。 假设某个leader假死,其余的followers选举出了一个新的leader。这时,旧的leader复活并且仍然认为自己是leader,这个时候它向其他followers发出写请求也是会被拒绝的。因为每当新leader产生时,会生成一个epoch,这个epoch是递增的,followers如果确认了新的leader存在,知道其epoch,就会拒绝epoch小于现任leader epoch的所有请求。那有没有follower不知道新的leader存在呢,有可能,但肯定不是大多数,否则新leader无法产生。Zookeeper的写也遵循quorum机制,因此,得不到大多数支持的写是无效的,旧leader即使各种认为自己是leader,依然没有什么作用。 3、总结

总结一下就是,通过Quorums机制来防止脑裂和假死,当leader挂掉之后,可以重新选举出新的leader节点使整个集群达成一致;当出现假死现象时,通过epoch大小来拒绝旧的leader发起的请求,在前面也已经讲到过,这个时候,重新恢复通信的老的leader节点会进入恢复模式,与新的leader节点做数据同步,perfect。

原文: https://blog.csdn.net/u013374645/article/details/93140148

read more
There are no topics in this category.
Why don't you try posting one?