学习

Flowable-05-spring-boot

入门 #

需要两个依赖

<properties>

        <flowable.version>6.7.2</flowable.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.flowable</groupId>
            <artifactId>flowable-spring-boot-starter</artifactId>
            <version>${flowable.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.h2database/h2 -->
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <version>2.1.212</version>
        </dependency>

    </dependencies>

结合Spring:

只需将依赖项添加到类路径并使用*@SpringBootApplication*注释,幕后就会发生很多事情:

  • 自动创建内存数据源(因为 H2 驱动程序位于类路径中)并传递给 Flowable 流程引擎配置

  • 已创建并公开了 Flowable ProcessEngine、CmmnEngine、DmnEngine、FormEngine、ContentEngine 和 IdmEngine bean

  • 所有 Flowable 服务都暴露为 Spring bean

  • Spring Job Executor 已创建

  • 将自动部署流程文件夹中的任何 BPMN 2.0 流程定义。创建一个文件夹processes并将一个虚拟进程定义(名为one-task-process.bpmn20.xml)添加到此文件夹。该文件的内容如下所示。

    <?xml version="1.0" encoding="UTF-8"?>
    <definitions
            xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
            xmlns:flowable="http://flowable.org/bpmn"
            targetNamespace="Examples">
    
        <process id="oneTaskProcess" name="The One Task Process">
            <startEvent id="theStart" />
            <sequenceFlow id="flow1" sourceRef="theStart" targetRef="theTask" />
            <userTask id="theTask" name="my task" flowable:assignee="kermit" />
            <sequenceFlow id="flow2" sourceRef="theTask" targetRef="theEnd" />
            <endEvent id="theEnd" />
        </process>
    
    </definitions>
    
  • 案例文件夹中的任何 CMMN 1.1 案例定义都将自动部署。

    ...

Flowable-04-spring

ProcessEngineFactoryBean #

  • 将ProcessEngine配置为常规的SpringBean

    <bean id="processEngineConfiguration" class="org.flowable.spring.SpringProcessEngineConfiguration">
        ...
    </bean>
    
    <bean id="processEngine" class="org.flowable.spring.ProcessEngineFactoryBean">
      <property name="processEngineConfiguration" ref="processEngineConfiguration" />
    </bean>
    
  • 使用transaction

    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:tx="http://www.springframework.org/schema/tx"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
                                 http://www.springframework.org/schema/beans/spring-beans.xsd
                               http://www.springframework.org/schema/context
                                 http://www.springframework.org/schema/context/spring-context-2.5.xsd
                               http://www.springframework.org/schema/tx
                                 http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
    
      <bean id="dataSource" class="org.springframework.jdbc.datasource.SimpleDriverDataSource">
        <property name="driverClass" value="org.h2.Driver" />
        <property name="url" value="jdbc:h2:mem:flowable;DB_CLOSE_DELAY=1000" />
        <property name="username" value="sa" />
        <property name="password" value="" />
      </bean>
    
      <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="dataSource" />
      </bean>
    
      <bean id="processEngineConfiguration" class="org.flowable.spring.SpringProcessEngineConfiguration">
        <property name="dataSource" ref="dataSource" />
        <property name="transactionManager" ref="transactionManager" />
        <property name="databaseSchemaUpdate" value="true" />
        <property name="asyncExecutorActivate" value="false" />
      </bean>
    
      <bean id="processEngine" class="org.flowable.spring.ProcessEngineFactoryBean">
        <property name="processEngineConfiguration" ref="processEngineConfiguration" />
      </bean>
    
      <bean id="repositoryService" factory-bean="processEngine" factory-method="getRepositoryService" />
      <bean id="runtimeService" factory-bean="processEngine" factory-method="getRuntimeService" />
      <bean id="taskService" factory-bean="processEngine" factory-method="getTaskService" />
      <bean id="historyService" factory-bean="processEngine" factory-method="getHistoryService" />
      <bean id="managementService" factory-bean="processEngine" factory-method="getManagementService" />
    
    ...
    
  • 还包括了其他的一些bean

    <beans>
      ...
      <tx:annotation-driven transaction-manager="transactionManager"/>
    
      <bean id="userBean" class="org.flowable.spring.test.UserBean">
        <property name="runtimeService" ref="runtimeService" />
      </bean>
    
      <bean id="printer" class="org.flowable.spring.test.Printer" />
    
    </beans>
    
  • 使用

    ...

Flowable-03-api

流程引擎API和服务 #

引擎API是与Flowable交互的常见方式,主要起点是ProcessEngine,可以通过配置(Configuration章节)中描述的多种方式创建。

从ProcessEngine获取包含工作流/BPM方法的各种服务。ProcessEngine和服务对象是线程安全的

ly-20241212142116684

下面是通过processEngine获取各种服务的方法

ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine();

RuntimeService runtimeService = processEngine.getRuntimeService();
RepositoryService repositoryService = processEngine.getRepositoryService();
TaskService taskService = processEngine.getTaskService();
ManagementService managementService = processEngine.getManagementService();
IdentityService identityService = processEngine.getIdentityService();
HistoryService historyService = processEngine.getHistoryService();
FormService formService = processEngine.getFormService();
DynamicBpmnService dynamicBpmnService = processEngine.getDynamicBpmnService();

ProcessEngines.getDefaultProcessEngine()在第一次调用时初始化并构建流程引擎,然后返回相同的流程引擎

ProcessEngines类将扫描所有flowable.cfg.xml和flowable-context.xml文件。

对于所有 flowable.cfg.xml 文件,流程引擎将以典型的 Flowable 方式构建:ProcessEngineConfiguration.createProcessEngineConfigurationFromInputStream(inputStream).buildProcessEngine()。

对于所有 flowable-context.xml 文件,流程引擎将以 Spring 方式构建:首先创建 Spring 应用程序上下文,然后从该应用程序上下文中获取流程引擎。

The RepositoryService is probably the first service needed when working with the Flowable engine.

该服务**(RepositoryService)提供用于管理和操作部署deployments**和流程定义的操作

  • 查询引擎已知的部署和流程定义
  • 暂停和激活作为一个整体或特定流程定义的部署。挂起意味着不能对它们执行进一步的操作,而激活则相反并再次启用操作
  • 检索各种资源,例如引擎自动生成的部署或流程图中包含的文件
  • 检索流程定义的 POJO 版本,该版本可用于使用 Java 而不是 XML 来内省流程

RepositoryService主要是关于静态信息(不会改变的数据,或者至少不会改变太多),而RuntimeService处理启动流程定义的新流程实例

...

Flowable-02-Configuration

创建流程引擎 #

Flowable 流程引擎通过一个名为 flowable.cfg.xml 的 XML 文件进行配置

  • 现在类路径下放置floable.cfg.xml文件

    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    
      <bean id="processEngineConfiguration" class="org.flowable.engine.impl.cfg.StandaloneProcessEngineConfiguration">
    
        <property name="jdbcUrl" value="jdbc:h2:mem:flowable;DB_CLOSE_DELAY=1000" />
        <property name="jdbcDriver" value="org.h2.Driver" />
        <property name="jdbcUsername" value="sa" />
        <property name="jdbcPassword" value="" />
    
        <property name="databaseSchemaUpdate" value="true" />
    
        <property name="asyncExecutorActivate" value="false" />
    
        <property name="mailServerHost" value="mail.my-corp.com" />
        <property name="mailServerPort" value="5025" />
      </bean>
    
    </beans>
    
  • 然后使用静态方法进行获取ProcessEngine

    ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine();
    
  • 还有其他配置,这里不一一列举,详见文档地址 https://www.flowable.com/open-source/docs/bpmn/ch03-Configuration

  • 大致目录如下 ly-20241212142116369 ly-20241212142116576

Flowable-01-GettingStarted

入门 #

什么是流动性 #

Flowable 是一个用 Java 编写的轻量级业务流程引擎。Flowable 流程引擎允许您部署 BPMN 2.0 流程定义(用于定义流程的行业 XML 标准)、创建这些流程定义的流程实例、运行查询、访问活动或历史流程实例和相关数据等等。

可以使用 Flowable REST API 通过 HTTP 进行通信。还有几个 Flowable 应用程序(Flowable Modeler、Flowable Admin、Flowable IDM 和 Flowable Task)提供开箱即用的示例 UI,用于处理流程和任务。

Flowable和Activiti #

Flowable是Activiti的一个分支

构建命令行命令 #

创建流程引擎 #

请假流程如下

  • 员工要求休假数次
  • 经理批准或拒绝请求
  • 之后将模拟再某个外部系统中注册请求,并向员工发送一封包含结果的邮件

创建一个空的Mave项目,并添加依赖

    <dependencies>
        <dependency>
            <groupId>org.flowable</groupId>
            <artifactId>flowable-engine</artifactId>
            <version>6.6.0</version>
        </dependency>
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <version>1.3.176</version>
        </dependency> 

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.29</version> <!--当版本号>=8.0.22时会报date转字符串的错误-->
        </dependency>
    </dependencies>

添加一个带有Main方法的类

这里实例化一个ProcessEngine实例,一般只需要实例化一次,是通过ProcessEngineConfiguration创建的,用来配置和调整流程引擎的配置

  • ProcessEngineConfiguration也可以使用配置 XML 文件创建
  • ProcessEngineConfiguration需要的最低配置是与数据库的 JDBC 连接
package org.flowable;

import org.flowable.engine.ProcessEngine;
import org.flowable.engine.ProcessEngineConfiguration;
import org.flowable.engine.impl.cfg.StandaloneProcessEngineConfiguration;

public class HolidayRequest {
    public static void main(String[] args) {

        //这里改用mysql,注意后面的nullCatalogMeansCurrent=true
        //注意,pom需要添加mysql驱动依赖
        ProcessEngineConfiguration cfg = new StandaloneProcessEngineConfiguration()
                .setJdbcUrl("jdbc:mysql://localhost:3306/flowable_official?useUnicode=true" +
                        "&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&allowMultiQueries=true"
                        +"&nullCatalogMeansCurrent=true"
                )
                .setJdbcUsername("root")
                .setJdbcPassword("123456")
                .setJdbcDriver("com.mysql.cj.jdbc.Driver")
                .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE);
        /* //这是官网,用的h2
        ProcessEngineConfiguration cfg = new StandaloneProcessEngineConfiguration()
                .setJdbcUrl("jdbc:h2:mem:flowable;DB_CLOSE_DELAY=-1")
                .setJdbcUsername("sa")
                .setJdbcPassword("")
                .setJdbcDriver("org.h2.Driver")
                .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE);*/

        ProcessEngine processEngine = cfg.buildProcessEngine();
    }
}

