redis集群搭建

转载自https://www.cnblogs.com/Yunya-Cnblogs/p/14608937.html(添加小部分笔记)感谢作者!

部分参考自 https://www.cnblogs.com/ysocean/p/12328088.html

基本准备#

ly-20241212142159186.png

架构#

采用Centos7,Redis版本为6.2,架构如下:

ly-20241212142159238

hosts修改#

vim /etc/hosts
#添加
192.168.1.101 node1
192.168.1.102 node2
192.168.1.103 node3

集群准备#

对每个节点#

  1. 下载redis并解压到 /usr/local/redis-cluster中

    cd /usr/local
    mkdir redis-cluster
    tar -zxvf redis* -C /usr/local/redis*
  2. 进入redis根目录

    make
    make install
  3. 安装完毕

  4. hosts修改

    vim /etc/hosts
    #添加
    192.168.1.101 node1
    192.168.1.102 node2
    192.168.1.103 node3

配置文件修改(6个节点中的每一个)#

创建多级目录

mkdir -p /usr/local/redis_cluster/redis_63{79,80}/{conf,pid,logs}

ly-20241212142159275

编写配置文件

vim /usr/local/redis_cluster/redis_6379/conf/redis.conf
# 命令行状态下输入 :%d  回车,清空文件
# 再输入 :set paste 处理多出的行带#的问题
# 再输入i

####内容#####
# 快速修改::%s/6379/6380/g

# 守护进行模式启动
daemonize yes

# 设置数据库数量,默认数据库为0
databases 16

# 绑定地址,需要修改
# bind 192.168.1.101
bind node1

# 绑定端口,需要修改
port 6379

# pid文件存储位置,文件名需要修改
pidfile /usr/local/redis_cluster/redis_6379/pid/redis_6379.pid

# log文件存储位置,文件名需要修改
logfile /usr/local/redis_cluster/redis_6379/logs/redis_6379.log

# RDB快照备份文件名,文件名需要修改
dbfilename redis_6379.rdb

# 本地数据库存储目录,需要修改
dir /usr/local/redis_cluster/redis_6379

# 集群相关配置
# 是否以集群模式启动
cluster-enabled yes

# 集群节点回应最长时间,超过该时间被认为下线
cluster-node-timeout 15000

# 生成的集群节点配置文件名,文件名需要修改
cluster-config-file nodes_6379.conf

复制粘贴配置文件

05高级功能

学习来源 https://www.bilibili.com/video/BV1L4411y7mn(添加小部分笔记)感谢作者!

消息存储#

流程#

ly-20241212142201154

存储介质#

关系型数据库DB#

适合数据量不够大,比如ActiveMQ可选用JDBC方式作为消息持久化

文件系统#

  1. 关系型数据库最终也是要存到文件系统中的,不如直接存到文件系统,绕过关系型数据库
  2. 常见的RocketMQ/RabbitMQ/Kafka都是采用消息刷盘到计算机的文件系统来做持久化(同步刷盘/异步刷盘)

消息发送#

  1. 顺序写:600MB/s,随机写:100KB/s

    • 系统运行一段时间后,我们对文件的增删改会导致磁盘上数据无法连续,非常的分散。

    • 顺序读也只是逻辑上的顺序,也就是按照当前文件的相对偏移量顺序读取,并非磁盘上连续空间读取

    • 对于磁盘的读写分为两种模式,顺序IO随机IO。 随机IO存在一个寻址的过程,所以效率比较低。而顺序IO,相当于有一个物理索引,在读取的时候不需要寻找地址,效率很高。

    • 来源: https://www.cnblogs.com/liuche/p/15455808.html

  2. 数据网络传输

    零拷贝技术MappedByteBuffer,省去了用户态,由内核态直接拷贝到网络驱动内核
    RocketMQ默认设置单个CommitLog日志数据文件为1G

    ly-20241212142201208

消息存储#

三个概念:commitLog、ConsumerQueue、index

CommitLog#

  1. 默认大小1G
    ly-20241212142201240
  2. 存储消息的元数据,包括了Topic、QueueId、Message
  3. 还存储了ConsumerQueue相关信息,所以ConsumerQueue丢了也没事

ConsumerQueue#

  1. 存储了消息在CommitLog的索引(几百K,Linux会事先加载到内存中)
  2. 包括最小/最大偏移量、已经消费的偏移量
  3. 一个Topic多个队列,每个队列对应一个ConsumerQueue
    ly-20241212142201276

Index#

