欢迎光临
我们一直在努力

经典分布式算法 —— 浅显易懂的 Raft 算法实现

mumupudding阅读(16)


一、Raft概念

copy一下其他小伙伴写的文章: Raft算法详解

不同于Paxos算法直接从分布式一致性问题出发推导出来,Raft算法则是从多副本状态机的角度提出,用于管理多副本状态机的日志复制。Raft实现了和Paxos相同的功能,它将一致性分解为多个子问题:Leader选举(Leader election)、日志同步(Log replication)、安全性(Safety)、日志压缩(Log compaction)、成员变更(Membership change)等。同时,Raft算法使用了更强的假设来减少了需要考虑的状态,使之变的易于理解和实现。

Raft将系统中的角色分为领导者(Leader)、跟从者(Follower)和候选人(Candidate):

  • Leader:接受客户端请求,并向Follower同步请求日志,当日志同步到大多数节点上后告诉Follower提交日志。
  • Follower:接受并持久化Leader同步的日志,在Leader告之日志可以提交之后,提交日志。
  • Candidate:Leader选举过程中的临时角色。

本文不过多赘述 raft 算法是个什么东西… 这里再贴一个十分好理解的文章:The Raft Consensus Algorithm


二、系统初步设计

在对raft有一定理解后,我们简单梳理一下在raft选举过程中,我们需要的一些角色,以及角色的司职。

首先我们需要一个选举控制类,单例实现即可,节点的选举全权交给此选举控制类的实现,我们称其为 ElectOperator。

先讲一个 raft 中重要的概念:世代,也称为 epoch,但在这篇文章,将其称为 generation(不要纠结这个 = =)。 世代可以认为是一个标记当前发送的操作是否有效的标识,如果收到了小于本节点世代的请求,则可无视其内容,如果收到了大于本世代的请求,则需要更新本节点世代,并重置自己的身份,变为 Follower,类似于乐观锁的设计理念。

我们知道,raft中一共有三种角色:Follower、Candidate、Leader

(1)Follower

Follower 需要做什么呢:

  • 接收心跳
  • Follower 在 ELECTION_TIMEOUT_MS 时间内,若没有收到来自 Leader的心跳,则转变为 Candidate
  • 接收拉票请求,并返回自己的投票

好的,Follower非常简单,只需要做三件事即可。

(2)Candidate

Candidate 扮演什么样的职能呢:

  • 接收心跳
  • Candidate 在 ELECTION_TIMEOUT_MS 时间内,若没有收到来自 Leader的心跳,则转变为 Candidate
  • 接收拉票请求,并返回自己的投票
  • 向集群中的其他节点发起拉票请求
  • 当收到的投票大于半数( n/2 + 1, n为集群内的节点数量),转变为 Leader

Candidate 比起 Follower 稍微复杂一些,但前三件事情都是一样的。

(3)Leader

Leader 在选举过程中扮演的角色最为简单:

  • 接收心跳
  • 向集群内所有节点发送心跳

Leader 也是可以接收心跳的,当收到大于当前世代的心跳或请求后,Leader 需要转变为 Follower。Leader 不可能收到同世代的心跳请求,因为 (1) 在 raft 算法中,同一世代中,节点仅对同一个节点进行投票。(2) 需要收到过半投票才可以转变为 Leader。


三、系统初步实现

简单贴一下选举控制器需要的一些属性代码,下面的注释都说的很清楚了,其中需要补充的一点是定时任务使用了时间轮来实现,不理解没有关系…就是个定时任务,定时任务的一个引用放在 Map<TaskEnum, TimedTask> taskMap; 中,便于取消任务。

public class ElectOperator extends ReentrantLocker implements Runnable {    // 成为 Candidate 的退避时间(真实退避时间需要 randomized to be between 150ms and 300ms )    private static final long ELECTION_TIMEOUT_MS = ElectConfigHelper.getElectionTimeoutMs();    // 心跳间隔    private static final long HEART_BEAT_MS = ElectConfigHelper.getHeartBeatMs();    /**     * 该投票箱的世代信息,如果一直进行选举,一直能达到 {@link #ELECTION_TIMEOUT_MS},而选不出 Leader ,也需要15年,generation才会不够用,如果     * generation 的初始值设置为 Long.Min (现在是0,则可以撑30年,所以完全呆胶布)     */    private long generation;    /**     * 当前节点的角色     */    private NodeRole nodeRole;    /**     * 所有正在跑的定时任务     */    private Map<TaskEnum, TimedTask> taskMap;    /**     * 投票箱     */    private Map<String/* serverName */, Boolean> box;    /**     * 投票给了谁的投票记录     */    private Votes voteRecord;    /**     * 缓存一份集群信息,因为集群信息是可能变化的,我们要保证在一次选举中,集群信息是不变的     */    private List<HanabiNode> clusters;    /**     * 心跳内容     */    private HeartBeat heartBeat;    /**     * 现在集群的leader是哪个节点     */    private String leaderServerName;    private volatile static ElectOperator INSTANCE;    public static ElectOperator getInstance() {        if (INSTANCE == null) {            synchronized (ElectOperator.class) {                if (INSTANCE == null) {                    INSTANCE = new ElectOperator();                    ElectControllerPool.execute(INSTANCE);                }            }        }        return INSTANCE;    }

另外,上面罗列的这些值大都是需要在更新世代时重置的,我们先拟定一下更新世代的逻辑,通用的来讲,就是清除投票记录,清除自己的投票箱,更新自己的世代,身份变更为 Follower 等等,我们将这个方法称为 init。

