JedisSentinelAPI的使用和源码解读
Sentinel的的部署和实践已经结束了,本篇文章是Redis的Java客户端端JedisSentinelAPI的使用和源码解读。
简单例子:
public class RedisSentinelTest {
public static void main(String[] args) throws Exception {
//Sentinel的配置
Set<String> sentinels = new HashSet<String>();
sentinels.add("10.10.10.126:26379");
sentinels.add("10.10.10.126:26479");
sentinels.add("10.10.10.126:26579");
//JedisPool 连接池的配置
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(10000);
config.setMaxIdle(200);
config.setTestOnBorrow(true);
config.setMaxWaitMillis(1000);
//创建JedisPool
JedisSentinelPool pool = new JedisSentinelPool("mymaster", sentinels, config);
//获取Jedis连接
Jedis jedis = pool.getResource();
//从Redis中获取k1的值
String v1 = jedis.get("k1");
System.out.println(v1);
//关闭Jedis,返回pool中
jedis.close();
}
}
注释写的应该能看懂了,就不啰嗦了 ^_^^_^
线上使用的例子看下面:
1)spring配置
<bean id="jedisPoolConfig" class="org.apache.commons.pool2.impl.GenericObjectPoolConfig">
<property name="maxTotal" value="10000" />
<property name="maxIdle" value="200" />
<property name="maxWaitMillis" value="1000" />
</bean>
<bean id="redisSentinel" class="redis.clients.jedis.JedisSentinelPool">
<constructor-arg index="0" value="mymaster" />
<constructor-arg index="1">
<set>
<value>10.10.10.126:26379</value>
<value>10.10.10.126:26479</value>
<value>10.10.10.126:26579</value>
</set>
</constructor-arg>
<constructor-arg index="2" ref="jedisPoolConfig"/>
<!--constructor-arg index="3" value="tiger"/--> <!-- password configuration-->
</bean>
2)连接基础类:RedisUtil.java
public class RedisUtil {
private final static RedisUtil POOL_UTIL = new RedisUtil();
public static RedisUtil getInstance() {
return POOL_UTIL;
}
private RedisUtil() {
}
private JedisSentinelPool pool;
private JedisSentinelPool getJedisPool() {
if (pool == null) {
synchronized (this) {
if (pool == null) {
init();
}
}
}
return pool;
}
private void init() {
try {
pool = (JedisSentinelPool) Config.ctx.getBean("redisSentinel");
} catch (Exception e) {
e.printStackTrace();
throw new ExceptionInInitializerError("启动系统异常,系统无法启动,请检查配置文件是否存在!");
}
}
//获取Redis连接
public Jedis getConnection() {
Jedis jedis = null;
try {
jedis = getJedisPool().getResource();
jedis.select(0);
} catch (Exception e) {
if (jedis != null)
jedis.close();
e.printStackTrace();
throw new RuntimeException("连接Redis异常.", e);
}
return jedis;
}
//关闭Redis连接
public void closeConnection(Jedis jedis) {
if (jedis != null) {
try {
jedis.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
3)使用例子
public String getValue() {
Jedis jedis = RedisUtil.getInstance().getConnection();
try {
return jedis.get("k100");
} catch (Exception e) {
;
} finally {
RedisUtil.getInstance().closeConnection(jedis);
}
return "";
}
JedisSentinelAPI源码阅读
其实源码也比较简单,就几个类,还是看文章开头的代码例子
顺着这一句 JedisSentinelPool pool = new JedisSentinelPool("mymaster", sentinels, config); 是创建了一个Jedis的连接池,
"mymaster" 是我们配置监视Master时的名字; sentinels 则是一个Set集合,里面是所有Sentinel的信息,用IP:PORT组成字符串;config 是连接池的一些信息
JedisSentinelPool extends Pool , JedisSentinelPool 有几个构造方法,根据实际情况来使用不同,但最终会调用下面的构造方法
public JedisSentinelPool(String masterName, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout,
final String password, final int database, final String clientName) {
this.poolConfig = poolConfig;
this.connectionTimeout = connectionTimeout;
this.soTimeout = soTimeout;
this.password = password;
this.database = database;
this.clientName = clientName;
HostAndPort master = initSentinels(sentinels, masterName);
initPool(master);
}
注意红色的部分 initSentinels,其实是从Sentinel里面获取当前Redis Master信息,看源码,里面我已经加了注释
private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {
HostAndPort master = null; //构造一个Redis服务配置对象,里面包括ip和port
boolean sentinelAvailable = false; //sentinel服务是否正常
log.info("Trying to find master from available Sentinels...");
for (String sentinel : sentinels) {//遍历所有配置的Sentinel的信息
final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":"))); //把配置的Sentinel的信息组合成个一个对象,我们当初配置就是 ip:port 的字符串
log.fine("Connecting to Sentinel " + hap);
Jedis jedis = null;
try {
jedis = new Jedis(hap.getHost(), hap.getPort()); //和Sentinel服务建立连接
List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName); //这里其实是 sentinel get-master-addr-by-name mymaster 的实现,来获取Master的 ip和port 信息
// connected to sentinel...
sentinelAvailable = true;//执行此,则认为sentinel的服务可用。但这里只是当前连接的sentienl服务可用,并不能说明sentinel集群可用
if (masterAddr == null || masterAddr.size() != 2) { //没有从当前sentinel服务里面获取master信息,可能sentinel的配置信息有误导致,继续访问下一个sentinel服务
log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap
+ ".");
continue;
}
master = toHostAndPort(masterAddr); //构建Redis Master的配置对象
log.fine("Found Redis master at " + master);
break; //跳出遍历,只要从一个Sentinel来获取Master信息正常就可以了
} catch (JedisException e) {
// resolves #1036, it should handle JedisException there's another chance
// of raising JedisDataException
log.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e
+ ". Trying next one.");
} finally {
if (jedis != null) {
jedis.close();
}
}
}
if (master == null) { // master的信息没有取到
if (sentinelAvailable) { // sentinel服务正常,此时说明sentinel的配置有问题,没有配置监视master的信息
// can connect to sentinel, but master name seems to not
// monitored
throw new JedisException("Can connect to sentinel, but " + masterName
+ " seems to be not monitored...");
} else {
throw new JedisConnectionException("All sentinels down, cannot determine where is "
+ masterName + " master is running...");
}
}
log.info("Redis master running at " + master + ", starting Sentinel listeners...");
for (String sentinel : sentinels) { //这里很重要! 客户端对每个Sentinel服务建立监听,当发生failover时,获取新的Master信息,重新建立pool连接
final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));
MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort());
// whether MasterListener threads are alive or not, process can be stopped
masterListener.setDaemon(true);
masterListeners.add(masterListener);
masterListener.start();
}
return master;
}
我们看下 MasterListener 这个线程类,它订阅了"+switch-master" 这个channel,当发生failover时会收到Sentinel发来的消息,消息如下
"mymaster 10.10.10.126 6379 10.10.10.118 6379" ,格式是“监视master名称 旧Master的ip 旧Master的port 新Master的ip 新Master的port”
public void run() {
running.set(true); // AtomicBoolean 类型,此类提供了一些原子操作
while (running.get()) {
j = new Jedis(host, port); // 和Sentinel建立连接。j前面修饰符 volatile, 多线程间值可见性,禁止编译器对指令的重排序
try {
// double check that it is not being shutdown
if (!running.get()) { //二次check,防止线程destroy掉
break;
}
j.subscribe(new JedisPubSub() { //订阅服务 channel是"+switch-master"
@Override
public void onMessage(String channel, String message) {
log.fine("Sentinel " + host + ":" + port + " published: " + message + ".");
String[] switchMasterMsg = message.split(" "); //解析sentinel发过来的消息,以空格分隔
if (switchMasterMsg.length > 3) { //check解析出来的消息的长度是否满足
if (masterName.equals(switchMasterMsg[0])) { //由于sentinel可以监视多个master信息,所以这里要确认是否是本身的master
initPool(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]))); //重新初始化pool
} else {
log.fine("Ignoring message on +switch-master for master name "
+ switchMasterMsg[0] + ", our master name is " + masterName);
}
} else {
log.severe("Invalid message received on Sentinel " + host + ":" + port
+ " on channel +switch-master: " + message);
}
}
}, "+switch-master");
} catch (JedisConnectionException e) { ;} finally { j.close(); } } }
评论
发表评论
|
|
|