也是索引文件,为消息查询服务,通过key或时间区间查询消息

总结#

ly-20241212142201312

刷盘机制#

ly-20241212142201340

  1. 同步刷盘
  2. 异步刷盘

高可用性机制#

消费高可用及发送高可用#

ly-20241212142201372

消息主从复制#

ly-20241212142201400

负载均衡#

ly-20241212142201467

消息重试#

下面都是针对消费失败的重试

顺序消息#

RocketMQ会自动不断重试,且为了保证顺序性,会导致消息消费被阻塞。使用时要及时监控并处理消费失败现象

无序消息(普通、定时、延时、事务)#

  • 通过设置返回状态达到消息重试的结果
  • 重试只对集群消费方式生效,广播方式不提供重试特性
  • 重试次数 ly-20241212142201492 如果16次后还是消费失败,会进入死信队列,不再被消费

配置是否重试#

重试#

ly-20241212142201525

04案例

学习来源 https://www.bilibili.com/video/BV1L4411y7mn(添加小部分笔记)感谢作者!基本架构

架构#

ly-20241212142200977

流程图#

下单流程#

ly-20241212142201029

支付流程#

ly-20241212142201068

SpringBoot整合RocketMQ#

依赖包#

	   <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
        </dependency>

生产者#

yaml#

rocketmq:
  name-server: 192.168.1.135:9876;192.168.1.138:9876
  producer:
    group: my-group

使用#

    @Autowired
    private RocketMQTemplate template;

    @RequestMapping("rocketmq")
    public String rocketmq(){
        log.info("我被调用了-rocketmq");
        //主题+内容
        template.convertAndSend("mytopic-ly","hello1231");
        return "hello world"+serverPort;
    }

消费者#

yaml#

rocketmq:
  name-server: 192.168.1.135:9876;192.168.1.138:9876
  consumer:
    group: my-group2

使用#

创建监听器

@RocketMQMessageListener(topic = "mytopic-ly",
        consumeMode = ConsumeMode.CONCURRENTLY,consumerGroup = "${rocketmq.producer.group}")
@Slf4j
@Component
public class Consumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        log.info("消费了"+s);
    }
}

下单流程利用MQ进行回退处理,保证数据一致性#

ly-20241212142201106

库存回退的消费者,代码如下:

