04案例

学习来源 https://www.bilibili.com/video/BV1L4411y7mn(添加小部分笔记)感谢作者!基本架构

架构#

ly-20241212142200977

流程图#

下单流程#

ly-20241212142201029

支付流程#

ly-20241212142201068

SpringBoot整合RocketMQ#

依赖包#

	   <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
        </dependency>

生产者#

yaml#

rocketmq:
  name-server: 192.168.1.135:9876;192.168.1.138:9876
  producer:
    group: my-group

使用#

    @Autowired
    private RocketMQTemplate template;

    @RequestMapping("rocketmq")
    public String rocketmq(){
        log.info("我被调用了-rocketmq");
        //主题+内容
        template.convertAndSend("mytopic-ly","hello1231");
        return "hello world"+serverPort;
    }

消费者#

yaml#

rocketmq:
  name-server: 192.168.1.135:9876;192.168.1.138:9876
  consumer:
    group: my-group2

使用#

创建监听器

@RocketMQMessageListener(topic = "mytopic-ly",
        consumeMode = ConsumeMode.CONCURRENTLY,consumerGroup = "${rocketmq.producer.group}")
@Slf4j
@Component
public class Consumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        log.info("消费了"+s);
    }
}

下单流程利用MQ进行回退处理,保证数据一致性#

ly-20241212142201106

库存回退的消费者,代码如下:

@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup =
 "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{


    @Value("${mq.order.consumer.group.name}")
    private String groupName;

    @Autowired
    private TradeGoodsMapper goodsMapper;

    @Autowired
    private TradeMqConsumerLogMapper mqConsumerLogMapper;

    @Autowired
    private TradeGoodsNumberLogMapper goodsNumberLogMapper;

    @Override
    public void onMessage(MessageExt messageExt) {
        String msgId=null;
        String tags=null;
        String keys=null;
        String body=null;
        try {
            //1. 解析消息内容
            msgId = messageExt.getMsgId();
            tags= messageExt.getTags();
            keys= messageExt.getKeys();
            body= new String(messageExt.getBody(),"UTF-8");

            log.info("接受消息成功");

            //2. 查询消息消费记录
            TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();
            primaryKey.setMsgTag(tags);
            primaryKey.setMsgKey(keys);
            primaryKey.setGroupName(groupName);
            TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);

            if(mqConsumerLog!=null){
                //3. 判断如果消费过...
                //3.1 获得消息处理状态
                Integer status = mqConsumerLog.getConsumerStatus();
                //处理过...返回
                if(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode().intValue()==status.intValue()){
                    log.info("消息:"+msgId+",已经处理过");
                    return;
                }

                //正在处理...返回
                if(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode().intValue()==status.intValue()){
                    log.info("消息:"+msgId+",正在处理");
                    return;
                }

                //处理失败
                if(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode().intValue()==status.intValue()){
                    //获得消息处理次数
                    Integer times = mqConsumerLog.getConsumerTimes();
                    if(times>3){
                        log.info("消息:"+msgId+",消息处理超过3次,不能再进行处理了");
                        return;
                    }
                    mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());

                    //使用数据库乐观锁更新
                    TradeMqConsumerLogExample example = new TradeMqConsumerLogExample();
                    TradeMqConsumerLogExample.Criteria criteria = example.createCriteria();
                    criteria.andMsgTagEqualTo(mqConsumerLog.getMsgTag());
                    criteria.andMsgKeyEqualTo(mqConsumerLog.getMsgKey());
                    criteria.andGroupNameEqualTo(groupName);
                    criteria.andConsumerTimesEqualTo(mqConsumerLog.getConsumerTimes());
                    int r = mqConsumerLogMapper.updateByExampleSelective(mqConsumerLog, example);
                    if(r<=0){
                        //未修改成功,其他线程并发修改
                        log.info("并发修改,稍后处理");
                    }
                }

            }else{
                //4. 判断如果没有消费过...
                mqConsumerLog = new TradeMqConsumerLog();
                mqConsumerLog.setMsgTag(tags);
                mqConsumerLog.setMsgKey(keys);
                mqConsumerLog.setGroupName(groupName);
                mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());
                mqConsumerLog.setMsgBody(body);
                mqConsumerLog.setMsgId(msgId);
                mqConsumerLog.setConsumerTimes(0);

                //将消息处理信息添加到数据库
                mqConsumerLogMapper.insert(mqConsumerLog);
            }
            //5. 回退库存
            MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
            Long goodsId = mqEntity.getGoodsId();
            TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsId);
            goods.setGoodsNumber(goods.getGoodsNumber()+mqEntity.getGoodsNum());
            goodsMapper.updateByPrimaryKey(goods);


            //6. 将消息的处理状态改为成功
            mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode());
            mqConsumerLog.setConsumerTimestamp(new Date());
            mqConsumerLogMapper.updateByPrimaryKey(mqConsumerLog);
            log.info("回退库存成功");
        } catch (Exception e) {
            e.printStackTrace();
            TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();
            primaryKey.setMsgTag(tags);
            primaryKey.setMsgKey(keys);
            primaryKey.setGroupName(groupName);
            TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);
            if(mqConsumerLog==null){
                //数据库未有记录
                mqConsumerLog = new TradeMqConsumerLog();
                mqConsumerLog.setMsgTag(tags);
                mqConsumerLog.setMsgKey(keys);
                mqConsumerLog.setGroupName(groupName);
                mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode());
                mqConsumerLog.setMsgBody(body);
                mqConsumerLog.setMsgId(msgId);
                mqConsumerLog.setConsumerTimes(1);
                mqConsumerLogMapper.insert(mqConsumerLog);
            }else{
                mqConsumerLog.setConsumerTimes(mqConsumerLog.getConsumerTimes()+1);
                mqConsumerLogMapper.updateByPrimaryKeySelective(mqConsumerLog);
            }
        }

    }
}

