学习

算法红皮书 2.1.2-2.1.3

排序 #

初级排序算法 #

选择排序 #

  • 命题A。对于长度为N 的数组,选择排序需要大约 N^2/2 次比较和N 次交换。

  • 代码

    public class Selection
    {
    	public static void sort(Comparable[] a)
    	{
    		// 将a[]按升序排列
    		int N = a.length;
    		// 数组长度
    		for (int i = 0; i < N; i++)
    		{
    			// 将a[i]和a[i+1..N]中最小的元素交换
    			int min = i;
    			// 最小元素的索引
    			for (int j = i+1; j < N; j++)
    			if (less(a[j], a[min])) min = j;
    			exch(a, i, min);
    		}
    	}
    	// less()、exch()、isSorted()和main()方法见“排序算法类模板”
    }
    
  • 特点

    • 运行时间与输入无关,即输入数据的初始状态(比如是否已排序好等等)不影响排序时间
    • 数据移动是最少的(只使用了N次交换,交换次数和数组的大小是线性关系

插入排序 #

  • 命题B。对于随机排列的长度为N 且主键不重复的数组,平均情况下插入排序需要~ N2/4 次比较以及~ N2/4 次交换。最坏情况下需要~ N2/2 次比较和~ N2/2 次交换,最好情况下需要N-1次比较和0 次交换。

    ...

算法红皮书 2.1.1

排序 #

排序就是将一组对象按照某种逻辑顺序重新排序的过程

  • 对排序算法的分析有助于理解本书中比较算法性能的方法
  • 类似技术能解决其他类型问题
  • 排序算法常常是我们解决其他问题的第一步

初级排序算法 #

  • 熟悉术语及技巧
  • 某些情况下初级算法更有效
  • 有助于改进复杂算法的效率

游戏规则 #

  • 主要关注重新排序数组元素的算法,每个元素都会有一个主键

  • 排序后索引较大的主键大于索引较小的主键

  • 一般情况下排序算法通过两个方法操作数据,less()进行比较,exch()进行交换

  • 排序算法类的模板

    public class Example
    {
    	public static void sort(Comparable[] a)
    	{
    		/* 请见算法2.1、算法2.2、算法2.3、算法2.4、算法2.5或算法2.7*/
    	}
    	private static Boolean less(Comparable v, Comparable w)
    	{
    		return v.compareTo(w) < 0;
    	}
    	private static void exch(Comparable[] a, int i, int j)
    	{
    		Comparable t = a[i];
    		a[i] = a[j];
    		a[j] = t;
    	}
    	private static void show(Comparable[] a)
    	{
    		// 在单行中打印数组
    		for (int i = 0; i < a.length; i++)
    		StdOut.print(a[i] + " ");
    		StdOut.println();
    	}
    	public static Boolean isSorted(Comparable[] a)
    	{
    		// 测试数组元素是否有序
    		for (int i = 1; i < a.length; i++)
    		if (less(a[i], a[i-1])) return false;
    		return true;
    	}
    	public static void main(String[]
    	args)
    	{
    		// 从标准输入读取字符串,将它们排序并输出
    		String[] a = In.readStrings();
    		sort(a);
    		assert isSorted(a);
    		show(a);
    	}
    }
    

05高级功能

学习来源 https://www.bilibili.com/video/BV1L4411y7mn(添加小部分笔记)感谢作者!

消息存储 #

流程 #

ly-20241212142201154

存储介质 #

关系型数据库DB #

适合数据量不够大,比如ActiveMQ可选用JDBC方式作为消息持久化

文件系统 #

  1. 关系型数据库最终也是要存到文件系统中的,不如直接存到文件系统,绕过关系型数据库
  2. 常见的RocketMQ/RabbitMQ/Kafka都是采用消息刷盘到计算机的文件系统来做持久化(同步刷盘/异步刷盘)

消息发送 #

  1. 顺序写:600MB/s,随机写:100KB/s

    • 系统运行一段时间后,我们对文件的增删改会导致磁盘上数据无法连续,非常的分散。

    • 顺序读也只是逻辑上的顺序,也就是按照当前文件的相对偏移量顺序读取,并非磁盘上连续空间读取

    • 对于磁盘的读写分为两种模式,顺序IO随机IO。 随机IO存在一个寻址的过程,所以效率比较低。而顺序IO,相当于有一个物理索引,在读取的时候不需要寻找地址,效率很高。

    • 来源: https://www.cnblogs.com/liuche/p/15455808.html

  2. 数据网络传输

    零拷贝技术MappedByteBuffer,省去了用户态,由内核态直接拷贝到网络驱动内核
    RocketMQ默认设置单个CommitLog日志数据文件为1G

    ly-20241212142201208

消息存储 #

三个概念:commitLog、ConsumerQueue、index

CommitLog #

  1. 默认大小1G
    ly-20241212142201240
  2. 存储消息的元数据,包括了Topic、QueueId、Message
  3. 还存储了ConsumerQueue相关信息,所以ConsumerQueue丢了也没事

ConsumerQueue #

  1. 存储了消息在CommitLog的索引(几百K,Linux会事先加载到内存中)
  2. 包括最小/最大偏移量、已经消费的偏移量
  3. 一个Topic多个队列,每个队列对应一个ConsumerQueue
    ly-20241212142201276

Index #

也是索引文件,为消息查询服务,通过key或时间区间查询消息

总结 #

ly-20241212142201312

刷盘机制 #

ly-20241212142201340

  1. 同步刷盘
  2. 异步刷盘

高可用性机制 #

消费高可用及发送高可用 #

ly-20241212142201372

...

算法红皮书 1.5.1-1.5.3

案例研究:union-find 算法 #

  • 设计和分析算法的基本方法
    • 优秀的算法能解决实际问题
    • 高效的算法也可以很简单
    • 理解某个实现的性能特点是一项有趣的挑战
    • 在解决同一个问题的多种算法间选择,科学方法是一种重要工具
    • 迭代式改进能让算法效率越来越高

动态连通性 #

  • 从输入中读取整数对p q,如果已知的所有整数对都不能说明p,q相连,就打印出pq
  • 网络:整个程序能够判定是否需要在pq之间架设一条新的连接才能进行通信
  • 变量名等价性(即指向同一个对象的多个引用)
  • 数学集合:在处理一个整数对pq时,我们是在判断它们是否属于相同的集合
  • 本节中,将对象称为触点,整数对称为连接,等价类称为连通分量或是简称分量
  • 连通性 问题只要求我们的程序能够判别给定的整数对pq是否相连,并没有要求给两者之间的通路上的所有连接
  • union-find算法的API
    ly-20241212142056628
  • 数据结构和算法的设计影响到算法的效率

实现 #

public class UF
{
	private int[]	id;
	/* 分量id(以触点作为索引) */
	private int	count;
	/* 分量数量 */
	public UF( int N )
		{
		/* 初始化分量id数组 */
		count	= N;
		id	= new int[N];
		for ( int i = 0; i < N; i++ )
					id[i] = i;
	}
	public int count()
		{
		return(count);
	}
	public Boolean connected( int p, int q )
		{
		return(find( p ) == find( q ) );
	}
	public int find( int p )
		public void union( int p, int q )
	/* 请见1.5.2.1节用例(quick-find)、1.5.2.3节用例(quick-union)和算法1.5(加权quick-union) */
	public static void main( String[] args )
		{
		/* 解决由StdIn得到的动态连通性问题 */
		int	N	= StdIn.readint();
		/* 读取触点数量 */
		UF	uf	= new UF( N );
		/* 初始化N个分量 */
		while ( !StdIn.isEmpty() )
				{
			int	p	= StdIn.readint();
			int	q	= StdIn.readint();
			/* 读取整数对 */
			if ( uf.connected( p, q ) )
							continue;
			/* 如果已经连通则忽略 */
			uf.union( p, q );
			/* 归并分量 */
			StdOut.println( p + " " + q );
			/* 打印连接 */
		}
		StdOut.println( uf.count() + "components" );
	}
}

union-find的成本模型:union-find API的各种算法,统计的是数组的访问次数,不论读写

...

04案例

学习来源 https://www.bilibili.com/video/BV1L4411y7mn(添加小部分笔记)感谢作者!基本架构

架构 #

ly-20241212142200977

流程图 #

下单流程 #

ly-20241212142201029

支付流程 #

ly-20241212142201068

SpringBoot整合RocketMQ #

依赖包 #

	   <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
        </dependency>

生产者 #

yaml #

rocketmq:
  name-server: 192.168.1.135:9876;192.168.1.138:9876
  producer:
    group: my-group

使用 #

    @Autowired
    private RocketMQTemplate template;

    @RequestMapping("rocketmq")
    public String rocketmq(){
        log.info("我被调用了-rocketmq");
        //主题+内容
        template.convertAndSend("mytopic-ly","hello1231");
        return "hello world"+serverPort;
    }

消费者 #

yaml #

rocketmq:
  name-server: 192.168.1.135:9876;192.168.1.138:9876
  consumer:
    group: my-group2

使用 #

创建监听器

...

03收发消息

学习来源 https://www.bilibili.com/video/BV1L4411y7mn(添加小部分笔记)感谢作者!前提

依赖包 #

		<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

消息生产者步骤 #

创建生产者,生产者组名–>指定nameserver地址–>启动producer–>

创建消息对象(Topic、Tag、消息体)

发送消息、关闭生产者producer

消息消费者步骤 #

创建消费者,制定消费者组名–>指定nameserver地址

订阅Topic和Tag,设置回调函数处理消息

启动消费者consumer

消息发送 #

同步消息 #

发送消息后客户端会进行阻塞,直到得到结果后,客户端才会继续执行

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        //创建Producer,并指定生产者组
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.1.135:9876;192.168.1.138:9876");
        producer.start();
        for (int i = 0; i < 10; i++) {
            Message msg = new Message();
            msg.setTopic("base");
            msg.setTags("Tag1");
            msg.setBody(("hello world" + i).getBytes());
            //发送消息
            SendResult result = producer.send(msg);
            //发送状态
            SendStatus sendStatus = result.getSendStatus();
            //消息id
            String msgId = result.getMsgId();
            //消息接收队列id
            MessageQueue messageQueue = result.getMessageQueue();
            int queueId = messageQueue.getQueueId();
            log.info(result.toString());
            log.info(messageQueue.toString());
            log.info("status:" + sendStatus +
                    "msgId:" + msgId + "queueId" + queueId);
            TimeUnit.SECONDS.sleep(1);
        }
        log.info("发送结束===================");
        producer.shutdown();
    }

