2023年4月13日 23:27 周四转载自https://www.cnblogs.com/Yunya-Cnblogs/p/14608937.html(添加小部分笔记)感谢作者!
部分参考自
https://www.cnblogs.com/ysocean/p/12328088.html
基本准备
#
![ly-20241212142159186.png](img/ly-20241212142159186.png)
架构
#
采用Centos7,Redis版本为6.2,架构如下:
![ly-20241212142159238](img/ly-20241212142159238.png)
hosts修改
#
vim /etc/hosts
#添加
192.168.1.101 node1
192.168.1.102 node2
192.168.1.103 node3
集群准备
#
对每个节点
#
下载redis并解压到 /usr/local/redis-cluster中
cd /usr/local
mkdir redis-cluster
tar -zxvf redis* -C /usr/local/redis*
进入redis根目录
安装完毕
hosts修改
vim /etc/hosts
#添加
192.168.1.101 node1
192.168.1.102 node2
192.168.1.103 node3
配置文件修改(6个节点中的每一个)
#
创建多级目录
mkdir -p /usr/local/redis_cluster/redis_63{79,80}/{conf,pid,logs}
![ly-20241212142159275](img/ly-20241212142159275.png)
编写配置文件
...
2022年4月9日 09:20 周六学习来源
https://www.bilibili.com/video/BV1L4411y7mn(添加小部分笔记)感谢作者!
消息存储
#
流程
#
![ly-20241212142201154](img/ly-20241212142201154.png)
存储介质
#
关系型数据库DB
#
适合数据量不够大,比如ActiveMQ可选用JDBC方式作为消息持久化
文件系统
#
- 关系型数据库最终也是要存到文件系统中的,不如直接存到文件系统,绕过关系型数据库
- 常见的RocketMQ/RabbitMQ/Kafka都是采用消息刷盘到计算机的文件系统来做持久化(同步刷盘/异步刷盘)
消息发送
#
顺序写:600MB/s,随机写:100KB/s
数据网络传输
零拷贝技术MappedByteBuffer,省去了用户态,由内核态直接拷贝到网络驱动内核。
RocketMQ默认设置单个CommitLog日志数据文件为1G
![ly-20241212142201208](img/ly-20241212142201208.png)
消息存储
#
三个概念:commitLog、ConsumerQueue、index
CommitLog
#
- 默认大小1G
![ly-20241212142201240](img/ly-20241212142201240.png)
- 存储消息的元数据,包括了Topic、QueueId、Message
- 还存储了ConsumerQueue相关信息,所以ConsumerQueue丢了也没事
ConsumerQueue
#
- 存储了消息在CommitLog的索引(几百K,Linux会事先加载到内存中)
- 包括最小/最大偏移量、已经消费的偏移量
- 一个Topic多个队列,每个队列对应一个ConsumerQueue
![ly-20241212142201276](img/ly-20241212142201276.png)
Index
#
也是索引文件,为消息查询服务,通过key或时间区间查询消息
总结
#
![ly-20241212142201312](img/ly-20241212142201312.png)
刷盘机制
#
![ly-20241212142201340](img/ly-20241212142201340.png)
- 同步刷盘
- 异步刷盘
高可用性机制
#
消费高可用及发送高可用
#
![ly-20241212142201372](img/ly-20241212142201372.png)
...
2022年4月8日 11:00 周五学习来源
https://www.bilibili.com/video/BV1L4411y7mn(添加小部分笔记)感谢作者!基本架构
架构
#
![ly-20241212142200977](img/ly-20241212142200977.png)
流程图
#
下单流程
#
![ly-20241212142201029](img/ly-20241212142201029.png)
支付流程
#
![ly-20241212142201068](img/ly-20241212142201068.png)
SpringBoot整合RocketMQ
#
依赖包
#
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
生产者
#
yaml
#
rocketmq:
name-server: 192.168.1.135:9876;192.168.1.138:9876
producer:
group: my-group
使用
#
@Autowired
private RocketMQTemplate template;
@RequestMapping("rocketmq")
public String rocketmq(){
log.info("我被调用了-rocketmq");
//主题+内容
template.convertAndSend("mytopic-ly","hello1231");
return "hello world"+serverPort;
}
消费者
#
yaml
#
rocketmq:
name-server: 192.168.1.135:9876;192.168.1.138:9876
consumer:
group: my-group2
使用
#
创建监听器
...
2022年4月7日 14:31 周四学习来源
https://www.bilibili.com/video/BV1L4411y7mn(添加小部分笔记)感谢作者!前提
依赖包
#
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
消息生产者步骤
#
创建生产者,生产者组名–>指定nameserver地址–>启动producer–>
创建消息对象(Topic、Tag、消息体)
发送消息、关闭生产者producer
消息消费者步骤
#
创建消费者,制定消费者组名–>指定nameserver地址
订阅Topic和Tag,设置回调函数处理消息
启动消费者consumer
消息发送
#
同步消息
#
发送消息后客户端会进行阻塞,直到得到结果后,客户端才会继续执行
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
//创建Producer,并指定生产者组
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.1.135:9876;192.168.1.138:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message msg = new Message();
msg.setTopic("base");
msg.setTags("Tag1");
msg.setBody(("hello world" + i).getBytes());
//发送消息
SendResult result = producer.send(msg);
//发送状态
SendStatus sendStatus = result.getSendStatus();
//消息id
String msgId = result.getMsgId();
//消息接收队列id
MessageQueue messageQueue = result.getMessageQueue();
int queueId = messageQueue.getQueueId();
log.info(result.toString());
log.info(messageQueue.toString());
log.info("status:" + sendStatus +
"msgId:" + msgId + "queueId" + queueId);
TimeUnit.SECONDS.sleep(1);
}
log.info("发送结束===================");
producer.shutdown();
}
异步消息
#
发送消息后不会导致阻塞,当broker返回结果时,会调用回调函数进行处理
...
2022年4月6日 10:55 周三学习来源
https://www.bilibili.com/video/BV1L4411y7mn(添加小部分笔记)感谢作者!
服务器信息修改
#
在.135和.138均进行下面的操作
解压
#
rocketmq解压到/usr/local/rocketmq目录下
host添加
#
#添加host
vim /etc/hosts
##添加内容
192.168.1.135 rocketmq-nameserver1
192.168.1.138 rocketmq-nameserver2
192.168.1.135 rocketmq-master1
192.168.1.135 rocketmq-slave2
192.168.1.138 rocketmq-master2
192.168.1.138 rocketmq-slave1
## 保存后
systemctl restart network
防火墙
#
直接关闭
#
## 防火墙关闭
systemctl stop firewalld.service
## 防火墙状态查看
firewall-cmd --state
##禁止开机启动
systemctl disable firewalld.service
或者直接关闭对应端口即可
#
![ly-20241212142200796](img/ly-20241212142200796.png)
环境变量配置
#
为了执行rocketmq命令方便
#添加环境变量
vim /etc/profile
#添加
ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-4.4.0-bin-release
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH
#使配置生效
source /etc/profile
消息存储路径创建
#
mkdir /usr/local/rocketmq/store-a
mkdir /usr/local/rocketmq/store-a/commitlog
mkdir /usr/local/rocketmq/store-a/consumequeue
mkdir /usr/local/rocketmq/store-a/index
mkdir /usr/local/rocketmq/store-b
mkdir /usr/local/rocketmq/store-b/commitlog
mkdir /usr/local/rocketmq/store-b/consumequeue
mkdir /usr/local/rocketmq/store-b/index
双主双从配置文件的修改
#
master-a
#
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store-a
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store-a/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-a/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-a/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store-a/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store-a/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
slave-b
#
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store-b
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store-b/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-b/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-b/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store-b/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store-b/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
master-b
#
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store-b
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store-b/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-b/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-b/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store-b/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store-b/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
slave-a
#
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store-a
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store-a/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-a/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-a/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store-a/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store-a/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
修改两台主机的runserver.sh及runbroker.sh修改
#
修改runbroker.sh
#
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
修改runserver.sh
#
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
两台主机分别启动nameserver和Brocker
#
## 在两台主机分别启动nameserver
nohup sh mqnamesrv &
#135启动master1
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a.properties &
#135启动slave2
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b-s.properties &
#查看
jps
3478 Jps
3366 BrokerStartup
3446 BrokerStartup
3334 NamesrvStartup
#138启动master2
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b.properties &
#135启动slave1
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a-s.properties &
#查看
jps
3376 Jps
3360 BrokerStartup
3251 NamesrvStartup
3295 BrokerStartup
双主双从集群搭建完毕!
...
2022年4月6日 00:11 周三学习来源
https://www.bilibili.com/video/BV1L4411y7mn(添加小部分笔记)感谢作者!
基本操作
#
下载
#
https://rocketmq.apache.org/download/ 选择Binary下载即可,放到Linux主机中
前提java运行环境
#
yum search java | grep jdk
yum install -y java-1.8.0-openjdk-devel.x86_64
# java -version 正常
# javac -version 正常
启动
#
#nameserver启动
nohup sh bin/mqnamesrv &
#nameserver日志查看
tail -f ~/logs/rocketmqlogs/namesrv.log
#输出
2023-04-06 00:08:34 INFO main - tls.client.certPath = null
2023-04-06 00:08:34 INFO main - tls.client.authServer = false
2023-04-06 00:08:34 INFO main - tls.client.trustCertPath = null
2023-04-06 00:08:35 INFO main - Using OpenSSL provider
2023-04-06 00:08:35 INFO main - SSLContext created for server
2023-04-06 00:08:36 INFO NettyEventExecutor - NettyEventExecutor service started
2023-04-06 00:08:36 INFO main - The Name Server boot success. serializeType=JSON
2023-04-06 00:08:36 INFO FileWatchService - FileWatchService service started
2023-04-06 00:09:35 INFO NSScheduledThread1 - --------------------------------------------------------
2023-04-06 00:09:35 INFO NSScheduledThread1 - configTable SIZE: 0
#broker启动
nohup sh bin/mqbroker -n localhost:9876 &
#查看broker日志
tail -f ~/logs/rocketmqlogs/broker.log
#日志如下
tail: 无法打开"/root/logs/rocketmqlogs/broker.log" 读取数据: 没有那个文件或目录
tail: 没有剩余文件
👇
#jps查看
2465 Jps
2430 NamesrvStartup
#说明没有启动成功,因为默认配置的虚拟机内存较大
vim bin/runbroker.sh 以及 vim runserver.sh
#修改
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
#修改为
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
#修改完毕后启动
#先关闭namesrv后
#按上述启动namesrv以及broker
sh bin/mqshutdown namesrv
# jsp命令查看进程
2612 Jps
2551 BrokerStartup
2524 NamesrvStartup
测试
#
同一台机器上,两个cmd窗口
...