03收发消息

学习来源 https://www.bilibili.com/video/BV1L4411y7mn(添加小部分笔记)感谢作者!前提

依赖包#

		<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

消息生产者步骤#

创建生产者,生产者组名–>指定nameserver地址–>启动producer–>

创建消息对象(Topic、Tag、消息体)

发送消息、关闭生产者producer

消息消费者步骤#

创建消费者,制定消费者组名–>指定nameserver地址

订阅Topic和Tag,设置回调函数处理消息

启动消费者consumer

消息发送#

同步消息#

发送消息后客户端会进行阻塞,直到得到结果后,客户端才会继续执行

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        //创建Producer,并指定生产者组
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.1.135:9876;192.168.1.138:9876");
        producer.start();
        for (int i = 0; i < 10; i++) {
            Message msg = new Message();
            msg.setTopic("base");
            msg.setTags("Tag1");
            msg.setBody(("hello world" + i).getBytes());
            //发送消息
            SendResult result = producer.send(msg);
            //发送状态
            SendStatus sendStatus = result.getSendStatus();
            //消息id
            String msgId = result.getMsgId();
            //消息接收队列id
            MessageQueue messageQueue = result.getMessageQueue();
            int queueId = messageQueue.getQueueId();
            log.info(result.toString());
            log.info(messageQueue.toString());
            log.info("status:" + sendStatus +
                    "msgId:" + msgId + "queueId" + queueId);
            TimeUnit.SECONDS.sleep(1);
        }
        log.info("发送结束===================");
        producer.shutdown();
    }

异步消息#

发送消息后不会导致阻塞,当broker返回结果时,会调用回调函数进行处理

算法红皮书 1.4.1-1.4.10

算法分析#

使用数学分析为算法成本建立简洁的模型,并使用实验数据验证这些模型

科学方法#

  • 观察、假设、预测、观察并核实预测、反复确认预测和观察
  • 原则:实验可重现

观察#

  • 计算性任务的困难程度可以用问题的规模来衡量

  • 问题规模可以是输入的大小或某个命令行参数的值

  • 研究问题规模和运行时间的关系

  • 使用计时器得到大概的运行时间 ly-20241212142054451

    • 典型用例

      public static void main(String[] args) {
              int N = Integer.parseInt(args[0]);
              int[] a = new int[N];
              for (int i = 0; i < N; i++)
                  a[i] = StdRandom.uniform(-1000000, 1000000);
              Stopwatch timer = new Stopwatch();
              int cnt = ThreeSum.count(a);
              double time = timer.elapsedTime();
              StdOut.println(cnt + " triples " + time + " seconds");
          }
    • 使用方法 ly-20241212142054686

    • 数据类型的实现

      public class Stopwatch {
          private final long start;
      
          public Stopwatch() {
              start = System.currentTimeMillis();
          }
      
          public double elapsedTime() {
              long now = System.currentTimeMillis();
      
              return (now - start) / 1000.0;
          }
      }