    /**     * 初始化     *     * 1、成为follower     * 2、先取消所有的定时任务     * 3、重置本地变量     * 4、新增成为Candidate的定时任务     */    private boolean init(long generation, String reason) {        return this.lockSupplier(() -> {            if (generation > this.generation) {// 如果有选票的世代已经大于当前世代,那么重置投票箱                logger.debug("初始化投票箱,原因:{}", reason);                // 1、成为follower                this.becomeFollower();                // 2、先取消所有的定时任务                this.cancelAllTask();                // 3、重置本地变量                logger.debug("更新世代:旧世代 {} => 新世代 {}", this.generation, generation);                this.generation = generation;                this.voteRecord = null;                this.box = new HashMap<>();                this.leaderServerName = null;                // 4、新增成为Candidate的定时任务                this.becomeCandidateAndBeginElectTask(this.generation);                return true;            } else {                return false;            }        });    }

(1) Follower的实现

基于上面的分析,我们可以归纳一下 Follower 需要一些什么样的方法:

1、转变为 Candidate 的定时任务

实际上就是 ELECTION_TIMEOUT_MS (randomized to be between 150ms and 300ms) 后,如果没收到 Leader 的心跳,或者自己变为 Candidate 后,在这个时间内没有成功上位,则继续转变为 Candidate。

为什么我们成为 Candidate 的退避时间需要随机 150ms – 300ms呢?这是为了避免所有节点的选举发起发生碰撞,如果说都是相同的退避时间,每个节点又会优先投自己一票,那么这个集群系统就会陷入无限发起投票,但又无法成为 Leader 的局面。

简而言之就是我们需要提供一个可刷新的定时任务,如果在一定时间内没刷新这个任务,则节点转变为 Candidate,并发起选举,代码如下。首先取消之前的 becomeCandidate 定时定时任务,然后设定在 electionTimeout 后调用 beginElect(generation) 方法。

   /**     * 成为候选者的任务,(重复调用则会取消之前的任务,收到来自leader的心跳包,就可以重置一下这个任务)     *     * 没加锁,因为这个任务需要频繁被调用,只要收到leader来的消息就可以调用一下     */    private void becomeCandidateAndBeginElectTask(long generation) {        this.lockSupplier(() -> {            this.cancelCandidateAndBeginElectTask("正在重置发起下一轮选举的退避时间");            // The election timeout is randomized to be between 150ms and 300ms.            long electionTimeout = ELECTION_TIMEOUT_MS + (int) (ELECTION_TIMEOUT_MS * RANDOM.nextFloat());            TimedTask timedTask = new TimedTask(electionTimeout, () -> this.beginElect(generation));            Timer.getInstance()                 .addTask(timedTask);            taskMap.put(TaskEnum.BECOME_CANDIDATE, timedTask);            return null;        });    }
2、接收心跳与心跳回复

接收心跳十分简单,如果当前心跳大于等于当前世代,且还未认定某个节点为 Leader,则取消所有定时任务,成为Follower,并记录心跳包中 Leader 节点的信息,最后重置一下成为候选者的任务。

如果已经成为某个 Leader 的 Follower,则直接成为候选者的任务即可。

另外一个要注意的是,needToSendHeartBeatInfection,是否需要发送心跳感染包,当收到低世代 Leader 的心跳时,如果当前集群已经选出 Leader ,则回复此心跳包,告诉旧 Leader,现在已经是新世代了!(代码中没有展现,其实就是再次封装一个心跳包,带上世代信息和 Leader 节点信息,回复给 Leader 即可)

    public void receiveHeatBeat(String leaderServerName, long generation, String msg) {       return this.lockSupplier(() -> {     boolean needToSendHeartBeatInfection = true;            // 世代大于当前世代            if (generation >= this.generation) {                needToSendHeartBeatInfection = false;                if (this.leaderServerName == null) {                                        logger.info("集群中,节点 {} 已经成功在世代 {} 上位成为 Leader,本节点将成为 Follower,直到与 Leader 的网络通讯出现问题", leaderServerName, generation);                    // 取消所有任务                    this.cancelAllTask();                    // 成为follower                    this.becomeFollower();                    // 将那个节点设为leader节点                    this.leaderServerName = leaderServerName;                }                // 重置成为候选者任务                this.becomeCandidateAndBeginElectTask(this.generation);            }            return null;        });    }
3、接收拉票请求与回复投票

我们知道,raft 在一个世代只能投票给一个节点,且发起投票者会首先投票给自己。所以逻辑就很简单了,只有当世代大于等于当前,且还未投票时,则拉票请求成功,返回true即可,否则都视为失败,返回false。

    /**     * 某个节点来请求本节点给他投票了,只有当世代大于当前世代,才有投票一说,其他情况都是失败的     *     * 返回结果     *     * 为true代表接受投票成功。     * 为false代表已经给其他节点投过票了,     */    public VotesResponse receiveVotes(Votes votes) {        return this.lockSupplier(() -> {            logger.debug("收到节点 {} 的投票请求,其世代为 {}", votes.getServerName(), votes.getGeneration());            String cause = "";            if (votes.getGeneration() < this.generation) {                cause = String.format("投票请求 %s 世代小于当前世代 %s", votes.getGeneration(), this.generation);            } else if (this.voteRecord != null) {                cause = String.format("在世代 %s,本节点已投票给 => %s 节点", this.generation, this.voteRecord.getServerName());            } else {                this.voteRecord = votes; // 代表投票成功了            }            boolean result = votes.equals(this.voteRecord);            if (result) {                logger.debug("投票记录更新成功:在世代 {},本节点投票给 => {} 节点", this.generation, this.voteRecord.getServerName());            } else {                logger.debug("投票记录更新失败:原因:{}", cause);            }            String serverName = InetSocketAddressConfigHelper.getServerName();            return new VotesResponse(this.generation, serverName, result, serverName.equals(this.leaderServerName), votes.getGeneration());        });    }

(2) Candidate的实现

可以看出 Follower 十分简单, Candidate 在 Follower 的基础上增加了发起选举的拉票请求,与接收投票,并上位成为Leader两个功能,实际上也十分简单。

1、发起拉票请求

回顾一下前面的转变成 Candidate 的定时任务,定时任务实际上就是调用一个方法

TimedTask timedTask = new TimedTask(electionTimeout, () -> this.beginElect(generation));

这个 beginElect 就是转变为 Candidate 并发起选举的实现。让我们先想想需要做什么,首先肯定是

  1. 更新一下自己的世代,因为已经长时间没收到 Leader 的心跳包了,我们需要自立门户。
  2. 给自己投一票
  3. 要求其他节点给自己投票

分析到这里就很明了了。下面首先执行 updateGeneration 方法,实际上就是执行前面所说的 init 方法,传入 generation + 1 的世代,重置一下上个世代各种保存的状态;然后调用 becomeCandidate,实际上就是切换一下身份,将 Follower 或者 Candidate 切换为 Candidate;给自己的 voteRecord 投一票,最后带上自己的节点标识和世代信息,去拉票。

    /**     * 开始进行选举     *     * 1、首先更新一下世代信息,重置投票箱和投票记录     * 2、成为候选者     * 3、给自己投一票     * 4、请求其他节点,要求其他节点给自己投票     */    private void beginElect(long generation) {        this.lockSupplier(() -> {            if (this.generation != generation) {// 存在这么一种情况,虽然取消了选举任务,但是选举任务还是被执行了,所以这里要多做一重处理,避免上个周期的任务被执行                return null;            }            logger.info("Election Timeout 到期,可能期间内未收到来自 Leader 的心跳包或上一轮选举没有在期间内选出 Leader,故本节点即将发起选举");            updateGeneration("本节点发起了选举");// this.generation ++            // 成为候选者            logger.info("本节点正式开始世代 {} 的选举", this.generation);            if (this.becomeCandidate()) {                VotesResponse votes = new VotesResponse(this.generation, InetSocketAddressConfigHelper.getServerName(), true, false, this.generation);                // 给自己投票箱投票                this.receiveVotesResponse(votes);                // 记录一下,自己给自己投了票                this.voteRecord = votes;                // 让其他节点给自己投一票                this.askForVoteTask(new Votes(this.generation, InetSocketAddressConfigHelper.getServerName()), 0);            }            return null;        });    }
2、接收投票,并成为 Leader

如果说在 150ms and 300ms 之间,本节点收到了过半投票,则可上位成 Leader,否则定时任务会再次调用 beginElect,再次更新本节点世代,然后发起新一轮选举。

接收投票其实十分简单,回忆一下前面接收拉票请求与回复投票,实际上就是拉票成功,就返回true,否则返回flase。

我们每次都判断一下是否拿到过半的票数,如果拿到,则成为 Leader,另外有一个值得注意的是,为了加快集群恢复可用的进程,类似于心跳感染(如果心跳发到Leader那里去了,Leader会告诉本节点,它才是真正的Leader),投票也存在投票感染,下面的代码由 votesResponse.isFromLeaderNode() 来表示。

投票的记录也是十分简单,就是把每个投票记录扔到 Map<String/* serverName */, Boolean> box; 里,true 表示同意投给本节点,flase 则不同意,如果同意达到半数以上,则调用 becomeLeader 成为本世代 Leader。

    /**     * 给当前节点的投票箱投票     */    public void receiveVotesResponse(VotesResponse votesResponse) {        this.lockSupplier(() -> {            if (votesResponse.isFromLeaderNode()) {                logger.info("来自节点 {} 的投票应答表明其身份为 Leader,本轮拉票结束。", votesResponse.getServerName());                this.receiveHeatBeat(votesResponse.getServerName(), votesResponse.getGeneration(),                    String.format("收到来自 Leader 节点的投票应答,自动将其视为来自 Leader %s 世代 %s 节点的心跳包", heartBeat.getServerName(), votesResponse.getGeneration()));            }            if (this.generation > votesResponse.getAskVoteGeneration()) {// 如果选票的世代小于当前世代,投票无效                logger.info("来自节点 {} 的投票应答世代是以前世代 {} 的选票,选票无效", votesResponse.getServerName(), votesResponse.getAskVoteGeneration());                return null;            }            if (votesResponse.isAgreed()) {                if (!voteSelf) {                    logger.info("来自节点 {} 的投票应答有效,投票箱 + 1", votesResponse.getServerName());                }                // 记录一下投票结果                box.put(votesResponse.getServerName(), votesResponse.isAgreed());                List<HanabiNode> hanabiNodeList = this.clusters;                int clusterSize = hanabiNodeList.size();                int votesNeed = clusterSize / 2 + 1;                long voteCount = box.values()                                    .stream()                                    .filter(aBoolean -> aBoolean)                                    .count();                logger.info("集群中共 {} 个节点,本节点当前投票箱进度 {}/{}", hanabiNodeList.size(), voteCount, votesNeed);                // 如果获得的选票已经大于了集群数量的一半以上,则成为leader                if (voteCount == votesNeed) {                    logger.info("选票过半,准备上位成为 leader 节点", votesResponse.getServerName());                    this.becomeLeader();                }            } else {                logger.info("节点 {} 在世代 {} 的投票应答为:拒绝给本节点在世代 {} 的选举投票(当前世代 {})", votesResponse.getServerName(), votesResponse.getGeneration(), votesResponse.getAskVoteGeneration(), this.generation);                // 记录一下投票结果                box.put(votesResponse.getServerName(), votesResponse.isAgreed());            }            return null;        });    }

(3) Leader 的实现

作为 Leader,在 raft 中的实现却是最简单的,我们只需要给子节点发心跳包即可。然后如果收到大于自己世代的心跳感染,则成为新世代的 Follower,接收心跳的逻辑和 Follower 没有区别。

    /**     * 当选票大于一半以上时调用这个方法,如何去成为一个leader     */    private void becomeLeader() {        this.lockSupplier(() -> {            long becomeLeaderCostTime = TimeUtil.getTime() - this.beginElectTime;            this.beginElectTime = 0L;            logger.info("本节点 {} 在世代 {} 角色由 {} 变更为 {} 选举耗时 {} ms,并开始向其他节点发送心跳包 ......", InetSocketAddressConfigHelper.getServerName(), this.generation, this.nodeRole.name(), NodeRole.Leader.name(),                becomeLeaderCostTime);            this.nodeRole = NodeRole.Leader;            this.cancelAllTask();            this.heartBeatTask();            this.leaderServerName = InetSocketAddressConfigHelper.getServerName();            return null;        });    }

四、运行我们的 raft!

看到这里,不用怀疑.. 一个 raft 算法已经实现了。至于一些细枝末节的东西,我相信大家都能处理好的.. 比如如何给其他节点发送各种包,包怎么去定义之类的,都和 raft 本身没什么关系。

一般来说,在集群可用后,我们就可以让 Follower 连接 Leader 的业务端口,开始真正的业务了。 raft作为一个能快速选主的分布式算法,一次选主基本只需要一次 RTT(Round-Trip Time)时间即可,非常迅速。

运行一下我们的项目,简单测试,我们只用三台机子,想测试多台机子可以自己去玩玩…我们可以看到就像 zookeeper,我们需要配置两个端口,前一个作为选举端口,后一个则作为业务端口。

本文章只讲了怎么选举,后面的端口可以无视,但是必填…

依次启动 hanabi.1,hanabi.2,hanabi.3

很快,我们就能看到 hanabi.1 成为了世代28的 Leader,第一次选举耗时久是因为启动的时候有各种初始化 = =

此时,我们关闭 hanabi.1,因为集群还有2台机器,它们之间完全可以选出新的 Leader,我们关闭 hanabi.1 试试。观察 hanabi.3,我们发现,很快,hanabi.3 就发现 Leader 已经挂掉,并发起了世代 29 的选举。

在世代29中,仅存的 hanabi.2 拒绝为本节点投票,所以在 ELECTION_TIMEOUT_MS 到期后,hanabi.3 再次发起了选举,此次选举成功,因为 hanabi.2 还未到达 ELECTION_TIMEOUT_MS,所以还在世代 28,收到了世代 29 的拉票请求后,hanabi.2 节点将自己的票投给了 hanabi.3,hanabi.3 成功上位。

本项目github地址 : 基于raft算法实现的分布式kv存储框架 (项目实际上还有日志写入,日志提交,日志同步等功能,直接无视它…还没写完 = =)

mysql8+mybatis-plus3.1自动生成带lombok和swagger和增删改查接口

mumupudding阅读(22)

mybatis-dsc-generator

完美集成lombok,swagger的代码生成工具,让你不再为繁琐的注释和简单的接口实现而烦恼:entity集成,格式校验,swagger; dao自动加@ mapper,service自动注释和依赖; 控制器实现单表的增副改查,并实现swaggers的api文档。

源码地址

MAVEN地址

2.1.0版本是未集成Mybatis-plus版本——源码分支master

<dependency>
    <groupId>com.github.flying-cattle</groupId>
    <artifactId>mybatis-dsc-generator</artifactId>
    <version>2.1.0.RELEASE</version>
</dependency>

 3.0.0版本是集成了Mybatis-plus版本——源码分支mybatisPlus

<dependency>
    <groupId>com.github.flying-cattle</groupId>
    <artifactId>mybatis-dsc-generator</artifactId>
    <version>2.1.0.RELEASE</version>
</dependency>

数据表结构样式

CREATE TABLE `user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `login_name` varchar(40) DEFAULT NULL COMMENT '登录名',
  `password` varchar(100) NOT NULL COMMENT '秘密',
  `nickname` varchar(50) NOT NULL COMMENT '昵称',
  `type` int(10) unsigned DEFAULT NULL COMMENT '类型',
  `state` int(10) unsigned NOT NULL DEFAULT '1' COMMENT '状态:-1失败,0等待,1成功',
  `note` varchar(255) DEFAULT NULL COMMENT '备注',
  `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `update_uid` bigint(20) DEFAULT '0' COMMENT '修改人用户ID',
  `login_ip` varchar(50) DEFAULT NULL COMMENT '登录IP地址',
  `login_addr` varchar(100) DEFAULT NULL COMMENT '登录地址',
  PRIMARY KEY (`id`),
  UNIQUE KEY `login_name` (`login_name`)
) ENGINE=InnoDB AUTO_INCREMENT=13 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

要求必须有表注释,要求必须有主键为id,所有字段必须有注释(便于生成java注释swagger等)。

生成的实体类

生成方法参考源码中的:https://github.com/flying-cattle/mybatis-dsc-generator/blob/mybatisPlus/src/main/java/com/github/mybatis/fl/test/TestMain.java

执行结果

实体类

/**
 * @filename:Order 2018年7月5日
 * @project deal-center  V1.0
 * Copyright(c) 2018 BianP Co. Ltd. 
 * All right reserved. 
 */
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.extension.activerecord.Model;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import java.util.Date;
import org.springframework.format.annotation.DateTimeFormat;
import java.io.Serializable;

/**   
 * Copyright: Copyright (c) 2019 
 * 
 * <p>说明: 用户实体类</P>
 * @version: V1.0
 * @author: BianPeng
 * 
 * Modification History:
 * Date          Author          Version          Description
 *---------------------------------------------------------------*
 * 2019年4月9日      BianPeng    V1.0         initialize
 */
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class User extends Model<User> {

    private static final long serialVersionUID = 1L;
    @TableId(value = "id", type = IdType.AUTO)
    @ApiModelProperty(name = "id" , value = "用户ID")
    private Long id;

    @ApiModelProperty(name = "loginName" , value = "登录账户")
    private String loginName;

    @ApiModelProperty(name = "password" , value = "登录密码")
    private String password;

    @ApiModelProperty(name = "nickname" , value = "用户昵称")
    private String nickname;

    @ApiModelProperty(name = "type" , value = "用户类型")
    private Integer type;

    @ApiModelProperty(name = "state" , value = "用户状态")
    private Integer state;

    @ApiModelProperty(name = "note" , value = "备注")
    private String note;

    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
    @ApiModelProperty(name = "createTime" , value = "用户创建时间")
    private Date createTime;

    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
    @ApiModelProperty(name = "updateTime" , value = "修改时间")
    private Date updateTime;
 
    @ApiModelProperty(name = "updateUid" , value = "修改人用户ID")
    private Long updateUid;

    @ApiModelProperty(name = "loginIp" , value = "登录IP")
    private String loginIp;

    @ApiModelProperty(name = "loginIp" , value = "登录地址")
    private String loginAddr;
 
    @Override
    protected Serializable pkVal() {
        return this.id;
    }
}

DAO

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import com.xin.usercenter.entity.User;

/**   
 * Copyright: Copyright (c) 2019 
 * 
 * <p>说明: 用户数据访问层</P>
 * @version: V1.0
 * @author: BianPeng
 * 
 * Modification History:
 * Date          Author          Version          Description
 *---------------------------------------------------------------*
 * 2019年4月9日      BianPeng         V1.0            initialize
 */
@Mapper
public interface UserDao extends BaseMapper<User> {
 
}

生成的XML

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.xin.usercenter.dao.UserDao">

 <resultMap id="BaseResultMap" type="com.xin.usercenter.entity.User">
  <id column="id" property="id" />
  <id column="login_name" property="loginName" />
  <id column="password" property="password" />
  <id column="nickname" property="nickname" />
  <id column="type" property="type" />
  <id column="state" property="state" />
  <id column="note" property="note" />
  <id column="create_time" property="createTime" />
  <id column="update_time" property="updateTime" />
  <id column="update_uid" property="updateUid" />
  <id column="login_ip" property="loginIp" />
  <id column="login_addr" property="loginAddr" />
 </resultMap>
 <sql id="Base_Column_List">
  id, login_name, password, nickname, type, state, note, create_time, update_time, update_uid, login_ip, login_addr
 </sql>
</mapper>

生成的SERVICE

import com.xin.usercenter.entity.User;
import com.baomidou.mybatisplus.extension.service.IService;
/**   
 * Copyright: Copyright (c) 2019 
 * 
 * <p>说明: 用户服务层</P>
 * @version: V1.0
 * @author: BianPeng
 * 
 * Modification History:
 * Date          Author          Version        Description
 *------------------------------------------------------------*
 * 2019年4月9日      BianPeng        V1.0           initialize
 */
public interface UserService extends IService<User> {
 
}

生成的SERVICE_IMPL

import com.xin.usercenter.entity.User;
import com.xin.usercenter.dao.UserDao;
import com.xin.usercenter.service.UserService;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;

/**   
 * Copyright: Copyright (c) 2019 
 * 
 * <p>说明: 用户服务实现层</P>
 * @version: V1.0
 * @author: BianPeng
 * 
 * Modification History:
 * Date          Author          Version        Description
 *------------------------------------------------------------*
 * 2019年4月9日      BianPeng        V1.0           initialize
 */
@Service
public class UserServiceImpl  extends ServiceImpl<UserDao, User> implements UserService  {
 
}

生成的CONTROLLER

import com.item.util.JsonResult;
import com.xin.usercenter.entity.User;
import com.xin.usercenter.service.UserService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/**   
 * Copyright: Copyright (c) 2019 
 * 
 * <p>说明: 用户API接口层</P>
 * @version: V1.0
 * @author: BianPeng
 * 
 * Modification History:
 * Date          Author          Version          Description
 *---------------------------------------------------------------*
 * 2019年4月9日      BianPeng    V1.0           initialize
 */
@Api(description = "用户",value="用户" )
@RestController
@RequestMapping("/user")
public class UserController {

    Logger logger = LoggerFactory.getLogger(this.getClass());
 
    @Autowired
    public UserService userServiceImpl;
 
    /**
    * @explain 查询用户对象  <swagger GET请求>
    * @param   对象参数:id
    * @return  user
    * @author  BianPeng
    * @time    2019年4月9日
    */
    @GetMapping("/getUserById/{id}")
    @ApiOperation(value = "获取用户信息", notes = "获取用户信息[user],作者:BianPeng")
    @ApiImplicitParam(paramType="path", name = "id", value = "用户id", required = true, dataType = "Long")
    public JsonResult<User> getUserById(@PathVariable("id")Long id){
     JsonResult<User> result=new JsonResult<User>();
     try {
      User user=userServiceImpl.getById(id);
      if (user!=null) {
       result.setType("success");
       result.setMessage("成功");
       result.setData(user);
      } else {
       logger.error("获取用户失败ID:"+id);
       result.setType("fail");
       result.setMessage("你获取的用户不存在");
      }
     } catch (Exception e) {
      logger.error("获取用户执行异常:"+e.getMessage());
      result=new JsonResult<User>(e);
     }
     return result;
    }
    /**
     * @explain 添加或者更新用户对象
     * @param   对象参数:user
     * @return  int
     * @author  BianPeng
     * @time    2019年4月9日
     */
    @PostMapping("/insertSelective")
    @ApiOperation(value = "添加用户", notes = "添加用户[user],作者:BianPeng")
    public JsonResult<User> insertSelective(User user){
     JsonResult<User> result=new JsonResult<User>();
     try {
      boolean rg=userServiceImpl.saveOrUpdate(user);
      if (rg) {
       result.setType("success");
       result.setMessage("成功");
       result.setData(user);
      } else {
       logger.error("添加用户执行失败:"+user.toString());
       result.setType("fail");
       result.setMessage("执行失败,请稍后重试");
      }
     } catch (Exception e) {
      logger.error("添加用户执行异常:"+e.getMessage());
      result=new JsonResult<User>(e);
     }
       return result;
    }
 
    /**
     * @explain 删除用户对象
     * @param   对象参数:id
     * @return  int
     * @author  BianPeng
     * @time    2019年4月9日
     */
    @PostMapping("/deleteByPrimaryKey")
    @ApiOperation(value = "删除用户", notes = "删除用户,作者:BianPeng")
    @ApiImplicitParam(paramType="query", name = "id", value = "用户id", required = true, dataType = "Long")
    public JsonResult<Object> deleteByPrimaryKey(Long id){
     JsonResult<Object> result=new JsonResult<Object>();
     try {
      boolean reg=userServiceImpl.removeById(id);
      if (reg) {
       result.setType("success");
       result.setMessage("成功");
       result.setData(id);
      } else {
       logger.error("删除用户失败ID:"+id);
       result.setType("fail");
       result.setMessage("执行错误,请稍后重试");
      }
     } catch (Exception e) {
      logger.error("删除用户执行异常:"+e.getMessage());
      result=new JsonResult<Object>(e);
     }
     return result;
 }
 
 /**
  * @explain 分页条件查询用户   
  * @param   对象参数:AppPage<User>
  * @return  PageInfo<User>
  * @author  BianPeng
  * @time    2019年4月9日
  */
 @GetMapping("/getUserPages")
 @ApiOperation(value = "分页查询", notes = "分页查询返回对象[IPage<User>],作者:边鹏")
 @ApiImplicitParams({
        @ApiImplicitParam(paramType="query", name = "pageNum", value = "当前页", required = true, dataType = "int"),
        @ApiImplicitParam(paramType="query", name = "pageSize", value = "页行数", required = true, dataType = "int")
    })
 public JsonResult<Object> getUserPages(Integer pageNum,Integer pageSize){
 
  JsonResult<Object> result=new JsonResult<Object>();
  Page<User> page=new Page<User>(pageNum,pageSize);
  QueryWrapper<User> queryWrapper =new QueryWrapper<User>();
  //分页数据
  try {
   //List<User> list=userServiceImpl.list(queryWrapper); 
   IPage<User> pageInfo=userServiceImpl.page(page, queryWrapper);
   result.setType("success");
   result.setMessage("成功");
   result.setData(pageInfo);
  } catch (Exception e) {
   logger.error("分页查询用户执行异常:"+e.getMessage());
   result=new JsonResult<Object>(e);
  }
  return result;
 }
}

生成完毕,控制器中的JsonResult

import java.io.Serializable;
import java.net.ConnectException;
import java.sql.SQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**   
 * Copyright: Copyright (c) 2019 
 * 
 * <p>说明: 用户服务层</P>
 * @version: V1.0
 * @author: BianPeng
 * 
 * Modification History:
 * Date         Author         Version         Description
 *---------------------------------------------------------*
 * 2019/4/9  flying-cattle  V1.0            initialize
 */
public class JsonResult<T> implements Serializable{
 
 Logger logger = LoggerFactory.getLogger(this.getClass());
 private static final long serialVersionUID = 1071681926787951549L;

 /**
     * <p>返回状态</p>
     */
    private Boolean isTrue=true;
    /**
     *<p> 状态码</p>
     */
    private String code;
    /**
     * <p>业务码</p>
     */
    private String type;
    /**
     *<p> 状态说明</p>
     */
    private String message;
    /**
     * <p>返回数据</p>
     */
    private T data;
    public Boolean getTrue() {
        return isTrue;
    }
    public void setTrue(Boolean aTrue) {
        isTrue = aTrue;
    }
    public String getCode() {
        return code;
    }
    public void setCode(String code) {
        this.code = code;
    }
    public String getMessage() {
        return message;
    }
    public void setMessage(String message) {
        this.message = message;
    }
    public T getData() {
        return data;
    }
    public void setData(T data) {
        this.data = data;
    }
    public String getType() {
        return type;
    }
    public void setType(String type) {
        this.type = type;
    }
    /**
     * <p>返回成功</p>
     * @param type 业务码
     * @param message 错误说明
     * @param data 数据
     */
    public JsonResult(String type, String message, T data) {
        this.isTrue=true;
        this.code ="0000";
        this.type=type;
        this.message = message;
        this.data=data;
    }
    public JsonResult() {
        this.isTrue=true;
        this.code ="0000";
    }
    public JsonResult(Throwable throwable) {
     logger.error(throwable+"tt");
        this.isTrue=false;
        if(throwable instanceof NullPointerException){
            this.code= "1001";
            this.message="空指针:"+throwable;
        }else if(throwable instanceof ClassCastException ){
            this.code= "1002";
            this.message="类型强制转换异常:"+throwable;
        }else if(throwable instanceof ConnectException){
            this.code= "1003";
            this.message="链接失败:"+throwable;
        }else if(throwable instanceof IllegalArgumentException ){
            this.code= "1004";
            this.message="传递非法参数异常:"+throwable;
        }else if(throwable instanceof NumberFormatException){
            this.code= "1005";
            this.message="数字格式异常:"+throwable;
        }else if(throwable instanceof IndexOutOfBoundsException){
            this.code= "1006";
            this.message="下标越界异常:"+throwable;
        }else if(throwable instanceof SecurityException){
            this.code= "1007";
            this.message="安全异常:"+throwable;
        }else if(throwable instanceof SQLException){
            this.code= "1008";
            this.message="数据库异常:"+throwable;
        }else if(throwable instanceof ArithmeticException){
            this.code= "1009";
            this.message="算术运算异常:"+throwable;
        }else if(throwable instanceof RuntimeException){
            this.code= "1010";
            this.message="运行时异常:"+throwable;
        }else if(throwable instanceof Exception){ 
         logger.error("未知异常:"+throwable);
            this.code= "9999";
            this.message="未知异常"+throwable;
        }
    }
}

如果你生成的分页的方法不能分页:根据官方提升,记得在启动类中加入

@Bean
public PaginationInterceptor paginationInterceptor() {
    return new PaginationInterceptor();
}

 

并行化-你的高并发大杀器

mumupudding阅读(12)

1.前言

想必热爱游戏的同学小时候,都幻想过要是自己要是能像鸣人那样会多重影分身之术,就能一边打游戏一边上课了,可惜漫画就是漫画,现实中并没有这个技术,你要么只有老老实实的上课,要么就只有逃课去打游戏了。虽然在现实中我们无法实现多重影分身这样的技术,但是我们可以在计算机世界中实现我们这样的愿望。

2.计算机中的分身术

计算机中的分身术不是天生就有了。在1971年,1971年,英特尔推出的全球第一颗通用型微处理器4004,由2300个晶体管构成。当时,公司的联合创始人之一戈登摩尔就提出大名鼎鼎的“摩尔定律”——每过18个月,芯片上可以集成的晶体管数目将增加一倍。最初的主频740kHz(每秒运行74万次),现在过了快50年了,大家去买电脑的时候会发现现在的主频都能达到4.0GHZ了(每秒40亿次)。但是主频越高带来的收益却是越来越小:

  • 据测算,主频每增加1G,功耗将上升25瓦,而在芯片功耗超过150瓦后,现有的风冷散热系统将无法满足散热的需要。有部分CPU都可以用来煎鸡蛋了。
  • 流水线过长,使得单位频率效能低下,越大的主频其实整体性能反而不如小的主频。
  • 戈登摩尔认为摩尔定律未来10-20年会失效。

在单核主频遇到瓶颈的情况下,多核CPU应运而生,不仅提升了性能,并且降低了功耗。所以多核CPU逐渐成为现在市场的主流,这样让我们的多线程编程也更加的容易。

说到了多核CPU就一定要说GPU,大家可能对这个比较陌生,但是一说到显卡就肯定不陌生,笔者搞过一段时间的CUDA编程,我才意识到这个才是真正的并行计算,大家都知道图片像素点吧,比如19201080的图片有210万个像素点,如果想要把一张图片的每个像素点都进行转换一下,那在我们java里面可能就要循环遍历210万次。就算我们用多线程8核CPU,那也得循环几十万次。但是如果使用Cuda,最多可以365535*512=100661760(一亿)个线程并行执行,就这种级别的图片那也是马上处理完成。但是Cuda一般适合于图片这种,有大量的像素点需要同时处理,但是其支持指令不多所以逻辑不能太复杂。GPU只是用来扩展介绍,感兴趣可以和笔者交流。

3.应用中的并行

一说起让你的服务高性能的手段,那么异步化,并行化这些肯定会第一时间在你脑海中显现出来,在之前的文章:《异步化,你的高并发大杀器》中已经介绍过了异步化的优化手段,有兴趣的朋友可以看看。并行化可以用来配合异步化,也可以用来单独做优化。

我们可以想想有这么一个需求,在你下外卖订单的时候,这笔订单可能还需要查,用户信息,折扣信息,商家信息,菜品信息等,用同步的方式调用,如下图所示:

设想一下这5个查询服务,平均每次消耗50ms,那么本次调用至少是250ms,我们细想一下,在这个这五个服务其实并没有任何的依赖,谁先获取谁后获取都可以,那么我们可以想想,是否可以用多重影分身之术,同时获取这五个服务的信息呢?优化如下:

将这五个查询服务并行查询,在理想情况下可以优化至50ms。当然说起来简单,我们真正如何落地呢?

3.1 CountDownLatch/Phaser

CountDownLatch和Phaser是JDK提供的同步工具类Phaser是1.7版本之后提供的工具类而CountDownLatch是1.5版本之后提供的工具类。这里简单介绍一下CountDownLatch,可以将其看成是一个计数器,await()方法可以阻塞至超时或者计数器减至0,其他线程当完成自己目标的时候可以减少1,利用这个机制我们可以将其用来做并发。可以用如下的代码实现我们上面的下订单的需求:

public class CountDownTask {    private static final int CORE_POOL_SIZE = 4;    private static final int MAX_POOL_SIZE = 12;    private static final long KEEP_ALIVE_TIME = 5L;    private final static int QUEUE_SIZE = 1600;    protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE,            KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE));    public static void main(String[] args) throws InterruptedException {        // 新建一个为5的计数器        CountDownLatch countDownLatch = new CountDownLatch(5);        OrderInfo orderInfo = new OrderInfo();        THREAD_POOL.execute(() -> {            System.out.println("当前任务Customer,线程名字为:" + Thread.currentThread().getName());            orderInfo.setCustomerInfo(new CustomerInfo());            countDownLatch.countDown();        });        THREAD_POOL.execute(() -> {            System.out.println("当前任务Discount,线程名字为:" + Thread.currentThread().getName());            orderInfo.setDiscountInfo(new DiscountInfo());            countDownLatch.countDown();        });        THREAD_POOL.execute(() -> {            System.out.println("当前任务Food,线程名字为:" + Thread.currentThread().getName());            orderInfo.setFoodListInfo(new FoodListInfo());            countDownLatch.countDown();        });        THREAD_POOL.execute(() -> {            System.out.println("当前任务Tenant,线程名字为:" + Thread.currentThread().getName());            orderInfo.setTenantInfo(new TenantInfo());            countDownLatch.countDown();        });        THREAD_POOL.execute(() -> {            System.out.println("当前任务OtherInfo,线程名字为:" + Thread.currentThread().getName());            orderInfo.setOtherInfo(new OtherInfo());            countDownLatch.countDown();        });        countDownLatch.await(1, TimeUnit.SECONDS);        System.out.println("主线程:"+ Thread.currentThread().getName());    }}

建立一个线程池(具体配置根据具体业务,具体机器配置),进行并发的执行我们的任务(生成用户信息,菜品信息等),最后利用await方法阻塞等待结果成功返回。

3.2CompletableFuture

相信各位同学已经发现,CountDownLatch虽然能实现我们需要满足的功能但是其任然有个问题是,在我们的业务代码需要耦合CountDownLatch的代码,比如在我们获取用户信息之后我们会执行countDownLatch.countDown(),很明显我们的业务代码显然不应该关心这一部分逻辑,并且在开发的过程中万一写漏了,那我们的await方法将只会被各种异常唤醒。

所以在JDK1.8中提供了一个类CompletableFuture,它是一个多功能的非阻塞的Future。(什么是Future:用来代表异步结果,并且提供了检查计算完成,等待完成,检索结果完成等方法。)在我之前的这篇文章中详细介绍了《异步技巧之CompletableFuture》,有兴趣的可以看这篇文章。我们将每个任务的计算完成的结果都用CompletableFuture来表示,利用CompletableFuture.allOf汇聚成一个大的CompletableFuture,那么利用get()方法就可以阻塞。

public class CompletableFutureParallel {    private static final int CORE_POOL_SIZE = 4;    private static final int MAX_POOL_SIZE = 12;    private static final long KEEP_ALIVE_TIME = 5L;    private final static int QUEUE_SIZE = 1600;    protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE,            KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE));    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {        OrderInfo orderInfo = new OrderInfo();        //CompletableFuture 的List        List<CompletableFuture> futures = new ArrayList<>();        futures.add(CompletableFuture.runAsync(() -> {            System.out.println("当前任务Customer,线程名字为:" + Thread.currentThread().getName());            orderInfo.setCustomerInfo(new CustomerInfo());        }, THREAD_POOL));        futures.add(CompletableFuture.runAsync(() -> {            System.out.println("当前任务Discount,线程名字为:" + Thread.currentThread().getName());            orderInfo.setDiscountInfo(new DiscountInfo());        }, THREAD_POOL));        futures.add( CompletableFuture.runAsync(() -> {            System.out.println("当前任务Food,线程名字为:" + Thread.currentThread().getName());            orderInfo.setFoodListInfo(new FoodListInfo());        }, THREAD_POOL));        futures.add(CompletableFuture.runAsync(() -> {            System.out.println("当前任务Other,线程名字为:" + Thread.currentThread().getName());            orderInfo.setOtherInfo(new OtherInfo());        }, THREAD_POOL));        CompletableFuture allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));        allDoneFuture.get(10, TimeUnit.SECONDS);        System.out.println(orderInfo);    }}

可以看见我们使用CompletableFuture能很快的完成的需求,当然这还不够。

3.3 Fork/Join

我们上面用CompletableFuture完成了我们对多组任务并行执行,但是其依然是依赖我们的线程池,在我们的线程池中使用的是阻塞队列,也就是当我们某个线程执行完任务的时候需要通过这个阻塞队列进行,那么肯定会发生竞争,所以在JDK1.7中提供了ForkJoinTask和ForkJoinPool。

ForkJoinPool中每个线程都有自己的工作队列,并且采用Work-Steal算法防止线程饥饿。 Worker线程用LIFO的方法取出任务,但是会用FIFO的方法去偷取别人队列的任务,这样就减少了锁的冲突。

网上这个框架的例子很多,我们看看如何使用代码其完成我们上面的下订单需求:

public class OrderTask extends RecursiveTask<OrderInfo> {    @Override    protected OrderInfo compute() {        System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());        // 定义其他五种并行TasK        CustomerTask customerTask = new CustomerTask();        TenantTask tenantTask = new TenantTask();        DiscountTask discountTask = new DiscountTask();        FoodTask foodTask = new FoodTask();        OtherTask otherTask = new OtherTask();        invokeAll(customerTask, tenantTask, discountTask, foodTask, otherTask);        OrderInfo orderInfo = new OrderInfo(customerTask.join(), tenantTask.join(), discountTask.join(), foodTask.join(), otherTask.join());        return orderInfo;    }    public static void main(String[] args) {        ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() -1 );        System.out.println(forkJoinPool.invoke(new OrderTask()));    }}class CustomerTask extends RecursiveTask<CustomerInfo>{    @Override    protected CustomerInfo compute() {        System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());        return new CustomerInfo();    }}class TenantTask extends RecursiveTask<TenantInfo>{    @Override    protected TenantInfo compute() {        System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());        return new TenantInfo();    }}class DiscountTask extends RecursiveTask<DiscountInfo>{    @Override    protected DiscountInfo compute() {        System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());        return new DiscountInfo();    }}class FoodTask extends RecursiveTask<FoodListInfo>{    @Override    protected FoodListInfo compute() {        System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());        return new FoodListInfo();    }}class OtherTask extends RecursiveTask<OtherInfo>{    @Override    protected OtherInfo compute() {        System.out.println("执行"+ this.getClass().getSimpleName() + "线程名字为:" + Thread.currentThread().getName());        return new OtherInfo();    }}

我们定义一个OrderTask并且定义五个获取信息的任务,在compute中分别fork执行这五个任务,最后在将这五个任务的结果通过Join获得,最后完成我们的并行化的需求。

3.4 parallelStream

在jdk1.8中提供了并行流的API,当我们使用集合的时候能很好的进行并行处理,下面举了一个简单的例子从1加到100:

public class ParallelStream {    public static void main(String[] args) {        ArrayList<Integer> list = new ArrayList<Integer>();        for (int i = 1; i <= 100; i++) {            list.add(i);        }        LongAdder sum = new LongAdder();        list.parallelStream().forEach(integer -> {//            System.out.println("当前线程" + Thread.currentThread().getName());            sum.add(integer);        });        System.out.println(sum);    }}

parallelStream中底层使用的那一套也是Fork/Join的那一套,默认的并发程度是可用CPU数-1。

3.5 分片

可以想象有这么一个需求,每天定时对id在某个范围之间的用户发券,比如这个范围之间的用户有几百万,如果给一台机器发的话,可能全部发完需要很久的时间,所以分布式调度框架比如:elastic-job。都提供了分片的功能,比如你用50台机器,那么id%50=0的在第0台机器上,=1的在第1台机器上发券,那么我们的执行时间其实就分摊到了不同的机器上了。

4.并行化注意事项

  • 线程安全:在parallelStream中我们列举的代码中使用的是LongAdder,并没有直接使用我们的Integer和Long,这个是因为在多线程环境下Integer和Long线程不安全。所以线程安全我们需要特别注意。
  • 合理参数配置:可以看见我们需要配置的参数比较多,比如我们的线程池的大小,等待队列大小,并行度大小以及我们的等待超时时间等等,我们都需要根据自己的业务不断的调优防止出现队列不够用或者超时时间不合理等等。

5.最后

本文介绍了什么是并行化,并行化的各种历史,在Java中如何实现并行化,以及并行化的注意事项。希望大家对并行化有个比较全面的认识。最后给大家提个两个小问题:

  1. 在我们并行化当中有某个任务如果某个任务出现了异常应该怎么办?
  2. 在我们并行化当中有某个任务的信息并不是强依赖,也就是如果出现了问题这部分信息我们也可以不需要,当并行化的时候,这种任务出现了异常应该怎么办?

最后这篇文章被我收录于JGrowing,一个全面,优秀,由社区一起共建的Java学习路线,如果您想参与开源项目的维护,可以一起共建,github地址为:https://github.com/javagrowing/JGrowing麻烦给个小星星哟。

如果你觉得这篇文章对你有文章,可以关注我的技术公众号,你的关注和转发是对我最大的支持,O(∩_∩)O

支撑百万并发的数据库架构如何设计?

mumupudding阅读(11)


前言

        作为一个全球人数最多的国家,一个再怎么凄惨的行业,都能找出很多的人为之付出。而在这个互联网的时代,IT公司绝对比牛毛还多很多。但是大多数都是创业公司,长期存活的真的不多。大多数的IT项目在注册量从0-100万,日活跃1-5万,说实话就这种系统随便找一个有几年工作经验的高级工程师,然后带几个年轻工程师,随便干干都可以做出来。
        因为这样的系统,实际上主要就是在前期快速的进行业务功能的开发,搞一个单块系统部署在一台服务器上,然后连接一个数据库就可以了。接着大家就是不停的在一个工程里填充进去各种业务代码,尽快把公司的业务支撑起来。

        但是如果真的发展的还可以,可能就会遇到如下问题:
        在运行的过程中系统访问数据库的性能越来越差,单表数据量越来越大,一些复杂查询 SQL直接拖垮!
        这种时候就不得不考虑的解决方案:缓存,负载均衡,项目分块(微服务);数据库:读写分离,分库分表等技术

如果说此时你还是一台数据库服务器在支撑每秒上万的请求,负责任的告诉你,每次高峰期会出现下述问题:

  • 数据库服务器的磁盘 IO、网络带宽、CPU 负载、内存消耗,都会达到非常高的情况,数据库所在服务器的整体负载会非常重,甚至都快不堪重负了。
  • 高峰期时,本来你单表数据量就很大,SQL 性能就不太好,这时加上你的数据库服务器负载太高导致性能下降,就会发现你的 SQL 性能更差了。
  • 最明显的一个感觉,就是你的系统在高峰期各个功能都运行的很慢,用户体验很差,点一个按钮可能要几十秒才出来结果。
  • 如果你运气不太好,数据库服务器的配置不是特别的高的话,弄不好你还会经历数据库宕机的情况,因为负载太高对数据库压力太大了。

其实大多数公司的瓶颈都在数据库,其实如果把上面的解决方案,都实现了,基本上就没的什么问题了,举例
        如果订单一年有 1 亿条数据,可以把订单表一共拆分为 1024 张表,分散在5个库中,这样 1 亿数据量的话,分散到每个表里也就才 10 万量级的数据量,然后这上千张表分散在 5 台数据库里就可以了。
        在写入数据的时候,需要做两次路由,先对订单 id hash 后对数据库的数量取模,可以路由到一台数据库上,然后再对那台数据库上的表数量取模,就可以路由到数据库上的一个表里了。
        通过这个步骤,就可以让每个表里的数据量非常小,每年 1 亿数据增长,但是到每个表里才 10 万条数据增长,这个系统运行 10 年,每个表里可能才百万级的数据量。

全局唯一ID

在分库分表之后你必然要面对的一个问题,就是 id 咋生成?因为要是一个表分成多个表之后,每个表的 id 都是从 1 开始累加自增长,那肯定不对啊。

举个例子,你的订单表拆分为了 1024 张订单表,每个表的 id 都从 1 开始累加,这个肯定有问题了!

你的系统就没办法根据表主键来查询订单了,比如 id = 50 这个订单,在每个表里都有!

所以此时就需要分布式架构下的全局唯一 id 生成的方案了,在分库分表之后,对于插入数据库中的核心 id,不能直接简单使用表自增 id,要全局生成唯一 id,然后插入各个表中,保证每个表内的某个 id,全局唯一。

比如说订单表虽然拆分为了 1024 张表,但是 id = 50 这个订单,只会存在于一个表里。

那么如何实现全局唯一 id 呢?有以下几种方案:

方案一:独立数据库自增 id

这个方案就是说你的系统每次要生成一个 id,都是往一个独立库的一个独立表里插入一条没什么业务含义的数据,然后获取一个数据库自增的一个 id。拿到这个 id 之后再往对应的分库分表里去写入。

比如说你有一个 auto_id 库,里面就一个表,叫做 auto_id 表,有一个 id 是自增长的。

那么你每次要获取一个全局唯一 id,直接往这个表里插入一条记录,获取一个全局唯一 id 即可,然后这个全局唯一 id 就可以插入订单的分库分表中。

这个方案的好处就是方便简单,谁都会用。缺点就是单库生成自增 id,要是高并发的话,就会有瓶颈的,因为 auto_id 库要是承载个每秒几万并发,肯定是不现实的了。

方案二:UUID

这个每个人都应该知道吧,就是用 UUID 生成一个全局唯一的 id。

好处就是每个系统本地生成,不要基于数据库来了。不好之处就是,UUID 太长了,作为主键性能太差了,不适合用于主键。

如果你是要随机生成个什么文件名了,编号之类的,你可以用 UUID,但是作为主键是不能用 UUID 的。

方案三:获取系统当前时间

这个方案的意思就是获取当前时间作为全局唯一的 id。但是问题是,并发很高的时候,比如一秒并发几千,会有重复的情况,这个肯定是不合适的。

一般如果用这个方案,是将当前时间跟很多其他的业务字段拼接起来,作为一个 id,如果业务上你觉得可以接受,那么也是可以的。

你可以将别的业务字段值跟当前时间拼接起来,组成一个全局唯一的编号,比如说订单编号:时间戳 + 用户 id + 业务含义编码。

方案四:SnowFlake 算法的思想分析

SnowFlake 算法,是 Twitter 开源的分布式 id 生成算法。其核心思想就是:使用一个 64 bit 的 long 型的数字作为全局唯一 id。这 64 个 bit 中,其中 1 个 bit 是不用的,然后用其中的 41 bit 作为毫秒数,用 10 bit 作为工作机器 id,12 bit 作为序列号。

给大家举个例子吧,比如下面那个 64 bit 的 long 型数字:

  • 第一个部分,是 1 个 bit:0,这个是无意义的。

  • 第二个部分是 41 个 bit:表示的是时间戳。

  • 第三个部分是 5 个 bit:表示的是机房 id,10001。

  • 第四个部分是 5 个 bit:表示的是机器 id,1 1001。

  • 第五个部分是 12 个 bit:表示的序号,就是某个机房某台机器上这一毫秒内同时生成的 id 的序号,0000 00000000。

①1 bit:是不用的,为啥呢?

        因为二进制里第一个 bit 为如果是 1,那么都是负数,但是我们生成的 id 都是正数,所以第一个 bit 统一都是 0。

②41 bit:表示的是时间戳,单位是毫秒。

        41 bit 可以表示的数字多达 2^41 – 1,也就是可以标识 2 ^ 41 – 1 个毫秒值,换算成年就是表示 69 年的时间。

③10 bit:记录工作机器 id,代表的是这个服务最多可以部署在 2^10 台机器上,也就是 1024 台机器。

        但是 10 bit 里 5 个 bit 代表机房 id,5 个 bit 代表机器 id。意思就是最多代表 2 ^ 5 个机房(32 个机房),每个机房里可以代表 2 ^ 5 个机器(32 台机器)。

④12 bit:这个是用来记录同一个毫秒内产生的不同 id。

        12 bit 可以代表的最大正整数是 2 ^ 12 – 1 = 4096,也就是说可以用这个 12 bit 代表的数字来区分同一个毫秒内的 4096 个不同的 id。简单来说,你的某个服务假设要生成一个全局唯一 id,那么就可以发送一个请求给部署了 SnowFlake 算法的系统,由这个 SnowFlake 算法系统来生成唯一 id。

        这个 SnowFlake 算法系统首先肯定是知道自己所在的机房和机器的,比如机房 id = 17,机器 id = 12。

        接着 SnowFlake 算法系统接收到这个请求之后,首先就会用二进制位运算的方式生成一个 64 bit 的 long 型 id,64 个 bit 中的第一个 bit 是无意义的。

        接着 41 个 bit,就可以用当前时间戳(单位到毫秒),然后接着 5 个 bit 设置上这个机房 id,还有 5 个 bit 设置上机器 id。

        最后再判断一下,当前这台机房的这台机器上这一毫秒内,这是第几个请求,给这次生成 id 的请求累加一个序号,作为最后的 12 个 bit。

        最终一个 64 个 bit 的 id 就出来了,类似于:

这个算法可以保证说,一个机房的一台机器上,在同一毫秒内,生成了一个唯一的 id。可能一个毫秒内会生成多个 id,但是有最后 12 个 bit 的序号来区分开来。

下面我们简单看看这个 SnowFlake 算法的一个代码实现,这就是个示例,大家如果理解了这个意思之后,以后可以自己尝试改造这个算法。

总之就是用一个 64 bit 的数字中各个 bit 位来设置不同的标志位,区分每一个 id。

SnowFlake 算法JAVA版(含测试方法):

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.ToString;

/**   
* Copyright: Copyright (c) 2019 
* 
* @ClassName: IdWorker.java
* @Description: <p>SnowFlake 算法,是 Twitter 开源的分布式 id 生成算法。
*      其核心思想就是:使用一个 64 bit 的 long 型的数字作为全局唯一 id。
*      这 64 个 bit 中,其中 1 个 bit 是不用的,然后用其中的 41 bit 作为毫秒数,
*      用 10 bit 作为工作机器 id,12 bit 作为序列号
*    </p>
* @version: v1.0.0
* @author: BianPeng
* @date: 2019年4月11日 下午3:13:41 
*
* Modification History:
* Date           Author          Version          Description
*---------------------------------------------------------------*
* 2019年4月11日        BianPeng        v1.0.0           initialize
*/
@ToString
public class SnowflakeIdFactory {
 
 static Logger log = LoggerFactory.getLogger(SnowflakeIdFactory.class);
 
    private final long twepoch = 1288834974657L;
    private final long workerIdBits = 5L;
    private final long datacenterIdBits = 5L;
    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
    private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
    private final long sequenceBits = 12L;
    private final long workerIdShift = sequenceBits;
    private final long datacenterIdShift = sequenceBits + workerIdBits;
    private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
    private final long sequenceMask = -1L ^ (-1L << sequenceBits);
 
    private long workerId;
    private long datacenterId;
    private long sequence = 0L;
    private long lastTimestamp = -1L;
 
 
 
    public SnowflakeIdFactory(long workerId, long datacenterId) {
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
        }
        if (datacenterId > maxDatacenterId || datacenterId < 0) {
            throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
        }
        this.workerId = workerId;
        this.datacenterId = datacenterId;
    }
 
    public synchronized long nextId() {
        long timestamp = timeGen();
        if (timestamp < lastTimestamp) {
            //服务器时钟被调整了,ID生成器停止服务.
            throw new RuntimeException(String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
        }
        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            if (sequence == 0) {
                timestamp = tilNextMillis(lastTimestamp);
            }
        } else {
            sequence = 0L;
        }
 
        lastTimestamp = timestamp;
        return ((timestamp - twepoch) << timestampLeftShift) | (datacenterId << datacenterIdShift) | (workerId << workerIdShift) | sequence;
    }
 
    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }
 
    protected long timeGen() {
        return System.currentTimeMillis();
    }
 
    public static void testProductIdByMoreThread(int dataCenterId, int workerId, int n) throws InterruptedException {
        List<Thread> tlist = new ArrayList<>();
        Set<Long> setAll = new HashSet<>();
        CountDownLatch cdLatch = new CountDownLatch(10);
        long start = System.currentTimeMillis();
        int threadNo = dataCenterId;
        Map<String,SnowflakeIdFactory> idFactories = new HashMap<>();
        for(int i=0;i<10;i++){
            //用线程名称做map key.
            idFactories.put("snowflake"+i,new SnowflakeIdFactory(workerId, threadNo++));
        }
        for(int i=0;i<10;i++){
            Thread temp =new Thread(new Runnable() {
                @Override
                public void run() {
                    Set<Long> setId = new HashSet<>();
                    SnowflakeIdFactory idWorker = idFactories.get(Thread.currentThread().getName());
                    for(int j=0;j<n;j++){
                        setId.add(idWorker.nextId());
                    }
                    synchronized (setAll){
                        setAll.addAll(setId);
                        log.info("{}生产了{}个id,并成功加入到setAll中.",Thread.currentThread().getName(),n);
                    }
                    cdLatch.countDown();
                }
            },"snowflake"+i);
            tlist.add(temp);
        }
        for(int j=0;j<10;j++){
            tlist.get(j).start();
        }
        cdLatch.await();
 
        long end1 = System.currentTimeMillis() - start;
 
        log.info("共耗时:{}毫秒,预期应该生产{}个id, 实际合并总计生成ID个数:{}",end1,10*n,setAll.size());
 
    }
 
    public static void testProductId(int dataCenterId, int workerId, int n){
        SnowflakeIdFactory idWorker = new SnowflakeIdFactory(workerId, dataCenterId);
        SnowflakeIdFactory idWorker2 = new SnowflakeIdFactory(workerId+1, dataCenterId);
        Set<Long> setOne = new HashSet<>();
        Set<Long> setTow = new HashSet<>();
        long start = System.currentTimeMillis();
        for (int i = 0; i < n; i++) {
            setOne.add(idWorker.nextId());//加入set
        }
        long end1 = System.currentTimeMillis() - start;
        log.info("第一批ID预计生成{}个,实际生成{}个<<<<*>>>>共耗时:{}",n,setOne.size(),end1);
 
        for (int i = 0; i < n; i++) {
            setTow.add(idWorker2.nextId());//加入set
        }
        long end2 = System.currentTimeMillis() - start;
        log.info("第二批ID预计生成{}个,实际生成{}个<<<<*>>>>共耗时:{}",n,setTow.size(),end2);
 
        setOne.addAll(setTow);
        log.info("合并总计生成ID个数:{}",setOne.size());
 
    }
 
    public static void testPerSecondProductIdNums(){
        SnowflakeIdFactory idWorker = new SnowflakeIdFactory(1, 2);
        long start = System.currentTimeMillis();
        int count = 0;
        for (int i = 0; System.currentTimeMillis()-start<1000; i++,count=i) {
            /**  测试方法一: 此用法纯粹的生产ID,每秒生产ID个数为400w+ */
         //idWorker.nextId();
            /**  测试方法二: 在log中打印,同时获取ID,此用法生产ID的能力受限于log.error()的吞吐能力.
             * 每秒徘徊在10万左右. */
         log.info(""+idWorker.nextId());
        }
        long end = System.currentTimeMillis()-start;
        System.out.println(end);
        System.out.println(count);
    }
 
    public static void main(String[] args) {
        /** case1: 测试每秒生产id个数?
         *   结论: 每秒生产id个数400w+ 
         */
        //testPerSecondProductIdNums();
 
        /** case2: 单线程-测试多个生产者同时生产N个id,验证id是否有重复?
         *   结论: 验证通过,没有重复. 
         */
        //testProductId(1,2,10000);//验证通过!
        //testProductId(1,2,20000);//验证通过!
 
        /** case3: 多线程-测试多个生产者同时生产N个id, 全部id在全局范围内是否会重复?
         *   结论: 验证通过,没有重复.
         */
        try {
            testProductIdByMoreThread(1,2,100000);//单机测试此场景,性能损失至少折半!
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
    }
}

这个算法也叫雪花算法我使用的类源码:https://gitee.com/flying-cattle/earn_knife/blob/master/item-common/src/main/java/com/item/util/SnowflakeIdWorker.java

项目是一个递进的过程,优先考虑缓存,其次读写分离,再分表分库。当然这只是个人想法,各位伙伴还是根据自己的项目和业务来综合考虑实行方案。

防火墙API,全新升级,支持iptables、ipset、pf

mumupudding阅读(16)

  在上一篇“使用redis来调用iptables,封禁恶意IP” 一文中已经讲解RedisPushIptables的用法,在经过几个版本迭代后,该模块功能更强大了,所以需要新开篇幅来讲解下。

   RedisPushIptables-6.1.tar.gz为最新版本,已经支持iptables、ipset、pf防火墙,意味着它已经跨平台支持,Linux、BSD、MacOS。最重要的是添加了动态删除防火墙规则的功能,当然这个功能ipset也有。虽然fail2ban也有此功能,但是极其消耗资源因为它是轮询来获取任务的。

 本篇主要介绍RedisPushIptables模块的实现原理、安装方法、API使用方法以及适用范围。并与Fail2ban作了对比以便读者了解二者的区别,RedisPushIptables不受编程语言的限制。意味着开发者都可以使用它来进行业务防护,接着讲解了怎样重新封装lib库从而支持API调用,最后给出了部分编程语言调用API的示例,供读者参阅。

简介

  RedisPushIptables是Redis的一个模块, 也可以把它理解为防火墙API调用库,该模块可以通过 redis 来操作 iptables 的 filter表INPUT链规则ACCEPT和DROP,而相对于BSD系统该模块则是对应pf防火墙。RedisPushIptables更新防火墙规则以在指定的时间内拒绝IP地址或永远拒绝。比如用来防御攻击。自此普通开发者也可以使用iptables或者PF,而不必再理会复杂的防火墙语法。

与Fail2Ban比较

  主要从两个方面,实现原理和实用性。Fail2Ban倾向于事后分析,需要监控日志,支持的应用也比较多,只需简单配置即可。而RedisPushIptables倾向于实效性,不需要监控日志,但是,需要程序编码时调用API,使用门槛较高,并不适用所有人。

 Fail2Ban

  Fail2Ban是一种入侵防御软件框架,可以保护计算机服务器免受暴力攻击。用Python编程语言编写,它能够在POSIX系统上运行,该系统具有本地安装的数据包控制系统或防火墙的接口,例如iptablesTCP Wrapper

  fail2ban通过监控操作日志文件(如/var/log/auth.log,/var/log/apache/access.log等)选中的条目并运行基于他们的脚本。最常用于阻止可能属于试图破坏系统安全性的主机的所选IP地址。它可以禁止在管理员定义的时间范围内进行过多登录尝试或执行任何其他不需要的操作的任何主机IP地址。包括对IPv4和IPv6的支持。可选择更长时间的禁令可以为不断回来的滥用者进行定制配置。Fail2Ban通常设置为在一定时间内取消阻止被阻止的主机,以便不“锁定”任何可能暂时错误配置的真正连接。但是,几分钟的unban时间通常足以阻止网络连接被恶意连接淹没,并降低字典攻击成功的可能性。

每当检测到滥用的IP地址时,Fail2Ban都可以执行多个操作:更新Netfilter / iptablesPF防火墙规则,TCP Wrapper的hosts.deny表,拒绝滥用者的IP地址; 邮件通知; 或者可以由Python脚本执行的任何用户定义的操作。

标准配置附带ApacheLighttpdsshdvsftpdqmailPostfixCourier Mail Server的过滤器。过滤器是被Python定义的正则表达式,其可以由熟悉正则表达式的管理员可以方便地定制。过滤器和操作的组合称为“jail”,是阻止恶意主机访问指定网络服务的原因。除了随软件一起分发的示例之外,还可以为任何创建访问日志文件的面向网络的进程创建“jail”。

Fail2Ban类似于DenyHosts […],但与专注于SSH的DenyHosts不同,Fail2Ban可以配置为监视将登录尝试写入日志文件的任何服务,而不是仅使用/etc/hosts.deny来阻止IP地址/ hosts,Fail2Ban可以使用Netfilter / iptables和TCP Wrappers /etc/hosts.deny。

 

缺点

  • Fail2Ban无法防范分布式暴力攻击。
  • 没有与特定于应用程序的API的交互。
  • 太过依赖正则表达式,不同的程序需要各自对应的正则。
  • 效率低下,性能受日志数量影响
  • IP列表很多时,内存消耗很高

 RedisPushIptables

  虽然与Fail2Ban比较起来,RedisPushIptables支持还不是很完善,但是,术业有专攻,它的优势在于高性能,用C语言实现,同样支持跨平台LinuxBSDMacOS。可以通过API调用,意味着redis官方支持的编程语言都可以使用,应用范围不受限。Fail2Ban是被动防御的需要根据关键字实时获取应用程序日志,匹配字符串再计算阈值达到就封禁IP地址。而RedisPushIptables业务主动调用,不需要分析日志。同样支持动态删除iptables或者PF规则,比fail2ban更省资源。

 

缺点

  • 需要开发者编码时调用API
  • 无法防范分布式暴力攻击。
  • 目前IPv6在我国还没普及所以不支持

安装

在安装RedisPushIptables之前,需要先安装redis。下面为版本redis-5.0.3.tar.gz

root@debian:~/bookscode# git clone https://github.com/limithit/RedisPushIptables.git
root@debian:~/bookscode# wget http://download.redis.io/releases/redis-5.0.3.tar.gz
root@debian:~/bookscode# tar zxvf redis-5.0.3.tar.gz
root@debian:~/bookscode#cd redis-5.0.3&& make   
root@debian:~/bookscode/redis-5.0.3# make test    
root@debian:~/bookscode/redis-5.0.3/deps/hiredis#make&& make install  
root@debian:~/bookscode/redis-5.0.3/src#cp redis-server redis-sentinel redis-cliredis-benchmark redis-check-rdb redis-check-aof /usr/local/bin/ 
root@debian:~/bookscode/redis-5.0.3/utils# ./install_server.sh    
root@debian:~/bookscode/redis-5.0.3# cd deps/hiredis
root@debian:~/bookscode/redis-5.0.3/deps/hiredis# make && make install
root@debian:~/bookscode/redis-5.0.3# echo /usr/local/lib >> /etc/ld.so.conf  
root@debian:~/bookscode/redis-5.0.3# ldconfig          
root@debian:~/bookscode# cd RedisPushIptables
root@debian:~/bookscode/ RedisPushIptables # make  && make install  

 

注意

编译时有三个选项 make、make CFLAGS=-DWITH_IPSET和make CFLGAS=-DBSD

默认是make选项,make CFLAGS=-DWITH_IPSET则是使用ipset更快地管理规则

make CFLGAS=-DBSD则是在BSD和MacOS系统上编译使用

 

可以使用以下redis.conf配置指令加载模块:

loadmodule /path/to/iptablespush.so

也可以使用以下命令在运行时加载模块:

MODULE LOAD /path/to/iptablespush.so

可以使用以下命令卸载模块:

MODULE unload iptables-input-filter

动态删除配置

默认情况下,禁用键空间事件通知,虽然不太明智,但该功能会使用一些CPU。使用redis.confnotify-keyspace-eventsCONFIG SET启用通知。将参数设置为空字符串会禁用通知。为了启用该功能,使用了一个非空字符串,由多个字符组成,其中每个字符都具有特殊含义,如下所示:

K     Keyspace events, published with __keyspace@<db>__ prefix.
E     Keyevent events, published with __keyevent@<db>__ prefix.
g     Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
$     String commands
l     List commands
s     Set commands
h     Hash commands
z     Sorted set commands
x     Expired events (events generated every time a key expires)
e     Evicted events (events generated when a key is evicted for maxmemory)
A     Alias for g$lshzxe, so that the "AKE" string means all the events.

字符串中至少应存在KE,否则无论字符串的其余部分如何都不会传递任何事件。例如,只为列表启用键空间事件,配置参数必须设置为Kl,依此类推。字符串KEA可用于启用每个可能的事件。

# redis-cli config set notify-keyspace-events Ex

也可以使用以下redis.conf配置指令加载模块:

notify-keyspace-events Ex
#notify-keyspace-events ""  #注释掉这行

使用root用户运行ttl_iptables守护程序

root@debian:~/RedisPushIptables# /etc/init.d/ttl_iptables start

日志在/var/log/ttl_iptables.log中查看

root@debian:~# redis-cli TTL_DROP_INSERT 192.168.18.5 60  
(integer) 12
root@debian:~# date
Fri Mar 15 09:38:49 CST 2019                                 
root@debian:~/RedisPushIptables# tail -f /var/log/ttl_iptables.log 
2019/03/15 09:39:48 pid=5670 iptables -D INPUT -s 192.168.18.5 -j DROP

指令

  RedisPushIptables目前有五个指令,管理filter表中的INPUT链。为了保证规则生效,采用插入规则而不是按序添加规则,这么做的原因是,因为iptables是按顺序执行的。此外加入了自动去重功能(ipset和pfctl自带去重)。使用者不必担心会出现重复的规则,只需要添加即可。

accept_insert

等同iptables -I INPUT -s x.x.x.x -j ACCEPT

  • accept_delete

等同iptables -D INPUT -s x.x.x.x -j ACCEPT

  • drop_insert

等同iptables -I INPUT -s x.x.x.x -j DROP

  • drop_delete

等同iptables -D INPUT -s x.x.x.x -j DROP

  • ttl_drop_insert

例ttl_drop_insert 192.168.18.5 60

等同iptables -I INPUT -s x.x.x.x -j DROP 60秒后ttl_iptables守护进程自动删除iptables -D INPUT -s x.x.x.x -j DROP

客户端API示例

  理论上除了C语言原生支持API调用,其他语言API调用前对应的库都要重新封装,因为第三方模块并不被其他语言支持。这里只示范C、Python、Bash、Lua其他编程语言同理。

 C编程

  C只需要编译安装hiredis即可。步骤如下:

root@debian:~/bookscode/redis-5.0.3/deps/hiredis#make install
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <hiredis.h>
int main(int argc, char **argv) {
    unsigned int j;                
    redisContext *c;
    redisReply *reply;
    const char *hostname = (argc > 1) ? argv[1] : "127.0.0.1";    
    int port = (argc > 2) ? atoi(argv[2]) : 6379;   
    struct timeval timeout = { 1, 500000 }; // 1.5 seconds   
    c = redisConnectWithTimeout(hostname, port, timeout);
    if (c == NULL || c->err) {
        if (c) {
            printf("Connection error: %s\n", c->errstr);    
            redisFree(c);
        } else {
            printf("Connection error: can't allocate redis context\n");
        }
        exit(1);
}
    reply = redisCommand(c,"drop_insert 192.168.18.3");  
    printf("%d\n", reply->integer);
    freeReplyObject(reply);  

    reply = redisCommand(c,"accept_insert 192.168.18.4");
    printf("%d\n", reply->integer);
    freeReplyObject(reply);

    reply = redisCommand(c,"drop_delete 192.168.18.3");
    printf("%d\n", reply->integer);
    freeReplyObject(reply);

    reply = redisCommand(c,"accept_delete 192.168.18.5");
    printf("%d\n", reply->integer);
    freeReplyObject(reply);

    redisFree(c);

    return 0;

}

gcc example.c -I/usr/local/include/hiredis -lhiredis

编译即可

Python编程

root@debian:~/bookscode# git clone https://github.com/andymccurdy/redis-py.git  #下载Python lib库

下载好之后不要急着编译安装,先编辑redis-py/redis/client.py文件,添加代码如下: 

   # COMMAND EXECUTION AND PROTOCOL PARSING
      def execute_command(self, *args, **options):
          "Execute a command and return a parsed response"
           .....
           .....

     def drop_insert(self, name):

         """
         Return the value at key ``name``, or None if the key doesn't exist
          """
         return self.execute_command('drop_insert', name)

     def accept_insert(self, name):
         """
         Return the value at key ``name``, or None if the key doesn't exist
        """
         return self.execute_command('accept_insert', name)

     def drop_delete(self, name):
         """
         Return the value at key ``name``, or None if the key doesn't exist
         """
         return self.execute_command('drop_delete', name)

     def accept_delete(self, name):
         """
         Return the value at key ``name``, or None if the key doesn't exist
         """
         return self.execute_command('accept_delete', name)

     def ttl_drop_insert(self, name, blocktime):

         """
         Return the value at key ``name``, or None if the key doesn't exist
         """
         return self.execute_command('ttl_drop_insert', name, blocktime)


为了不误导读者,上述代码不加注释了,只是在类里添加几个函数而已,不需要解释

root@debian:~/bookscode/redis-py# python setup.py build        
root@debian:~/bookscode/redis-py# python setup.py install       
root@debian:~/bookscode/8/redis-py# python
Python 2.7.3 (default, Nov 19 2017, 01:35:09)
[GCC 4.7.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import redis
>>> r = redis.Redis(host='localhost', port=6379, db=0)
>>> r.drop_insert('192.168.18.7')
12L
>>> r.accept_insert('192.168.18.7')
12L
>>> r.accept_delete('192.168.18.7')
0L
>>> r.drop_delete('192.168.18.7')
0L
>>> r.ttl_drop_insert('192.168.18.7', 600)
12L
>>>

 Bash编程

examples.sh

#!/bin/bash
for ((i=1; i<=254; i++))
 do
redis-cli TTL_DROP_INSERT 192.168.17.$i 60
done 
redis-cli DROP_INSERT 192.168.18.5
redis-cli DROP_DELETE 192.168.18.5
redis-cli ACCEPT_INSERT 192.168.18.5
redis-cli ACCEPT_DELETE 192.168.18.5

 Lua编程

git clone https://github.com/nrk/redis-lua.git    #下载Lua lib库

下载后编辑redis-lua/src/redis.lua 添加以下代码:

redis.commands = {
    .....
    ttl              = command('TTL'),
    drop_insert     = command('drop_insert'),
    drop_delete     = command('drop_delete'),
    accept_insert    = command('accept_insert'),
    accept_delete    = command('accept_delete'),
    ttl_drop_insert  = command('ttl_drop_insert'),
    pttl             = command('PTTL'),         -- >= 2.6
     .....

示例代码examples.lua 

package.path = "../src/?.lua;src/?.lua;" .. package.path
pcall(require, "luarocks.require")             --不要忘记安装Luasocket库
local redis = require 'redis'
local params = {
    host = '127.0.0.1',
    port = 6379,
}
local client = redis.connect(params)                    
client:select(0) -- for testing purposes                   
client:drop_insert('192.168.1.1')          
client:drop_delete('192.168.1.1')          
client:ttl_drop_insert('192.168.1.2', '60')      --加入规则后60秒后自动删除添加的规则
local value = client:get('192.168.1.2')     
print(value)

最后,目前还缺少Java、php常用语言的驱动,由于我不太擅长太多语言,有兴趣的朋友可以提交PR来补充。

程序员随想-快速熟悉业务

mumupudding阅读(6)


前言

作为一名开发,经常面临着主动或被动切换业务做,有些时候切换至有一定相关联的另一个业务,本来做余额宝的被调到做证券。一些情况下是切换至完全相关的业务,如从商品切换到交易,甚至从电商业务切换至金融业务。在经常遇到这种的情况下,建立一个“快速熟悉陌生业务”的方法论就很重要了。下面经过个人思考,提出的一些不完善的想法。

一、划分模块

人的记忆和关系型数据库有些类型,较容易记住的大的关键点,而具体的细节可能想很久,甚至记不住。根据人的这种记忆方式,我们接触一个新的事务最好的方式应该就是分层,按照一定逻辑分几层,每层分多个模块,了解每层各个模块概念,然后再细分下一层子模块概念。这种思维方式有点像数据库的索引,符合人的思维习惯。

划分模块方式

模块是基于一定逻辑组织形式来划分的,只要按照逻辑来划分的话,划分模块的方式肯定有很多种,这里我提两种最容易的方式:业务功能,应用架构。

业务功能

按照业务功能性,对业务进行拆解,把复杂的业务进行拆分成功能单元,各功能单元再根据场景进行更细粒度的拆分。拆分有一个原则,要做到高内聚,低耦合。

image.png

应用架构

基本上应用都是分层的,下图就是最常见的应用的组织形式,当然很多应用组织形式会根据具体情况进行变化

image.png

二、梳理业务领域模型

对于大部分人结束新的业务的时候,相信最麻烦的就是周围人谈论的“专业词汇”,像之前听得什么,直销、代销,转托管,卡账,户帐这类词汇对于第一次听到人肯定蒙蔽。对于这一类词汇其实很多都是特定业务领域形成的简化术语,如果连这些业务领域内的概念的不清楚,那么根本不知道在讨论一个什么事。这类词汇存在于特定业务领域中,了解业务领域是相当重要的,统一大家的概念认知,统一大家在讨论的是一件什么事,在做的是一件什么事!!

所以我说的要了解业务领域,并不是指得DDD,充血、贫血模型。而是对业务进行一定抽象成一个或多个有关联的业务模型,对于这个模型有一定认知是需求讨论和方案设计的前提,很多时候某个业务域用到了其他业务域的模型,那么相对于相关联业务域的模型也要有一定认知。

如何梳理业务领域模型?

业务领域模型最底层的是数据库表极其字段,把表和表之间的关系,字段含义理清楚后,会有大概的业务轮廓。接着通过熟悉模块中核心的POJO的类极其字段能更对业务有更清晰一点的认知。当然这还不够,还需要花时间去看一些集团,网上的文章,关于其他人对业务的理解,其他人在技术如何设计和实现的。这个过程不是一蹴而就的,慢慢的会找到更深的理解,等到了能发现当前这个业务领域模型优点缺点,并构思出改进的方向,基本上就意味已经进入这块业务领域的专家了。

三、梳理业务调用链路

寻找业务入口,然后看代码。一般来说业务代码迭代很快,很多在写代码的时候没有考虑以后,经常被之后的需求给推翻调,亦或者经历过多个团队多人接手过,每个人命名、设计等等习惯不一样,所以这个过程比较痛苦。

如何梳理业务调用链路?

这里必须要提一下,traceId是个神器,自己实际操作一笔拿到traceId,在集团鹰眼或者蚂蚁云图上应用的调用链路基本上呼之欲出,但是具体逻辑细节还是要自己去看代码。这里看代码我理解两种模式,一种是自下而上,另一种是自上而下。我比较喜欢自上而下的看,这样顺着业务链路来理解比较简单,大部分情况下两者结合效果比较好。

自上而下

前后端不分离的PC页面,顺着页面实际操作入口去看,很简单前后端分离的PC页面、Ajax接口、无线端,和前端同学打好关系,多聊聊,喝喝咖啡,让前端同学帮忙去找入口,另外就是通过网络抓包找到调用入口,顺着代码往下看吧,少年。通过日志里面的traceId可以查到调用链路甚至到接口级别,这就更方便找到入口。一般情况下相同功能模块、对外的接口会放在一个大包或者pom模块下面。

自下而上

从数据库表入手,理解表模型,各字段含义,建立各表和字段之间的关系。

四、需求入手

一般情况下,公司不太会给太多空闲时间去看代码,且直接看代码多数情况下也没有那么直观感受,这时候结合一些比较完整的需求去搞,效果反而会更好。所谓完整的需求是指:尽量是完整模块的需求,或者能串联部分或整个链路的需求。切身体验零散的需求没有鸟用。

IDEA+Springboot+JRebel热部署实现

mumupudding阅读(12)

步骤一:在IDEA中安装JRebel插件(File->settings->plugins->search in repositories),如下图

IDEA+Springboot+JRebel热部署实现

步骤二:安装完成之后,重启idea,破解JRebel插件(可以在help>JRebel>Activaction打开激活页面,也可在重启之后,直接点击右侧指导中进入激活页面)

IDEA+Springboot+JRebel热部署实现

选择License server方式(url可能失效,可以自行网上搜索)

Url:    http://139.199.89.239:1008/88414687-3b91-4286-89ba-2dc813b107ce

email:随便输入

 jrebel激活之后默认是联网使用的 , 在该模式下 , jrebel会一直联网监测激活信息 . 所以要调为离线使用的,步骤见下图

IDEA+Springboot+JRebel热部署实现

步骤三:设置IDEA为自动编译

IDEA+Springboot+JRebel热部署实现

步骤四:按住 Ctrl+Alt+Shift+/ 弹出,  选择Registry后勾选 

IDEA+Springboot+JRebel热部署实现

IDEA+Springboot+JRebel热部署实现

步骤五:选择springboot的入口类,右键选择debug with JRebel,等待启动完成即可。

IDEA+Springboot+JRebel热部署实现

IDEA+Springboot+JRebel热部署实现

java使用okhttp库实现Authorization认证请求

mumupudding阅读(7)

<!-- https://mvnrepository.com/artifact/com.squareup.okhttp3/okhttp -->
<dependency>
    <groupId>com.squareup.okhttp3</groupId>
    <artifactId>okhttp</artifactId>
    <version>3.9.1</version>
</dependency>
import okhttp3.*;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;

public class HttpUtil {
    static public OkHttpClient client;

    static private Logger logger = Logger.getLogger(HttpUtil.class);

    static {

        client = new OkHttpClient.Builder()

                .connectTimeout(10000L, TimeUnit.MILLISECONDS)

                .readTimeout(10000L, TimeUnit.MILLISECONDS)

                .build();

    }

    /**
     47
     * 同步GET请求 带Authorization认证
     48
     */

    public static String okhttp_get(String get_url, HashMap<String, Object> get_data, String[] auth_base){


        final String credential = Credentials.basic(auth_base[0], auth_base[1]);

        String result = "";

        String data_params = generateParameters(get_data);

        String data_url = get_url + data_params;

        Request request = new Request.Builder()

                .url(data_url)

                .header("Authorization", credential)

                .get()

                .build();

        Call call = client.newCall(request);

        try {

            Response response = call.execute();

            //判断是否成功
            if (response.isSuccessful()){
                result = response.body().string();
            }else {
                return "请求失败";
            }
            logger.debug(result);


        } catch (Exception e) {

            logger.error("网络GET请求失败!提示信息:"+e.getMessage());

        }

        return result;

    }
    /**
     * Post请求 带Authorization认证
     * */
    public static String okhttp_post(String get_url, HashMap<String, Object> get_data, String[] auth_base){

        final String credential = Credentials.basic(auth_base[0], auth_base[1]);

        String result = "";

        RequestBody fromBody = generateParametersForPost(get_data).build();


        Request request = new Request.Builder()

                .url(get_url)

                .header("Authorization", credential)

                .post(fromBody)

                .build();

        Call call = client.newCall(request);

        try {

            Response response = call.execute();

            //判断是否成功
            if (response.isSuccessful()){
                result = response.body().string();
            }else {
                return "请求失败";
            }
            logger.debug(result);


        } catch (Exception e) {

            logger.error("网络GET请求失败!提示信息:"+e.getMessage());

        }

        return result;

    }

    //拼接参数

    private static String generateParameters(HashMap<String, Object> parameters) {

        String urlAttachment = "";

        if(parameters.size()>0){

            urlAttachment = "?";

            Object[] keys = parameters.keySet().toArray();

            for(Object key : keys)

            urlAttachment += key.toString() + "=" + parameters.get(key).toString() + "&";

            urlAttachment = urlAttachment.substring(0,urlAttachment.length()-1);
        }

        return urlAttachment;
    }

    //拼接参数用于POST请求

    private static FormBody.Builder generateParametersForPost(HashMap<String, Object> parameters) {

        FormBody.Builder builder = new FormBody.Builder();

        if(parameters.size()>0){
            Object[] keys = parameters.keySet().toArray();

            for(Object key : keys){
                Object ff = parameters.get(key);
                String aa = parameters.get(key).toString();

                builder.add(key.toString(),parameters.get(key).toString());
            }
        }

        return builder;
    }

    /**
     * 发送私密API请求
     *
     * @paramget_url请求地址
     * @paramget_data 请求参数列表
     * @return          返回JSON数据
     */
    private String auth_get(){
        String result = "";
        String api_key = "apikey";
        String api_secret = "apisecret";
        //认证信息
        String[] baseauth = {api_key,api_secret};
        //请求的URL的参数
        HashMap<String, Object> get_data = new HashMap<>();
        get_data.put("page","1");       //page参数
        get_data.put("name","test");  //name参数
        result = HttpUtil.okhttp_get("http://www.superl.org/page-about.html", get_data, baseauth);
        return result;
    }


}

Objc Block实现分析

mumupudding阅读(9)


Objc Block实现分析

Block在iOS开发中使用的频率是很高的,使用的场景包括接口异步数据的回调(AFN)、UI事件的回调(BlockKits)、链式调用(Masonry)、容器数据的遍历回调(NSArray、NSDictionary),具体的用法以及使用Block的一些坑这里就不一一赘述了,本文会从源代码的层面分析我们常用的Block的底层实现原理,做到知其然知其所以然。

本文会从如下几个主题切入,层层递进分析Block的底层原理,还原Block本来的面目

  • 无参数无返回值,不使用变量的Block分析
  • 使用自动变量的Block分析
  • 使用__block修饰的变量的Block分析
  • Block引用分析

无参数无返回值,不使用变量的Block分析

有如下的源代码,创建一个简单的block,在block做的处理是打印一个字符串,然后执行这个block。接下来会从源码入手对此进行分析:block是如何执行的

// 无参数无返回值的Block分析int main(int argc, const char * argv[]) {    void (^blk) (void) = ^{        printf("block invoke");    };    blk();    return 0;}// 输出:block invoke

使用clang -rewrite-objc main.m命令重写为C++语法的源文件。从重写后的代码中看到,一个简单block定义和调用的代码变成了大几十行,不过该源代码还算是相对简单的,我们从main函数入口逐步的进行分析:

main函数中创建__main_block_impl_0类型的实例

__main_block_impl_0结构体是什么鬼呢?我们看__main_block_impl_0结构体的实现,包含了__block_impl结构体和__main_block_desc_0结构体指针,为了方便,现在页先不用管__main_block_desc_0结构体,目前他还没有真正的使用到,后面讲到的__block修饰自动变量以及Block对对象的引用关系,设计到内存的拷贝和释放的时候会使用到该变量。__block_impl结构体包含了4个成员,为了简单分析,我们只关注其中的FuncPtr成员的FuncPtr成员,这个也是最重要的成员,其它的先忽略不计。

__main_block_impl_0实例的初始化参数

第一个是__main_block_func_0函数的地址,__main_block_func_0函数是什么呢,其实就是Block执行的方法,可以看到里面的实现就是一个简单的打印而已,和我们定义在block中的实现一样的,该参数用于初始化impl成员的。至于第二个参数先不管,在这个例子木有用到。

__main_block_impl_0实例的调用

创建了__main_block_impl_0结构体类型的实例之后,接下来就是获取到里面的FuncPtr指针指向的方法进行调用,参数是__main_block_impl_0结构体类型的实例本身,上一步创建步骤可知,FuncPtr指针是一个函数指针指向__main_block_func_0函数的地址,所以这里本质上就是__main_block_func_0函数的调用,结构是打印字符串"block invoke",一个简单的block执行孙然在代码量上增加了,其实也不算复杂。

注意点:(__block_impl *)blk这里做了一个强制转换,blk是__main_block_impl_0类型的实例的指针,根据结构体的内存布局规则,该结构体的第一个成员为__block_impl 类型,所以可以强制转换为__block_impl *类型。

由上面的分析,可以得出如下的结论:使用变量的Block调用本质上是使用函数指针调用函数

// 使用`clang -rewrite-objc main.m`命令重写为C++语法的源文件,对结果分析如下// __block_impl结构体struct __block_impl {  void *isa;  int Flags;  int Reserved;  void *FuncPtr;};// `__main_block_impl_0`是block的结构体,包含了两个成员__block_impl类型的impl成员以及__main_block_desc_0类型的Desc成员;一个构造方法__main_block_impl_0,构造方法中初始化impl成员和Desc成员struct __main_block_impl_0 {  struct __block_impl impl;  struct __main_block_desc_0* Desc;  __main_block_impl_0(void *fp, struct __main_block_desc_0 *desc, int flags=0) {    impl.isa = &_NSConcreteStackBlock;    impl.Flags = flags;    impl.FuncPtr = fp;    Desc = desc;  }};// Block执行的方法static void __main_block_func_0(struct __main_block_impl_0 *__cself) {        printf("block invoke");}static struct __main_block_desc_0 {  size_t reserved;  size_t Block_size;} __main_block_desc_0_DATA = { 0, sizeof(struct __main_block_impl_0)};// 重写后的入口函数mainint main(int argc, const char * argv[]) {    // 创建__main_block_impl_0类型的实例,名称为blk,这里把改实例强制转换为`void(funcPtr *)(void)`型的方法指针,不懂为何,因为在下一步会把该方法指针强制转换为对应的结构体,也就是说`void(funcPtr *)(void)`型的方法指针并没有真实的使用到。    void (*blk) (void) =    ((void (*)())&__main_block_impl_0(                                      (void *)__main_block_func_0,                                      &__main_block_desc_0_DATA));    // blk是`__main_block_impl_0`类型的实例的指针,根绝结构体的内存布局规则,该结构体的第一个成员为`__block_impl` 类型,可以强制转换为`__block_impl *`类型,获取FuncPtr函数指针,强制转换为`(void (*)(__block_impl *))`类型的函数指针,然后执行这个函数指针对应的函数,参数为blk    // 由上面的步骤可知,Block的调用本质上是使用函数指针调用函数    ((void (*)(__block_impl *))((__block_impl *)blk)->FuncPtr)((__block_impl *)blk);    return 0;}

使用自动变量的Block分析

有如下的源代码,创建一个简单的block,在block做的处理是打印一个字符串,使用到外部的自动变量,然后执行这个block。接下来会从源码入手对此进行分析:block是如何使用外部的自动变量的

// 使用变量的Block分析// 源代码int main(int argc, const char * argv[]) {    int age = 100;    const char *name = "zyt";    void (^blk) (void) = ^{        printf("block invoke age = %d age = %s", age, name);    };    age = 101;    blk();    return 0;}// 输出:block invoke age = 100 age = zyt

使用clang -rewrite-objc main.m命令重写为C++语法的源文件。对比上一个的数据结构方法和调用步骤大体上是一样的,只针对变化的地方进行分析

__main_block_impl_0结构体的变化

增加了两个成员:int类型的age成员、char*类型的name成员,用于保存外部变量的值,在初始化的时候就会使用自动变量的值初始化这两个成员的值

调用的变化

__main_block_func_0函数的参数struct __main_block_impl_0 *__cself在这个例子有使用到了,因为两个自动变量对应的值被保存在__main_block_impl_0结构体中了,方法中有使用到这两个变量,直接从__main_block_impl_0结构体中获取这两个值,但是这两个值是独立于自动变量的存在了

由上面的分析,可以得出如下的结论:使用变量的Block调用本质上是使用函数指针调用函数,参数是保存在block的结构体中的,并且保存的值而不是引用

// 使用`clang -rewrite-objc main.m`命令重写为C++语法的源文件,对结果分析如下// __block_impl结构体struct __block_impl {  void *isa;  int Flags;  int Reserved;  void *FuncPtr;};// __main_block_impl_0,包含了四个成员__block_impl类型的impl成员、__main_block_desc_0类型的Desc成员、int类型的age成员、char*类型的name成员;一个构造方法__main_block_impl_0,构造方法中初始化impl成员、Desc成员、age成员和name成员,比起上一个改结构体多了两个成员,用于保存外部变量的值struct __main_block_impl_0 {  struct __block_impl impl;  struct __main_block_desc_0* Desc;  int age;  const char *name;  __main_block_impl_0(void *fp, struct __main_block_desc_0 *desc, int _age, const char *_name, int flags=0) : age(_age), name(_name) {    impl.isa = &_NSConcreteStackBlock;    impl.Flags = flags;    impl.FuncPtr = fp;    Desc = desc;  }};// Block执行的方法static void __main_block_func_0(struct __main_block_impl_0 *__cself) {  int age = __cself->age; // bound by copy  const char *name = __cself->name; // bound by copy        printf("block invoke age = %d age = %s", age, name);    }static struct __main_block_desc_0 {  size_t reserved;  size_t Block_size;} __main_block_desc_0_DATA = { 0, sizeof(struct __main_block_impl_0)};// 重写后的入口函数main// 由上面的步骤可知,使用变量的Block调用本质上是使用函数指针调用函数,参数是保存在block的结构体中的,并且保存的值而不是引用int main(int argc, const char * argv[]) {    int age = 100;    const char *name = "zyt";    void (*blk) (void) = ((void (*)())&__main_block_impl_0((void *)__main_block_func_0, &__main_block_desc_0_DATA, age, name));    age = 101;    ((void (*)(__block_impl *))((__block_impl *)blk)->FuncPtr)((__block_impl *)blk);    return 0;}

使用__block修饰的变量的Block分析

有如下的源代码,有个__block修饰的int类型的自动变量age,在block和变量age的作用域中分别作了修改,从输入的结果看看是两次都生效了,接下来会从源码入手对此进行分析:block中是如何处理__block修饰的自动变量,该自动变量的内存是如何变化的

// 源代码int main(int argc, const char * argv[]) {    __block int age = 100;    const char *name = "zyt";    void (^blk) (void) = ^{        age += 2;        printf("block invoke age = %d age = %s", age, name);    };    age += 1;    blk();}// 输出 :// block invoke age = 103 age = zyt

使用clang -rewrite-objc main.m命令重写为C++语法的源文件,对结果分析如下的注释,该转换后的代码做了些许的调整,包括代码的缩进和添加了结构体声明,使得该代码可以直接运行

// clang改写后的代码如下,以下代码是经过调整可以直接运行的struct __main_block_desc_0;// __block_impl结构体struct __block_impl {    void *isa;    int Flags;    int Reserved;    void *FuncPtr;};// `__block`修饰的变量对应的结构体,里面包含了该变量的原始值也就是age成员,另外还有一个奇怪的`__forwarding`成员,稍后我们会分析它的用处  struct __Block_byref_age_0 {    void *__isa;    __Block_byref_age_0 *__forwarding;    int __flags;    int __size;    int age;};// `__main_block_impl_0`结构体包含了四个成员`__block_impl`类型的impl成员、`__main_block_desc_0`类型的Desc成员、`__Block_byref_age_0 *`类型的age成员、char*类型的name成员;一个构造方法`__main_block_impl_0`,构造方法中初始化impl成员、Desc成员、age成员和name成员,比起上一个结构体的变化是age的类型变为了包装自动变量的结构体了struct __main_block_impl_0 {    __block_impl impl;    __main_block_desc_0* Desc;    const char *name;    __Block_byref_age_0 *age; // by ref    __main_block_impl_0(void *fp,                        struct __main_block_desc_0 *desc,                        const char *_name,                        __Block_byref_age_0 *_age,                        int flags=0) :    name(_name),    age(_age->__forwarding) {        impl.isa = &_NSConcreteStackBlock;        impl.Flags = flags;        impl.FuncPtr = fp;        Desc = desc;    }};// Block执行的方法static void __main_block_func_0(struct __main_block_impl_0 *__cself) {    __Block_byref_age_0 *age = __cself->age; // bound by ref    const char *name = __cself->name; // bound by copy        (age->__forwarding->age) += 2;    printf("block invoke age = %d age = %s", (age->__forwarding->age), name);}// Block拷贝函数static void __main_block_copy_0(struct __main_block_impl_0*dst, struct __main_block_impl_0*src) {    _Block_object_assign((void*)&dst->age, (void*)src->age, 8/*BLOCK_FIELD_IS_BYREF*/);}// Block销毁函数static void __main_block_dispose_0(struct __main_block_impl_0*src) {    _Block_object_dispose((void*)src->age, 8/*BLOCK_FIELD_IS_BYREF*/);}static struct __main_block_desc_0 {    size_t reserved;    size_t Block_size;    void (*copy)(struct __main_block_impl_0*, struct __main_block_impl_0*);    void (*dispose)(struct __main_block_impl_0*);} __main_block_desc_0_DATA = { 0, sizeof(struct __main_block_impl_0), __main_block_copy_0, __main_block_dispose_0};// 重写后的入口函数mainint main(int argc, const char * argv[]) {    __attribute__((__blocks__(byref))) __Block_byref_age_0 age = {        (void*)0,        (__Block_byref_age_0 *)&age,        0,        sizeof(__Block_byref_age_0),        100};    const char *name = "zyt";    struct __main_block_impl_0 blockImpl =    __main_block_impl_0((void *)__main_block_func_0,                        &__main_block_desc_0_DATA,                        name,                        (__Block_byref_age_0 *)&age,                        570425344);    void (*blk) (void) = ((void (*)())&blockImpl);    (age.__forwarding->age) += 1;    ((void (*)(__block_impl *))((__block_impl *)blk)->FuncPtr)((__block_impl *)blk);}

__block修饰自动变量转换为C++代码的结构体关系图:
__block修饰自动变量转换为C++代码的结构体关系图

该重写的代码大部分和上面分析过的例子代码是类似,发现增加了两个处理方法Block拷贝函数__main_block_copy_0和Block销毁函数__main_block_dispose_0,这两个函数会保存在__main_block_desc_0结构体的copydispose成员中;另外添加了一个__Block_byref_age_0结构体类型用户处理__block修饰的自动变量。以下针对这两点从源代码的角度进行一个分析

__main_block_copy_0方法中调用到的_Block_object_assign可以在 runtime.c 这里找到 ,主要看下_Block_object_assign方法里面的处理逻辑

  • flags参数值为8,是BLOCK_FIELD_IS_BYREF枚举对应的值,会走到_Block_byref_assign_copy方法的调用步骤
  • _Block_byref_assign_copy方法会在在堆上创建Block_byref对象,也就是Block对象,并且把栈上和堆上的Block对象的forwarding属性值都修改为指向堆上的Block对象,这样使用两个对象的修改值都会修改为同一个地方

栈上的__block自动变量__forwarding指向关系以及拷贝到堆上之后__forwarding指向关系如下图所示
栈上的自动变量指向关系以及拷贝到堆上之后指向关系如下图所示

具体使用到的代码和对应的注释如下:

/* * When Blocks or Block_byrefs hold objects then their copy routine helpers use this entry point * to do the assignment. */void _Block_object_assign(void *destAddr, const void *object, const int flags) {    //printf("_Block_object_assign(*%p, %p, %x)\n", destAddr, object, flags);    if ((flags & BLOCK_BYREF_CALLER) == BLOCK_BYREF_CALLER) {        if ((flags & BLOCK_FIELD_IS_WEAK) == BLOCK_FIELD_IS_WEAK) {            _Block_assign_weak(object, destAddr);        }        else {            // do *not* retain or *copy* __block variables whatever they are            _Block_assign((void *)object, destAddr);        }    }    // 代码会走到这个分支中,调用方法`_Block_byref_assign_copy`    else if ((flags & BLOCK_FIELD_IS_BYREF) == BLOCK_FIELD_IS_BYREF)  {        // copying a __block reference from the stack Block to the heap        // flags will indicate if it holds a __weak reference and needs a special isa        _Block_byref_assign_copy(destAddr, object, flags);    }    // (this test must be before next one)    else if ((flags & BLOCK_FIELD_IS_BLOCK) == BLOCK_FIELD_IS_BLOCK) {        // copying a Block declared variable from the stack Block to the heap        _Block_assign(_Block_copy_internal(object, flags), destAddr);    }    // (this test must be after previous one)    else if ((flags & BLOCK_FIELD_IS_OBJECT) == BLOCK_FIELD_IS_OBJECT) {        //printf("retaining object at %p\n", object);        _Block_retain_object(object);        //printf("done retaining object at %p\n", object);        _Block_assign((void *)object, destAddr);    }}/* * Runtime entry points for maintaining the sharing knowledge of byref data blocks. * * A closure has been copied and its fixup routine is asking us to fix up the reference to the shared byref data * Closures that aren't copied must still work, so everyone always accesses variables after dereferencing the forwarding ptr. * We ask if the byref pointer that we know about has already been copied to the heap, and if so, increment it. * Otherwise we need to copy it and update the stack forwarding pointer * XXX We need to account for weak/nonretained read-write barriers. */static void _Block_byref_assign_copy(void *dest, const void *arg, const int flags) {    struct Block_byref **destp = (struct Block_byref **)dest;    struct Block_byref *src = (struct Block_byref *)arg;            //printf("_Block_byref_assign_copy called, byref destp %p, src %p, flags %x\n", destp, src, flags);    //printf("src dump: %s\n", _Block_byref_dump(src));    if (src->forwarding->flags & BLOCK_IS_GC) {        ;   // don't need to do any more work    }    else if ((src->forwarding->flags & BLOCK_REFCOUNT_MASK) == 0) {        //printf("making copy\n");        // src points to stack        bool isWeak = ((flags & (BLOCK_FIELD_IS_BYREF|BLOCK_FIELD_IS_WEAK)) == (BLOCK_FIELD_IS_BYREF|BLOCK_FIELD_IS_WEAK));        // if its weak ask for an object (only matters under GC)        struct Block_byref *copy = (struct Block_byref *)_Block_allocator(src->size, false, isWeak);        copy->flags = src->flags | _Byref_flag_initial_value; // non-GC one for caller, one for stack        // copy是拷贝到堆上的Block_byref类型对象,scr是原来的Block_byref类型对象,两者的forwarding成员都指向到堆上的Block_byref类型对象也就是copy,这样不管是在栈上修改__block修饰的变量(age.age = 102调用)还是在堆上修改__block修饰的变量()        copy->forwarding = copy; // patch heap copy to point to itself (skip write-barrier)        src->forwarding = copy;  // patch stack to point to heap copy        copy->size = src->size;        if (isWeak) {            copy->isa = &_NSConcreteWeakBlockVariable;  // mark isa field so it gets weak scanning        }        if (src->flags & BLOCK_HAS_COPY_DISPOSE) {            // Trust copy helper to copy everything of interest            // If more than one field shows up in a byref block this is wrong XXX            copy->byref_keep = src->byref_keep;            copy->byref_destroy = src->byref_destroy;            (*src->byref_keep)(copy, src);        }        else {            // just bits.  Blast 'em using _Block_memmove in case they're __strong            _Block_memmove(                (void *)&copy->byref_keep,                (void *)&src->byref_keep,                src->size - sizeof(struct Block_byref_header));        }    }    // already copied to heap    else if ((src->forwarding->flags & BLOCK_NEEDS_FREE) == BLOCK_NEEDS_FREE) {        latching_incr_int(&src->forwarding->flags);    }    // assign byref data block pointer into new Block    _Block_assign(src->forwarding, (void **)destp);}

Block引用分析

Block强引用分析

// 定义类YTObject@interface YTObject : NSObject@property (nonatomic, strong) NSString *name;@property (nonatomic, copy) void (^blk)(void);- (void)testReferenceSelf;@end@implementation YTObject- (void)testReferenceSelf {    self.blk = ^ {        // 这里不管是使用self.name还是_name,从clang重写的代码上看,处理方式是一样的        printf("self.name = %s", self.name.UTF8String);    };    self.blk();}- (void)dealloc {    NSLog(@"==dealloc==");}@end// 使用YTObjectint main(int argc, const char * argv[]) {    YTObject *obj = [YTObject new];    obj.name = @"hello";    [obj testReferenceSelf];        return 0;}// 输出 :// self.name = hello

使用clang -rewrite-objc main.m命令重写为C++语法的源文件如下

static struct /*_method_list_t*/ { unsigned int entsize;  // sizeof(struct _objc_method) unsigned int method_count; struct _objc_method method_list[6];} _OBJC_$_INSTANCE_METHODS_YTObject __attribute__ ((used, section ("__DATA,__objc_const"))) = { sizeof(_objc_method), 6, {{(struct objc_selector *)"testReferenceSelf", "v16@0:8", (void *)_I_YTObject_testReferenceSelf}, {(struct objc_selector *)"dealloc", "v16@0:8", (void *)_I_YTObject_dealloc}, {(struct objc_selector *)"name", "@16@0:8", (void *)_I_YTObject_name}, {(struct objc_selector *)"setName:", "v24@0:8@16", (void *)_I_YTObject_setName_}, {(struct objc_selector *)"blk", "@?16@0:8", (void *)_I_YTObject_blk}, {(struct objc_selector *)"setBlk:", "v24@0:8@?16", (void *)_I_YTObject_setBlk_}}};struct __YTObject__testReferenceSelf_block_impl_0 {  struct __block_impl impl;  struct __YTObject__testReferenceSelf_block_desc_0* Desc;  YTObject *self;  __YTObject__testReferenceSelf_block_impl_0(void *fp, struct __YTObject__testReferenceSelf_block_desc_0 *desc, YTObject *_self, int flags=0) : self(_self) {    impl.isa = &_NSConcreteStackBlock;    impl.Flags = flags;    impl.FuncPtr = fp;    Desc = desc;  }};static void __YTObject__testReferenceSelf_block_func_0(struct __YTObject__testReferenceSelf_block_impl_0 *__cself) {    YTObject *self = __cself->self; // bound by copy    printf("self.name = %s", ((const char * _Nullable (*)(id, SEL))(void *)objc_msgSend)((id)((NSString *(*)(id, SEL))(void *)objc_msgSend)((id)self, sel_registerName("name")), sel_registerName("UTF8String")));}static void __YTObject__testReferenceSelf_block_copy_0(struct __YTObject__testReferenceSelf_block_impl_0*dst, struct __YTObject__testReferenceSelf_block_impl_0*src) {    _Block_object_assign((void*)&dst->self, (void*)src->self, 3/*BLOCK_FIELD_IS_OBJECT*/);}static void __YTObject__testReferenceSelf_block_dispose_0(struct __YTObject__testReferenceSelf_block_impl_0*src) {    _Block_object_dispose((void*)src->self, 3/*BLOCK_FIELD_IS_OBJECT*/);}static struct __YTObject__testReferenceSelf_block_desc_0 {  size_t reserved;  size_t Block_size;  void (*copy)(struct __YTObject__testReferenceSelf_block_impl_0*, struct __YTObject__testReferenceSelf_block_impl_0*);  void (*dispose)(struct __YTObject__testReferenceSelf_block_impl_0*);} __YTObject__testReferenceSelf_block_desc_0_DATA = { 0, sizeof(struct __YTObject__testReferenceSelf_block_impl_0), __YTObject__testReferenceSelf_block_copy_0, __YTObject__testReferenceSelf_block_dispose_0};static void _I_YTObject_testReferenceSelf(YTObject * self, SEL _cmd) {    ((void (*)(id, SEL, void (*)()))(void *)objc_msgSend)((id)self, sel_registerName("setBlk:"), ((void (*)())&__YTObject__testReferenceSelf_block_impl_0((void *)__YTObject__testReferenceSelf_block_func_0, &__YTObject__testReferenceSelf_block_desc_0_DATA, self, 570425344)));    ((void (*(*)(id, SEL))())(void *)objc_msgSend)((id)self, sel_registerName("blk"))();}int main(int argc, const char * argv[]) {    YTObject *obj = ((YTObject *(*)(id, SEL))(void *)objc_msgSend)((id)objc_getClass("YTObject"), sel_registerName("new"));    ((void (*)(id, SEL, NSString *))(void *)objc_msgSend)((id)obj, sel_registerName("setName:"), (NSString *)&__NSConstantStringImpl__var_folders_fk_19cr58zj0f7f19001k_mxzlm0000gp_T_main_21b52d_mii_1);    ((void (*)(id, SEL))(void *)objc_msgSend)((id)obj, sel_registerName("testReferenceSelf"));    return 0;}

Block引用对象转换为C++代码的结构体关系图:
Block引用对象转换为C++代码的结构体关系图

从类图上可以明显的看到__YTObject__testReferenceSelf_block_impl_0YTObject之间有循环依赖的关系,这样NSLog(@"==dealloc==");这段代码最终是没有调用到,也就是这里会出现Block循环引用导致内存泄漏问题

Block弱引用分析

Block的循环引用问题其中一种解决方案是可以使用weakself来解除这种强引用关系,防止内存的泄漏,代码的改造如下

@interface YTObject : NSObject@property (nonatomic, strong) NSString *name;@property (nonatomic, copy) void (^blk)(void);- (void)testReferenceSelf;@end@implementation YTObject- (void)testReferenceSelf {    __weak typeof(self) weakself = self;    self.blk = ^ {        __strong typeof(self) strongself = weakself;        // 这里不管是使用self.name还是_name,从clang重写的代码上看,处理方式是一样的        printf("self.name = %s\n", strongself.name.UTF8String);    };    self.blk();}- (void)dealloc {    printf("==dealloc==");}@end// 使用YTObjectint main(int argc, const char * argv[]) {    YTObject *obj = [YTObject new];    obj.name = @"hello";    [obj testReferenceSelf];        return 0;}

添加了weak之后需要使用clang -rewrite-objc -fobjc-arc -fobjc-runtime=macosx-10.14 main.mm这个命令才能够重写为C++语言对应的代码,重写后的代码如下

static struct /*_method_list_t*/ {    unsigned int entsize;  // sizeof(struct _objc_method)    unsigned int method_count;    struct _objc_method method_list[6];} _OBJC_$_INSTANCE_METHODS_YTObject __attribute__ ((used, section ("__DATA,__objc_const"))) = {    sizeof(_objc_method),    6,    {{(struct objc_selector *)"testReferenceSelf", "v16@0:8", (void *)_I_YTObject_testReferenceSelf},        {(struct objc_selector *)"dealloc", "v16@0:8", (void *)_I_YTObject_dealloc},        {(struct objc_selector *)"name", "@16@0:8", (void *)_I_YTObject_name},        {(struct objc_selector *)"setName:", "v24@0:8@16", (void *)_I_YTObject_setName_},        {(struct objc_selector *)"blk", "@?16@0:8", (void *)_I_YTObject_blk},        {(struct objc_selector *)"setBlk:", "v24@0:8@?16", (void *)_I_YTObject_setBlk_}}};struct __YTObject__testReferenceSelf_block_impl_0 {    struct __block_impl impl;    struct __YTObject__testReferenceSelf_block_desc_0* Desc;    YTObject *const __weak weakself;    __YTObject__testReferenceSelf_block_impl_0(void *fp, struct __YTObject__testReferenceSelf_block_desc_0 *desc, YTObject *const __weak _weakself, int flags=0) : weakself(_weakself) {        impl.isa = &_NSConcreteStackBlock;        impl.Flags = flags;        impl.FuncPtr = fp;        Desc = desc;    }};static void __YTObject__testReferenceSelf_block_func_0(struct __YTObject__testReferenceSelf_block_impl_0 *__cself) {    YTObject *const __weak weakself = __cself->weakself; // bound by copy    __attribute__((objc_ownership(strong))) typeof(self) strongself = weakself;    printf("self.name = %s\n", ((const char * _Nullable (*)(id, SEL))(void *)objc_msgSend)((id)((NSString *(*)(id, SEL))(void *)objc_msgSend)((id)strongself, sel_registerName("name")), sel_registerName("UTF8String")));}static void __YTObject__testReferenceSelf_block_copy_0(struct __YTObject__testReferenceSelf_block_impl_0*dst, struct __YTObject__testReferenceSelf_block_impl_0*src) {    _Block_object_assign((void*)&dst->weakself, (void*)src->weakself, 3/*BLOCK_FIELD_IS_OBJECT*/);}static void __YTObject__testReferenceSelf_block_dispose_0(struct __YTObject__testReferenceSelf_block_impl_0*src) {    _Block_object_dispose((void*)src->weakself, 3/*BLOCK_FIELD_IS_OBJECT*/);}static struct __YTObject__testReferenceSelf_block_desc_0 {    size_t reserved;    size_t Block_size;    void (*copy)(struct __YTObject__testReferenceSelf_block_impl_0*, struct __YTObject__testReferenceSelf_block_impl_0*);    void (*dispose)(struct __YTObject__testReferenceSelf_block_impl_0*);} __YTObject__testReferenceSelf_block_desc_0_DATA = { 0, sizeof(struct __YTObject__testReferenceSelf_block_impl_0), __YTObject__testReferenceSelf_block_copy_0, __YTObject__testReferenceSelf_block_dispose_0};static void _I_YTObject_testReferenceSelf(YTObject * self, SEL _cmd) {    __attribute__((objc_ownership(weak))) typeof(self) weakself = self;    __YTObject__testReferenceSelf_block_impl_0 blockImpl =    __YTObject__testReferenceSelf_block_impl_0((void *)__YTObject__testReferenceSelf_block_func_0,                                               &__YTObject__testReferenceSelf_block_desc_0_DATA,                                               weakself,                                               570425344)    ((void (*)(id, SEL, void (*)()))(void *)objc_msgSend)((id)self, s                                                          el_registerName("setBlk:"),                                                          ((void (*)())&blockImpl));    ((void (*(*)(id, SEL))())(void *)objc_msgSend)((id)self, sel_registerName("blk"))();}int main(int argc, const char * argv[]) {    YTObject *obj = ((YTObject *(*)(id, SEL))(void *)objc_msgSend)((id)objc_getClass("YTObject"), sel_registerName("new"));    ((void (*)(id, SEL, NSString *))(void *)objc_msgSend)((id)obj, sel_registerName("setName:"), (NSString *)&__NSConstantStringImpl__var_folders_fk_19cr58zj0f7f19001k_mxzlm0000gp_T_main_6f39dd_mii_0);    ((void (*)(id, SEL))(void *)objc_msgSend)((id)obj, sel_registerName("testReferenceSelf"));        return 0;}

从上面的代码中可以看到__YTObject__testReferenceSelf_block_impl_0结构体中weakself成员是一个__weak修饰的YTObject类型对象,也就是说__YTObject__testReferenceSelf_block_impl_0YTObject的依赖是弱依赖。weak修饰变量是在runtime中进行处理的,在YTObject对象的Dealloc方法中会调用weak引用的处理方法,从weak_table中寻找弱引用的依赖对象,进行清除处理,可以查看Runtime源码中objc_object::clearDeallocating该方法的处理逻辑,另外关于__weak修饰的变量的详细处理可以查看Runtime相关的知识

Block弱引用对象转换为C++代码的结构体关系图:
Block弱引用对象转换为C++代码的结构体关系图

关于具体的weakSelf和strongSelf可以参考这篇文章深入研究Block用weakSelf、strongSelf、@weakify、@strongify解决循环引用中的描述

weakSelf 是为了block不持有self,避免Retain Circle循环引用。在 Block 内如果需要访问 self 的方法、变量,建议使用 weakSelf。strongSelf的目的是因为一旦进入block执行,假设不允许self在这个执行过程中释放,就需要加入strongSelf。block执行完后这个strongSelf 会自动释放,没有不会存在循环引用问题。如果在 Block 内需要多次 访问 self,则需要使用 strongSelf。

结束

以上就是关于Block底层实现的一些分析,不妥之处敬请指教

参考

深入研究Block用weakSelf、strongSelf、@weakify、@strongify解决循环引用

探索Java日志的奥秘:底层日志系统-log4j2

mumupudding阅读(12)

前言

log4j2是apache在log4j的基础上,参考logback架构实现的一套新的日志系统(我感觉是apache害怕logback了)。
log4j2的官方文档上写着一些它的优点:

  • 在拥有全部logback特性的情况下,还修复了一些隐藏问题
  • API 分离:现在log4j2也是门面模式使用日志,默认的日志实现是log4j2,当然你也可以用logback(应该没有人会这么做)
  • 性能提升:log4j2包含下一代基于LMAX Disruptor library的异步logger,在多线程场景下,拥有18倍于log4j和logback的性能
  • 多API支持:log4j2提供Log4j 1.2, SLF4J, Commons Logging and java.util.logging (JUL) 的API支持
  • 避免锁定:使用Log4j2 API的应用程序始终可以选择使用任何符合SLF4J的库作为log4j-to-slf4j适配器的记录器实现
  • 自动重新加载配置:与Logback一样,Log4j 2可以在修改时自动重新加载其配置。与Logback不同,它会在重新配置发生时不会丢失日志事件。
  • 高级过滤: 与Logback一样,Log4j 2支持基于Log事件中的上下文数据,标记,正则表达式和其他组件进行过滤。
  • 插件架构: Log4j使用插件模式配置组件。因此,您无需编写代码来创建和配置Appender,Layout,Pattern Converter等。Log4j自动识别插件并在配置引用它们时使用它们。
  • 属性支持:您可以在配置中引用属性,Log4j将直接替换它们,或者Log4j将它们传递给将动态解析它们的底层组件。
  • Java 8 Lambda支持
  • 自定义日志级别
  • 产生垃圾少:在稳态日志记录期间,Log4j 2 在独立应用程序中是无垃圾的,在Web应用程序中是低垃圾。这减少了垃圾收集器的压力,并且可以提供更好的响应时间性能。
  • 和应用server集成:版本2.10.0引入了一个模块log4j-appserver,以改进与Apache Tomcat和Eclipse Jetty的集成。

Log4j2类图:

这次从四个地方去探索源码:启动,配置,异步,插件化

源码探索

启动

log4j2的关键组件

  • LogManager

根据配置指定LogContexFactory,初始化对应的LoggerContext

  • LoggerContext

1、解析配置文件,解析为对应的java对象。
2、通过LoggerRegisty缓存Logger配置
3、Configuration配置信息
4、start方法解析配置文件,转化为对应的java对象
5、通过getLogger获取logger对象

  • Logger

LogManaer

该组件是Log4J启动的入口,后续的LoggerContext以及Logger都是通过调用LogManager的静态方法获得。我们可以使用下面的代码获取Logger

Logger logger = LogManager.getLogger();

可以看出LogManager是十分关键的组件,因此在这个小节中我们详细分析LogManager的启动流程。
LogManager启动的入口是下面的static代码块:

/**
     * Scans the classpath to find all logging implementation. Currently, only one will be used but this could be
     * extended to allow multiple implementations to be used.
     */
    static {
        // Shortcut binding to force a specific logging implementation.
        final PropertiesUtil managerProps = PropertiesUtil.getProperties();
        final String factoryClassName = managerProps.getStringProperty(FACTORY_PROPERTY_NAME);
        if (factoryClassName != null) {
            try {
                factory = LoaderUtil.newCheckedInstanceOf(factoryClassName, LoggerContextFactory.class);
            } catch (final ClassNotFoundException cnfe) {
                LOGGER.error("Unable to locate configured LoggerContextFactory {}", factoryClassName);
            } catch (final Exception ex) {
                LOGGER.error("Unable to create configured LoggerContextFactory {}", factoryClassName, ex);
            }
        }

        if (factory == null) {
            final SortedMap<Integer, LoggerContextFactory> factories = new TreeMap<>();
            // note that the following initial call to ProviderUtil may block until a Provider has been installed when
            // running in an OSGi environment
            if (ProviderUtil.hasProviders()) {
                for (final Provider provider : ProviderUtil.getProviders()) {
                    final Class<? extends LoggerContextFactory> factoryClass = provider.loadLoggerContextFactory();
                    if (factoryClass != null) {
                        try {
                            factories.put(provider.getPriority(), factoryClass.newInstance());
                        } catch (final Exception e) {
                            LOGGER.error("Unable to create class {} specified in provider URL {}", factoryClass.getName(), provider
                                    .getUrl(), e);
                        }
                    }
                }

                if (factories.isEmpty()) {
                    LOGGER.error("Log4j2 could not find a logging implementation. "
                            + "Please add log4j-core to the classpath. Using SimpleLogger to log to the console...");
                    factory = new SimpleLoggerContextFactory();
                } else if (factories.size() == 1) {
                    factory = factories.get(factories.lastKey());
                } else {
                    final StringBuilder sb = new StringBuilder("Multiple logging implementations found: \n");
                    for (final Map.Entry<Integer, LoggerContextFactory> entry : factories.entrySet()) {
                        sb.append("Factory: ").append(entry.getValue().getClass().getName());
                        sb.append(", Weighting: ").append(entry.getKey()).append('\n');
                    }
                    factory = factories.get(factories.lastKey());
                    sb.append("Using factory: ").append(factory.getClass().getName());
                    LOGGER.warn(sb.toString());

                }
            } else {
                LOGGER.error("Log4j2 could not find a logging implementation. "
                        + "Please add log4j-core to the classpath. Using SimpleLogger to log to the console...");
                factory = new SimpleLoggerContextFactory();
            }
        }
    }

这段静态代码段主要分为下面的几个步骤:

  1. 首先根据特定配置文件的配置信息获取loggerContextFactory
  2. 如果没有找到对应的Factory的实现类则通过ProviderUtil中的getProviders()方法载入providers,随后通过provider的loadLoggerContextFactory方法载入LoggerContextFactory的实现类
  3. 如果provider中没有获取到LoggerContextFactory的实现类或provider为空,则使用SimpleLoggerContextFactory作为LoggerContextFactory。

根据配置文件载入LoggerContextFactory

// Shortcut binding to force a specific logging implementation.
        final PropertiesUtil managerProps = PropertiesUtil.getProperties();
        final String factoryClassName = managerProps.getStringProperty(FACTORY_PROPERTY_NAME);
        if (factoryClassName != null) {
            try {
                factory = LoaderUtil.newCheckedInstanceOf(factoryClassName, LoggerContextFactory.class);
            } catch (final ClassNotFoundException cnfe) {
                LOGGER.error("Unable to locate configured LoggerContextFactory {}", factoryClassName);
            } catch (final Exception ex) {
                LOGGER.error("Unable to create configured LoggerContextFactory {}", factoryClassName, ex);
            }
        }

在这段逻辑中,LogManager优先通过配置文件”log4j2.component.properties”通过配置项”log4j2.loggerContextFactory”来获取LoggerContextFactory,如果用户做了对应的配置,通过newCheckedInstanceOf方法实例化LoggerContextFactory的对象,最终的实现方式为:

public static <T> T newInstanceOf(final Class<T> clazz)
            throws InstantiationException, IllegalAccessException, InvocationTargetException {
        try {
            return clazz.getConstructor().newInstance();
        } catch (final NoSuchMethodException ignored) {
            // FIXME: looking at the code for Class.newInstance(), this seems to do the same thing as above
            return clazz.newInstance();
        }
    }

在默认情况下,不存在初始的默认配置文件log4j2.component.properties,因此需要从其他途径获取LoggerContextFactory。

通过Provider实例化LoggerContextFactory对象

代码:

if (factory == null) {
            final SortedMap<Integer, LoggerContextFactory> factories = new TreeMap<>();
            // note that the following initial call to ProviderUtil may block until a Provider has been installed when
            // running in an OSGi environment
            if (ProviderUtil.hasProviders()) {
                for (final Provider provider : ProviderUtil.getProviders()) {
                    final Class<? extends LoggerContextFactory> factoryClass = provider.loadLoggerContextFactory();
                    if (factoryClass != null) {
                        try {
                            factories.put(provider.getPriority(), factoryClass.newInstance());
                        } catch (final Exception e) {
                            LOGGER.error("Unable to create class {} specified in provider URL {}", factoryClass.getName(), provider
                                    .getUrl(), e);
                        }
                    }
                }

                if (factories.isEmpty()) {
                    LOGGER.error("Log4j2 could not find a logging implementation. "
                            + "Please add log4j-core to the classpath. Using SimpleLogger to log to the console...");
                    factory = new SimpleLoggerContextFactory();
                } else if (factories.size() == 1) {
                    factory = factories.get(factories.lastKey());
                } else {
                    final StringBuilder sb = new StringBuilder("Multiple logging implementations found: \n");
                    for (final Map.Entry<Integer, LoggerContextFactory> entry : factories.entrySet()) {
                        sb.append("Factory: ").append(entry.getValue().getClass().getName());
                        sb.append(", Weighting: ").append(entry.getKey()).append('\n');
                    }
                    factory = factories.get(factories.lastKey());
                    sb.append("Using factory: ").append(factory.getClass().getName());
                    LOGGER.warn(sb.toString());

                }
            } else {
                LOGGER.error("Log4j2 could not find a logging implementation. "
                        + "Please add log4j-core to the classpath. Using SimpleLogger to log to the console...");
                factory = new SimpleLoggerContextFactory();
            }
        }

这里比较有意思的是hasProviders和getProviders都会通过线程安全的方式去懒加载ProviderUtil这个对象。跟进lazyInit方法:

protected static void lazyInit() {
        //noinspection DoubleCheckedLocking
        if (INSTANCE == null) {
            try {
                STARTUP_LOCK.lockInterruptibly();
                if (INSTANCE == null) {
                    INSTANCE = new ProviderUtil();
                }
            } catch (final InterruptedException e) {
                LOGGER.fatal("Interrupted before Log4j Providers could be loaded.", e);
                Thread.currentThread().interrupt();
            } finally {
                STARTUP_LOCK.unlock();
            }
        }
    }

再看构造方法:

private ProviderUtil() {
        for (final LoaderUtil.UrlResource resource : LoaderUtil.findUrlResources(PROVIDER_RESOURCE)) {
            loadProvider(resource.getUrl(), resource.getClassLoader());
        }
    }

这里的懒加载其实就是懒加载Provider对象。在创建新的providerUtil实例的过程中就会直接实例化provider对象,其过程是先通过getClassLoaders方法获取provider的类加载器,然后通过loadProviders(classLoader);加载类。在providerUtil实例化的最后,会统一查找”META-INF/log4j-provider.properties”文件中对应的provider的url,会考虑从远程加载provider。而loadProviders方法就是在ProviderUtil的PROVIDERS列表中添加对一个的provider。可以看到默认的provider是org.apache.logging.log4j.core.impl.Log4jContextFactory

LoggerContextFactory = org.apache.logging.log4j.core.impl.Log4jContextFactory
Log4jAPIVersion = 2.1.0
FactoryPriority= 10

很有意思的是这里懒加载加上了锁,而且使用的是
lockInterruptibly这个方法。lockInterruptibly和lock的区别如下:

lock 与 lockInterruptibly比较区别在于:

lock 优先考虑获取锁,待获取锁成功后,才响应中断。
lockInterruptibly 优先考虑响应中断,而不是响应锁的普通获取或重入获取。
ReentrantLock.lockInterruptibly允许在等待时由其它线程调用等待线程的
Thread.interrupt 方法来中断等待线程的等待而直接返回,这时不用获取锁,而会抛出一个InterruptedException。 ReentrantLock.lock方法不允许Thread.interrupt中断,即使检测到Thread.isInterrupted,一样会继续尝试获取锁,失败则继续休眠。只是在最后获取锁成功后再把当前线程置为interrupted状态,然后再中断线程。

上面有一句注释值得注意:

/**
     * Guards the ProviderUtil singleton instance from lazy initialization. This is primarily used for OSGi support.
     *
     * @since 2.1
     */
    protected static final Lock STARTUP_LOCK = new ReentrantLock();
    // STARTUP_LOCK guards INSTANCE for lazy initialization; this allows the OSGi Activator to pause the startup and
    // wait for a Provider to be installed. See LOG4J2-373
    private static volatile ProviderUtil INSTANCE;

原来这里是为了让osgi可以阻止启动。
再回到logManager:
可以看到在加载完Provider之后,会做factory的绑定:

if (factories.isEmpty()) {
                    LOGGER.error("Log4j2 could not find a logging implementation. "
                            + "Please add log4j-core to the classpath. Using SimpleLogger to log to the console...");
                    factory = new SimpleLoggerContextFactory();
                } else if (factories.size() == 1) {
                    factory = factories.get(factories.lastKey());
                } else {
                    final StringBuilder sb = new StringBuilder("Multiple logging implementations found: \n");
                    for (final Map.Entry<Integer, LoggerContextFactory> entry : factories.entrySet()) {
                        sb.append("Factory: ").append(entry.getValue().getClass().getName());
                        sb.append(", Weighting: ").append(entry.getKey()).append('\n');
                    }
                    factory = factories.get(factories.lastKey());
                    sb.append("Using factory: ").append(factory.getClass().getName());
                    LOGGER.warn(sb.toString());

                }

到这里,logmanager的启动流程就结束了。

配置

在不使用slf4j的情况下,我们获取logger的方式是这样的:

Logger logger = logManager.getLogger(xx.class)

跟进getLogger方法:

    public static Logger getLogger(final Class<?> clazz) {
        final Class<?> cls = callerClass(clazz);
        return getContext(cls.getClassLoader(), false).getLogger(toLoggerName(cls));
    }

这里有一个getContext方法,跟进,

public static LoggerContext getContext(final ClassLoader loader, final boolean currentContext) {
        try {
            return factory.getContext(FQCN, loader, null, currentContext);
        } catch (final IllegalStateException ex) {
            LOGGER.warn(ex.getMessage() + " Using SimpleLogger");
            return new SimpleLoggerContextFactory().getContext(FQCN, loader, null, currentContext);
        }
    }

上文提到factory的具体实现是Log4jContextFactory,跟进getContext
方法:

public LoggerContext getContext(final String fqcn, final ClassLoader loader, final Object externalContext,
                                    final boolean currentContext) {
        final LoggerContext ctx = selector.getContext(fqcn, loader, currentContext);
        if (externalContext != null && ctx.getExternalContext() == null) {
            ctx.setExternalContext(externalContext);
        }
        if (ctx.getState() == LifeCycle.State.INITIALIZED) {
            ctx.start();
        }
        return ctx;
    }

直接看start:

public void start() {
        LOGGER.debug("Starting LoggerContext[name={}, {}]...", getName(), this);
        if (PropertiesUtil.getProperties().getBooleanProperty("log4j.LoggerContext.stacktrace.on.start", false)) {
            LOGGER.debug("Stack trace to locate invoker",
                    new Exception("Not a real error, showing stack trace to locate invoker"));
        }
        if (configLock.tryLock()) {
            try {
                if (this.isInitialized() || this.isStopped()) {
                    this.setStarting();
                    reconfigure();
                    if (this.configuration.isShutdownHookEnabled()) {
                        setUpShutdownHook();
                    }
                    this.setStarted();
                }
            } finally {
                configLock.unlock();
            }
        }
        LOGGER.debug("LoggerContext[name={}, {}] started OK.", getName(), this);
    }

发现其中的核心方法是reconfigure方法,继续跟进:

private void reconfigure(final URI configURI) {
        final ClassLoader cl = ClassLoader.class.isInstance(externalContext) ? (ClassLoader) externalContext : null;
        LOGGER.debug("Reconfiguration started for context[name={}] at URI {} ({}) with optional ClassLoader: {}",
                contextName, configURI, this, cl);
        final Configuration instance = ConfigurationFactory.getInstance().getConfiguration(this, contextName, configURI, cl);
        if (instance == null) {
            LOGGER.error("Reconfiguration failed: No configuration found for '{}' at '{}' in '{}'", contextName, configURI, cl);
        } else {
            setConfiguration(instance);
            /*
             * instance.start(); Configuration old = setConfiguration(instance); updateLoggers(); if (old != null) {
             * old.stop(); }
             */
            final String location = configuration == null ? "?" : String.valueOf(configuration.getConfigurationSource());
            LOGGER.debug("Reconfiguration complete for context[name={}] at URI {} ({}) with optional ClassLoader: {}",
                    contextName, location, this, cl);
        }
    }

可以看到每一个configuration都是从ConfigurationFactory拿出来的,我们先看看这个类的getInstance看看:

public static ConfigurationFactory getInstance() {
        // volatile works in Java 1.6+, so double-checked locking also works properly
        //noinspection DoubleCheckedLocking
        if (factories == null) {
            LOCK.lock();
            try {
                if (factories == null) {
                    final List<ConfigurationFactory> list = new ArrayList<ConfigurationFactory>();
                    final String factoryClass = PropertiesUtil.getProperties().getStringProperty(CONFIGURATION_FACTORY_PROPERTY);
                    if (factoryClass != null) {
                        addFactory(list, factoryClass);
                    }
                    final PluginManager manager = new PluginManager(CATEGORY);
                    manager.collectPlugins();
                    final Map<String, PluginType<?>> plugins = manager.getPlugins();
                    final List<Class<? extends ConfigurationFactory>> ordered =
                        new ArrayList<Class<? extends ConfigurationFactory>>(plugins.size());
                    for (final PluginType<?> type : plugins.values()) {
                        try {
                            ordered.add(type.getPluginClass().asSubclass(ConfigurationFactory.class));
                        } catch (final Exception ex) {
                            LOGGER.warn("Unable to add class {}", type.getPluginClass(), ex);
                        }
                    }
                    Collections.sort(ordered, OrderComparator.getInstance());
                    for (final Class<? extends ConfigurationFactory> clazz : ordered) {
                        addFactory(list, clazz);
                    }
                    // see above comments about double-checked locking
                    //noinspection NonThreadSafeLazyInitialization
                    factories = Collections.unmodifiableList(list);
                }
            } finally {
                LOCK.unlock();
            }
        }

        LOGGER.debug("Using configurationFactory {}", configFactory);
        return configFactory;
    }

这里可以看到ConfigurationFactory中利用了PluginManager来进行初始化,PluginManager会将ConfigurationFactory的子类加载进来,默认使用的XmlConfigurationFactory,JsonConfigurationFactory,YamlConfigurationFactory这三个子类,这里插件化加载暂时按下不表。
回到reconfigure这个方法,我们看到获取ConfigurationFactory实例之后会去调用getConfiguration方法:

public Configuration getConfiguration(final String name, final URI configLocation, final ClassLoader loader) {
        if (!isActive()) {
            return null;
        }
        if (loader == null) {
            return getConfiguration(name, configLocation);
        }
        if (isClassLoaderUri(configLocation)) {
            final String path = extractClassLoaderUriPath(configLocation);
            final ConfigurationSource source = getInputFromResource(path, loader);
            if (source != null) {
                final Configuration configuration = getConfiguration(source);
                if (configuration != null) {
                    return configuration;
                }
            }
        }
        return getConfiguration(name, configLocation);
    }

跟进getConfiguration,这里值得注意的是有很多个getConfiguration,注意甄别,如果不确定的话可以通过debug的方式来确定。

public Configuration getConfiguration(final String name, final URI configLocation) {

            if (configLocation == null) {
                final String config = this.substitutor.replace(
                    PropertiesUtil.getProperties().getStringProperty(CONFIGURATION_FILE_PROPERTY));
                if (config != null) {
                    ConfigurationSource source = null;
                    try {
                        source = getInputFromUri(FileUtils.getCorrectedFilePathUri(config));
                    } catch (final Exception ex) {
                        // Ignore the error and try as a String.
                        LOGGER.catching(Level.DEBUG, ex);
                    }
                    if (source == null) {
                        final ClassLoader loader = LoaderUtil.getThreadContextClassLoader();
                        source = getInputFromString(config, loader);
                    }
                    if (source != null) {
                        for (final ConfigurationFactory factory : factories) {
                            final String[] types = factory.getSupportedTypes();
                            if (types != null) {
                                for (final String type : types) {
                                    if (type.equals("*") || config.endsWith(type)) {
                                        final Configuration c = factory.getConfiguration(source);
                                        if (c != null) {
                                            return c;
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            } else {
                for (final ConfigurationFactory factory : factories) {
                    final String[] types = factory.getSupportedTypes();
                    if (types != null) {
                        for (final String type : types) {
                            if (type.equals("*") || configLocation.toString().endsWith(type)) {
                                final Configuration config = factory.getConfiguration(name, configLocation);
                                if (config != null) {
                                    return config;
                                }
                            }
                        }
                    }
                }
            }

            Configuration config = getConfiguration(true, name);
            if (config == null) {
                config = getConfiguration(true, null);
                if (config == null) {
                    config = getConfiguration(false, name);
                    if (config == null) {
                        config = getConfiguration(false, null);
                    }
                }
            }
            if (config != null) {
                return config;
            }
            LOGGER.error("No log4j2 configuration file found. Using default configuration: logging only errors to the console.");
            return new DefaultConfiguration();
        }

这里就会根据之前加载进来的factory进行配置的获取,具体的不再解析。
回到reconfigure,之后的步骤就是setConfiguration,入参就是刚才获取的config

private synchronized Configuration setConfiguration(final Configuration config) {
        Assert.requireNonNull(config, "No Configuration was provided");
        final Configuration prev = this.config;
        config.addListener(this);
        final ConcurrentMap<String, String> map = config.getComponent(Configuration.CONTEXT_PROPERTIES);

        try { // LOG4J2-719 network access may throw android.os.NetworkOnMainThreadException
            map.putIfAbsent("hostName", NetUtils.getLocalHostname());
        } catch (final Exception ex) {
            LOGGER.debug("Ignoring {}, setting hostName to 'unknown'", ex.toString());
            map.putIfAbsent("hostName", "unknown");
        }
        map.putIfAbsent("contextName", name);
        config.start();
        this.config = config;
        updateLoggers();
        if (prev != null) {
            prev.removeListener(this);
            prev.stop();
        }

        firePropertyChangeEvent(new PropertyChangeEvent(this, PROPERTY_CONFIG, prev, config));

        try {
            Server.reregisterMBeansAfterReconfigure();
        } catch (final Throwable t) {
            // LOG4J2-716: Android has no java.lang.management
            LOGGER.error("Could not reconfigure JMX", t);
        }
        return prev;
    }

这个方法最重要的步骤就是config.start,这才是真正做配置解析的

public void start() {
        LOGGER.debug("Starting configuration {}", this);
        this.setStarting();
        pluginManager.collectPlugins(pluginPackages);
        final PluginManager levelPlugins = new PluginManager(Level.CATEGORY);
        levelPlugins.collectPlugins(pluginPackages);
        final Map<String, PluginType<?>> plugins = levelPlugins.getPlugins();
        if (plugins != null) {
            for (final PluginType<?> type : plugins.values()) {
                try {
                    // Cause the class to be initialized if it isn't already.
                    Loader.initializeClass(type.getPluginClass().getName(), type.getPluginClass().getClassLoader());
                } catch (final Exception e) {
                    LOGGER.error("Unable to initialize {} due to {}", type.getPluginClass().getName(), e.getClass()
                            .getSimpleName(), e);
                }
            }
        }
        setup();
        setupAdvertisement();
        doConfigure();
        final Set<LoggerConfig> alreadyStarted = new HashSet<LoggerConfig>();
        for (final LoggerConfig logger : loggers.values()) {
            logger.start();
            alreadyStarted.add(logger);
        }
        for (final Appender appender : appenders.values()) {
            appender.start();
        }
        if (!alreadyStarted.contains(root)) { // LOG4J2-392
            root.start(); // LOG4J2-336
        }
        super.start();
        LOGGER.debug("Started configuration {} OK.", this);
    }

这里面有如下步骤:

  1. 获取日志等级的插件
  2. 初始化
  3. 初始化Advertiser
  4. 配置

先看一下初始化,也就是setup这个方法,setup是一个需要被复写的方法,我们以XMLConfiguration作为例子,

@Override
    public void setup() {
        if (rootElement == null) {
            LOGGER.error("No logging configuration");
            return;
        }
        constructHierarchy(rootNode, rootElement);
        if (status.size() > 0) {
            for (final Status s : status) {
                LOGGER.error("Error processing element {}: {}", s.name, s.errorType);
            }
            return;
        }
        rootElement = null;
    }

发现这里面有一个比较重要的方法constructHierarchy,跟进:

private void constructHierarchy(final Node node, final Element element) {
        processAttributes(node, element);
        final StringBuilder buffer = new StringBuilder();
        final NodeList list = element.getChildNodes();
        final List<Node> children = node.getChildren();
        for (int i = 0; i < list.getLength(); i++) {
            final org.w3c.dom.Node w3cNode = list.item(i);
            if (w3cNode instanceof Element) {
                final Element child = (Element) w3cNode;
                final String name = getType(child);
                final PluginType<?> type = pluginManager.getPluginType(name);
                final Node childNode = new Node(node, name, type);
                constructHierarchy(childNode, child);
                if (type == null) {
                    final String value = childNode.getValue();
                    if (!childNode.hasChildren() && value != null) {
                        node.getAttributes().put(name, value);
                    } else {
                        status.add(new Status(name, element, ErrorType.CLASS_NOT_FOUND));
                    }
                } else {
                    children.add(childNode);
                }
            } else if (w3cNode instanceof Text) {
                final Text data = (Text) w3cNode;
                buffer.append(data.getData());
            }
        }

        final String text = buffer.toString().trim();
        if (text.length() > 0 || (!node.hasChildren() && !node.isRoot())) {
            node.setValue(text);
        }
    }

发现这个就是一个树遍历的过程。诚然,配置文件是以xml的形式给出的,xml的结构就是一个树形结构。回到start方法,跟进doConfiguration:

protected void doConfigure() {
        if (rootNode.hasChildren() && rootNode.getChildren().get(0).getName().equalsIgnoreCase("Properties")) {
            final Node first = rootNode.getChildren().get(0);
            createConfiguration(first, null);
            if (first.getObject() != null) {
                subst.setVariableResolver((StrLookup) first.getObject());
            }
        } else {
            final Map<String, String> map = this.getComponent(CONTEXT_PROPERTIES);
            final StrLookup lookup = map == null ? null : new MapLookup(map);
            subst.setVariableResolver(new Interpolator(lookup, pluginPackages));
        }

        boolean setLoggers = false;
        boolean setRoot = false;
        for (final Node child : rootNode.getChildren()) {
            if (child.getName().equalsIgnoreCase("Properties")) {
                if (tempLookup == subst.getVariableResolver()) {
                    LOGGER.error("Properties declaration must be the first element in the configuration");
                }
                continue;
            }
            createConfiguration(child, null);
            if (child.getObject() == null) {
                continue;
            }
            if (child.getName().equalsIgnoreCase("Appenders")) {
                appenders = child.getObject();
            } else if (child.isInstanceOf(Filter.class)) {
                addFilter(child.getObject(Filter.class));
            } else if (child.getName().equalsIgnoreCase("Loggers")) {
                final Loggers l = child.getObject();
                loggers = l.getMap();
                setLoggers = true;
                if (l.getRoot() != null) {
                    root = l.getRoot();
                    setRoot = true;
                }
            } else if (child.getName().equalsIgnoreCase("CustomLevels")) {
                customLevels = child.getObject(CustomLevels.class).getCustomLevels();
            } else if (child.isInstanceOf(CustomLevelConfig.class)) {
                final List<CustomLevelConfig> copy = new ArrayList<CustomLevelConfig>(customLevels);
                copy.add(child.getObject(CustomLevelConfig.class));
                customLevels = copy;
            } else {
                LOGGER.error("Unknown object \"{}\" of type {} is ignored.", child.getName(),
                        child.getObject().getClass().getName());
            }
        }

        if (!setLoggers) {
            LOGGER.warn("No Loggers were configured, using default. Is the Loggers element missing?");
            setToDefault();
            return;
        } else if (!setRoot) {
            LOGGER.warn("No Root logger was configured, creating default ERROR-level Root logger with Console appender");
            setToDefault();
            // return; // LOG4J2-219: creating default root=ok, but don't exclude configured Loggers
        }

        for (final Map.Entry<String, LoggerConfig> entry : loggers.entrySet()) {
            final LoggerConfig l = entry.getValue();
            for (final AppenderRef ref : l.getAppenderRefs()) {
                final Appender app = appenders.get(ref.getRef());
                if (app != null) {
                    l.addAppender(app, ref.getLevel(), ref.getFilter());
                } else {
                    LOGGER.error("Unable to locate appender {} for logger {}", ref.getRef(), l.getName());
                }
            }

        }

        setParents();
    }

发现就是对刚刚获取的configuration进行解析,然后塞进正确的地方。回到start方法,可以看到昨晚配置之后就是开启logger和appender了。

异步

AsyncAppender

log4j2突出于其他日志的优势,异步日志实现。我们先从日志打印看进去。找到Logger,随便找一个log日志的方法。

public void debug(final Marker marker, final Message msg) {
        logIfEnabled(FQCN, Level.DEBUG, marker, msg, msg != null ? msg.getThrowable() : null);
    }

一路跟进

@PerformanceSensitive
    // NOTE: This is a hot method. Current implementation compiles to 29 bytes of byte code.
    // This is within the 35 byte MaxInlineSize threshold. Modify with care!
    private void logMessageTrackRecursion(final String fqcn,
                                          final Level level,
                                          final Marker marker,
                                          final Message msg,
                                          final Throwable throwable) {
        try {
            incrementRecursionDepth(); // LOG4J2-1518, LOG4J2-2031
            tryLogMessage(fqcn, level, marker, msg, throwable);
        } finally {
            decrementRecursionDepth();
        }
    }

可以看出这个在打日志之前做了调用次数的记录。跟进tryLogMessage,

@PerformanceSensitive
    // NOTE: This is a hot method. Current implementation compiles to 26 bytes of byte code.
    // This is within the 35 byte MaxInlineSize threshold. Modify with care!
    private void tryLogMessage(final String fqcn,
                               final Level level,
                               final Marker marker,
                               final Message msg,
                               final Throwable throwable) {
        try {
            logMessage(fqcn, level, marker, msg, throwable);
        } catch (final Exception e) {
            // LOG4J2-1990 Log4j2 suppresses all exceptions that occur once application called the logger
            handleLogMessageException(e, fqcn, msg);
        }
    }

继续跟进:

@Override
    public void logMessage(final String fqcn, final Level level, final Marker marker, final Message message,
            final Throwable t) {
        final Message msg = message == null ? new SimpleMessage(Strings.EMPTY) : message;
        final ReliabilityStrategy strategy = privateConfig.loggerConfig.getReliabilityStrategy();
        strategy.log(this, getName(), fqcn, marker, level, msg, t);
    }

这里可以看到在实际打日志的时候,会从config中获取打日志的策略,跟踪ReliabilityStrategy的创建,发现默认的实现类为DefaultReliabilityStrategy,跟进看实际打日志的方法

@Override
    public void log(final Supplier<LoggerConfig> reconfigured, final String loggerName, final String fqcn, final Marker marker, final Level level,
            final Message data, final Throwable t) {
        loggerConfig.log(loggerName, fqcn, marker, level, data, t);
    }

这里实际打日志的方法居然是交给一个config去实现的。。。感觉有点奇怪。。跟进看看

@PerformanceSensitive("allocation")
    public void log(final String loggerName, final String fqcn, final Marker marker, final Level level,
            final Message data, final Throwable t) {
        List<Property> props = null;
        if (!propertiesRequireLookup) {
            props = properties;
        } else {
            if (properties != null) {
                props = new ArrayList<>(properties.size());
                final LogEvent event = Log4jLogEvent.newBuilder()
                        .setMessage(data)
                        .setMarker(marker)
                        .setLevel(level)
                        .setLoggerName(loggerName)
                        .setLoggerFqcn(fqcn)
                        .setThrown(t)
                        .build();
                for (int i = 0; i < properties.size(); i++) {
                    final Property prop = properties.get(i);
                    final String value = prop.isValueNeedsLookup() // since LOG4J2-1575
                            ? config.getStrSubstitutor().replace(event, prop.getValue()) //
                            : prop.getValue();
                    props.add(Property.createProperty(prop.getName(), value));
                }
            }
        }
        final LogEvent logEvent = logEventFactory.createEvent(loggerName, marker, fqcn, level, data, props, t);
        try {
            log(logEvent, LoggerConfigPredicate.ALL);
        } finally {
            // LOG4J2-1583 prevent scrambled logs when logging calls are nested (logging in toString())
            ReusableLogEventFactory.release(logEvent);
        }
    }

可以清楚的看到try之前是在创建LogEvent,try里面做的才是真正的log(好tm累),一路跟进。

private void processLogEvent(final LogEvent event, LoggerConfigPredicate predicate) {
        event.setIncludeLocation(isIncludeLocation());
        if (predicate.allow(this)) {
            callAppenders(event);
        }
        logParent(event, predicate);
    }

接下来就是callAppender了,我们直接开始看AsyncAppender的append方法:

/**
     * Actual writing occurs here.
     *
     * @param logEvent The LogEvent.
     */
    @Override
    public void append(final LogEvent logEvent) {
        if (!isStarted()) {
            throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
        }
        final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
        InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage());
        if (!transfer(memento)) {
            if (blocking) {
                if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031
                    // If queue is full AND we are in a recursive call, call appender directly to prevent deadlock
                    AsyncQueueFullMessageUtil.logWarningToStatusLogger();
                    logMessageInCurrentThread(logEvent);
                } else {
                    // delegate to the event router (which may discard, enqueue and block, or log in current thread)
                    final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
                    route.logMessage(this, memento);
                }
            } else {
                error("Appender " + getName() + " is unable to write primary appenders. queue is full");
                logToErrorAppenderIfNecessary(false, memento);
            }
        }
    }

这里主要的步骤就是:

  1. 生成logEvent
  2. 将logEvent放入BlockingQueue,就是transfer方法
  3. 如果BlockingQueue满了则启用相应的策略

同样的,这里也有一个线程用来做异步消费的事情

private class AsyncThread extends Log4jThread {

        private volatile boolean shutdown = false;
        private final List<AppenderControl> appenders;
        private final BlockingQueue<LogEvent> queue;

        public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) {
            super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement());
            this.appenders = appenders;
            this.queue = queue;
            setDaemon(true);
        }

        @Override
        public void run() {
            while (!shutdown) {
                LogEvent event;
                try {
                    event = queue.take();
                    if (event == SHUTDOWN_LOG_EVENT) {
                        shutdown = true;
                        continue;
                    }
                } catch (final InterruptedException ex) {
                    break; // LOG4J2-830
                }
                event.setEndOfBatch(queue.isEmpty());
                final boolean success = callAppenders(event);
                if (!success && errorAppender != null) {
                    try {
                        errorAppender.callAppender(event);
                    } catch (final Exception ex) {
                        // Silently accept the error.
                    }
                }
            }
            // Process any remaining items in the queue.
            LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
                queue.size());
            int count = 0;
            int ignored = 0;
            while (!queue.isEmpty()) {
                try {
                    final LogEvent event = queue.take();
                    if (event instanceof Log4jLogEvent) {
                        final Log4jLogEvent logEvent = (Log4jLogEvent) event;
                        logEvent.setEndOfBatch(queue.isEmpty());
                        callAppenders(logEvent);
                        count++;
                    } else {
                        ignored++;
                        LOGGER.trace("Ignoring event of class {}", event.getClass().getName());
                    }
                } catch (final InterruptedException ex) {
                    // May have been interrupted to shut down.
                    // Here we ignore interrupts and try to process all remaining events.
                }
            }
            LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
                + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
        }

        /**
         * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on all registered {@code AppenderControl}
         * objects, and returns {@code true} if at least one appender call was successful, {@code false} otherwise. Any
         * exceptions are silently ignored.
         *
         * @param event the event to forward to the registered appenders
         * @return {@code true} if at least one appender call succeeded, {@code false} otherwise
         */
        boolean callAppenders(final LogEvent event) {
            boolean success = false;
            for (final AppenderControl control : appenders) {
                try {
                    control.callAppender(event);
                    success = true;
                } catch (final Exception ex) {
                    // If no appender is successful the error appender will get it.
                }
            }
            return success;
        }

        public void shutdown() {
            shutdown = true;
            if (queue.isEmpty()) {
                queue.offer(SHUTDOWN_LOG_EVENT);
            }
            if (getState() == State.TIMED_WAITING || getState() == State.WAITING) {
                this.interrupt(); // LOG4J2-1422: if underlying appender is stuck in wait/sleep/join/park call
            }
        }
    }

直接看run方法:

  1. 阻塞获取logEvent
  2. 将logEvent分发出去
  3. 如果线程要退出了,将blockingQueue里面的event消费完在退出。

AsyncLogger

直接从AsyncLogger的logMessage看进去:

public void logMessage(final String fqcn, final Level level, final Marker marker, final Message message,
            final Throwable thrown) {

        if (loggerDisruptor.isUseThreadLocals()) {
            logWithThreadLocalTranslator(fqcn, level, marker, message, thrown);
        } else {
            // LOG4J2-1172: avoid storing non-JDK classes in ThreadLocals to avoid memory leaks in web apps
            logWithVarargTranslator(fqcn, level, marker, message, thrown);
        }
    }

跟进logWithThreadLocalTranslator,

private void logWithThreadLocalTranslator(final String fqcn, final Level level, final Marker marker,
            final Message message, final Throwable thrown) {
        // Implementation note: this method is tuned for performance. MODIFY WITH CARE!

        final RingBufferLogEventTranslator translator = getCachedTranslator();
        initTranslator(translator, fqcn, level, marker, message, thrown);
        initTranslatorThreadValues(translator);
        publish(translator);
    }

这里的逻辑很简单,就是将日志相关的信息转换成RingBufferLogEvent(RingBuffer是Disruptor的无所队列),然后将其发布到RingBuffer中。发布到RingBuffer中,那肯定也有消费逻辑。这时候有两种方式可以找到这个消费的逻辑。

  • 找disruptor被使用的地方,然后查看,但是这样做会很容易迷惑
  • 按照Log4j2的尿性,这种Logger都有对应的start方法,我们可以从start方法入手寻找

在start方法中,我们找到了一段代码:

final RingBufferLogEventHandler[] handlers = {new RingBufferLogEventHandler()};
        disruptor.handleEventsWith(handlers);

直接看看这个RingBufferLogEventHandler的实现:

public class RingBufferLogEventHandler implements
        SequenceReportingEventHandler<RingBufferLogEvent>, LifecycleAware {

    private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
    private Sequence sequenceCallback;
    private int counter;
    private long threadId = -1;

    @Override
    public void setSequenceCallback(final Sequence sequenceCallback) {
        this.sequenceCallback = sequenceCallback;
    }

    @Override
    public void onEvent(final RingBufferLogEvent event, final long sequence,
            final boolean endOfBatch) throws Exception {
        event.execute(endOfBatch);
        event.clear();

        // notify the BatchEventProcessor that the sequence has progressed.
        // Without this callback the sequence would not be progressed
        // until the batch has completely finished.
        if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
            sequenceCallback.set(sequence);
            counter = 0;
        }
    }

    /**
     * Returns the thread ID of the background consumer thread, or {@code -1} if the background thread has not started
     * yet.
     * @return the thread ID of the background consumer thread, or {@code -1}
     */
    public long getThreadId() {
        return threadId;
    }

    @Override
    public void onStart() {
        threadId = Thread.currentThread().getId();
    }

    @Override
    public void onShutdown() {
    }
}

顺着接口找上去,发现一个接口:

/**
 * Callback interface to be implemented for processing events as they become available in the {@link RingBuffer}
 *
 * @param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.
 * @see BatchEventProcessor#setExceptionHandler(ExceptionHandler) if you want to handle exceptions propagated out of the handler.
 */
public interface EventHandler<T>
{
    /**
     * Called when a publisher has published an event to the {@link RingBuffer}
     *
     * @param event      published to the {@link RingBuffer}
     * @param sequence   of the event being processed
     * @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
     * @throws Exception if the EventHandler would like the exception handled further up the chain.
     */
    void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
}

通过注释可以发现,这个onEvent就是处理逻辑,回到RingBufferLogEventHandler的onEvent方法,发现里面有一个execute方法,跟进:

public void execute(final boolean endOfBatch) {
        this.endOfBatch = endOfBatch;
        asyncLogger.actualAsyncLog(this);
    }

这个方法就是实际打日志了,AsyncLogger看起来还是比较简单的,只是使用了一个Disruptor。

插件化

之前在很多代码里面都可以看到

final PluginManager manager = new PluginManager(CATEGORY);
manager.collectPlugins(pluginPackages);

其实整个log4j2为了获得更好的扩展性,将自己的很多组件都做成了插件,然后在配置的时候去加载plugin。
跟进collectPlugins。

 public void collectPlugins(final List<String> packages) {
        final String categoryLowerCase = category.toLowerCase();
        final Map<String, PluginType<?>> newPlugins = new LinkedHashMap<>();

        // First, iterate the Log4j2Plugin.dat files found in the main CLASSPATH
        Map<String, List<PluginType<?>>> builtInPlugins = PluginRegistry.getInstance().loadFromMainClassLoader();
        if (builtInPlugins.isEmpty()) {
            // If we didn't find any plugins above, someone must have messed with the log4j-core.jar.
            // Search the standard package in the hopes we can find our core plugins.
            builtInPlugins = PluginRegistry.getInstance().loadFromPackage(LOG4J_PACKAGES);
        }
        mergeByName(newPlugins, builtInPlugins.get(categoryLowerCase));

        // Next, iterate any Log4j2Plugin.dat files from OSGi Bundles
        for (final Map<String, List<PluginType<?>>> pluginsByCategory : PluginRegistry.getInstance().getPluginsByCategoryByBundleId().values()) {
            mergeByName(newPlugins, pluginsByCategory.get(categoryLowerCase));
        }

        // Next iterate any packages passed to the static addPackage method.
        for (final String pkg : PACKAGES) {
            mergeByName(newPlugins, PluginRegistry.getInstance().loadFromPackage(pkg).get(categoryLowerCase));
        }
        // Finally iterate any packages provided in the configuration (note these can be changed at runtime).
        if (packages != null) {
            for (final String pkg : packages) {
                mergeByName(newPlugins, PluginRegistry.getInstance().loadFromPackage(pkg).get(categoryLowerCase));
            }
        }

        LOGGER.debug("PluginManager '{}' found {} plugins", category, newPlugins.size());

        plugins = newPlugins;
    }

处理逻辑如下:

  1. 从Log4j2Plugin.dat中加载所有的内置的plugin
  2. 然后将OSGi Bundles中的Log4j2Plugin.dat中的plugin加载进来
  3. 再加载传入的package路径中的plugin
  4. 最后加载配置中的plugin

逻辑还是比较简单的,但是我在看源码的时候发现了一个很有意思的东西,就是在加载log4j2 core插件的时候,也就是

PluginRegistry.getInstance().loadFromMainClassLoader()

这个方法,跟进到decodeCacheFiles:

private Map<String, List<PluginType<?>>> decodeCacheFiles(final ClassLoader loader) {
        final long startTime = System.nanoTime();
        final PluginCache cache = new PluginCache();
        try {
            final Enumeration<URL> resources = loader.getResources(PluginProcessor.PLUGIN_CACHE_FILE);
            if (resources == null) {
                LOGGER.info("Plugin preloads not available from class loader {}", loader);
            } else {
                cache.loadCacheFiles(resources);
            }
        } catch (final IOException ioe) {
            LOGGER.warn("Unable to preload plugins", ioe);
        }
        final Map<String, List<PluginType<?>>> newPluginsByCategory = new HashMap<>();
        int pluginCount = 0;
        for (final Map.Entry<String, Map<String, PluginEntry>> outer : cache.getAllCategories().entrySet()) {
            final String categoryLowerCase = outer.getKey();
            final List<PluginType<?>> types = new ArrayList<>(outer.getValue().size());
            newPluginsByCategory.put(categoryLowerCase, types);
            for (final Map.Entry<String, PluginEntry> inner : outer.getValue().entrySet()) {
                final PluginEntry entry = inner.getValue();
                final String className = entry.getClassName();
                try {
                    final Class<?> clazz = loader.loadClass(className);
                    final PluginType<?> type = new PluginType<>(entry, clazz, entry.getName());
                    types.add(type);
                    ++pluginCount;
                } catch (final ClassNotFoundException e) {
                    LOGGER.info("Plugin [{}] could not be loaded due to missing classes.", className, e);
                } catch (final LinkageError e) {
                    LOGGER.info("Plugin [{}] could not be loaded due to linkage error.", className, e);
                }
            }
        }

        final long endTime = System.nanoTime();
        final DecimalFormat numFormat = new DecimalFormat("#0.000000");
        final double seconds = (endTime - startTime) * 1e-9;
        LOGGER.debug("Took {} seconds to load {} plugins from {}",
            numFormat.format(seconds), pluginCount, loader);
        return newPluginsByCategory;
    }

可以发现加载时候是从一个文件(PLUGIN_CACHE_FILE)获取所有要获取的plugin。看到这里的时候我有一个疑惑就是,为什么不用反射的方式直接去扫描,而是要从文件中加载进来,而且文件是写死的,很不容易扩展啊。然后我找了一下PLUGIN_CACHE_FILE这个静态变量的用处,发现了PluginProcessor这个类,这里用到了注解处理器

/**
 * Annotation processor for pre-scanning Log4j 2 plugins.
 */
@SupportedAnnotationTypes("org.apache.logging.log4j.core.config.plugins.*")
public class PluginProcessor extends AbstractProcessor {

    // TODO: this could be made more abstract to allow for compile-time and run-time plugin processing

    /**
     * The location of the plugin cache data file. This file is written to by this processor, and read from by
     * {@link org.apache.logging.log4j.core.config.plugins.util.PluginManager}.
     */
    public static final String PLUGIN_CACHE_FILE =
            "META-INF/org/apache/logging/log4j/core/config/plugins/Log4j2Plugins.dat";

    private final PluginCache pluginCache = new PluginCache();

    @Override
    public boolean process(final Set<? extends TypeElement> annotations, final RoundEnvironment roundEnv) {
        System.out.println("Processing annotations");
        try {
            final Set<? extends Element> elements = roundEnv.getElementsAnnotatedWith(Plugin.class);
            if (elements.isEmpty()) {
                System.out.println("No elements to process");
                return false;
            }
            collectPlugins(elements);
            writeCacheFile(elements.toArray(new Element[elements.size()]));
            System.out.println("Annotations processed");
            return true;
        } catch (final IOException e) {
            e.printStackTrace();
            error(e.getMessage());
            return false;
        } catch (final Exception ex) {
            ex.printStackTrace();
            error(ex.getMessage());
            return false;
        }
    }
}

(不太重要的方法省略)

我们可以看到在process方法中,PluginProcessor会先收集所有的Plugin,然后在写入文件。这样做的好处就是可以省去反射时候的开销。
然后我又看了一下Plugin这个注解,发现它的RetentionPolicy是RUNTIME,一般来说PluginProcessor是搭配RetentionPolicy.SOURCE,CLASS使用的,而且既然你把自己的Plugin扫描之后写在文件中了,RetentionPolicy就没有必要是RUNTIME了吧,这个是一个很奇怪的地方。

小结

总算是把Log4j2的代码看完了,发现它的设计理念很值得借鉴,为了灵活性,所有的东西都设计成插件式。互联网技术日益发展,各种中间件层出不穷,而作为工程师的我们更需要做的是去思考代码与代码之间的关系,毫无疑问的是,解耦是最具有美感的关系。

作者:netflix

原文链接

本文为云栖社区原创内容,未经允许不得转载。