理想论坛_专业20年的财经股票炒股论坛交流社区 - 股票论坛

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 3047|回复: 0

Dapr Pub/Sub 集成 RabbitMQ 、Golang、Java、DotNet Core

[复制链接]

9650

主题

9650

帖子

2万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
28966
发表于 2019-12-27 14:41 | 显示全部楼层 |阅读模式
前置条件:
《Dapr应用》
《Dapr 应用之 Java gRPC 挪用篇》
《Dapr 应用之集成 Asp.Net Core Grpc 挪用篇》

  • 搭建 RabbitMQ

    • Docker 搭建 RabbitMQ 办事
      1. docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
      复制代码
    • 建立 rabbiqmq.yaml
      1. apiVersion: dapr.io/v1alpha1kind: Componentmetadata:name: messagebusspec:type: pubsub.rabbitmqmetadata:- name: host    value: "amqp://localhost:5672" # Required. Example: "rabbitmq.default.svc.cluster.local:5672"- name: consumerID    value: "61415901178272324029" # Required. Any unique ID. Example: "myConsumerID"- name: durable    value: "true" # Optional. Default: "false"- name: deletedWhenUnused    value: "false" # Optional. Default: "false"- name: autoAck    value: "false" # Optional. Default: "false"- name: deliveryMode    value: "2" # Optional. Default: "0". Values between 0 - 2.- name: requeueInFailure    value: "true" # Optional. Default: "false".
      复制代码

  • 革新 StorageService.Api
    方针:把 StorageService 从 Grpc 客户端革新为 Grpc 办事端,并 Sub Storage.Reduce 主题,完成减库存操纵。

    • 删除 Storage 中无用的代码 StorageController.cs
    • 点窜 Program.cs 中的 CreateHostBuilder 代码为
      1. public static IHostBuilder CreateHostBuilder(string[] args){    return Host.CreateDefaultBuilder(args)        .ConfigureWebHostDefaults(webBuilder =>        {            webBuilder.ConfigureKestrel(options =>            {                options.Listen(IPAddress.Loopback, 5003, listenOptions =>                {                    listenOptions.Protocols = HttpProtocols.Http2;                });            });            webBuilder.UseStartup();        });}
      复制代码
    • 增加 DaprClientService
      1. public sealed class DaprClientService : DaprClient.DaprClientBase{    public override Task GetTopicSubscriptions(Empty request, ServerCallContext context)    {        var topicSubscriptionsEnvelope = new GetTopicSubscriptionsEnvelope();        topicSubscriptionsEnvelope.Topics.Add("Storage.Reduce");        return Task.FromResult(topicSubscriptionsEnvelope);    }}
      复制代码
      Dapr 运转时将挪用此方式获得 StorageServcie 关注的主题列表
    • 点窜 Startup.cs
      1. /// /// This method gets called by the runtime. Use this method to add services to the container./// /// Services.public void ConfigureServices(IServiceCollection services){    services.AddGrpc();    services.AddDbContextPool(options => { options.UseMySql(Configuration.GetConnectionString("MysqlConnection")); });}
      复制代码
      1. /// /// This method gets called by the runtime. Use this method to configure the HTTP request pipeline./// /// app./// env.public void Configure(IApplicationBuilder app, IWebHostEnvironment env){    if (env.IsDevelopment())    {        app.UseDeveloperExceptionPage();    }    app.UseRouting();    app.UseEndpoints(endpoints =>    {        endpoints.MapSubscribeHandler();        endpoints.MapGrpcService();    });}
      复制代码
    • 复制 rabbimq.yaml 文件到 components 文件夹中,删除 redis_messagebus.yaml 文件
    • 启动 StorageService 办事
      1. dapr run --app-id storageService --app-port 5003 --protocol grpc dotnet run
      复制代码

  • 利用 Java 斥地一个 Order 办事端,Order 办事供给的功用为

    • 下单
    • 检察定单详情
    • 获得定单列表
    在当前高低文中偏重处置惩罚的是下单功用,以及下单乐成后 Java 办事端将公布一个事变到 Storage.Reduce 主题,即淘汰库存。

    • 建立 CreateOrder.proto 文件
      1. syntax = "proto3";package daprexamples;option java_outer_classname = "CreateOrderProtos";option java_package = "generate.protos";service OrderService {    rpc CreateOrder (CreateOrderRequest) returns (CreateOrderResponse);    rpc RetrieveOrder(RetrieveOrderRequest) returns(RetrieveOrderResponse);    rpc GetOrderList(GetOrderListRequest) returns(GetOrderListResponse);}message CreateOrderRequest {    string ProductID = 1; //Product ID    int32 Amount=2; //Product Amount    string CustomerID=3; //Customer ID}message CreateOrderResponse {    bool Succeed = 1; //Create Order Result,true:success,false:fail}message RetrieveOrderRequest{    string OrderID=1;}message RetrieveOrderResponse{    Order Order=1;}message GetOrderListRequest{    string CustomerID=1;}message GetOrderListResponse{    repeated Order Orders=1;}message Order{    string ID=1;    string ProductID=2;    int32 Amount=3;    string CustomerID=4;}
      复制代码
    • 利用 protoc 天生 Java 代码
      1. protoc -I=C:\Users\JR\DaprDemos\java\examples\src\main\protos\examples --java_out=C:\Users\JR\DaprDemos\java\examples\src\main\java  C:\Users\JR\DaprDemos\java\examples\src\main\protos\examples\CreateOrder.proto
      复制代码
    • 援用 MyBatis 做为 Mapper 工具
    • 点窜 HelloWorldService.java 文件,提取 GrpcHelloWorldDaprService.java 到零丁的包中,在此文件中增加 createOrder() 、 getOrderList() 、 retrieveOrder() 三个函数的实现
    • 复制 rabbimq.yaml 文件到 components 文件夹中,删除原有 redis_messagebus.yaml 文件
    • 启动 OrderService 办事
      1. dapr run --app-id OrderService --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=server.HelloWorldService -Dexec.args="-p 5000"
      复制代码

  • 建立 Golang Grpc 客户端,该客户端必要完成建立定单 Grpc 挪用,定单建立乐成公布扣除库存事变

    • 援用 CreateOrder.proto 文件,并天生 CreateOrder.pb.go 文件
      如未安装 protoc-gen-gogo ,经过一下命令获得并安装
      1. go get github.com/gogo/protobuf/gogoproto
      复制代码
      安装 protoc-gen-gogo
      1. go install github.com/gogo/protobuf/gogoproto
      复制代码
      按照 proto 文件天生代码
      1. protoc -I C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\CreateOrder.proto --go_out=plugins=grpc:C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\
      复制代码
    • 客户端代码,建立定单
      1. ... response, err := client.InvokeService(context.Background(), &pb.InvokeServiceEnvelope{    Id:     "OrderService",    Data:   createOrderRequestData,    Method: "createOrder",    })    if err != nil {        fmt.Println(err)        return    }...
      复制代码
    • 增加 DataToPublish.proto 文件,此文件作为事变公布数据结构
      1. syntax = "proto3";package daprexamples;option java_outer_classname = "DataToPublishProtos";option java_package = "generate.protos";message StorageReduceData {    string ProductID = 1;    int32 Amount=2;}
      复制代码
    • 天生 DataToPublish 代码
      1. protoc -I C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\DataToPublish.proto --go_out=plugins=grpc:C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\
      复制代码
    • 点窜 main.go 代码,按照 createOrder 结果判定能否要公布信息到消息行列
      1. ...createOrderResponse := &daprexamples.CreateOrderResponse{}if err := proto.Unmarshal(response.Data.Value, createOrderResponse); err != nil {    fmt.Println(err)    return}fmt.Println(createOrderResponse.Succeed)if !createOrderResponse.Succeed {    //下单失利    return}storageReduceData := &daprexamples.StorageReduceData{    ProductID: createOrderRequest.ProductID,    Amount:    createOrderRequest.Amount,}storageReduceDataData, err := jsoniter.ConfigFastest.Marshal(storageReduceData) //ptypes.MarshalAny(storageReduceData)if err != nil {    fmt.Println(err)    return}_, err = client.PublishEvent(context.Background(), &pb.PublishEventEnvelope{    Topic: "Storage.Reduce",    Data:  &any.Any{Value: storageReduceDataData},})fmt.Println(storageReduceDataData)if err != nil {    fmt.Println(err)} else {    fmt.Println("Published message!")}...
      复制代码
      留意: 发送数据前,利用 jsoniter 转换数据为 json 字符串,原因起因是假如间接传输 Grpc 流,当前版本(0.3.x) Dapr runtime 打包数据时利用 Json 打包,解包利用 String ,致使数据不同等。
    • 复制 rabbimq.yaml 文件到 components 文件夹,删除原有 redis_messagebus.yaml 文件
    • 启动 golang Grpc 客户端
      1. dapr run --app-id client go run main.go
      复制代码
      输出
      1. == APP == true== APP == Published message!
      复制代码

  • RabbitMQ

    • 在欣赏器中输入 http://localhost:15672/ ,账号和密码均为 guest
    • 检察 Connections ,有3个毗连

      • 这个3个毗连来自设备了 messagebus.yaml 组件的三个办事

    • 检察 Exchanges
      1. Name            Type    Features    Message rate in Message rate out(AMQP default)  direct  DStorage.Reduce  fanout  Damq.direct      direct  Damq.fanout      fanout  D...
      复制代码
      偏重看 Storage.Reduce ,可以看出 Dapr 运转时建立了一个 fanout 典范的 Exchange ,这表白该 Exhange 中的数据是广播的。
    • 检察 Queues
      Dapr 运转时建立了 storageService-Storage.Reduce ,该 Queue 绑定了 Storage.Reduce Exchange ,所以可以收到 Storage.Reduce 的广播数据。

  • DotNet Core StorageService.Api 革新以完成 Sub 事变

    • 翻开 DaprClientService.cs 文件,变动内容为
      1. public sealed class DaprClientService : DaprClient.DaprClientBase{    private readonly StorageContext _storageContext;    public DaprClientService(StorageContext storageContext)    {        _storageContext = storageContext;    }    public override Task GetTopicSubscriptions(Empty request, ServerCallContext context)    {        var topicSubscriptionsEnvelope = new GetTopicSubscriptionsEnvelope();        topicSubscriptionsEnvelope.Topics.Add("Storage.Reduce");        return Task.FromResult(topicSubscriptionsEnvelope);    }    public override async Task OnTopicEvent(CloudEventEnvelope request, ServerCallContext context)    {        if (request.Topic.Equals("Storage.Reduce"))        {            StorageReduceData storageReduceData = StorageReduceData.Parser.ParseJson(request.Data.Value.ToStringUtf8());            Console.WriteLine("ProductID:" + storageReduceData.ProductID);            Console.WriteLine("Amount:" + storageReduceData.Amount);            await HandlerStorageReduce(storageReduceData);        }        return new Empty();    }    private async Task HandlerStorageReduce(StorageReduceData storageReduceData)    {        Guid productID = Guid.Parse(storageReduceData.ProductID);        Storage storageFromDb = await _storageContext.Storage.FirstOrDefaultAsync(q => q.ProductID.Equals(productID));        if (storageFromDb == null)        {            return;        }        if (storageFromDb.Amount < storageReduceData.Amount)        {            return;        }        storageFromDb.Amount -= storageReduceData.Amount;        Console.WriteLine(storageFromDb.Amount);        await _storageContext.SaveChangesAsync();    }
      复制代码
    • 分析

      • 增加 GetTopicSubscriptions() 将完成对主题的关注

        • 当利用制止时,RabbitMQ 中的 Queue 自动删除
        • 增加 OnTopicEvent() 重写,此方式将完成对 Sub 主题的事变处置惩罚

      • HandlerStorageReduce 用于淘汰库存


  • 启动 DotNet Core StorageService.Api Grpc 办事,启动 Java OrderService Grpc 办事,启动 Go Grpc 客户端

    • DotNet Core
      1. dapr run --app-id storageService --app-port 5003 --protocol grpc dotnet run
      复制代码
    • Java
      1. dapr run --app-id OrderService --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=server.HelloWorldService -Dexec.args="-p 5000"
      复制代码
    • go
      1. dapr run --app-id client  go run main.go
      复制代码
      go grpc 输出为
      1. == APP == true== APP == Published message!
      复制代码
    检察 MySql Storage 数据库,对应产物库存淘汰 20

至此,经过 Dapr runtime 完成了 Go 和 Java 之间的 Grpc 挪用,并经过 RabbitMQ 组件完成了 Pub/Sub
源码地址

免责声明:假如加害了您的权益,请联系站长,我们会实时删除侵权内容,感谢合作!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Archiver|手机版|小黑屋|理想论坛_专业20年的财经股票炒股论坛交流社区 - 股票论坛

GMT+8, 2020-7-7 08:40 , Processed in 0.167664 second(s), 28 queries .

Powered by Discuz! X3.2

© 2001-2013 Comsenz Inc.

快速回复 返回顶部 返回列表