JedisSentinelAPI的使用和源码解读
Sentinel的的部署和实践已经结束了,本篇文章是Redis的Java客户端端JedisSentinelAPI的使用和源码解读。
简单例子:
public class RedisSentinelTest { public static void main(String[] args) throws Exception { //Sentinel的配置 Set<String> sentinels = new HashSet<String>(); sentinels.add("10.10.10.126:26379"); sentinels.add("10.10.10.126:26479"); sentinels.add("10.10.10.126:26579"); //JedisPool 连接池的配置 GenericObjectPoolConfig config = new GenericObjectPoolConfig(); config.setMaxTotal(10000); config.setMaxIdle(200); config.setTestOnBorrow(true); config.setMaxWaitMillis(1000); //创建JedisPool JedisSentinelPool pool = new JedisSentinelPool("mymaster", sentinels, config); //获取Jedis连接 Jedis jedis = pool.getResource(); //从Redis中获取k1的值 String v1 = jedis.get("k1"); System.out.println(v1); //关闭Jedis,返回pool中 jedis.close(); } }
注释写的应该能看懂了,就不啰嗦了 ^_^^_^
线上使用的例子看下面:
1)spring配置
<bean id="jedisPoolConfig" class="org.apache.commons.pool2.impl.GenericObjectPoolConfig">
<property name="maxTotal" value="10000" />
<property name="maxIdle" value="200" />
<property name="maxWaitMillis" value="1000" />
</bean>
<bean id="redisSentinel" class="redis.clients.jedis.JedisSentinelPool">
<constructor-arg index="0" value="mymaster" />
<constructor-arg index="1">
<set>
<value>10.10.10.126:26379</value>
<value>10.10.10.126:26479</value>
<value>10.10.10.126:26579</value>
</set>
</constructor-arg>
<constructor-arg index="2" ref="jedisPoolConfig"/>
<!--constructor-arg index="3" value="tiger"/--> <!-- password configuration-->
</bean>
2)连接基础类:RedisUtil.java
public class RedisUtil { private final static RedisUtil POOL_UTIL = new RedisUtil(); public static RedisUtil getInstance() { return POOL_UTIL; } private RedisUtil() { } private JedisSentinelPool pool; private JedisSentinelPool getJedisPool() { if (pool == null) { synchronized (this) { if (pool == null) { init(); } } } return pool; } private void init() { try { pool = (JedisSentinelPool) Config.ctx.getBean("redisSentinel"); } catch (Exception e) { e.printStackTrace(); throw new ExceptionInInitializerError("启动系统异常,系统无法启动,请检查配置文件是否存在!"); } } //获取Redis连接 public Jedis getConnection() { Jedis jedis = null; try { jedis = getJedisPool().getResource(); jedis.select(0); } catch (Exception e) { if (jedis != null) jedis.close(); e.printStackTrace(); throw new RuntimeException("连接Redis异常.", e); } return jedis; } //关闭Redis连接 public void closeConnection(Jedis jedis) { if (jedis != null) { try { jedis.close(); } catch (Exception e) { e.printStackTrace(); } } } }
3)使用例子
public String getValue() { Jedis jedis = RedisUtil.getInstance().getConnection(); try { return jedis.get("k100"); } catch (Exception e) { ; } finally { RedisUtil.getInstance().closeConnection(jedis); } return ""; }
JedisSentinelAPI源码阅读
其实源码也比较简单,就几个类,还是看文章开头的代码例子
顺着这一句 JedisSentinelPool pool = new JedisSentinelPool("mymaster", sentinels, config); 是创建了一个Jedis的连接池,
"mymaster" 是我们配置监视Master时的名字; sentinels 则是一个Set集合,里面是所有Sentinel的信息,用IP:PORT组成字符串;config 是连接池的一些信息
JedisSentinelPool extends Pool , JedisSentinelPool 有几个构造方法,根据实际情况来使用不同,但最终会调用下面的构造方法
public JedisSentinelPool(String masterName, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout,
final String password, final int database, final String clientName) {
this.poolConfig = poolConfig;
this.connectionTimeout = connectionTimeout;
this.soTimeout = soTimeout;
this.password = password;
this.database = database;
this.clientName = clientName;
HostAndPort master = initSentinels(sentinels, masterName);
initPool(master);
}
注意红色的部分 initSentinels,其实是从Sentinel里面获取当前Redis Master信息,看源码,里面我已经加了注释
private HostAndPort initSentinels(Set<String> sentinels, final String masterName) { HostAndPort master = null; //构造一个Redis服务配置对象,里面包括ip和port boolean sentinelAvailable = false; //sentinel服务是否正常 log.info("Trying to find master from available Sentinels..."); for (String sentinel : sentinels) {//遍历所有配置的Sentinel的信息 final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":"))); //把配置的Sentinel的信息组合成个一个对象,我们当初配置就是 ip:port 的字符串 log.fine("Connecting to Sentinel " + hap); Jedis jedis = null; try { jedis = new Jedis(hap.getHost(), hap.getPort()); //和Sentinel服务建立连接 List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName); //这里其实是 sentinel get-master-addr-by-name mymaster 的实现,来获取Master的 ip和port 信息 // connected to sentinel... sentinelAvailable = true;//执行此,则认为sentinel的服务可用。但这里只是当前连接的sentienl服务可用,并不能说明sentinel集群可用 if (masterAddr == null || masterAddr.size() != 2) { //没有从当前sentinel服务里面获取master信息,可能sentinel的配置信息有误导致,继续访问下一个sentinel服务 log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap + "."); continue; } master = toHostAndPort(masterAddr); //构建Redis Master的配置对象 log.fine("Found Redis master at " + master); break; //跳出遍历,只要从一个Sentinel来获取Master信息正常就可以了 } catch (JedisException e) { // resolves #1036, it should handle JedisException there's another chance // of raising JedisDataException log.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e + ". Trying next one."); } finally { if (jedis != null) { jedis.close(); } } } if (master == null) { // master的信息没有取到 if (sentinelAvailable) { // sentinel服务正常,此时说明sentinel的配置有问题,没有配置监视master的信息 // can connect to sentinel, but master name seems to not // monitored throw new JedisException("Can connect to sentinel, but " + masterName + " seems to be not monitored..."); } else { throw new JedisConnectionException("All sentinels down, cannot determine where is " + masterName + " master is running..."); } } log.info("Redis master running at " + master + ", starting Sentinel listeners..."); for (String sentinel : sentinels) { //这里很重要! 客户端对每个Sentinel服务建立监听,当发生failover时,获取新的Master信息,重新建立pool连接 final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":"))); MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort()); // whether MasterListener threads are alive or not, process can be stopped masterListener.setDaemon(true); masterListeners.add(masterListener); masterListener.start(); } return master; }
我们看下 MasterListener 这个线程类,它订阅了"+switch-master" 这个channel,当发生failover时会收到Sentinel发来的消息,消息如下
"mymaster 10.10.10.126 6379 10.10.10.118 6379" ,格式是“监视master名称 旧Master的ip 旧Master的port 新Master的ip 新Master的port”
public void run() {
running.set(true); // AtomicBoolean 类型,此类提供了一些原子操作
while (running.get()) {
j = new Jedis(host, port); // 和Sentinel建立连接。j前面修饰符 volatile, 多线程间值可见性,禁止编译器对指令的重排序
try {
// double check that it is not being shutdown
if (!running.get()) { //二次check,防止线程destroy掉
break;
}
j.subscribe(new JedisPubSub() { //订阅服务 channel是"+switch-master"
@Override
public void onMessage(String channel, String message) {
log.fine("Sentinel " + host + ":" + port + " published: " + message + ".");
String[] switchMasterMsg = message.split(" "); //解析sentinel发过来的消息,以空格分隔
if (switchMasterMsg.length > 3) { //check解析出来的消息的长度是否满足
if (masterName.equals(switchMasterMsg[0])) { //由于sentinel可以监视多个master信息,所以这里要确认是否是本身的master
initPool(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]))); //重新初始化pool
} else {
log.fine("Ignoring message on +switch-master for master name "
+ switchMasterMsg[0] + ", our master name is " + masterName);
}
} else {
log.severe("Invalid message received on Sentinel " + host + ":" + port
+ " on channel +switch-master: " + message);
}
}
}, "+switch-master");
} catch (JedisConnectionException e) { ;} finally { j.close(); } } }
评论
发表评论
|
|