成人在线亚洲_国产日韩视频一区二区三区_久久久国产精品_99国内精品久久久久久久

您的位置:首頁技術文章
文章詳情頁

.NET中的MassTransit分布式應用框架詳解

瀏覽:300日期:2022-06-09 11:49:11
目錄
  • 引言
  • 快速體驗
  • 核心概念
  • Message
  • Producer
    • 發送命令
    • 發布事件
  • Consumer
    • 無狀態消費者
    • 有狀態消費者
  • 應用場景

    MassTransit是一款優秀的分布式應用框架,可作為分布式應用的消息總線,也可以用作單體應用的事件總線。

    引言

    A free, open-source distributed application framework for .NET.
    一個免費、開源的.NET 分布式應用框架。-- MassTransit 官網

    快速體驗

    空口無憑,創建一個項目快速體驗一下。

    • 基于worker模板創建一個基礎項目:dotnet new worker -n MassTransit.Demo
    • 打開項目,添加NuGet包:MassTransit
    • 定義訂單創建事件消息契約:
    using System;namespace MassTransit.Demo{    public record OrderCreatedEvent    {public Guid OrderId { get; set; }    }}

    4.修改Worker類,發送訂單創建事件:

    namespace MassTransit.Demo;public class Worker : BackgroundService{    readonly IBus _bus;//注冊總線    public Worker(IBus bus)    {_bus = bus;    }    protected override async Task ExecuteAsync(CancellationToken stoppingToken)    {while (!stoppingToken.IsCancellationRequested){    //模擬并發送訂單創建事件    await _bus.Publish(new OrderCreatedEvent(Guid.NewGuid()), stoppingToken);    await Task.Delay(1000, stoppingToken);}    }}

    5.僅需實現IConsumer<OrderCreatedEvent>泛型接口,即可實現消息的訂閱:

    public class OrderCreatedEventConsumer: IConsumer<OrderCreatedEvent>{    private readonly ILogger<OrderCreatedEventConsumer> _logger;    public OrderCreatedEventConsumer(ILogger<OrderCreatedEventConsumer> logger)    {_logger = logger;    }    public Task Consume(ConsumeContext<OrderCreatedEvent> context)    {_logger.LogInformation($"Received Order:{context.Message.OrderId}");return Task.CompletedTask;    }}

    6.注冊服務:

    using MassTransit;using MassTransit.Demo;IHost host = Host.CreateDefaultBuilder(args)    .ConfigureServices(services =>    {services.AddHostedService<Worker>();services.AddMassTransit(configurator =>{    //注冊消費者    configurator.AddConsumer<OrderCreatedEventConsumer>();    //使用基于內存的消息路由傳輸    configurator.UsingInMemory((context, cfg) =>    {cfg.ConfigureEndpoints(context);    });});    })    .Build();await host.RunAsync();

    7.運行項目,一個簡單的進程內事件發布訂閱的應用就完成了。

    如果需要使用RabbitMQ 消息代理進行消息傳輸,則僅需安裝MassTransit.RabbitMQNuGet包,然后指定使用RabbitMQ 傳輸消息即可。

    using MassTransit;using MassTransit.Demo;IHost host = Host.CreateDefaultBuilder(args)    .ConfigureServices(services =>    {services.AddHostedService<Worker>();services.AddMassTransit(configurator =>{    configurator.AddConsumer<OrderCreatedEventConsumer>();        // configurator.UsingInMemory((context, cfg) =>    // {    //     cfg.ConfigureEndpoints(context);    // });        configurator.UsingRabbitMq((context, cfg) =>    {cfg.Host(    host: "localhost",    port: 5672,    virtualHost: "/",    configure: hostConfig =>    {hostConfig.Username("guest");hostConfig.Password("guest");    });cfg.ConfigureEndpoints(context);    });});    })    .Build();await host.RunAsync();

    運行項目,MassTransit會自動在指定的RabbitMQ上創建一個類型為fanoutMassTransit.Demo.OrderCreatedEventExchange和一個與OrderCreatedEvent同名的隊列進行消息傳輸,如下圖所示。

    核心概念

    MassTranist 為了實現消息代理的透明化和應用間消息的高效傳輸,抽象了以下概念,其中消息流轉流程如下圖所示:

    • Message:消息契約,定義了消息生產者和消息消費者之間的契約。
    • Producer:生產者,發送消息的一方都可以稱為生產者。
    • SendEndpoint:發送端點,用于將消息內容序列化,并發送到傳輸模塊。
    • Transport:傳輸模塊,消息代理透明化的核心,用于和消息代理通信,負責發送和接收消息。
    • ReceiveEndpoint:接收端點,用于從傳輸模塊接收消息,反序列化消息內容,并將消息路由到消費者。
    • Consumer:消費者,用于消息消費。

    從上圖可知,本質上還是發布訂閱模式的實現,接下來就核心概念進行詳解。

    Message

    Message:消息,可以使用class、interface、struct和record來創建,消息作為一個契約,需確保創建后不能篡改,因此應只保留只讀屬性且不應包含方法和行為。MassTransit使用的是包含命名空間的完全限定名即typeof(T).FullName來表示特定的消息類型。因此若在另外的項目中消費同名的消息類型,需確保消息的命名空間相同。另外需注意消息不應繼承,以避免發送基類消息類型造成的不可預期的結果。為避免此類情況,官方建議使用接口來定義消息。在MassTransit中,消息主要分為兩種類型:

    • Command:命令,用于告訴服務做什么,命令被發送到指定端點,僅被一個服務接收并執行。一般以動名詞結構命名,如:UpdateAddress、CancelOrder。
    • Event:事件,用于告訴服務什么發生了,事件被發布到多個端點,可以被多個服務消費。 一般以過去式結構命名,如:AddressUpdated,OrderCanceled。

    經過MassTransit發送的消息,會使用信封包裝,包含一些附加信息,數據結構舉例如下:

    {    "messageId": "6c600000-873b-00ff-9a8f-08da8da85542",    "requestId": null,    "correlationId": null,    "conversationId": "6c600000-873b-00ff-9526-08da8da85544",    "initiatorId": null,    "sourceAddress": "rabbitmq://localhost/THINKPAD_MassTransitDemo_bus_ptoyyyr88cyx9s1gbdpe5kniy1?temporary=true",    "destinationAddress": "rabbitmq://localhost/MassTransit.Demo:OrderCreatedEvent",    "responseAddress": null,    "faultAddress": null,    "messageType": ["urn:message:MassTransit.Demo:OrderCreatedEvent"    ],    "message": {"orderId": "fd8a3598-4c3a-4ec9-bbf9-d5f508e1a0d8"    },    "expirationTime": null,    "sentTime": "2022-09-03T12:32:15.0796943Z",    "headers": {},    "host": {"machineName": "THINKPAD","processName": "MassTransit.Demo","processId": 24684,"assembly": "MassTransit.Demo","assemblyVersion": "1.0.0.0","frameworkVersion": "6.0.5","massTransitVersion": "8.0.6.0","operatingSystemVersion": "Microsoft Windows NT 10.0.19044.0"    }}

    從以上消息實例中可以看出一個包裝后的消息包含以下核心屬性:

    • messageId:全局唯一的消息ID
    • messageType:消息類型
    • message:消息體,也就是具體的消息實例
    • sourceAddress:消息來源地址
    • destinationAddress:消息目標地址
    • responseAddress:響應地址,在請求響應模式中使用
    • faultAddress:消息異常發送地址,用于存儲異常消費消息
    • headers:消息頭,允許應用自定義擴展信息
    • correlationId:關聯Id,在Saga狀態機中會用到,用來關聯系列事件
    • host:宿主,消息來源應用的宿主信息

    Producer

    Producer,生產者,即用于生產消息。在MassTransit主要借助以下對象進行命令的發送和事件的發布。

    從以上類圖可以看出,消息的發送主要核心依賴于兩個接口:

    • ISendEndpoint:提供了Send方法,用于發送命令。
    • IPublishEndpoint:提供了Publish方法,用于發布事件。

    但基于上圖的繼承體系,可以看出通過IBus、ISendEndpointProviderConsumeContext進行命令的發送;通過IBusIPublishEndpointProvider進行事件的發布。具體舉例如下:

    發送命令

    1.通過IBus發送:

    private readonly IBus _bus;public async Task Post(CreateOrderRequest request){    //通過以下方式配置對應消息類型的目標地址    EndpointConvention.Map<CreateOrderRequest>(new Uri("queue:create-order"));    await _bus.Send(request);}

    2.通過ISendEndpointProvider發送:

    private readonly ISendEndpointProvider  _sendEndpointProvider;public async Task Post(CreateOrderRequest request){    var serviceAddress = new Uri("queue:create-order");    var endpoint = await _sendEndpointProvider.GetSendEndpoint(serviceAddress);    await endpoint.Send(request);}

    3.通過ConsumeContext發送:

    public class CreateOrderRequestConsumer:IConsumer<CreateOrderRequest>{        public async Task Consume(ConsumeContext<CreateOrderRequest> context)    {    	//do something elsevar destinationAddress = new Uri("queue:lock-stock");var command = new LockStockRequest(context.Message.OrderId);       await context.Send<LockStockRequest>(destinationAddress, command); 		// 也可以通過獲取`SendEndpoint`發送命令// var endpoint = await context.GetSendEndpoint(destinationAddress);// await endpoint.Send<LockStockRequest>(command);    	    }}

    發布事件

    1.通過IBus發布:

    private readonly IBus _bus;public async Task Post(CreateOrderRequest request){    //do something    await _bus.Publish(request);}

    2.通過IPublishEndpoint發布:

    private readonly IPublishEndpoint _publishEndpoint;public async Task Post(CreateOrderRequest request){    //do something    var order = CreateOrder(request);    await _publishEndpoint.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));}

    3.通過ConsumeContext發布:

    public class CreateOrderRequestConsumer: IConsumer<CreateOrderRequest>{        public async Task Consume(ConsumeContext<CreateOrderRequest> context)    {var order = CreateOrder(conext.Message);    	await context.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));    }}

    Consumer

    Consumer,消費者,即用于消費消息。MassTransit 包括多種消費者類型,主要分為無狀態和有狀態兩種消費者類型。

    無狀態消費者

    無狀態消費者,即消費者無狀態,消息消費完畢,消費者就釋放。主要的消費者類型有:IConsumer<TMessage>、JobConsumer、IActivityRoutingSlip等。其中IConsumer<TMessage>已經在上面的快速體驗部分舉例說明。而JobConsumer<TMessage>主要是對IConsumer<TMessage>的補充,其主要應用場景在于執行耗時任務。
    而對于IActivityRoutingSlip則是MassTransit Courier的核心對象,主要用于實現Saga模式的分布式事務。MassTransit Courier 實現了Routing Slip模式,通過按需有序組合一系列的Activity,得到一個用來限定消息處理順序的Routing Slip。而每個Activity的具體抽象就是IActivityIExecuteActivity。二者的差別在于IActivity定義了ExecuteCompensate兩個方法,而IExecuteActivitiy僅定義了Execute方法。其中Execute代表正向操作,Compensate代表反向補償操作。用一個簡單的下單流程:創建訂單->扣減庫存->支付訂單舉例而言,其示意圖如下所示。而對于具體實現,可參閱文章:AspNetCore&MassTransit Courier實現分布式事務

    有狀態消費者

    有狀態消費者,即消費者有狀態,其狀態會持久化,代表的消費者類型為MassTransitStateMachine。MassTransitStateMachineMassTransit Automatonymous 庫定義的,Automatonymous 是一個.NET 狀態機庫,用于定義狀態機,包括狀態、事件和行為。MassTransitStateMachine就是狀態機的具體抽象,可以用其編排一系列事件來實現狀態的流轉,也可以用來實現Saga模式的分布式事務。并支持與EF Core和Dapper集成將狀態持久化到關系型數據庫,也支持將狀態持久化到MongoDB、Redis等數據庫。MassTransitStateMachine對于Saga模式分布式事務的實現方式與RoutingSlip不同,還是以簡單的下單流程:創建訂單->扣減庫存->支付訂單舉例而言,其示意圖如下所示?;?code>MassTransitStateMachine 實現分布式事務詳參后續文章。

    從上圖可知,通過MassTransitStateMachine可以將事件的執行順序邏輯編排在一個集中的狀態機中,通過發送命令和訂閱事件來推動狀態流轉,而這也正是Saga編排模式的實現。

    應用場景

    了解完MassTransit的核心概念,接下來再來看下MassTransit的核心特性以及應用場景:

    • 基于消息的請求響應模式:可用于同步通信
    • Mediator模式:中間者模式的實現,類似MediatR,但功能更完善
    • 計劃任務:可用于執行定時任務
    • Routing Slip 模式:可用于實現Saga模式的分布式事務
    • Saga 狀態機:可用于實現Saga模式的分布式事務
    • 本地消息表:類似DotNetCore.Cap,用于實現最終一致性

    總體而言,MassTransit是一款優秀的分布式應用框架,可作為分布式應用的消息總線,也可以用作單體應用的事件總線。感興趣的朋友不妨一觀。

    到此這篇關于MassTransit 中的.NET 分布式應用框架的文章就介紹到這了,更多相關.NET 分布式應用框架內容請搜索以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持!

    標簽: ASP.NET
    成人在线亚洲_国产日韩视频一区二区三区_久久久国产精品_99国内精品久久久久久久
    精品视频一区三区九区| 欧美一级在线观看| 99久久精品国产导航| 国产成人av一区二区| 国产综合久久久久久鬼色 | 久久久久久久久久电影| 亚洲成人av一区二区三区| 色婷婷国产精品久久包臀| 丁香五精品蜜臀久久久久99网站| 国产麻豆午夜三级精品| 国内精品嫩模私拍在线| 高清不卡在线观看| 99这里都是精品| 欧美人与禽猛交乱配视频| 欧美日本免费| 羞羞答答国产精品www一本| 亚洲一区二区三区精品在线观看| 亚洲一区自拍| 日本道色综合久久| 在线播放视频一区| 国产免费久久精品| 亚洲精品乱码久久久久久日本蜜臀| 国产精品萝li| 国产精品久久久久毛片软件| 亚洲欧洲精品一区二区精品久久久| 亚洲精品乱码久久久久久 | 91福利资源站| 精品一区二区在线看| 中文字幕一区二区三区精华液| 日韩三级精品电影久久久| 一本久久a久久精品亚洲| av成人免费观看| 亚洲高清三级视频| 日本一区二区久久| 亚洲成人免费电影| 亚洲图片激情小说| 国产一区二区免费在线| 麻豆极品一区二区三区| 99久久精品国产网站| 日韩精品影音先锋| 国产成人在线视频网址| 欧美一区二区三区四区在线观看地址 | 美女高潮久久久| 精品成人在线观看| 亚洲成人免费观看| 亚洲欧美偷拍另类a∨色屁股| 中文字幕一区二区在线观看 | 亚洲欧美一区二区久久| 日韩成人精品视频| 色综合天天综合网国产成人综合天 | 午夜精品视频一区| 色综合久久六月婷婷中文字幕| 成人黄色免费短视频| 国内激情久久| 欧美视频一区二区三区四区| 久久蜜桃av一区精品变态类天堂| 国产精品白丝在线| 亚洲人快播电影网| 欧美欧美天天天天操| 日韩视频一区二区在线观看| 色欧美日韩亚洲| 亚洲国产婷婷香蕉久久久久久99| 成人精品在线视频观看| 亚洲欧美日韩另类精品一区二区三区 | 国产91精品一区二区麻豆网站| 国产综合一区二区| 99re成人精品视频| 久久一综合视频| 欧美变态tickle挠乳网站| 91丨九色丨蝌蚪富婆spa| 麻豆国产精品777777在线| 亚洲国产合集| 日韩网站在线| 欧美成人a∨高清免费观看| 日本精品一区二区三区高清 | 欧美午夜免费电影| 欧美日韩一区二区高清| 久久av一区二区三区亚洲| 91精品国产综合久久蜜臀| 日韩欧美一二三| 亚洲天堂免费看| 国模大尺度一区二区三区| 国产欧美日韩一区二区三区在线| 91精品国产综合久久久久| 日本 国产 欧美色综合| 久久精品九九| 男女激情视频一区| 在线观看视频91| 亚洲免费看黄网站| 欧美日韩视频| 中文字幕欧美国产| voyeur盗摄精品| 精品久久一区二区三区| 日韩一区精品视频| 亚洲视频大全| 国产精品久久久久久福利一牛影视| 丁香网亚洲国际| 精品免费国产一区二区三区四区| 精品一区二区三区的国产在线播放 | 一区在线播放视频| 欧美精品一区二区三区四区| 欧美色图一区二区三区| 午夜在线成人av| 在线视频国内自拍亚洲视频| 久久久久久久久久久久久夜| 在线视频国产一区| 日韩色在线观看| 日韩欧美一区二区视频| 欧美网站在线| 亚洲成人av一区二区三区| 欧美一区二区三区在线观看| 国产乱淫av一区二区三区| 91精品国产麻豆国产自产在线 | 久久网站最新地址| 国产一区二区在线免费观看| 欧美日韩免费不卡视频一区二区三区 | 精品国产自在久精品国产| 99久久777色| 中文字幕亚洲一区二区av在线| 亚洲国产一区二区在线| 尤物av一区二区| 欧美亚洲综合色| 不卡电影一区二区三区| 亚洲欧美在线观看| 麻豆av福利av久久av| 七七婷婷婷婷精品国产| 日韩视频在线一区二区| 国产精品播放| 日韩成人午夜电影| 久久伊人蜜桃av一区二区| 在线看无码的免费网站| 日韩国产欧美三级| 精品少妇一区二区三区免费观看| 午夜精品视频| 首页综合国产亚洲丝袜| 日韩三级免费观看| 在线视频免费在线观看一区二区| 蜜臀av性久久久久蜜臀av麻豆| 欧美sm极限捆绑bd| 亚洲视频播放| 免费日本视频一区| 国产精品福利一区| 亚洲福利视频三区| 丝袜亚洲精品中文字幕一区| 国产清纯白嫩初高生在线观看91 | 久久综合九色综合97婷婷| 久久综合狠狠| 欧美精品一区二区三区蜜桃视频 | 一本久道久久久| 国内成人自拍视频| 国产精品色在线观看| 色天使色偷偷av一区二区| 91亚洲大成网污www| 亚洲gay无套男同| 精品不卡一区| 国产一区二区三区四区五区美女| 一色桃子久久精品亚洲| 555www色欧美视频| 国产亚洲精品自拍| 不卡的av在线播放| 天天综合网天天综合色| 国产欧美精品一区二区色综合| 日本韩国欧美在线| 亚洲大胆视频| 97久久精品人人做人人爽50路| 天天色 色综合| 国产精品麻豆视频| 欧美tk丨vk视频| 在线欧美一区二区| 国产视频一区欧美| 欧美xxx在线观看| 国产一区二区三区最好精华液| 亚洲一区二区三区精品在线| 中文字幕免费不卡| 日韩三区在线观看| 欧美日韩欧美一区二区| 新狼窝色av性久久久久久| 午夜久久久久| 成人a区在线观看| 国产一区二区0| 美美哒免费高清在线观看视频一区二区| 国产精品九色蝌蚪自拍| www国产成人免费观看视频 深夜成人网| 欧美日韩一区二区三区免费看| 国产欧美日韩综合一区在线播放 | 色婷婷久久一区二区三区麻豆| 最新亚洲激情| 精品91视频| 伊人久久婷婷色综合98网| 女人天堂亚洲aⅴ在线观看| 成人avav影音| 成av人片一区二区| 成人污视频在线观看| 精品一区二区三区视频| 免费人成精品欧美精品| 日韩高清一区二区| 免费观看在线综合色| 日本午夜精品一区二区三区电影| 亚洲综合一区在线| 久久综合999|