运行后会出现slf4j的警告,添加依赖并编写配置文件即可

...

算法红皮书 2.1.2-2.1.3

排序 #

初级排序算法 #

选择排序 #

  • 命题A。对于长度为N 的数组,选择排序需要大约 N^2/2 次比较和N 次交换。

  • 代码

    public class Selection
    {
    	public static void sort(Comparable[] a)
    	{
    		// 将a[]按升序排列
    		int N = a.length;
    		// 数组长度
    		for (int i = 0; i < N; i++)
    		{
    			// 将a[i]和a[i+1..N]中最小的元素交换
    			int min = i;
    			// 最小元素的索引
    			for (int j = i+1; j < N; j++)
    			if (less(a[j], a[min])) min = j;
    			exch(a, i, min);
    		}
    	}
    	// less()、exch()、isSorted()和main()方法见“排序算法类模板”
    }
    
  • 特点

    • 运行时间与输入无关,即输入数据的初始状态(比如是否已排序好等等)不影响排序时间
    • 数据移动是最少的(只使用了N次交换,交换次数和数组的大小是线性关系

插入排序 #

  • 命题B。对于随机排列的长度为N 且主键不重复的数组,平均情况下插入排序需要~ N2/4 次比较以及~ N2/4 次交换。最坏情况下需要~ N2/2 次比较和~ N2/2 次交换,最好情况下需要N-1次比较和0 次交换。

    ...

算法红皮书 2.1.1

排序 #

排序就是将一组对象按照某种逻辑顺序重新排序的过程

  • 对排序算法的分析有助于理解本书中比较算法性能的方法
  • 类似技术能解决其他类型问题
  • 排序算法常常是我们解决其他问题的第一步

初级排序算法 #

  • 熟悉术语及技巧
  • 某些情况下初级算法更有效
  • 有助于改进复杂算法的效率

游戏规则 #

  • 主要关注重新排序数组元素的算法,每个元素都会有一个主键

  • 排序后索引较大的主键大于索引较小的主键

  • 一般情况下排序算法通过两个方法操作数据,less()进行比较,exch()进行交换

  • 排序算法类的模板

    public class Example
    {
    	public static void sort(Comparable[] a)
    	{
    		/* 请见算法2.1、算法2.2、算法2.3、算法2.4、算法2.5或算法2.7*/
    	}
    	private static Boolean less(Comparable v, Comparable w)
    	{
    		return v.compareTo(w) < 0;
    	}
    	private static void exch(Comparable[] a, int i, int j)
    	{
    		Comparable t = a[i];
    		a[i] = a[j];
    		a[j] = t;
    	}
    	private static void show(Comparable[] a)
    	{
    		// 在单行中打印数组
    		for (int i = 0; i < a.length; i++)
    		StdOut.print(a[i] + " ");
    		StdOut.println();
    	}
    	public static Boolean isSorted(Comparable[] a)
    	{
    		// 测试数组元素是否有序
    		for (int i = 1; i < a.length; i++)
    		if (less(a[i], a[i-1])) return false;
    		return true;
    	}
    	public static void main(String[]
    	args)
    	{
    		// 从标准输入读取字符串,将它们排序并输出
    		String[] a = In.readStrings();
    		sort(a);
    		assert isSorted(a);
    		show(a);
    	}
    }
    

05高级功能

学习来源 https://www.bilibili.com/video/BV1L4411y7mn(添加小部分笔记)感谢作者!

消息存储 #

流程 #

ly-20241212142201154

存储介质 #

关系型数据库DB #

适合数据量不够大,比如ActiveMQ可选用JDBC方式作为消息持久化

文件系统 #

  1. 关系型数据库最终也是要存到文件系统中的,不如直接存到文件系统,绕过关系型数据库
  2. 常见的RocketMQ/RabbitMQ/Kafka都是采用消息刷盘到计算机的文件系统来做持久化(同步刷盘/异步刷盘)

消息发送 #

  1. 顺序写:600MB/s,随机写:100KB/s

    • 系统运行一段时间后,我们对文件的增删改会导致磁盘上数据无法连续,非常的分散。

    • 顺序读也只是逻辑上的顺序,也就是按照当前文件的相对偏移量顺序读取,并非磁盘上连续空间读取

    • 对于磁盘的读写分为两种模式,顺序IO随机IO。 随机IO存在一个寻址的过程,所以效率比较低。而顺序IO,相当于有一个物理索引,在读取的时候不需要寻找地址,效率很高。

    • 来源: https://www.cnblogs.com/liuche/p/15455808.html

  2. 数据网络传输

    零拷贝技术MappedByteBuffer,省去了用户态,由内核态直接拷贝到网络驱动内核
    RocketMQ默认设置单个CommitLog日志数据文件为1G

    ly-20241212142201208

消息存储 #

三个概念:commitLog、ConsumerQueue、index

CommitLog #

  1. 默认大小1G
    ly-20241212142201240
  2. 存储消息的元数据,包括了Topic、QueueId、Message
  3. 还存储了ConsumerQueue相关信息,所以ConsumerQueue丢了也没事

ConsumerQueue #

  1. 存储了消息在CommitLog的索引(几百K,Linux会事先加载到内存中)
  2. 包括最小/最大偏移量、已经消费的偏移量
  3. 一个Topic多个队列,每个队列对应一个ConsumerQueue
    ly-20241212142201276

Index #

也是索引文件,为消息查询服务,通过key或时间区间查询消息

总结 #

ly-20241212142201312

刷盘机制 #

ly-20241212142201340

  1. 同步刷盘
  2. 异步刷盘

高可用性机制 #

消费高可用及发送高可用 #

ly-20241212142201372

...

算法红皮书 1.5.1-1.5.3

案例研究:union-find 算法 #

  • 设计和分析算法的基本方法
    • 优秀的算法能解决实际问题
    • 高效的算法也可以很简单
    • 理解某个实现的性能特点是一项有趣的挑战
    • 在解决同一个问题的多种算法间选择,科学方法是一种重要工具
    • 迭代式改进能让算法效率越来越高

动态连通性 #

  • 从输入中读取整数对p q,如果已知的所有整数对都不能说明p,q相连,就打印出pq
  • 网络:整个程序能够判定是否需要在pq之间架设一条新的连接才能进行通信
  • 变量名等价性(即指向同一个对象的多个引用)
  • 数学集合:在处理一个整数对pq时,我们是在判断它们是否属于相同的集合
  • 本节中,将对象称为触点,整数对称为连接,等价类称为连通分量或是简称分量
  • 连通性 问题只要求我们的程序能够判别给定的整数对pq是否相连,并没有要求给两者之间的通路上的所有连接
  • union-find算法的API
    ly-20241212142056628
  • 数据结构和算法的设计影响到算法的效率

实现 #

public class UF
{
	private int[]	id;
	/* 分量id(以触点作为索引) */
	private int	count;
	/* 分量数量 */
	public UF( int N )
		{
		/* 初始化分量id数组 */
		count	= N;
		id	= new int[N];
		for ( int i = 0; i < N; i++ )
					id[i] = i;
	}
	public int count()
		{
		return(count);
	}
	public Boolean connected( int p, int q )
		{
		return(find( p ) == find( q ) );
	}
	public int find( int p )
		public void union( int p, int q )
	/* 请见1.5.2.1节用例(quick-find)、1.5.2.3节用例(quick-union)和算法1.5(加权quick-union) */
	public static void main( String[] args )
		{
		/* 解决由StdIn得到的动态连通性问题 */
		int	N	= StdIn.readint();
		/* 读取触点数量 */
		UF	uf	= new UF( N );
		/* 初始化N个分量 */
		while ( !StdIn.isEmpty() )
				{
			int	p	= StdIn.readint();
			int	q	= StdIn.readint();
			/* 读取整数对 */
			if ( uf.connected( p, q ) )
							continue;
			/* 如果已经连通则忽略 */
			uf.union( p, q );
			/* 归并分量 */
			StdOut.println( p + " " + q );
			/* 打印连接 */
		}
		StdOut.println( uf.count() + "components" );
	}
}

union-find的成本模型:union-find API的各种算法,统计的是数组的访问次数,不论读写

...

04案例

学习来源 https://www.bilibili.com/video/BV1L4411y7mn(添加小部分笔记)感谢作者!基本架构

架构 #

ly-20241212142200977

流程图 #

下单流程 #

ly-20241212142201029

支付流程 #

ly-20241212142201068

SpringBoot整合RocketMQ #

依赖包 #

	   <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
        </dependency>

生产者 #

yaml #

rocketmq:
  name-server: 192.168.1.135:9876;192.168.1.138:9876
  producer:
    group: my-group

使用 #

    @Autowired
    private RocketMQTemplate template;

    @RequestMapping("rocketmq")
    public String rocketmq(){
        log.info("我被调用了-rocketmq");
        //主题+内容
        template.convertAndSend("mytopic-ly","hello1231");
        return "hello world"+serverPort;
    }

消费者 #

yaml #

rocketmq:
  name-server: 192.168.1.135:9876;192.168.1.138:9876
  consumer:
    group: my-group2

使用 #

创建监听器

...