异步消息 #

发送消息后不会导致阻塞,当broker返回结果时,会调用回调函数进行处理

...

算法红皮书 1.4.1-1.4.10

算法分析 #

使用数学分析为算法成本建立简洁的模型,并使用实验数据验证这些模型

科学方法 #

  • 观察、假设、预测、观察并核实预测、反复确认预测和观察
  • 原则:实验可重现

观察 #

  • 计算性任务的困难程度可以用问题的规模来衡量

  • 问题规模可以是输入的大小或某个命令行参数的值

  • 研究问题规模和运行时间的关系

  • 使用计时器得到大概的运行时间 ly-20241212142054451

    • 典型用例

      public static void main(String[] args) {
              int N = Integer.parseInt(args[0]);
              int[] a = new int[N];
              for (int i = 0; i < N; i++)
                  a[i] = StdRandom.uniform(-1000000, 1000000);
              Stopwatch timer = new Stopwatch();
              int cnt = ThreeSum.count(a);
              double time = timer.elapsedTime();
              StdOut.println(cnt + " triples " + time + " seconds");
          }
      
    • 使用方法 ly-20241212142054686

    • 数据类型的实现

      public class Stopwatch {
          private final long start;
      
          public Stopwatch() {
              start = System.currentTimeMillis();
          }
      
          public double elapsedTime() {
              long now = System.currentTimeMillis();
      
              return (now - start) / 1000.0;
          }
      }
      