@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup =
 "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{


    @Value("${mq.order.consumer.group.name}")
    private String groupName;

    @Autowired
    private TradeGoodsMapper goodsMapper;

    @Autowired
    private TradeMqConsumerLogMapper mqConsumerLogMapper;

    @Autowired
    private TradeGoodsNumberLogMapper goodsNumberLogMapper;

    @Override
    public void onMessage(MessageExt messageExt) {
        String msgId=null;
        String tags=null;
        String keys=null;
        String body=null;
        try {
            //1. 解析消息内容
            msgId = messageExt.getMsgId();
            tags= messageExt.getTags();
            keys= messageExt.getKeys();
            body= new String(messageExt.getBody(),"UTF-8");

            log.info("接受消息成功");

            //2. 查询消息消费记录
            TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();
            primaryKey.setMsgTag(tags);
            primaryKey.setMsgKey(keys);
            primaryKey.setGroupName(groupName);
            TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);

            if(mqConsumerLog!=null){
                //3. 判断如果消费过...
                //3.1 获得消息处理状态
                Integer status = mqConsumerLog.getConsumerStatus();
                //处理过...返回
                if(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode().intValue()==status.intValue()){
                    log.info("消息:"+msgId+",已经处理过");
                    return;
                }

                //正在处理...返回
                if(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode().intValue()==status.intValue()){
                    log.info("消息:"+msgId+",正在处理");
                    return;
                }

                //处理失败
                if(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode().intValue()==status.intValue()){
                    //获得消息处理次数
                    Integer times = mqConsumerLog.getConsumerTimes();
                    if(times>3){
                        log.info("消息:"+msgId+",消息处理超过3次,不能再进行处理了");
                        return;
                    }
                    mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());

                    //使用数据库乐观锁更新
                    TradeMqConsumerLogExample example = new TradeMqConsumerLogExample();
                    TradeMqConsumerLogExample.Criteria criteria = example.createCriteria();
                    criteria.andMsgTagEqualTo(mqConsumerLog.getMsgTag());
                    criteria.andMsgKeyEqualTo(mqConsumerLog.getMsgKey());
                    criteria.andGroupNameEqualTo(groupName);
                    criteria.andConsumerTimesEqualTo(mqConsumerLog.getConsumerTimes());
                    int r = mqConsumerLogMapper.updateByExampleSelective(mqConsumerLog, example);
                    if(r<=0){
                        //未修改成功,其他线程并发修改
                        log.info("并发修改,稍后处理");
                    }
                }

            }else{
                //4. 判断如果没有消费过...
                mqConsumerLog = new TradeMqConsumerLog();
                mqConsumerLog.setMsgTag(tags);
                mqConsumerLog.setMsgKey(keys);
                mqConsumerLog.setGroupName(groupName);
                mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());
                mqConsumerLog.setMsgBody(body);
                mqConsumerLog.setMsgId(msgId);
                mqConsumerLog.setConsumerTimes(0);

                //将消息处理信息添加到数据库
                mqConsumerLogMapper.insert(mqConsumerLog);
            }
            //5. 回退库存
            MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
            Long goodsId = mqEntity.getGoodsId();
            TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsId);
            goods.setGoodsNumber(goods.getGoodsNumber()+mqEntity.getGoodsNum());
            goodsMapper.updateByPrimaryKey(goods);


            //6. 将消息的处理状态改为成功
            mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode());
            mqConsumerLog.setConsumerTimestamp(new Date());
            mqConsumerLogMapper.updateByPrimaryKey(mqConsumerLog);
            log.info("回退库存成功");
        } catch (Exception e) {
            e.printStackTrace();
            TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();
            primaryKey.setMsgTag(tags);
            primaryKey.setMsgKey(keys);
            primaryKey.setGroupName(groupName);
            TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);
            if(mqConsumerLog==null){
                //数据库未有记录
                mqConsumerLog = new TradeMqConsumerLog();
                mqConsumerLog.setMsgTag(tags);
                mqConsumerLog.setMsgKey(keys);
                mqConsumerLog.setGroupName(groupName);
                mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode());
                mqConsumerLog.setMsgBody(body);
                mqConsumerLog.setMsgId(msgId);
                mqConsumerLog.setConsumerTimes(1);
                mqConsumerLogMapper.insert(mqConsumerLog);
            }else{
                mqConsumerLog.setConsumerTimes(mqConsumerLog.getConsumerTimes()+1);
                mqConsumerLogMapper.updateByPrimaryKeySelective(mqConsumerLog);
            }
        }

    }
}

03收发消息

学习来源 https://www.bilibili.com/video/BV1L4411y7mn(添加小部分笔记)感谢作者!前提

依赖包#

		<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

消息生产者步骤#

创建生产者,生产者组名–>指定nameserver地址–>启动producer–>

创建消息对象(Topic、Tag、消息体)

发送消息、关闭生产者producer

消息消费者步骤#

创建消费者,制定消费者组名–>指定nameserver地址

订阅Topic和Tag,设置回调函数处理消息

启动消费者consumer

消息发送#

同步消息#

发送消息后客户端会进行阻塞,直到得到结果后,客户端才会继续执行

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        //创建Producer,并指定生产者组
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.1.135:9876;192.168.1.138:9876");
        producer.start();
        for (int i = 0; i < 10; i++) {
            Message msg = new Message();
            msg.setTopic("base");
            msg.setTags("Tag1");
            msg.setBody(("hello world" + i).getBytes());
            //发送消息
            SendResult result = producer.send(msg);
            //发送状态
            SendStatus sendStatus = result.getSendStatus();
            //消息id
            String msgId = result.getMsgId();
            //消息接收队列id
            MessageQueue messageQueue = result.getMessageQueue();
            int queueId = messageQueue.getQueueId();
            log.info(result.toString());
            log.info(messageQueue.toString());
            log.info("status:" + sendStatus +
                    "msgId:" + msgId + "queueId" + queueId);
            TimeUnit.SECONDS.sleep(1);
        }
        log.info("发送结束===================");
        producer.shutdown();
    }

异步消息#

发送消息后不会导致阻塞,当broker返回结果时,会调用回调函数进行处理

02双主双从集群搭建

学习来源 https://www.bilibili.com/video/BV1L4411y7mn(添加小部分笔记)感谢作者!

服务器信息修改#

在.135和.138均进行下面的操作

解压#

rocketmq解压到/usr/local/rocketmq目录下

host添加#

