海安零距离 海安论坛 海安新闻 海安

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 2142|回复: 0

Dapr Pub/Sub 集成 RabbitMQ 、Golang、Java、DotNet Core

[复制链接]

6234

主题

6234

帖子

1万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
18716
发表于 2019-12-27 15:08 | 显示全部楼层 |阅读模式
前置条件:
《Dapr运用》
《Dapr 运用之 Java gRPC 调用篇》
《Dapr 运用之集成 Asp.Net Core Grpc 调用篇》

  • 搭建 RabbitMQ

    • Docker 搭建 RabbitMQ 服务
      1. docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
      复制代码
    • 创建 rabbiqmq.yaml
      1. apiVersion: dapr.io/v1alpha1kind: Componentmetadata:name: messagebusspec:type: pubsub.rabbitmqmetadata:- name: host    value: "amqp://localhost:5672" # Required. Example: "rabbitmq.default.svc.cluster.local:5672"- name: consumerID    value: "61415901178272324029" # Required. Any unique ID. Example: "myConsumerID"- name: durable    value: "true" # Optional. Default: "false"- name: deletedWhenUnused    value: "false" # Optional. Default: "false"- name: autoAck    value: "false" # Optional. Default: "false"- name: deliveryMode    value: "2" # Optional. Default: "0". Values between 0 - 2.- name: requeueInFailure    value: "true" # Optional. Default: "false".
      复制代码

  • 改造 StorageService.Api
    目标:把 StorageService 从 Grpc 客户端改造为 Grpc 服务端,并 Sub Storage.Reduce 主题,完成减库存操作。

    • 删除 Storage 中无用的代码 StorageController.cs
    • 修改 Program.cs 中的 CreateHostBuilder 代码为
      1. public static IHostBuilder CreateHostBuilder(string[] args){    return Host.CreateDefaultBuilder(args)        .ConfigureWebHostDefaults(webBuilder =>        {            webBuilder.ConfigureKestrel(options =>            {                options.Listen(IPAddress.Loopback, 5003, listenOptions =>                {                    listenOptions.Protocols = HttpProtocols.Http2;                });            });            webBuilder.UseStartup();        });}
      复制代码
    • 添加 DaprClientService
      1. public sealed class DaprClientService : DaprClient.DaprClientBase{    public override Task GetTopicSubscriptions(Empty request, ServerCallContext context)    {        var topicSubscriptionsEnvelope = new GetTopicSubscriptionsEnvelope();        topicSubscriptionsEnvelope.Topics.Add("Storage.Reduce");        return Task.FromResult(topicSubscriptionsEnvelope);    }}
      复制代码
      Dapr 运行时将调用此方法获取 StorageServcie 关注的主题列表
    • 修改 Startup.cs
      1. /// /// This method gets called by the runtime. Use this method to add services to the container./// /// Services.public void ConfigureServices(IServiceCollection services){    services.AddGrpc();    services.AddDbContextPool(options => { options.UseMySql(Configuration.GetConnectionString("MysqlConnection")); });}
      复制代码
      1. /// /// This method gets called by the runtime. Use this method to configure the HTTP request pipeline./// /// app./// env.public void Configure(IApplicationBuilder app, IWebHostEnvironment env){    if (env.IsDevelopment())    {        app.UseDeveloperExceptionPage();    }    app.UseRouting();    app.UseEndpoints(endpoints =>    {        endpoints.MapSubscribeHandler();        endpoints.MapGrpcService();    });}
      复制代码
    • 复制 rabbimq.yaml 文件到 components 文件夹中,删除 redis_messagebus.yaml 文件
    • 启动 StorageService 服务
      1. dapr run --app-id storageService --app-port 5003 --protocol grpc dotnet run
      复制代码

  • 利用 Java 开发一个 Order 服务端,Order 服务提供的功能为

    • 下单
    • 检察订单详情
    • 获取订单列表
    在当前上下文中着重处理的是下单功能,以及下单乐成后 Java 服务端将发布一个事件到 Storage.Reduce 主题,即镌汰库存。

    • 创建 CreateOrder.proto 文件
      1. syntax = "proto3";package daprexamples;option java_outer_classname = "CreateOrderProtos";option java_package = "generate.protos";service OrderService {    rpc CreateOrder (CreateOrderRequest) returns (CreateOrderResponse);    rpc RetrieveOrder(RetrieveOrderRequest) returns(RetrieveOrderResponse);    rpc GetOrderList(GetOrderListRequest) returns(GetOrderListResponse);}message CreateOrderRequest {    string ProductID = 1; //Product ID    int32 Amount=2; //Product Amount    string CustomerID=3; //Customer ID}message CreateOrderResponse {    bool Succeed = 1; //Create Order Result,true:success,false:fail}message RetrieveOrderRequest{    string OrderID=1;}message RetrieveOrderResponse{    Order Order=1;}message GetOrderListRequest{    string CustomerID=1;}message GetOrderListResponse{    repeated Order Orders=1;}message Order{    string ID=1;    string ProductID=2;    int32 Amount=3;    string CustomerID=4;}
      复制代码
    • 利用 protoc 天生 Java 代码
      1. protoc -I=C:\Users\JR\DaprDemos\java\examples\src\main\protos\examples --java_out=C:\Users\JR\DaprDemos\java\examples\src\main\java  C:\Users\JR\DaprDemos\java\examples\src\main\protos\examples\CreateOrder.proto
      复制代码
    • 引用 MyBatis 做为 Mapper 工具
    • 修改 HelloWorldService.java 文件,提取 GrpcHelloWorldDaprService.java 到单独的包中,在此文件中添加 createOrder() 、 getOrderList() 、 retrieveOrder() 三个函数的实现
    • 复制 rabbimq.yaml 文件到 components 文件夹中,删除原有 redis_messagebus.yaml 文件
    • 启动 OrderService 服务
      1. dapr run --app-id OrderService --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=server.HelloWorldService -Dexec.args="-p 5000"
      复制代码

  • 创建 Golang Grpc 客户端,该客户端须要完成创建订单 Grpc 调用,订单创建乐成发布扣除库存事件

    • 引用 CreateOrder.proto 文件,并天生 CreateOrder.pb.go 文件
      如未安装 protoc-gen-gogo ,通过一下下令获取并安装
      1. go get github.com/gogo/protobuf/gogoproto
      复制代码
      安装 protoc-gen-gogo
      1. go install github.com/gogo/protobuf/gogoproto
      复制代码
      根据 proto 文件天生代码
      1. protoc -I C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\CreateOrder.proto --go_out=plugins=grpc:C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\
      复制代码
    • 客户端代码,创建订单
      1. ... response, err := client.InvokeService(context.Background(), &pb.InvokeServiceEnvelope{    Id:     "OrderService",    Data:   createOrderRequestData,    Method: "createOrder",    })    if err != nil {        fmt.Println(err)        return    }...
      复制代码
    • 添加 DataToPublish.proto 文件,此文件作为事件发布数据结构
      1. syntax = "proto3";package daprexamples;option java_outer_classname = "DataToPublishProtos";option java_package = "generate.protos";message StorageReduceData {    string ProductID = 1;    int32 Amount=2;}
      复制代码
    • 天生 DataToPublish 代码
      1. protoc -I C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\DataToPublish.proto --go_out=plugins=grpc:C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\
      复制代码
    • 修改 main.go 代码,根据 createOrder 效果判定是否要发布信息到消息队列
      1. ...createOrderResponse := &daprexamples.CreateOrderResponse{}if err := proto.Unmarshal(response.Data.Value, createOrderResponse); err != nil {    fmt.Println(err)    return}fmt.Println(createOrderResponse.Succeed)if !createOrderResponse.Succeed {    //下单失败    return}storageReduceData := &daprexamples.StorageReduceData{    ProductID: createOrderRequest.ProductID,    Amount:    createOrderRequest.Amount,}storageReduceDataData, err := jsoniter.ConfigFastest.Marshal(storageReduceData) //ptypes.MarshalAny(storageReduceData)if err != nil {    fmt.Println(err)    return}_, err = client.PublishEvent(context.Background(), &pb.PublishEventEnvelope{    Topic: "Storage.Reduce",    Data:  &any.Any{Value: storageReduceDataData},})fmt.Println(storageReduceDataData)if err != nil {    fmt.Println(err)} else {    fmt.Println("Published message!")}...
      复制代码
      留意: 发送数据前,利用 jsoniter 转换数据为 json 字符串,缘故因由是假如直接传输 Grpc 流,当前版本(0.3.x) Dapr runtime 打包数据时利用 Json 打包,解包利用 String ,导致数据不同等。
    • 复制 rabbimq.yaml 文件到 components 文件夹,删除原有 redis_messagebus.yaml 文件
    • 启动 golang Grpc 客户端
      1. dapr run --app-id client go run main.go
      复制代码
      输出
      1. == APP == true== APP == Published message!
      复制代码

  • RabbitMQ

    • 在欣赏器中输入 http://localhost:15672/ ,账号和暗码均为 guest
    • 检察 Connections ,有3个毗连

      • 这个3个毗连来自配置了 messagebus.yaml 组件的三个服务

    • 检察 Exchanges
      1. Name            Type    Features    Message rate in Message rate out(AMQP default)  direct  DStorage.Reduce  fanout  Damq.direct      direct  Damq.fanout      fanout  D...
      复制代码
      着重看 Storage.Reduce ,可以看出 Dapr 运行时创建了一个 fanout 范例的 Exchange ,这表明该 Exhange 中的数据是广播的。
    • 检察 Queues
      Dapr 运行时创建了 storageService-Storage.Reduce ,该 Queue 绑定了 Storage.Reduce Exchange ,以是可以收到 Storage.Reduce 的广播数据。

  • DotNet Core StorageService.Api 改造以完成 Sub 事件

    • 打开 DaprClientService.cs 文件,更改内容为
      1. public sealed class DaprClientService : DaprClient.DaprClientBase{    private readonly StorageContext _storageContext;    public DaprClientService(StorageContext storageContext)    {        _storageContext = storageContext;    }    public override Task GetTopicSubscriptions(Empty request, ServerCallContext context)    {        var topicSubscriptionsEnvelope = new GetTopicSubscriptionsEnvelope();        topicSubscriptionsEnvelope.Topics.Add("Storage.Reduce");        return Task.FromResult(topicSubscriptionsEnvelope);    }    public override async Task OnTopicEvent(CloudEventEnvelope request, ServerCallContext context)    {        if (request.Topic.Equals("Storage.Reduce"))        {            StorageReduceData storageReduceData = StorageReduceData.Parser.ParseJson(request.Data.Value.ToStringUtf8());            Console.WriteLine("ProductID:" + storageReduceData.ProductID);            Console.WriteLine("Amount:" + storageReduceData.Amount);            await HandlerStorageReduce(storageReduceData);        }        return new Empty();    }    private async Task HandlerStorageReduce(StorageReduceData storageReduceData)    {        Guid productID = Guid.Parse(storageReduceData.ProductID);        Storage storageFromDb = await _storageContext.Storage.FirstOrDefaultAsync(q => q.ProductID.Equals(productID));        if (storageFromDb == null)        {            return;        }        if (storageFromDb.Amount < storageReduceData.Amount)        {            return;        }        storageFromDb.Amount -= storageReduceData.Amount;        Console.WriteLine(storageFromDb.Amount);        await _storageContext.SaveChangesAsync();    }
      复制代码
    • 分析

      • 添加 GetTopicSubscriptions() 将完成对主题的关注

        • 当应用制止时,RabbitMQ 中的 Queue 自动删除
        • 添加 OnTopicEvent() 重写,此方法将完成对 Sub 主题的事件处理

      • HandlerStorageReduce 用于镌汰库存


  • 启动 DotNet Core StorageService.Api Grpc 服务,启动 Java OrderService Grpc 服务,启动 Go Grpc 客户端

    • DotNet Core
      1. dapr run --app-id storageService --app-port 5003 --protocol grpc dotnet run
      复制代码
    • Java
      1. dapr run --app-id OrderService --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=server.HelloWorldService -Dexec.args="-p 5000"
      复制代码
    • go
      1. dapr run --app-id client  go run main.go
      复制代码
      go grpc 输出为
      1. == APP == true== APP == Published message!
      复制代码
    检察 MySql Storage 数据库,对应产物库存镌汰 20

至此,通过 Dapr runtime 完成了 Go 和 Java 之间的 Grpc 调用,并通过 RabbitMQ 组件完成了 Pub/Sub
源码地点

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Archiver|手机版|小黑屋|深圳论坛-深圳人的网上家园  

GMT+8, 2020-6-6 05:28 , Processed in 0.118099 second(s), 29 queries .

Powered by Discuz! X3.2

© 2001-2013 Comsenz Inc.

快速回复 返回顶部 返回列表