数学模型#

  • 程序运行的总时间主要和两点有关:执行每条语句的耗时;执行每条语句的频率

算法红皮书1.3.3.1-1.3.4

背包、队列和栈#

链表#

  • 链表是一种递归的数据结构,它或者为空(null),或者是一个指向一个结点(node)的引用,该节点含有一个泛型的元素和一个指向另一条链表的引用。

结点记录#

  • 使用嵌套类定义结点的抽象数据类型

    private class Node
    {
    	Item item;
    	Node next;
    }
    • 该类没有其它任何方法,且会在代码中直接引用实例变量,这种类型的变量称为记录

构造链表#

  • 需要一个Node类型的变量,保证它的值是null或者指向另一个Node对象的next域指向了另一个链表
  • 如下图 ly-20241212142053630
  • 链表表示的是一列元素
  • 链式结构在本书中的可视化表示 长方形表示对象;实例变量的值写在长方形中;用指向被引用对象的箭头表示引用关系
  • 术语链接表示对结点的引用

在表头插入结点#

  • 在首结点为first 的给定链表开头插入字符串not,我们先将first 保存在oldfirst 中, 然后将一个新结点赋予first,并将它的item 域设为not,next 域设为oldfirst

  • 时间复杂度为O(1)

  • 如图 ly-20241212142053870

从表头删除结点#

  • 将first指向first.next

  • 原先的结点称为孤儿,Java的内存管理系统最终将回收它所占用的内存

  • 如图 ly-20241212142053982

在表尾插入结点#

  • 每个修改链表的操作都需要增加检查是否要修改该变量(以及做出相应修改)的代码

  • 例如,当删除链表首结点时可能改变指向链表的尾结点的引用,因为链表中只有一个结点时它既是首结点又是尾结点

  • 如图 ly-20241212142054097

其他位置的插入和删除操作#

删除指定结点;在指定节点插入新结点

  • 需要将链表尾结点的前一个节点中的链接(它指向的是last)值改为null
  • 为了找到指向last的结点,需要遍历链表,时间复杂度为O(n)
  • 实现任意插入和删除操作的标准解决方案是双向链表

遍历#

  • 将x初始化为链表首结点,然后通过x.item访问和x相关联的元素,并将x设为x.next来访问链表中的下一个结点,知道x=null(没有下一个结点了,到达链表结尾)

    for (Node x = first; x != null; x = x.next)
    {
    // 处理x.item
    }

栈的实现#

  • 使用链表实现栈

  • 将栈保存为一条链表,栈的顶部即为表头,实例变量first 指向栈顶。这样,当使用push() 压入一个元素时,我们会按照1.3.3.3 节所讨论的代码将该元素添加在表头;当使用pop() 删除一个元素时,我们会按照1.3.3.4 节讨论的代码将该元素从表头删除。要实现size() 方法,我们用实例变量N 保存元素的个数,在压入元素时将N 加1,在弹出元素时将N 减1。要实现isEmpty() 方法,只需检查first 是否为null(或者可以检查N 是否为0)

02双主双从集群搭建

学习来源 https://www.bilibili.com/video/BV1L4411y7mn(添加小部分笔记)感谢作者!

服务器信息修改#

在.135和.138均进行下面的操作

解压#

rocketmq解压到/usr/local/rocketmq目录下

host添加#

#添加host
vim /etc/hosts
##添加内容
192.168.1.135 rocketmq-nameserver1
192.168.1.138 rocketmq-nameserver2
192.168.1.135 rocketmq-master1
192.168.1.135 rocketmq-slave2
192.168.1.138 rocketmq-master2
192.168.1.138 rocketmq-slave1
## 保存后
systemctl restart network

防火墙#

直接关闭#

## 防火墙关闭
systemctl stop firewalld.service
## 防火墙状态查看
firewall-cmd --state
##禁止开机启动
systemctl disable firewalld.service

或者直接关闭对应端口即可#

ly-20241212142200796

环境变量配置#

为了执行rocketmq命令方便

#添加环境变量
vim /etc/profile
#添加
ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-4.4.0-bin-release
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH
#使配置生效
source /etc/profile

消息存储路径创建#

a#