数学模型 #

  • 程序运行的总时间主要和两点有关:执行每条语句的耗时;执行每条语句的频率

    ...

算法红皮书1.3.3.1-1.3.4

背包、队列和栈 #

链表 #

  • 链表是一种递归的数据结构,它或者为空(null),或者是一个指向一个结点(node)的引用,该节点含有一个泛型的元素和一个指向另一条链表的引用。

结点记录 #

  • 使用嵌套类定义结点的抽象数据类型

    private class Node
    {
    	Item item;
    	Node next;
    }
    
    • 该类没有其它任何方法,且会在代码中直接引用实例变量,这种类型的变量称为记录

构造链表 #

  • 需要一个Node类型的变量,保证它的值是null或者指向另一个Node对象的next域指向了另一个链表
  • 如下图 ly-20241212142053630
  • 链表表示的是一列元素
  • 链式结构在本书中的可视化表示 长方形表示对象;实例变量的值写在长方形中;用指向被引用对象的箭头表示引用关系
  • 术语链接表示对结点的引用

在表头插入结点 #

  • 在首结点为first 的给定链表开头插入字符串not,我们先将first 保存在oldfirst 中, 然后将一个新结点赋予first,并将它的item 域设为not,next 域设为oldfirst

  • 时间复杂度为O(1)

  • 如图 ly-20241212142053870

