青年IT男

个人从事金融行业,就职过易极付、思建科技、某网约车平台等重庆一流技术团队,目前就职于某银行负责统一支付系统建设。自身对金融行业有强烈的爱好。同时也实践大数据、数据存储、自动化集成和部署、分布式微服务、响应式编程、人工智能等领域。同时也热衷于技术分享创立公众号和博客站点对知识体系进行分享。微信公众号:青年IT男。

SpringBoot2.x集成ServiceComb pack

SpringBoot2.x集成ServiceComb pack

事务基本概念

​ 有过后端数据库编程经验的童鞋应该知道事务的基本理论知识同时网上有许多更为规范的文档参考,我在这里大致简单介绍一下。在数据库编程中我们通常知道ACID的基本概念,为什么会存在这个理论知识的,我个人认为人们在实践的经验中总结出来了对数据库的基本范式和编程规范。

本地事务场景

​ 这里简单的那一个业务场景举例,比如我们有一个这样的积分兑换场景系统为单体架构那么这里会设计到用户积分表、商品表、交易订单表,

积分兑换

当用户发起积分兑换操作时步骤,这三个操作步骤需要保证数据的一致性、原子性、持久性 ,那么就需要开启事务,我们知道在同一个数据库会话连接中就相当于一个事务那么这三个操作步骤要么全部成功、要么全部失败(数据的强一致性),在并发的场景下不同连接会话的事务是不会相互影响。

  • 产生交易订单
  • 扣减库存
  • 扣减积分

以上操作就涉及到数据库的ACID基本概念:

A:即Atomic数据操作原子性

  • 在同一个事务中的操作要么全部成功、要么全部失败。

C:即Consistency数据一致性

  • 可以这样理解为什么会出现数据一致性问题,比如在一个事务操作数据的时候,其他的事务对此操作(commit、rollback)的看到的结果数据是一致的。

I:即Isolation事务的隔离性

  • 不同会话事务操作是互不影响

D:即Durability数据的持久性

  • 当事务被commit或者rollback对数据库的操作是持久化的。

分布式事务基本概念

为什么会出现分布式事务?这个问题在当今微服务盛行的今天我想大家应该深有体会,一个单体应用一个DB在业务操作层Service同一个事务可以完成多表操作,但是如果按照领域模型进行服务拆分后不同的领域对于各自的服务以及DB那么在单体应用中同一个场景下,服务化改造后的操作就会涉及到跨服务操作,就会涉及到不同服务有各自本地的事务操作,那么怎么来实现之前的本地事务ACID呢?显示是一个非常困难的事情,那么就出现了分布式事务的一些模式。

分布式事务场景

有个这样一个场景用户在购买商品&支付我们的架构可能是如下:

distributeservice

当用户通过app或者pc打开我们的商城选好商品后下单,这里涉及到产生交易订单商品库存的锁定调用支付系统帐户变更(混合支付=积分+第三方收单)等,由于我们的每一个服务都有自己DB本地事务操作只能保证本地事务的ACID,但对于整个交易场景来说会涉及到多个本地事务所有不能保证有一个统一的协同操作和回滚机制的保证。那么这个时候就出现了分布式事务的解决方案。