#添加host
vim /etc/hosts
##添加内容
192.168.1.135 rocketmq-nameserver1
192.168.1.138 rocketmq-nameserver2
192.168.1.135 rocketmq-master1
192.168.1.135 rocketmq-slave2
192.168.1.138 rocketmq-master2
192.168.1.138 rocketmq-slave1
## 保存后
systemctl restart network

防火墙#

直接关闭#

## 防火墙关闭
systemctl stop firewalld.service
## 防火墙状态查看
firewall-cmd --state
##禁止开机启动
systemctl disable firewalld.service

或者直接关闭对应端口即可#

ly-20241212142200796

环境变量配置#

为了执行rocketmq命令方便

#添加环境变量
vim /etc/profile
#添加
ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-4.4.0-bin-release
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH
#使配置生效
source /etc/profile

消息存储路径创建#

a#

mkdir /usr/local/rocketmq/store-a
mkdir /usr/local/rocketmq/store-a/commitlog
mkdir /usr/local/rocketmq/store-a/consumequeue
mkdir /usr/local/rocketmq/store-a/index

b#

mkdir /usr/local/rocketmq/store-b
mkdir /usr/local/rocketmq/store-b/commitlog
mkdir /usr/local/rocketmq/store-b/consumequeue
mkdir /usr/local/rocketmq/store-b/index

双主双从配置文件的修改#

master-a#

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store-a
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store-a/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-a/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-a/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store-a/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store-a/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

slave-b#

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store-b
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store-b/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-b/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-b/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store-b/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store-b/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

master-b#

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store-b
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store-b/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-b/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-b/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store-b/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store-b/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

slave-a#

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store-a
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store-a/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-a/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-a/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store-a/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store-a/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

修改两台主机的runserver.sh及runbroker.sh修改#

修改runbroker.sh#

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

修改runserver.sh#

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

两台主机分别启动nameserver和Brocker#

## 在两台主机分别启动nameserver 
nohup sh mqnamesrv &
#135启动master1
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a.properties &
#135启动slave2
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b-s.properties &
#查看
jps
3478 Jps
3366 BrokerStartup
3446 BrokerStartup
3334 NamesrvStartup
#138启动master2
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b.properties &
#135启动slave1
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a-s.properties &
#查看
jps
3376 Jps
3360 BrokerStartup
3251 NamesrvStartup
3295 BrokerStartup

双主双从集群搭建完毕!

01rocketmq学习

学习来源 https://www.bilibili.com/video/BV1L4411y7mn(添加小部分笔记)感谢作者!

基本操作#

下载#

https://rocketmq.apache.org/download/ 选择Binary下载即可,放到Linux主机中

前提java运行环境#

yum search java | grep jdk
yum install -y java-1.8.0-openjdk-devel.x86_64
# java -version 正常
# javac -version 正常

启动#

#nameserver启动
nohup sh bin/mqnamesrv &
#nameserver日志查看
tail -f ~/logs/rocketmqlogs/namesrv.log
#输出
2023-04-06 00:08:34 INFO main - tls.client.certPath = null
2023-04-06 00:08:34 INFO main - tls.client.authServer = false
2023-04-06 00:08:34 INFO main - tls.client.trustCertPath = null
2023-04-06 00:08:35 INFO main - Using OpenSSL provider
2023-04-06 00:08:35 INFO main - SSLContext created for server
2023-04-06 00:08:36 INFO NettyEventExecutor - NettyEventExecutor service started
2023-04-06 00:08:36 INFO main - The Name Server boot success. serializeType=JSON
2023-04-06 00:08:36 INFO FileWatchService - FileWatchService service started
2023-04-06 00:09:35 INFO NSScheduledThread1 - --------------------------------------------------------
2023-04-06 00:09:35 INFO NSScheduledThread1 - configTable SIZE: 0
#broker启动
nohup sh bin/mqbroker -n localhost:9876 &
#查看broker日志
tail -f ~/logs/rocketmqlogs/broker.log
#日志如下
tail: 无法打开"/root/logs/rocketmqlogs/broker.log" 读取数据: 没有那个文件或目录
tail: 没有剩余文件
👇
#jps查看
2465 Jps
2430 NamesrvStartup
#说明没有启动成功,因为默认配置的虚拟机内存较大
vim bin/runbroker.sh  以及 vim runserver.sh
#修改 
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
#修改为
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
#修改完毕后启动
#先关闭namesrv后
#按上述启动namesrv以及broker
sh bin/mqshutdown namesrv
# jsp命令查看进程
2612 Jps
2551 BrokerStartup
2524 NamesrvStartup

测试#

同一台机器上,两个cmd窗口