从表头删除结点 #

  • 将first指向first.next

  • 原先的结点称为孤儿,Java的内存管理系统最终将回收它所占用的内存

  • 如图 ly-20241212142053982

在表尾插入结点 #

  • 每个修改链表的操作都需要增加检查是否要修改该变量(以及做出相应修改)的代码

  • 例如,当删除链表首结点时可能改变指向链表的尾结点的引用,因为链表中只有一个结点时它既是首结点又是尾结点

  • 如图 ly-20241212142054097

其他位置的插入和删除操作 #

删除指定结点;在指定节点插入新结点

  • 需要将链表尾结点的前一个节点中的链接(它指向的是last)值改为null
  • 为了找到指向last的结点,需要遍历链表,时间复杂度为O(n)
  • 实现任意插入和删除操作的标准解决方案是双向链表

遍历 #

  • 将x初始化为链表首结点,然后通过x.item访问和x相关联的元素,并将x设为x.next来访问链表中的下一个结点,知道x=null(没有下一个结点了,到达链表结尾)

    ...

02双主双从集群搭建

学习来源 https://www.bilibili.com/video/BV1L4411y7mn(添加小部分笔记)感谢作者!

服务器信息修改 #

在.135和.138均进行下面的操作

解压 #

rocketmq解压到/usr/local/rocketmq目录下

host添加 #

#添加host
vim /etc/hosts
##添加内容
192.168.1.135 rocketmq-nameserver1
192.168.1.138 rocketmq-nameserver2
192.168.1.135 rocketmq-master1
192.168.1.135 rocketmq-slave2
192.168.1.138 rocketmq-master2
192.168.1.138 rocketmq-slave1
## 保存后
systemctl restart network

防火墙 #

直接关闭 #

## 防火墙关闭
systemctl stop firewalld.service
## 防火墙状态查看
firewall-cmd --state
##禁止开机启动
systemctl disable firewalld.service

或者直接关闭对应端口即可 #

ly-20241212142200796

环境变量配置 #

为了执行rocketmq命令方便

#添加环境变量
vim /etc/profile
#添加
ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-4.4.0-bin-release
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH
#使配置生效
source /etc/profile

