JedisSentinelAPI的使用和源码解读

  |   0 评论   |   8,891 浏览

    Sentinel的的部署和实践已经结束了,本篇文章是Redis的Java客户端端JedisSentinelAPI的使用和源码解读。

    简单例子:

    public class RedisSentinelTest {
    	public static void main(String[] args) throws Exception {
    		
    		//Sentinel的配置
    		Set<String> sentinels = new HashSet<String>();
    		sentinels.add("10.10.10.126:26379");
    		sentinels.add("10.10.10.126:26479");
    		sentinels.add("10.10.10.126:26579");
    
    		//JedisPool 连接池的配置
    		GenericObjectPoolConfig config = new GenericObjectPoolConfig();
    		config.setMaxTotal(10000);
    		config.setMaxIdle(200);
    		config.setTestOnBorrow(true);
    		config.setMaxWaitMillis(1000);
    		
    		//创建JedisPool
    		JedisSentinelPool pool = new JedisSentinelPool("mymaster", sentinels, config);
    		
    		//获取Jedis连接
    		Jedis jedis = pool.getResource();
    
    		//从Redis中获取k1的值
    		String v1 = jedis.get("k1");
    		System.out.println(v1);
    
    		//关闭Jedis,返回pool中
    		jedis.close();
    	}
    }

    注释写的应该能看懂了,就不啰嗦了 ^_^^_^


    线上使用的例子看下面:

    1)spring配置

     <bean id="jedisPoolConfig" class="org.apache.commons.pool2.impl.GenericObjectPoolConfig">
      <property name="maxTotal" value="10000" />
      <property name="maxIdle"  value="200" />
      <property name="maxWaitMillis" value="1000" />
     </bean>
     <bean id="redisSentinel" class="redis.clients.jedis.JedisSentinelPool">
      <constructor-arg index="0" value="mymaster" />
      <constructor-arg index="1">
       <set>  
        <value>10.10.10.126:26379</value>
        <value>10.10.10.126:26479</value>  
        <value>10.10.10.126:26579</value>    
       </set>
      </constructor-arg>
      <constructor-arg index="2" ref="jedisPoolConfig"/>
      <!--constructor-arg index="3" value="tiger"/--> <!-- password configuration-->
     </bean>

    2)连接基础类:RedisUtil.java

    public class RedisUtil {
    	private final static RedisUtil POOL_UTIL = new RedisUtil();
    
    	public static RedisUtil getInstance() {
    		return POOL_UTIL;
    	}
    
    	private RedisUtil() {
    	}
    	
    	private JedisSentinelPool pool;
    
    	private JedisSentinelPool getJedisPool() {
    		if (pool == null) {			
    			synchronized (this) {
    				if (pool == null) {
    					init();
    				}
    			}
    		}
    		return pool;
    	}
    
    	private void init() {
    		try {
    			pool = (JedisSentinelPool) Config.ctx.getBean("redisSentinel");
    		} catch (Exception e) {
    			e.printStackTrace();
    			throw new ExceptionInInitializerError("启动系统异常,系统无法启动,请检查配置文件是否存在!");
    		}
    	}
    
    	//获取Redis连接
    	public Jedis getConnection() {
    		Jedis jedis = null;
    		try {
    			jedis = getJedisPool().getResource();
    			jedis.select(0);
    		} catch (Exception e) {
    			if (jedis != null)
    				jedis.close();
    			e.printStackTrace();
    			throw new RuntimeException("连接Redis异常.", e);
    		}
    		return jedis;
    	}
    
    	//关闭Redis连接
    	public void closeConnection(Jedis jedis) {
    		if (jedis != null) {
    			try {
    				jedis.close();
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }

    3)使用例子

    	public String getValue() {
    		Jedis jedis = RedisUtil.getInstance().getConnection();
    		try {
    			return jedis.get("k100");
    		} catch (Exception e) {
    			;
    		} finally {
    			RedisUtil.getInstance().closeConnection(jedis);
    		}
    		return "";
    	}

    JedisSentinelAPI源码阅读

    其实源码也比较简单,就几个类,还是看文章开头的代码例子

    顺着这一句 JedisSentinelPool pool = new JedisSentinelPool("mymaster", sentinels, config); 是创建了一个Jedis的连接池,

    "mymaster" 是我们配置监视Master时的名字; sentinels 则是一个Set集合,里面是所有Sentinel的信息,用IP:PORT组成字符串;config 是连接池的一些信息

    JedisSentinelPool extends Pool , JedisSentinelPool 有几个构造方法,根据实际情况来使用不同,但最终会调用下面的构造方法

      public JedisSentinelPool(String masterName, Set<String> sentinels,
          final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout,
          final String password, final int database, final String clientName) {
        this.poolConfig = poolConfig;
        this.connectionTimeout = connectionTimeout;
        this.soTimeout = soTimeout;
        this.password = password;
        this.database = database;
        this.clientName = clientName;

        HostAndPort master = initSentinels(sentinels, masterName);
        initPool(master);
      }

    注意红色的部分 initSentinels,其实是从Sentinel里面获取当前Redis Master信息,看源码,里面我已经加了注释

      private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {
    
        HostAndPort master = null;          //构造一个Redis服务配置对象,里面包括ip和port
        boolean sentinelAvailable = false;  //sentinel服务是否正常
    
        log.info("Trying to find master from available Sentinels...");
    
        for (String sentinel : sentinels) {//遍历所有配置的Sentinel的信息
          final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":"))); //把配置的Sentinel的信息组合成个一个对象,我们当初配置就是 ip:port 的字符串
    
          log.fine("Connecting to Sentinel " + hap);
    
          Jedis jedis = null;
          try {
            jedis = new Jedis(hap.getHost(), hap.getPort()); //和Sentinel服务建立连接
    
            List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName); //这里其实是 sentinel get-master-addr-by-name mymaster  的实现,来获取Master的 ip和port 信息
            // connected to sentinel...
            sentinelAvailable = true;//执行此,则认为sentinel的服务可用。但这里只是当前连接的sentienl服务可用,并不能说明sentinel集群可用
    
            if (masterAddr == null || masterAddr.size() != 2) { //没有从当前sentinel服务里面获取master信息,可能sentinel的配置信息有误导致,继续访问下一个sentinel服务
              log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap
                  + ".");
              continue;
            }
    
            master = toHostAndPort(masterAddr); //构建Redis Master的配置对象
            log.fine("Found Redis master at " + master);
            break; //跳出遍历,只要从一个Sentinel来获取Master信息正常就可以了
          } catch (JedisException e) {
            // resolves #1036, it should handle JedisException there's another chance
            // of raising JedisDataException
            log.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e
                + ". Trying next one.");
          } finally {
            if (jedis != null) {
              jedis.close();
            }
          }
        }
    
        if (master == null) { // master的信息没有取到
          if (sentinelAvailable) { // sentinel服务正常,此时说明sentinel的配置有问题,没有配置监视master的信息
            // can connect to sentinel, but master name seems to not
            // monitored
            throw new JedisException("Can connect to sentinel, but " + masterName
                + " seems to be not monitored...");
          } else {
            throw new JedisConnectionException("All sentinels down, cannot determine where is "
                + masterName + " master is running...");
          }
        }
    
        log.info("Redis master running at " + master + ", starting Sentinel listeners...");
    
        for (String sentinel : sentinels) { //这里很重要! 客户端对每个Sentinel服务建立监听,当发生failover时,获取新的Master信息,重新建立pool连接
          final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));
          MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort());
          // whether MasterListener threads are alive or not, process can be stopped
          masterListener.setDaemon(true);
          masterListeners.add(masterListener);
          masterListener.start();
        }
    
        return master;
      } 

    我们看下 MasterListener 这个线程类,它订阅了"+switch-master" 这个channel,当发生failover时会收到Sentinel发来的消息,消息如下

    "mymaster 10.10.10.126 6379 10.10.10.118 6379"  ,格式是“监视master名称 旧Master的ip 旧Master的port 新Master的ip 新Master的port

        public void run() {
    
          running.set(true); // AtomicBoolean 类型,此类提供了一些原子操作
    
          while (running.get()) {
    
            j = new Jedis(host, port); // 和Sentinel建立连接。j前面修饰符 volatile, 多线程间值可见性,禁止编译器对指令的重排序
    
            try {
              // double check that it is not being shutdown
              if (!running.get()) { //二次check,防止线程destroy掉
                break;
              }
    
              j.subscribe(new JedisPubSub() { //订阅服务 channel是"+switch-master"
                @Override
                public void onMessage(String channel, String message) {
                  log.fine("Sentinel " + host + ":" + port + " published: " + message + ".");
    
                  String[] switchMasterMsg = message.split(" "); //解析sentinel发过来的消息,以空格分隔
    
                  if (switchMasterMsg.length > 3) { //check解析出来的消息的长度是否满足
    
                    if (masterName.equals(switchMasterMsg[0])) { //由于sentinel可以监视多个master信息,所以这里要确认是否是本身的master
                      initPool(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]))); //重新初始化pool
                    } else {
                      log.fine("Ignoring message on +switch-master for master name "
                          + switchMasterMsg[0] + ", our master name is " + masterName);
                    }
    
                  } else {
                    log.severe("Invalid message received on Sentinel " + host + ":" + port
                        + " on channel +switch-master: " + message);
                  }
                }
              }, "+switch-master");
            } catch (JedisConnectionException e) { ;} finally { j.close(); } } }

    评论

    发表评论

    validate