负载均衡和反向代理

负载均衡和反向代理

upstream 配置

  • ip地址和端口
  • 权重:默认是1,配置越高权重越高

负载均衡算法

  • round-robin 基于权重的仑村
  • ip_hash 根据客户ip进行负载均衡
  • hash_key 对某一个key进行hash
  • 哈希算法,根据请求的uri进行负载均衡

失败重试

配置max_fails和fail_timeout 当指定时间内失败指定的次数将摘掉上游服务器,然后fail_timeout之后,会把服务器加入存活的列表中

健康检查

  • TCP心跳检查:upstream 下配置check interval = rise= fail= timeout= type = tcp;进行tcp 心跳检查
  • HTTP心跳检查:对于TCP需要额外的两项配置,check_http_send ,check_http_expect_alive

HTTP动态负载均衡

Consul 实现分布式服务的注册与发现

  • 服务注册:通过http api 将服务注册到consul
  • 服务发现:服务消费者通过http api 从consul 获取服务的ip和端口
  • 故障检测
  • K/V存储
  • 多数据中心
  • rasf算法

Consul+ Consul-template

  • Consul server
  • Consul-template

    每次发现配置变更都需要reload nginx,而reload 会有一定的损耗

Consul + OpenResty

  • 使用Consul注册服务之后,使用OpenResty banlance_by_lua 实现无reload 动态负载均衡

RabbitMQ channel连接池

为什么使用MQ

  • 应用解耦:可以让不同的应用之间能够通过消息队列实现共同协作处理业务
  • 流量销峰:秒杀活动中,可能会因为瞬间的流量过大,导致应用挂掉,为了解决这个问题,可以引入消息队列来平滑的过度流量,保护后端的应用。

rabbitMq简介

Connection

  • ConnectionFactory 是Connection的创建工厂,我们需要Connection的时候,是通过ConnectionFactory获取的;
  • Connection 是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑,
  • Channel 通过Connection对象创建的AMQP 信道(Channel),一个Connection可以创建多个Channel,每一个Channel都会被指派一个唯一的ID, rabbitmq的每一条指令都是通过Channel来完成的 ,每个Channel代表一个会话任务。

Broker

  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列
  • Queue:消息队列载体,每个消息都会被投入到一个或多个队列
  • Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
  • vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离

交换机的类型

  • Direct交换机:直连交换机,完全更具key进行投递。
  • Topic交换机:在key进行模式匹配后进行投递,例如符号”#”匹配一个或多个字符,符号”*”匹配一串连续的字母字符
  • Fanout交换机:它采取广播模式,消息进来时,将会被投递到与改交换机绑定的所有队列中

rabbitMQ连接池实现

  • 对于connection,channel对象创建时间长,创建时需要消耗大量的资源,且对象创建后可被重复使用,如同数据库库连接池一样,当我们的业务并发量很大的时候,频繁的创建,关闭连接是一项非常损耗系统资源的操作,我们需要将这些对象缓存起来以便重复利用,减少系统开销,提升系统性能。
  • RabbitMQ的指令都是通过channel的来实现的, channel是可以复用的 所以该对象池中也是基于channel对象

配置信息类

配置rabbit连接属性,已经连接池中的对象数信息,更具业务的量来确定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class RabbitMqConfig {
/**
* @desc 用户名
*/
@Value("${rabbitmq.username:guest}")
private String username;
/**
* @desc 密码
*/
@Value("${rabbitmq.password:guest}")
private String password;
/**
* @desc 连接串
*/
private Address[] address;
/**
* @desc vhost
*/
@Value("${rabbitmq.vhost:/}")
private String vhost;
/**
* @desc 对象总数
*/
@Value("${rabbitmq.maxTotal:8}")
private Integer maxTotal;
/**
* @desc 最大空闲对象数
*/
@Value("${rabbitmq.maxIdle:8}")
private Integer maxIdle;
/**
* @desc 最小空闲对象书
*/
@Value("${rabbitmq.minIdle:0}")
private Integer minIdle;
/**
* @desc 获取超时时间
*/
@Value("${rabbitmq.maxWaitMillis:1000}")
private Integer maxWaitMillis;
//此处可能省略set get
/**
* 解析配置参数
*/
@PostConstruct
public void initAddress() {
String connetionString = environment.getProperty("rabbitmq.connectionString", "127.0.0.1:5672");
if (StringUtils.isEmpty(connetionString)) {
throw new UnknownFormatConversionException("非法的配置参数");
}
String[] arrays = connetionString.split(Constant.COMMA);
this.address = new Address[arrays.length];
for (int i = 0; i < arrays.length; i++) {
String[] addressArray = arrays[i].split(":");
address[i] = new Address(addressArray[0], Integer.valueOf(addressArray[1]));
}
}
}

rabbitMQ channel 对象工厂