那么在分布式系统中CAP原则和base的基本理论大家可以自行扫盲下,一下介绍几种常见的分布式事务解决方案

  • 强一致模型

  • 2pc 典型的XA模型 拥有三个角色:TM(事务管理者)、RM(资源管理者)、AP(应用), 包括两个阶段:第一是资源的准备 、第二是事务提交,在第一阶段当所有的资源管理者返回预提交成功后才发起第二阶段事务提交,如果在第一阶段存在一个返回预提交失败则回滚。

    • 问题:同步阻塞、事务没有超时机制(存在宕机后事务管理者一致等待资源管理者响应)
    • 3pc 在2pc的基础上增加了一个预备阶段和超时机制

    问题:

    • 极端条件存在数据不一致问题
    • 系统开销大
    • 容易出现单点问题
    • 同步、阻塞性能低
  • 柔性事务
    • TCC 这其实和2pc类似都属于两阶段提交不过这里把过程拆分为:try、confirm、cancel三个阶段,try阶段对资源check和预留,如果成功则进行confirm提交阶段,如果失败则进行cancel补偿阶段。当然这也需要事务的协调者角色参与
    • 问题 对业务入侵比较大 、同步、阻塞
    • sage 把整个分布式事务拆分成多个本地事务,如果所有本地事务都成功那就成功,如果存在失败那就进行补偿,补偿分为:正向补偿和反向补偿。正向补偿:不断的重试失败事务,最大努力尝试保证最终一致性,如果重试多次失败报警人工介入处理,反向补偿:进行反向回滚操作,达到最终一致性。
    • 优点:相比TCC减少try阶段、异步补偿
  • 异步消息(可靠实践模式)
    • 业务方提供本地操作成功回查功能

    在基于异步消息实现分布式事务中当操作本地业务的时候先记录一个消息到本地消息表消息状态为待发送,然后发送预half消息到MQ,此时MQ不会投递消息到消费者,MQ立即返回队列执行结果,如果失败则不执行后面业务同时发送MQ一个rollback消息和修改本地消息状态为 完成,如果返回成功执行本地事务提交和修改本地消息状态已发送并发送MQ一个commit消息表示可以投递。事务回滚则发送MQ一个rollback消息、删除或者修改本地消息表,当收到队列的ack回执后删除或者修改本地消息状态为完成

    • 发送端提供回查
    • 异步操作
    • 业务侵入大
    • 消费端消息去重
    • 消费端消息幂等性

    • 本地消息事务表

    基于消息队列(MQ)+本地事务表的形式, 在基于异步消息实现分布式事务中当操作本地业务的时候同时记录本地事务消息表在同一个事务中进行commit和rollback,然后把本地事务消息发送到MQ,当MQ成功回执后删除本地事务消息,未收到MQ回执需要重新尝试也可以开启一个定时任务去扫描发送MQ。当出现A->B->C 场景中消费者C事务异常则不断重试C,如果重试达到上限还是失败则需报警和人工介入。

    • 异步操作
    • 业务入侵小
    • 消费端消息幂等性

what‘s the ServiceComb pack?

Apache ServiceComb Pack is an eventually data consistency solution for micro-service applications. ServiceComb Pack currently provides TCC and Saga distributed transaction co-ordination solutions by using Alpha as a transaction coordinator and Omega as an transaction agent

也就是说Apache ServiceComb Saga 是一个微服务应用的数据最终一致性解决方案。

特性

  • 高可用。支持集群模式。
  • 高可靠。所有的事务事件都持久存储在数据库中。
  • 高性能。事务事件是通过gRPC来上报的,且事务的请求信息是通过Kyro进行序列化和反序列化的。
  • 低侵入。仅需2-3个注解和编写对应的补偿方法即可进行分布式事务。
  • 部署简单。可通过Docker快速部署。
  • 支持前向恢复(重试)及后向恢复(补偿)。
  • 扩展简单。基于Pack架构很容实现多种协调机制。

架构

Saga Pack 架构是由 alphaomega组成,其中:

  • alpha充当协调者的角色,主要负责对事务进行管理和协调。
  • omega是微服务中内嵌的一个agent,负责对网络请求进行拦截并向alpha上报事务事件。

下图展示了alpha, omega以及微服务三者的关系:

pack

Github:https://github.com/apache/servicecomb-pack

SpringBoot集成ServiceComb pack 案例

服务架构

servicecomb-pack

服务搭建准备工作

  • 使用spring官方生成代码脚手架https://start.spring.io生成springboot代码这里使用springboot2.x,需要依赖SpringWeb、SpringDta JDBC模块,分别生成booking、car、hotel三个项目