mkdir /usr/local/rocketmq/store-a
mkdir /usr/local/rocketmq/store-a/commitlog
mkdir /usr/local/rocketmq/store-a/consumequeue
mkdir /usr/local/rocketmq/store-a/index

b#

mkdir /usr/local/rocketmq/store-b
mkdir /usr/local/rocketmq/store-b/commitlog
mkdir /usr/local/rocketmq/store-b/consumequeue
mkdir /usr/local/rocketmq/store-b/index

双主双从配置文件的修改#

master-a#

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store-a
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store-a/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-a/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-a/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store-a/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store-a/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

slave-b#

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store-b
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store-b/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-b/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-b/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store-b/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store-b/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

master-b#

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store-b
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store-b/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-b/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-b/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store-b/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store-b/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

slave-a#

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store-a
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store-a/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-a/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-a/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store-a/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store-a/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

修改两台主机的runserver.sh及runbroker.sh修改#

修改runbroker.sh#

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

修改runserver.sh#

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

两台主机分别启动nameserver和Brocker#

## 在两台主机分别启动nameserver 
nohup sh mqnamesrv &
#135启动master1
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a.properties &
#135启动slave2
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b-s.properties &
#查看
jps
3478 Jps
3366 BrokerStartup
3446 BrokerStartup
3334 NamesrvStartup
#138启动master2
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b.properties &
#135启动slave1
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a-s.properties &
#查看
jps
3376 Jps
3360 BrokerStartup
3251 NamesrvStartup
3295 BrokerStartup

双主双从集群搭建完毕!

01rocketmq学习

学习来源 https://www.bilibili.com/video/BV1L4411y7mn(添加小部分笔记)感谢作者!

基本操作#

下载#

https://rocketmq.apache.org/download/ 选择Binary下载即可,放到Linux主机中

前提java运行环境#

yum search java | grep jdk
yum install -y java-1.8.0-openjdk-devel.x86_64
# java -version 正常
# javac -version 正常

启动#

#nameserver启动
nohup sh bin/mqnamesrv &
#nameserver日志查看
tail -f ~/logs/rocketmqlogs/namesrv.log
#输出
2023-04-06 00:08:34 INFO main - tls.client.certPath = null
2023-04-06 00:08:34 INFO main - tls.client.authServer = false
2023-04-06 00:08:34 INFO main - tls.client.trustCertPath = null
2023-04-06 00:08:35 INFO main - Using OpenSSL provider
2023-04-06 00:08:35 INFO main - SSLContext created for server
2023-04-06 00:08:36 INFO NettyEventExecutor - NettyEventExecutor service started
2023-04-06 00:08:36 INFO main - The Name Server boot success. serializeType=JSON
2023-04-06 00:08:36 INFO FileWatchService - FileWatchService service started
2023-04-06 00:09:35 INFO NSScheduledThread1 - --------------------------------------------------------
2023-04-06 00:09:35 INFO NSScheduledThread1 - configTable SIZE: 0
#broker启动
nohup sh bin/mqbroker -n localhost:9876 &
#查看broker日志
tail -f ~/logs/rocketmqlogs/broker.log
#日志如下
tail: 无法打开"/root/logs/rocketmqlogs/broker.log" 读取数据: 没有那个文件或目录
tail: 没有剩余文件
👇
#jps查看
2465 Jps
2430 NamesrvStartup
#说明没有启动成功,因为默认配置的虚拟机内存较大
vim bin/runbroker.sh  以及 vim runserver.sh
#修改 
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
#修改为
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
#修改完毕后启动
#先关闭namesrv后
#按上述启动namesrv以及broker
sh bin/mqshutdown namesrv
# jsp命令查看进程
2612 Jps
2551 BrokerStartup
2524 NamesrvStartup

测试#

同一台机器上,两个cmd窗口

算法红皮书 1.3.1.1-1.3.2.5

背包、队列和栈#

  • 数据类型的值就是一组对象的集合,所有操作都是关于添加、删除或是访问集合中的对象
  • 本章将学习三种数据类型:背包Bag、队列Queue、栈Stack
    • 对集合中的对象的表示方式直接影响各种操作的效率
    • 介绍泛型和迭代
    • 介绍并说明链式数据结构的重要性(链表)