消息存储路径创建 #

a #

mkdir /usr/local/rocketmq/store-a
mkdir /usr/local/rocketmq/store-a/commitlog
mkdir /usr/local/rocketmq/store-a/consumequeue
mkdir /usr/local/rocketmq/store-a/index

b #

mkdir /usr/local/rocketmq/store-b
mkdir /usr/local/rocketmq/store-b/commitlog
mkdir /usr/local/rocketmq/store-b/consumequeue
mkdir /usr/local/rocketmq/store-b/index

双主双从配置文件的修改 #

master-a #

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store-a
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store-a/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-a/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-a/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store-a/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store-a/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

slave-b #

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store-b
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store-b/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-b/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-b/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store-b/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store-b/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

master-b #

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store-b
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store-b/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-b/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-b/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store-b/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store-b/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

slave-a #

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store-a
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store-a/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-a/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-a/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store-a/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store-a/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

修改两台主机的runserver.sh及runbroker.sh修改 #

修改runbroker.sh #

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

修改runserver.sh #

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

两台主机分别启动nameserver和Brocker #

## 在两台主机分别启动nameserver 
nohup sh mqnamesrv &
#135启动master1
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a.properties &
#135启动slave2
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b-s.properties &
#查看
jps
3478 Jps
3366 BrokerStartup
3446 BrokerStartup
3334 NamesrvStartup
#138启动master2
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b.properties &
#135启动slave1
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a-s.properties &
#查看
jps
3376 Jps
3360 BrokerStartup
3251 NamesrvStartup
3295 BrokerStartup

双主双从集群搭建完毕!

...

01rocketmq学习

学习来源 https://www.bilibili.com/video/BV1L4411y7mn(添加小部分笔记)感谢作者!

基本操作 #

下载 #

https://rocketmq.apache.org/download/ 选择Binary下载即可,放到Linux主机中

前提java运行环境 #

yum search java | grep jdk
yum install -y java-1.8.0-openjdk-devel.x86_64
# java -version 正常
# javac -version 正常

启动 #

#nameserver启动
nohup sh bin/mqnamesrv &
#nameserver日志查看
tail -f ~/logs/rocketmqlogs/namesrv.log
#输出
2023-04-06 00:08:34 INFO main - tls.client.certPath = null
2023-04-06 00:08:34 INFO main - tls.client.authServer = false
2023-04-06 00:08:34 INFO main - tls.client.trustCertPath = null
2023-04-06 00:08:35 INFO main - Using OpenSSL provider
2023-04-06 00:08:35 INFO main - SSLContext created for server
2023-04-06 00:08:36 INFO NettyEventExecutor - NettyEventExecutor service started
2023-04-06 00:08:36 INFO main - The Name Server boot success. serializeType=JSON
2023-04-06 00:08:36 INFO FileWatchService - FileWatchService service started
2023-04-06 00:09:35 INFO NSScheduledThread1 - --------------------------------------------------------
2023-04-06 00:09:35 INFO NSScheduledThread1 - configTable SIZE: 0
#broker启动
nohup sh bin/mqbroker -n localhost:9876 &
#查看broker日志
tail -f ~/logs/rocketmqlogs/broker.log
#日志如下
tail: 无法打开"/root/logs/rocketmqlogs/broker.log" 读取数据: 没有那个文件或目录
tail: 没有剩余文件
👇
#jps查看
2465 Jps
2430 NamesrvStartup
#说明没有启动成功,因为默认配置的虚拟机内存较大
vim bin/runbroker.sh  以及 vim runserver.sh
#修改 
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
#修改为
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
#修改完毕后启动
#先关闭namesrv后
#按上述启动namesrv以及broker
sh bin/mqshutdown namesrv
# jsp命令查看进程
2612 Jps
2551 BrokerStartup
2524 NamesrvStartup

测试 #

同一台机器上,两个cmd窗口

...