BasePooledObjectFactory是池对象工厂,用于管理池对象的生命周期,我们只需要继承他,并覆并覆写父类相关方法即可控制池对象的生成、初始化、反初始化、校验等;我们在使用对象池的时候,一般是需要基于BasePooledObjectFactory创建我们自己的对象工厂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@Component
public class RabbitMqChannelFactory extends BasePooledObjectFactory<Channel> {
public static final Logger loggr = LoggerFactory.getLogger(RabbitMqChannelFactory.class);
@Autowired
private RabbitMqConfig config;
private ConnectionFactory factory;
private Connection conn;
private int i = 0;
@Override
public PooledObject<Channel> makeObject() throws Exception {
return super.makeObject();
}

@Override
public void destroyObject(PooledObject<Channel> p) throws Exception {
super.destroyObject(p);
}

@Override
public boolean validateObject(PooledObject<Channel> p) {
return p.getObject().isOpen();
}


@Override
public void activateObject(PooledObject<Channel> p) throws Exception {
super.activateObject(p);
}

/**
* 使用完返还对象时
* @param p
* @throws Exception
*/
@Override
public void passivateObject(PooledObject<Channel> p) throws Exception {
super.passivateObject(p);
}

/**
* 创建一个新对象
* @return
* @throws Exception
*/
@Override
public Channel create() throws Exception {
loggr.info("create channel:{}",i);
Channel channel = conn.createChannel();
channel.confirmSelect();
i++;
return channel;
}
/**
* 封装为池化对象
* @param channel
* @throws Exception
*/
@Override
public PooledObject<Channel> wrap(Channel channel) {
return new DefaultPooledObject<>(channel);
}
@PostConstruct
private void init(){
factory = new ConnectionFactory();
factory.setConnectionTimeout(5000);
factory.setVirtualHost(config.getVhost());
//自动重连
factory.setAutomaticRecoveryEnabled(true);
factory.setUsername(config.getUsername());
factory.setPassword(config.getPassword());
try {
conn = factory.newConnection(config.getAddress());
} catch (Exception e) {
loggr.error("rabbitmq create connection error", e);
throw new RuntimeException("rabbitmq create connection error");
}
}

create() 核心方法,表明我们对象池中的对象是如何创建的。

init() 类加载的时候创建rabbitMQ的connection对象,后续初始化对象池的是能,才能通过connection创建channel对象

构建rabbit channel pool

1
2
3
4
5
6
public class RabbitChannelPool extends GenericObjectPool<Channel> {

public RabbitChannelPool(PooledObjectFactory<Channel> factory, GenericObjectPoolConfig<Channel> config) {
super(factory, config);
}
}

GenericObjectPool是Apache Commons Pool实现的一个通用泛型对象池,是一个对象池的完整实现,我们直接构建并使用即可。

初始化channel对象池

1
2
3
4
5
6
7
8
9
@PostConstruct
public void initPool(){
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMinIdle(mqConfig.getMinIdle());
config.setMaxTotal(mqConfig.getMaxTotal());
config.setMaxIdle(mqConfig.getMaxIdle());
config.setMaxWaitMillis(2000);
rabbitChannelPool = new RabbitChannelPool(rabbitMqChannelFactory, config);
}

构建GenericObjectPoolConfig对象配置我们的参数,然后创建RabbitChannelPool对象,我们就可以直接通过RabbitChannelPool对象来池中获取和归还channel对象了。

如果对 Apache Commons Pool对象池还不是很了解的话,可以看一下这篇文章 探索对象池

封装rabbitMqService 服务提供消息的发送和监听

initPool() 初始化时构建好rabbitChannelPool对象,后续就可以使用了rabbitChannelPool来获取对象
类中提供了一个发送的示例方法和一个监听消息的实例方法

注:使用过程中一定要在finally块中归还对象,否则对象池将被耗尽无法提供服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@Service("rabbitMqService")
public class RabbitMqServiceImpl implements RabbitMqService{
public static final Logger logger = LoggerFactory.getLogger(RabbitMqServiceImpl.class);
private RabbitChannelPool rabbitChannelPool;
@Autowired
private RabbitMqChannelFactory rabbitMqChannelFactory;
@Autowired
private RabbitMqConfig mqConfig;
public static final String UTF_8 = "utf-8";

@PostConstruct
public void initPool(){
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMinIdle(mqConfig.getMinIdle());
config.setMaxTotal(mqConfig.getMaxTotal());
config.setMaxIdle(mqConfig.getMaxIdle());
config.setMaxWaitMillis(2000);
rabbitChannelPool = new RabbitChannelPool(rabbitMqChannelFactory, config);
}

@Override
public void sendMessage() {

}

@Override
public boolean sendMessage(String queueName, String exchange, String routingKey, String extype, String message) {
Channel channel = null;
try {
channel = rabbitChannelPool.borrowObject();
logger.info("active:{},Idle:{}",rabbitChannelPool.getNumActive(),rabbitChannelPool.getNumIdle());
//声明队列(durable -> true,持久化,)
channel.queueDeclare(queueName, false, false, false, null);
//声明交换机(durable -> true,持久化,)
channel.exchangeDeclare(exchange, extype,false);
//队列绑定
channel.queueBind(queueName, exchange, routingKey);
channel.basicPublish(exchange, routingKey, MessageProperties.BASIC, message.getBytes(UTF_8) );
return channel.waitForConfirms();
} catch (Exception e) {
logger.error("error", e);
return false;
}finally {
if (channel != null) {
rabbitChannelPool.returnObject(channel);
}
}
}

@Override
public void startConsumer(MqCallService callback, String queueName, String routingKey, String exchange, String extype, boolean durable) {
Channel channel = null;
try {
channel = rabbitChannelPool.borrowObject();
channel.queueDeclare(queueName, durable, false, false, null);
channel.queueBind(queueName, exchange, routingKey);
final Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
String message = new String(body, UTF_8);
callback.call(message, null, 0);
} catch (Exception ex) {
logger.error("消费消息错误", ex);
}
}
};
channel.basicConsume(queueName, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}

总结

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×