API#

  • 泛型可迭代的基础集合数据类型的API

    • 背包
      ly-20241212142051025
    • 队列(先进先出FIFO)
      ly-20241212142051259
    • 下压(后进先出,LIFO)栈 ly-20241212142051367
  • 泛型

    • 泛型,参数化类型
    • 在每份API 中,类名后的<Item> 记号将Item 定义为一个类型参数,它是一个象征性的占位符,表示的是用例将会使用的某种具体数据类型
  • 自动装箱

    • 用来处理原始类型
    • Boolean、Byte、Character、Double、Float、Integer、Long 和Short 分别对应着boolean、byte、char、double、float、int、long 和short
    • 自动将一个原始数据类型转换为一个封装类型称为自动装箱,自动将一个封装类型转换为一个原始数据类型被称为自动拆箱
  • 可迭代的集合类型

    • 迭代访问集合中的所有元素
  • 背包是一种不支持从中删除元素的集合数据类型–帮助用例收集元素并迭代遍历所有收集到的元素(无序遍历

    • 典型用例,计算标准差
  • 先进先出队列

    • 是一种基于先进先出(FIFO)策略的集合类型
    • 使用队列的主要原因:集合保存元素的同时保存它们的相对顺序
    • 如图
      ly-20241212142051478
    • Queue用例(先进先出)
      ly-20241212142051591
  • 下压栈

    • 简称栈,是一种基于后进先出LIFO策略的集合类型
    • 比如,收邮件等,如图
      ly-20241212142051703
    • Stack的用例
      ly-20241212142051815
  • 用栈解决算数表达式的问题
    (双栈算数表达式求值算法)
    ly-20241212142051919

集合类数据类型的实现#

  • 定容栈,表示容量固定的字符串栈的抽象数据类型

    • 只能处理String值,支持push和pop

    • 抽象数据类型
      ly-20241212142052029

    • 测试用例

      ly-20241212142052135

    • 使用方法
      ly-20241212142052243

    • 数据类型的实现
      ly-20241212142052353

  • 泛型

    • public class FixedCapacityStack<Item>
    • 由于不允许直接创建泛型数组,所以 a =new Item[cap] 不允许,应该改为
      a=(Item[])new Object[cap];
    • 泛型定容栈的抽象数据类型
      ly-20241212142052474
    • 测试用例
      ly-20241212142052593
    • 使用方法
      ly-20241212142052746
    • 数据类型的实现
      ly-20241212142052865
  • 调整数组大小

算法红皮书 1.2.1-1.2.5

数据抽象#

数据类型指的是一组值和一组对这些值的操作的集合

  • 定义和使用数据类型的过程,也被称为数据抽象
  • Java编程的基础是使用class关键字构造被称为引用类型的数据类型,也称面向对象编程
  • 定义自己的数据类型来抽象任意对象
  • 抽象数据类型(ADT)是一种能够对使用者隐藏数据表示的数据类型
  • 抽象数据类型将数据和函数的实现相关联,将数据的表示方式隐藏起来
  • 抽象数据类型使用时,关注API描述的操作上而不会去关心数据的表示;实现抽象数据类型时,关注数据本身并将实现对数据的各种操作
  • 研究同一个问题的不同算法的主要原因是他们的性能不同

使用抽象数据类型#

  • 使用一种数据类型并不一定非得知道它是如何实现的
  • 使用Counter(计数器)的简单数据类型的程序,操作有
    • 创建对象并初始化为0
    • 当前值加1
    • 获取当前值
  • 场景,用于电子计票
  • 抽象数据类型的API(应用程序编程接口)
    • API用来说明抽象数据类型的行为
    • 将列出所有构造函数和实例方法(即操作)
  • 计算器的API
  • 继承的方法
    • 所有数据类型都会继承toString()方法
    • Java会在用+运算符将任意数据类型的值和String值连接时调用toString()
    • 默认实现:返回该数据类型值的内存地址
  • 用例代码
    • 可以在用例代码中,声明变量、创建对象来保存数据类型的值并允许通过实例方法来操作它们
  • 对象
    • 对象是能够承载数据类型的值的实体
    • 对象三大特性:状态、标识和行为
      • 状态:数据类型中的值
      • 标识:在内存中的地址
      • 行为:数据类型的操作
    • Java使用"引用类型"和原始数据类型区别
  • 创建对象
    • 每种数据类型中的值都存储于一个对象中
    • 构造函数总是返回他的数据类型的对象的引用
    • 使用new(),会为新的对象分配内存空间,调用构造函数初始化对象中的值,返回该对象的一个引用
  • 抽象数据类型向用例隐藏了值的表示细节
  • 实例方法:参数按值传递
  • 方法每次触发都和一个对象相关
  • 静态方法的主要作用是实现函数;非静态(实例)方法的主要作用是实现数据类型的操作
  • 使用对象
    开发某种数据类型的用例
    • 声明该类型的变量,以引用对象
    • 使用new触发能够创建该类型的对象的一个构造函数
    • 使用变量名调用实例方法
  • 赋值语句(对象赋值)
    • 别名:两个变量同时指向同一个对象
  • 将对象作为参数
    • Java将参数值的一个副本从调用端传递给了方法,这种方式称为按值传递
    • 当使用引用类型作为参数时我们创建的都是别名,这种约定会传递引用的值(复制引用),也就是传递对象的引用
    • 虽然无法改变原始的引用(将原变量指向另一个Counter对象),但能够改变该对象的值
  • 将对象作为返回值
    • 由于Java只由一个返回值,有了对象实际上就能返回多个值
  • 数组也是对象
    • 将数组传递给一个方法或是将一个数组变量放在赋值语句的右侧时,我们都是在创建数组引用的一个副本,而非数组的副本
  • 对象的数组
    创建一个对象的数组
    • 使用方括号语法调用数组的构造函数创建数组
    • 对于每个数组元素调用它的构造函数创建相应的对象
      如下图
  • 运用数据抽象的思想编写代码(定义和使用数据类型,将数据类型的值封装在对象中)的方式称为面向对象编程
  • 总结
    • 数据类型指的是一组值和一组对值的操作的集合
    • 我们会在数据类型的Java类中编写用理
    • 对象是能够存储任意该数据类型的值的实体
    • 对象有三个关键性质:状态、标识和行为

抽象数据类型举例#

  • 本书中将会用到或开发的所有数据类型
    • java.lang.*
    • Java标准库中的抽象数据类型,需要import,比如java.io、java.net等
    • I/O处理嘞抽象数据类型,StdIn和StdOut
    • 面向数据类抽象数据类型,计算机和和信息处理
    • 集合类抽象数据类型,主要是为了简化对同一类型的一组数据的操作,包括Bag、Stack和Queue,PQ(优先队列)、ST(符号表)、SET(集合)
    • 面向操作的抽象数据类型(用来分析各种算法)
    • 图算法相关的抽象数据类型,用来封装各种图的表示的面向数据的抽象数据类型,和一些提供图的处理算法的面向操作的抽象数据类型
  • 几何对象(画图(图形)的)[跳过]
  • 信息处理
    • 抽象数据类型是组织信息的一种自然方式
    • 定义和真实世界中的物体相对应的对象
  • 字符串
    • java的String
    • 一个String值是一串可以由索引访问的char值
    • 有了String类型可以写出清晰干净的用例代码而无需关心字符串的表示方式

抽象数据类型的实现#

  • 使用Java的类(class)实现抽象数据类型并将所有代码放入一个和类名相同并带有.java扩展名的文件
  • 如下图
  • 实例变量
    用来定义数据类型的值(每个对象的状态)
  • 构造函数
    • 每个Java类都至少有一个构造函数以创建一个对象的标识
    • 每个构造函数将创建一个对象并向调用者返回一个该对象的引用
  • 实例方法
    • 如图
  • 作用域
    • 参数变量、局部变量、实例变量
    • 范围(如图)
  • API、用例与实现
    • 我们要学习的每个抽象数据类型的实现,都会是一个含有若干私有实例变量、构造函数、实例方法和一个测试用例的Java类
    • 用例和实现分离(一般将用例独立成含有静态方法main()的类)
    • 做法如下
      • 定义一份API,APi的作用是将使用和实现分离,以实现模块化编程
      • 用一个Java类实现API的定义
      • 实现多个测试用例来验证前两步做出的设计决定
    • 例子如下
      • API
      • 典型用例
      • 数据类型的实现
      • 使用方法(执行程序)

更多抽象数据类型的实现#

  • 日期
    • 两种实现方式
    • 本书反复出现的主题,即理解各种实现对空间和时间的需求
  • 维护多个实现
    • 比较同一份API的两种实现在同一个用例中的性能表现,需要下面非正式的命名约定
      • 使用前缀的描述性修饰符,比如BasicDate和SmallDate,以及是否合法的SmartDate
      • 适合大多数用力的需求的实现,比如Date
  • 累加器

数据类型的设计#

  • 抽象数据类型是一种向用例隐藏内部表示的数据类型
    • 封装(数据封装)
    • 设计APi
  • 算法与抽象数据类型
    • 能够准确地说明一个算法的目的及其他程序应该如何使用该算法
    • 每个Java程序都是一组静态方法和(或)一种数据类型的实现的集合
  • 本书中关注的是抽象数据类型的实现中的操作和向用例隐藏其中的数据表示
  • 例子,将二分法封装
    • API
    • 典型的用例
    • 数据类型的实现
  • 接口继承
    • Java语言为定义对象之间的关系提供了支持,称为接口
    • 接口继承使得我们的程序能够通过调用接口中的方法操作实现该接口的任意类型的对象
  • 本书中使用到的接口
  • 继承
    • 由Object类继承得到的方法
    • 继承toString()并自定义
    • 封装类型(内置的引用类型,包括Boolean、Byte、Character、Double、Float、Integer、Long和Short)
  • 等价性
    • 如图
    • 例子,在Date中重写equals
  • 内存管理
    Java具有自动内存管理,通过记录孤儿对象并将它们的内存释放到内存池中
  • 不可变性
    使用final保证数据不可变
    使用final修饰的引用类型,不能再引用(指向)其他对象,但对象本身的值可改变
  • 契约式设计
    • Java语言能够在程序运行时检测程序状态
    • 异常(Exception)+断言(Assertion)
  • 异常与错误
    允许抛出异常或抛出错误
  • 断言
    程序不应该依赖断言

End#

算法红皮书 1.1.6-1.1.11

基础编程模型#

静态方法#

  • 本书中所有的Java程序要么是数据类型的定义,要么是一个静态方法库
  • 当讨论静态方法和实体方法共有的属性时,我们会使用不加定语的方法一词
  • 方法需要参数(某种数据类型的值)并根据参数计算出某种数据类型的返回值(例如数学函数的结果)或者产生某种副作用(例如打印一个值)
  • 静态方法由签名(public static 以及函数的返回值,方法名及一串参数)和函数体组成
  • 调用静态方法(写出方法名并在后面的括号中列出数值)
  • 方法的性质
    • 方法的参数按值传递,方法中使用的参数变量能够引用调用者的参数并改变其内容(只是不能改变原数组变量本身)
    • 方法名可以被重载
    • 方法只能返回一个值,但能包含多个返回语句
    • 方法可以产生副作用
  • 递归:方法可以调用自己 可以使用数学归纳法证明所解释算法的正确性,编写递归重要的三点
    • 递归总有一个最简单的情况(方法第一条总包含return的条件语句)
    • 递归调用总是去尝试解决一个规模更小的子问题
    • 递归调用的父问题和尝试解决的子问题之间不应该由交集 如下图中,两个子问题各自操作的数组部分是不同的
  • 基础编程模型
    • 静态方法库是定义在一个Java类中的一组静态方法
    • Java开发的基本模式是编写一个静态方法库(包含一个main()方法)类完成一个任务
    • 在本书中,当我们提到用于执行一项人物的Java程序时,我们指的就是用这种模式开发的代码(还包括对数据类型的定义)
  • 模块化编程
    • 通过静态方法库实现了模块化编程
    • 一个库中的静态方法也能够调用另一个库中定义的静态方法
  • 单元测试
    • Java编程最佳实践之一就是每个静态方法库中都包含一个main()函数来测试库中所有的方法
    • 本书中使用main()来说明模块的功能并将测试用例留作练习
  • 外部库
    • 系统标准库 java.lang.*:包括Math库;String和StringBuilder库
    • 导入的系统库 java.util.Arrays
    • 本书中其他库
    • 本书使用了作者开发的标准库Std*

API#

  • 模块化编程重要组成部分,记录库方法的用法并供其他人参考的文档
  • 会统一使用应用程序编程接口API的方法列出每个库方法、签名及简述
  • 用例(调用另一个库中的方法的程序),实现(实现了某个API方法的Java代码)
  • 作者自己的两个库,一个扩展Math.random(),一个支持各种统计
    • 随机静态方法库(StdRandom)的API
    • 数据分析方法库(StdStats)的API
    • StdRandom库中的静态方法的实现
  • 编写自己的库
    • 编写用例,实现中将计算过程分解
    • 明确静态方法库和与之对应的API
    • 实现API和一个能够对方法进行独立测试的main()函数
    • API的目的是将调用和实现分离

字符串#

  • 字符串拼接,使用 +
  • 类型转换(将用户从键盘输入的内容转换成相应数据类型的值以及将各种数据类型的值转换成能够在屏幕上显示的值)
  • 如果数字跟在+后面,那么会将数据类型的值自动转换为字符串
  • 命令行参数
    • Java中字符串的存在,使程序能够接收到从命令行传递来的信息
    • 当输入命令java和一个库名及一系列字符串后,Java系统会调用库的main()方法并将后面的一系列字符串变成一个数组作为参数传递给它

输入输出#

  • Java程序可以从命令行参数或者一个名为标准输入流的抽象字符流中获得输入,并将输出写入另一个名为标准输出流的字符流中
  • 默认情况下,命令行参数、标准输入和标准输出是和应用程序绑定的,而应用程序是由能够接受命令输入的操作系统或是开发环境所支持
  • 使用终端来指代这个应用程序提供的供输入和显示的窗口,如图
  • 命令和参数
    • 终端窗口包含一个提示符,通过它我们能够向操作系统输入命令和参数
    • 操作系统常用命令
  • 标准输出
    • StdOut库的作用是支持标准输出
    • 标准输出库的静态方法的API
    • 格式化输出 字符%并紧跟一个字符表示的转换代码(包括d,f和s)。%和转换代码之间可以插入证书表示值的宽度,且转换后会在字符串左边添加空格以达到需要的宽度。负数表示空格从右边加
    • 宽度后用小数点及数值可以指定精度(或String字符串所截取的长度)
    • 格式中转换代码和对应参数的数据类型必须匹配
  • 标准输入
    • StdIn库从标准输入流中获取数据,然后将标准输出定向到终端窗口
    • 标准输入流最重要的特点,这些值会在程序读取后消失
    • 例子
    • 标准输入库中的静态方法API
  • 重定向和管道
    • 将标准输出重定向到一个文件
      java RandomSeq 1000 100.0 200.0 > data.txt
    • 从文件而不是终端应用程序中读取数据
      java Average < data.txt
    • 将一个程序的输出重定向为另一个程序的输入,叫做管道
      java RandomSeq 1000 100.0 200.0 | java Average
      • 突破了我们能够处理的输入输出流的长度限制
      • 即使计算机没有足够的空间来存储十亿个数,
      • 我们仍然可以将例子中的1000 换成1 000 000 000 (当然我们还是需要一些时间来处理它们)。当RandomSeq 调用StdOut.println() 时,它就向输出流的末尾添加了一个字符串;当Average 调用StdIn.readInt() 时,它就从输入流的开头删除了一个字符串。这些动作发生的实际顺序取决于操作系统
    • 命令行的重定向及管道
  • 基于文件的输入输出
  • In和Out库提供了一些静态方法,来实现向文件中写入或从文件中读取一个原始数据类型的数组的抽象
  • 用于读取和写入数组的静态方法的API
  • 标准绘图库(基本方法和控制方法)–这里跳过

二分查找#

  • 如图,在终端接收需要判断的数字,如果不存在于白名单(文件中的int数组)中则输出
  • 开发用例以及使用测试文件(数组长度很大的白名单)
  • 模拟实际情况来展示当前算法的必要性,比如
    • 将客户的账号保存在一个文件中,我们称它为白名单;
    • 从标准输入中得到每笔交易的账号;
    • 使用这个测试用例在标准输出中打印所有与任何客户无关的账号,公司很可能拒绝此类交易。
  • 使用顺序查找
    public static int rank(int key, int[] a)
    {
      for (int i = 0; i < a.length; i++)
        if (a[i] == key) return i;
      return -1;
    }
  • 当处理大量输入的时候,顺序查找的效率极其低

展望#

  • 下一节,鼓励使用数据抽象,或称面向对象编程,而不是操作预定义的数据类型的静态方法
  • 使用数据抽象的好处
    • 复用性
    • 链式数据结构比数组更灵活
    • 可以准确地定义锁面对的算法问题

1.1 End#