buildbookingprojiect

  • 去Github https://github.com/apache/servicecomb-pack下载源码编译最新代码使用0.6.0-SNAPSHOT版本当然如果不想编译使用发行版本0.5.0 ,注意在编译源码的使用注意选择相关的profies

    profiesset

    • 引入servicecomb pack依赖
    <dependency>
        <groupId>org.apache.servicecomb.pack</groupId>
        <artifactId>omega-spring-starter</artifactId>
        <version>0.5.0</version>
    </dependency>
    

    或者

    <dependency>
      <groupId>org.apache.servicecomb.pack</groupId>
      <artifactId>omega-spring-starter</artifactId>
      <version>0.6.0-SNAPSHOT</version>
    </dependency>
    
    • 其次这里我们使用到数据操作所以需要引入数据库连接池和相关驱动
    <!-- 数据库连接池 -->
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>druid</artifactId>
      <version>1.1.6</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>6.0.6</version>
    </dependency>
    
    • servicecomb实现的resttemplate
            <dependency>
                <groupId>org.apache.servicecomb.pack</groupId>
                <artifactId>omega-transport-resttemplate</artifactId>
                <version>0.6.0-SNAPSHOT</version>
            </dependency>
    
    • 准备alpha-server数据库脚本
    CREATE TABLE IF NOT EXISTS TxEvent (
      surrogateId bigint NOT NULL AUTO_INCREMENT,
      serviceName varchar(36) NOT NULL,
      instanceId varchar(36) NOT NULL,
      creationTime datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
      globalTxId varchar(36) NOT NULL,
      localTxId varchar(36) NOT NULL,
      parentTxId varchar(36) DEFAULT NULL,
      type varchar(50) NOT NULL,
      compensationMethod varchar(512) NOT NULL,
      expiryTime datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
      payloads blob,
      retries int(11) NOT NULL DEFAULT '0',
      retryMethod varchar(512) DEFAULT NULL,
      PRIMARY KEY (surrogateId),
      INDEX saga_events_index (surrogateId, globalTxId, localTxId, type, expiryTime),
      INDEX saga_global_tx_index (globalTxId)
    ) DEFAULT CHARSET=utf8;
    
    CREATE TABLE IF NOT EXISTS Command (
      surrogateId bigint NOT NULL AUTO_INCREMENT,
      eventId bigint NOT NULL UNIQUE,
      serviceName varchar(36) NOT NULL,
      instanceId varchar(36) NOT NULL,
      globalTxId varchar(36) NOT NULL,
      localTxId varchar(36) NOT NULL,
      parentTxId varchar(36) DEFAULT NULL,
      compensationMethod varchar(512) NOT NULL,
      payloads blob,
      status varchar(12),
      lastModified datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
      version bigint NOT NULL,
      PRIMARY KEY (surrogateId),
      INDEX saga_commands_index (surrogateId, eventId, globalTxId, localTxId, status)
    ) DEFAULT CHARSET=utf8;
    
    CREATE TABLE IF NOT EXISTS TxTimeout (
      surrogateId bigint NOT NULL AUTO_INCREMENT,
      eventId bigint NOT NULL UNIQUE,
      serviceName varchar(36) NOT NULL,
      instanceId varchar(36) NOT NULL,
      globalTxId varchar(36) NOT NULL,
      localTxId varchar(36) NOT NULL,
      parentTxId varchar(36) DEFAULT NULL,
      type varchar(50) NOT NULL,
      expiryTime datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
      status varchar(12),
      version bigint NOT NULL,
      PRIMARY KEY (surrogateId),
      INDEX saga_timeouts_index (surrogateId, expiryTime, globalTxId, localTxId, status)
    ) DEFAULT CHARSET=utf8;
    
    CREATE TABLE IF NOT EXISTS tcc_global_tx_event (
      surrogateId bigint NOT NULL AUTO_INCREMENT,
      globalTxId varchar(36) NOT NULL,
      localTxId varchar(36) NOT NULL,
      parentTxId varchar(36) DEFAULT NULL,
      serviceName varchar(36) NOT NULL,
      instanceId varchar(36) NOT NULL,
      txType varchar(12),
      status varchar(12),
      creationTime datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
      lastModified datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
      PRIMARY KEY (surrogateId),
      UNIQUE INDEX tcc_global_tx_event_index (globalTxId, localTxId, parentTxId, txType)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    CREATE TABLE IF NOT EXISTS tcc_participate_event (
      surrogateId bigint NOT NULL AUTO_INCREMENT,
      serviceName varchar(36) NOT NULL,
      instanceId varchar(36) NOT NULL,
      globalTxId varchar(36) NOT NULL,
      localTxId varchar(36) NOT NULL,
      parentTxId varchar(36) DEFAULT NULL,
      confirmMethod varchar(512) NOT NULL,
      cancelMethod varchar(512) NOT NULL,
      status varchar(50) NOT NULL,
      creationTime datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
      lastModified datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
      PRIMARY KEY (surrogateId),
      UNIQUE INDEX tcc_participate_event_index (globalTxId, localTxId, parentTxId)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    CREATE TABLE IF NOT EXISTS tcc_tx_event (
      surrogateId bigint NOT NULL AUTO_INCREMENT,
      globalTxId varchar(36) NOT NULL,
      localTxId varchar(36) NOT NULL,
      parentTxId varchar(36) DEFAULT NULL,
      serviceName varchar(36) NOT NULL,
      instanceId varchar(36) NOT NULL,
      methodInfo varchar(512) NOT NULL,
      txType varchar(12),
      status varchar(12),
      creationTime datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
      lastModified datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
      PRIMARY KEY (surrogateId),
      UNIQUE INDEX tcc_tx_event_index (globalTxId, localTxId, parentTxId, txType)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    CREATE TABLE IF NOT EXISTS master_lock (
      serviceName varchar(36) not NULL,
      expireTime datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
      lockedTime datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
      instanceId  varchar(255) not NULL,
      PRIMARY KEY (serviceName)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    • 微服务sql脚本
CREATE TABLE `booking` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(60) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `phone` varchar(60) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `price` double DEFAULT NULL,
  `uuid` varchar(60) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;


CREATE TABLE `carbooking` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(60) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `amount` int(11) DEFAULT NULL,
  `confirmed` tinyint(1) DEFAULT NULL,
  `cancelled` tinyint(1) DEFAULT NULL,
  `uuid` varchar(60) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

CREATE TABLE `hotelbooking` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(60) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `amount` int(11) DEFAULT NULL,
  `confirmed` tinyint(4) DEFAULT NULL,
  `cancelled` tinyint(4) DEFAULT NULL,
  `uuid` varchar(60) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;


booking service

预定相关服务,需要操作car和hotel具有分布式事务使用场景。预定car和hotel结果需要一致性(都成功or都失败)

  • 编写提供外部访问的Controller层
            @SagaStart //标志这里是全局事务的开始
        @PostMapping("/booking/{name}/{rooms}/{cars}")
        public String order(@PathVariable String name, @PathVariable Integer rooms,
                            @PathVariable Integer cars) throws Throwable {
    
            if (cars < 0) {
                throw new Exception("The cars order quantity must be greater than 0");
            }
                    //第一步本地事务方法
            saveBooking();
    
                    //第二步car服务
            postCarBooking(name, cars);
    
            if (rooms < 0) {
                throw new Exception("The rooms order quantity must be greater than 0");
            }
            //调用hotel服务
            postHotelBooking(name, rooms);
            return name + " booking " + rooms + " rooms and " + cars + " cars OK";
        }
    
  • 第一步saveBooking方法操作本地Service
      @Transactional
        @Compensable(compensationMethod = "cancel")//开启子事务,并提供cancel补偿方法
      @Override
        public boolean booking(Booking booking) {
    
            Assert.notNull(booking, "booking is not null");
            Assert.hasLength(booking.getPhone(), "phone is not null");
            //and so on ...
    
            bookingRepository.save(booking);
    
            return true;
        }
    
        @Transactional
        @Override
        public boolean cancel(Booking booking) {
            List<Booking> bookings = bookingRepository.findByUuid(booking.getUuid());
            if (bookings != null && bookings.size() > 0) {
                bookingRepository.delete(bookings.get(0));
            }
            return true;
        }
    

    注意:这里的cancel方法前面必须和booking方法签名一致,被标注@Compensable方法会被omega进行拦截并根据签名和参数产生事务上下文通过grpc发送alpha持久化。当需要进行事务补偿时候alpha异步调用cancel补偿方法进行调用并注入之前的事务上下文。

  • 第二步调用car服务

        private void postCarBooking(String name, Integer cars) {
            template.postForEntity(
                    carServiceUrl + "/order/{name}/{cars}",
                    null, String.class, name, cars);
        }
    
  • 第三步调用hotel服务
     template.postForEntity(
                    hotelServiceUrl + "/order/{name}/{rooms}",
                    null, String.class, name, rooms);
    
    • 配置文件application.yaml
    spring:
    application:
      name: booking
    cloud:
      consul:
        enabled: false
      zookeeper:
        enabled: false
      nacos:
        discovery:
          enabled: false
    alpha:
    cluster:
      address: alpha-server.servicecomb.io:8080
    car:
    service:
      address: http://car.servicecomb.io:8082
    
    hotel:
    service:
      address: http://hotel.servicecomb.io:8083
    
    server:
    port: 8081
    

car service

预定car服务

  • 编写rest api接口Controller
    @PostMapping("/order/{name}/{cars}")
      CarBooking order(@PathVariable String name, @PathVariable Integer cars) {
        CarBooking booking = new CarBooking();
        booking.setId(id.incrementAndGet());
        booking.setName(name);
        booking.setAmount(cars);
        booking.setUuid(UUID.randomUUID().toString());
        carService.bookingCar(booking);
        return booking;
      }
    
  • Service逻辑
    @Transactional
        @Override
        @Compensable(compensationMethod = "cancel")//开启子事务,并提供cancel补偿方法
        public void bookingCar(CarBooking booking) {
            if (booking.getAmount() > 10) {
                throw new IllegalArgumentException("can not order the cars large than ten");
            }
            booking.setId(null);
            booking.confirm();
            carBookingRepository.save(booking);
        }
    
        @Transactional
        @Override
        public void cancel(CarBooking booking) {
            List<CarBooking> cars = carBookingRepository.findByUuid(booking.getUuid());
            if (cars != null && cars.size()>0) {
                CarBooking car = cars.get(0);
                carBookingRepository.delete(car);
            }
        }
    

    注意:这里的cancel方法前面必须和booking方法签名一致,被标注@Compensable方法会被omega进行拦截并根据签名和参数产生事务上下文通过grpc发送alpha持久化。当需要进行事务补偿时候alpha异步调用cancel补偿方法进行调用并注入之前的事务上下文。

  • 配置文件application.yaml

    spring:
    application:
      name: car
    cloud:
      consul:
        enabled: false
      zookeeper:
        enabled: false
      nacos:
        discovery:
          enabled: false
    alpha:
    cluster:
      address: alpha-server.servicecomb.io:8080
    server:
    port: 8082
    

hotel service

预定hotel服务

  • 编写rest api接口Controller
      @PostMapping("/order/{name}/{rooms}")
      HotelBooking order(@PathVariable String name, @PathVariable Integer rooms) {
        HotelBooking booking = new HotelBooking();
        booking.setId(id.incrementAndGet());
        booking.setName(name);
        booking.setAmount(rooms);
        hotelService.order(booking);
        return booking;
      }
    
  • Service本地事务接口
        @Transactional
        @Compensable(compensationMethod = "cancel")//开启子事务,并提供cancel补偿方法
        @Override
        public void order(HotelBooking booking) {
            if (booking.getAmount() > 2) {
                throw new IllegalArgumentException("can not order the rooms large than two");
            }
            booking.setId(null);
            booking.confirm();
            booking.setUuid(UUID.randomUUID().toString());
            hotelRepository.save(booking);
        }
    
        @Transactional
        @Override
        public void cancel(HotelBooking booking) {
            List<HotelBooking> hotelBookings = hotelRepository.findByUuid(booking.getUuid());
            if (hotelBookings != null && hotelBookings.size() > 0) {
                hotelRepository.deleteAll(hotelBookings);
            }
        }
    

    注意:这里的cancel方法前面必须和booking方法签名一致,被标注@Compensable方法会被omega进行拦截并根据签名和参数产生事务上下文通过grpc发送alpha持久化。当需要进行事务补偿时候alpha异步调用cancel补偿方法进行调用并注入之前的事务上下文。

    • 配置文件application.yaml
    spring:
    application:
      name: hotel
    cloud:
      consul:
        enabled: false
      zookeeper:
        enabled: false
      nacos:
        discovery:
          enabled: false
    alpha:
    cluster:
      address: alpha-server.servicecomb.io:8080
    server:
    port: 8083
    

Alpha service

ServiceComb pack 协调服务、事务上下文持久化等

直接在源码中找到启动类启动Alpha service或者使用jar启动,在源码中启动的时候增加一个启动参数

-Dspring.profiles.active=mysql使用mysql数据库

alpha

测试

  • 场景一 car service和hotel service服务以及本地服务调用全部成功

    http://127.0.0.1:8081/booking/ouwen/1/2

  • 场景二 car service服务调用失败,booking service事务回滚

    http://127.0.0.1:8081/booking/ouwen/1/20

  • 场景三 hotel service服务调用失败 bookng service 和car service服务事务回滚

    http://127.0.0.1:8081/booking/ouwen/10/2

相关资料

参考文章:

https://docs.servicecomb.io/saga

示例代码:

https://gitee.com/newitman/itman-blog.git

关注我

在这里插入图片描述

0
青年IT男

个人从事金融行业,就职过易极付、思建科技等重庆一流技术团队,目前就职于某网约车平台负责整个支付系统建设。自身对金融行业有强烈的爱好。同时也实践大数据、数据存储、自动化集成和部署、分布式微服务、响应式编程、人工智能等领域。

评论已关闭。

This site is protected by wp-copyrightpro.com