博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
我的WCF之旅(13):创建基于MSMQ的Responsive Service
阅读量:6843 次
发布时间:2019-06-26

本文共 12588 字,大约阅读时间需要 41 分钟。

一、One-way MEP V.S. Responsible Service

我们知道MSMQ天生就具有异步的特性,它只能以One-way的MEP(Message Exchange Pattern)进行通信。Client和Service之间采用One-way MEP的话就意味着Client调用Service之后立即返回,它无法获得Service的执行结果,也无法捕捉Service运行的Exception。下图简单表述了基于MSMQ的WCF Service中Client和Service的交互。

但是在有些场景 中,这是无法容忍的。再拿我在上一篇文章的Order Delivery的例子来说。Client向Service提交了Order,却无法确认该Order是否被Service正确处理,这显然是不能接受的。我们今天就来讨论一下,如何创建一个Responsive Service来解决这个问题:Client不再是对Service的执行情况一无所知,它可以获知Order是否被Service正确处理了。

二、 Solution

虽然我们的目的很简单:当Client向Service递交了Order之后,能以某种方式获知Order的执行结果;对于Service端来说,在正确把Order从Message Queue中获取出来、并正确处理之后,能够向Order的递交者发送一个Acknowledge Message。为了简单起见,这个Acknowledge Message包含两组信息:

  • Order No.: 被处理的Order的一个能够为一标志它的ID。
  • Exception: 如果处理失败的Exception,如果成功处理为null。

要在WCF中实现这样的目的,对于Request/Reply MEP来说是简单而直接的:Client向Service递交Order,并等待Service的Response,Service在处理接收到Order之后直接将处理结果 返回给Client就可以了。但是我们说过MSMQ天生就是异步的,我们只有采取一种间接的方式实现“曲线救国”。

我们的解决方案是:在每个Client Domain也创建一个基于MSMQ的本地的WCF Service,用于接收来自Order处理端发送的Acknowledge Message。对于处理Order 的Service来说,在正确处理Order之后,想对应的Client发送Acknowledge Message。下图简单演示整个过程:

三、Implementation

了解了上面的Solution之后,我们来看看该Solution在真正实现过程中有什么样的困难。对于处理Order的Service来说,在向Client端发送Acknowledge Message的时候,它必须要知道该Order对应的Client的Response Service的MSMQ的Address以及其他和Operation相关的Context信息(在这里我们不需要,不过考虑到扩展性,我们把包括了address的Context的信息 封装到一个了Class中,在这里叫做:OrderResponseContext)。而这些Context却不能在Configuration中进行配置,因为他可以同时面临着很多个Client:比如每个Client用于接收Response 的Message Queue的address都不一样。所以这个OrderResponseContext必须通过对应的Client来提供。基于此,我们具有两面两种解决方式:

方式一、修改Service Contract,把OrderResponseContext当成是Operation的一个参数

这是我们最容易想到的,比如我们原来的Operation这样定义:

namespace Artech.ResponsiveQueuedService.Contract
ExpandedBlockStart.gif {
    [ServiceContract]
    [ServiceKnownType(
typeof(Order))]
    
public 
interface IOrderProcessor
ExpandedSubBlockStart.gif    {
        [OperationContract(IsOneWay = 
true)]
        
void Submit(Order order);
    }
}

现在变成:

namespace Artech.ResponsiveQueuedService.Contract
ExpandedBlockStart.gif {
    [ServiceContract]
    [ServiceKnownType(
typeof(Order))]
    
public 
interface IOrderProcessor
ExpandedSubBlockStart.gif    {
        [OperationContract(IsOneWay = 
true)]
        
void Submit(Order order, OrderResponseContext responseContext);
    }
}

虽然这种方式看起来不错,但是却不值得推荐。在一般情况下,我们的Contract需要是很稳定的,一经确定就不能轻易更改,因为Contract是被交互的多方共同支持的,牵一发动全身;此外,从Service Contract代表的是Service的一个Interface,他是对业务逻辑的抽象、和具体实现无关,而对于我们的例子来说,我们仅仅是定义一个递交Order的Operation,从业务逻辑来看,OrderResponseContext和抽象的业务逻辑毫无关系。基于此,我们需要寻求一种和Service Contract无关的解决方式:

方式二、将OrderResponseContext放到Soap Message 的Header中

其实我们要解决的问题很简单,就是要把OrderResponseContext的信息置于Soap Message中发送到Service。而我们知道,Soap的Header具有极强的可伸缩性,原则上,我们可以把任何控制信息置于Header中。基于WCF的编程模式很容易地帮助我们实现对Soap Header的插入和获取:

我们可以通过下面的方式获得当前Operation Context的Incoming Message Headers和Outgoing Message Headers

OperationContext.Current.IncomingMessageHeaders
OperationContext.Current.OutgoingMessageHeaders

如果我们要把一个OrderResponseContext 对象插入到当前Operation Context的Outgoing Message Headers中,我们可以通过下面的代码来实现:

OrderResponseContext context = 
new OrderResponseContext();
MessageHeader<OrderResponseContext> header = 
new MessageHeader<OrderResponseContext>( context);
OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader("name", "namespace"));

相应的,我们可以通过下面的代码从Outgoing Message Headers OrderResponseContext的数据获取的内容:

OrderResponseContext context = OperationContext.Current.IncomingMessageHeaders.GetHeader<OrderResponseContext>("name", "namespace"));

四、Sample

我们照例给出一个完整的Sample,下面是整个Solution的结构:

除了一贯使用的4层结构(Contract-Service-Hosting-Client),还为ResponseService增加了下面两层:

  • Localservice: 作为Client Domain的ResponseService。
  • LocalHosting:Host Localservice。

1.Contract:  Artech.ResponsiveQueuedService.Contract

Service Contract: Artech.ResponsiveQueuedService.Contract. IOrderProcessor

using System;
using System.Collections.Generic;
using System.Text;
using System.ServiceModel;
namespace Artech.ResponsiveQueuedService.Contract
ExpandedBlockStart.gif {
    [ServiceContract]
    [ServiceKnownType(
typeof(Order))]
    
public 
interface IOrderProcessor
ExpandedSubBlockStart.gif    {
        [OperationContract(IsOneWay = 
true)]
        
void Submit(Order order);
    }
}

Service Contract: Artech.ResponsiveQueuedService.Contract.IOrderRessponse

using System;
using System.Collections.Generic;
using System.Text;
using System.ServiceModel;
namespace Artech.ResponsiveQueuedService.Contract
ExpandedBlockStart.gif {
    [ServiceContract]
    
public 
interface  IOrderRessponse
ExpandedSubBlockStart.gif    {
        [OperationContract(IsOneWay =
true)]
        
void SubmitOrderResponse(Guid orderNo,FaultException exception);
    }
}

接收来自Order processing端的Response:Order No.和Exception。

Data Contract: Artech.ResponsiveQueuedService.Contract.Order

using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.Serialization;
namespace Artech.ResponsiveQueuedService.Contract
ExpandedBlockStart.gif {
    [DataContract]
    
public 
class Order
ExpandedSubBlockStart.gif    {
ContractedSubBlock.gif        
Private Fields
ContractedSubBlock.gif        
Constructors
ContractedSubBlock.gif        
Public Properties
ContractedSubBlock.gif        
Public Methods
    }
}

对Order的封装。

Data Contract:Artech.ResponsiveQueuedService.Contract. OrderResponseContext

using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.Serialization;
using System.ServiceModel;
namespace Artech.ResponsiveQueuedService.Contract
ExpandedBlockStart.gif {    
    [DataContract]
    
public 
class OrderResponseContext
ExpandedSubBlockStart.gif    {
        
private Uri _responseAddress;
        [DataMember]
        
public Uri ResponseAddress
ExpandedSubBlockStart.gif        {
ExpandedSubBlockStart.gif            
get { 
return _responseAddress; }
ExpandedSubBlockStart.gif            
set { _responseAddress = value; }
        }
        
public 
static OrderResponseContext Current
ExpandedSubBlockStart.gif        {
            
get
ExpandedSubBlockStart.gif            {
                
if (OperationContext.Current == 
null)
ExpandedSubBlockStart.gif                {
                    
return 
null;
                }
                
return OperationContext.Current.IncomingMessageHeaders.GetHeader<OrderResponseContext>("OrderResponseContext", "Artech.ResponsiveQueuedService.Contract");
            }
            
set
ExpandedSubBlockStart.gif            {
                MessageHeader<OrderResponseContext> header = 
new MessageHeader<OrderResponseContext>(value);
                OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader("OrderResponseContext", "Artech.ResponsiveQueuedService.Contract"));
            }
        }
    }
}

ResponseAddress代表Host在Client Domain的Response Service的Address。同过Current把OrderResponseContext插入到Outgoing Message Headers中、以及从Ingoing Message Headers取出OrderResponseContext对象。

2.Order Processing Service:Artech.ResponsiveQueuedService.Service

using System;
using System.Collections.Generic;
using System.Text;
using Artech.ResponsiveQueuedService.Contract;
using System.ServiceModel;
using System.Net.Security;
namespace Artech.ResponsiveQueuedService.Service
ExpandedBlockStart.gif {
    
public 
class OrderProcessorService:IOrderProcessor
ExpandedSubBlockStart.gif    {
        
private 
void ProcessOrder(Order order)
ExpandedSubBlockStart.gif        {
            
if (order.OrderDate < DateTime.Today)
ExpandedSubBlockStart.gif            {
                
throw 
new Exception();
            }
        }
ContractedSubBlock.gif        
IOrderProcessor Members
    }
}

在这里我们模拟了这样的场景:先通过Order Date判断Order是否过期,如果过期创建一个FaultException,否则正确处理该Order,然后通过OrderResponseContext.Current从Incoming Message Header中获取封装在OrderResponseContext对象中的Response Address,创建Binding并调用Response Service.

3. Order Processing Service Hosting: Artech.ResponsiveQueuedService.Hosting

Configuration

<?
xml version="1.0" encoding="utf-8" 
?>
<
configuration
>
  
<
appSettings
>
    
<
add 
key
="msmqPath"
 value
=".\private$\orderprocessor"
/>
  
</
appSettings
>
  
<
system
.serviceModel
>
    
<
bindings
>
      
<
netMsmqBinding
>
        
<
binding 
name
="MsmqBinding"
 exactlyOnce
="false"
 useActiveDirectory
="false"
>
          
<
security
>
            
<
transport 
msmqAuthenticationMode
="None"
 msmqProtectionLevel
="None"
 
/>
          
</
security
>
        
</
binding
>
      
</
netMsmqBinding
>
    
</
bindings
>
    
<
services
>
      
<
service 
name
="Artech.ResponsiveQueuedService.Service.OrderProcessorService"
>
        
<
endpoint 
address
="net.msmq://localhost/private/orderprocessor"
 binding
="netMsmqBinding"
            bindingConfiguration
="MsmqBinding"
 contract
="Artech.ResponsiveQueuedService.Contract.IOrderProcessor"
 
/>
      
</
service
>
    
</
services
>
  
</
system.serviceModel
>
</
configuration
>

Program

using System;
using System.Collections.Generic;
using System.Text;
using Artech.ResponsiveQueuedService.Service;
using System.ServiceModel;
using System.Configuration;
using System.Messaging;
namespace Artech.ResponsiveQueuedService.Hosting
ExpandedBlockStart.gif {
    
class Program
ExpandedSubBlockStart.gif    {
        
static 
void Main(
string[] args)
ExpandedSubBlockStart.gif        {
            
string path = ConfigurationManager.AppSettings["msmqPath"];
            
if (!MessageQueue.Exists(path))
ExpandedSubBlockStart.gif            {
                MessageQueue.Create(path);
            }
            
using (ServiceHost host = 
new ServiceHost(
typeof(OrderProcessorService)))
ExpandedSubBlockStart.gif            {
                host.Opened += 
delegate
ExpandedSubBlockStart.gif                {
                    Console.WriteLine("The Order Processor service has begun to listen
");
                };
                host.Open();
                Console.Read();
            }
        }
    }
}

4. Response Service: Artech.ResponsiveQueuedService.LocalService.OrderRessponseService

using System;
using System.Collections.Generic;
using System.Text;
using Artech.ResponsiveQueuedService.Contract;
using System.ServiceModel;
namespace Artech.ResponsiveQueuedService.LocalService
ExpandedBlockStart.gif {
    
public 
class OrderRessponseService : IOrderRessponse
ExpandedSubBlockStart.gif    {
ContractedSubBlock.gif        
IOrderRessponse Members
    }
}

5. Response Service Hosting: Artech.ResponsiveQueuedService.LocalhHosting

Configuration

<?
xml version="1.0" encoding="utf-8" 
?>
<
configuration
>
  
<
appSettings
>
    
<
add 
key
="msmqPath"
 value
=".\private$\orderresponse"
/>
  
</
appSettings
>
  
<
system
.serviceModel
>
    
<
bindings
>
      
<
netMsmqBinding
>
        
<
binding 
name
="msmqBinding"
 exactlyOnce
="false"
>
          
<
security
>
            
<
transport 
msmqAuthenticationMode
="None"
 msmqProtectionLevel
="None"
 
/>
          
</
security
>
        
</
binding
>
      
</
netMsmqBinding
>
    
</
bindings
>
    
<
services
>
      
<
service 
name
="Artech.ResponsiveQueuedService.LocalService.OrderRessponseService"
>
        
<
endpoint 
address
="net.msmq://localhost/private/orderresponse"
 binding
="netMsmqBinding"
            bindingConfiguration
="msmqBinding"
 contract
="Artech.ResponsiveQueuedService.Contract.IOrderRessponse"
 
/>
      
</
service
>
    
</
services
>
  
</
system.serviceModel
>
</
configuration
>

Program

using System;
using System.Collections.Generic;
using System.Text;
using Artech.ResponsiveQueuedService.LocalService;
using System.Configuration;
using System.ServiceModel;
using System.Messaging;
namespace Artech.ResponsiveQueuedService.LocalhHosting
ExpandedBlockStart.gif {
    
class Program
ExpandedSubBlockStart.gif    {
        
static 
void Main(
string[] args)
ExpandedSubBlockStart.gif        {
            
string path = ConfigurationManager.AppSettings["msmqPath"];
            
if (!MessageQueue.Exists(path))
ExpandedSubBlockStart.gif            {
                MessageQueue.Create(path);
            }
            
using (ServiceHost host = 
new ServiceHost(
typeof(OrderRessponseService)))
ExpandedSubBlockStart.gif            {
                host.Opened += 
delegate
ExpandedSubBlockStart.gif                {
                    Console.WriteLine("The Order Response service has begun to listen
");
                };
                host.Open();
                Console.Read();
            }
        }
    }
}

6. Client: Artech.ResponsiveQueuedService.Client

Configuration:

<?
xml version="1.0" encoding="utf-8" 
?>
<
configuration
>
  
<
appSettings
>
    
<
add 
key
="msmqPath"
 value
="net.msmq://localhost/private/orderresponse"
/>
  
</
appSettings
>
  
<
system
.serviceModel
>
    
<
bindings
>
      
<
netMsmqBinding
>
        
<
binding 
name
="MsmqBinding"
 exactlyOnce
="false"
 useActiveDirectory
="false"
>
          
<
security
>
            
<
transport 
msmqAuthenticationMode
="None"
 msmqProtectionLevel
="None"
 
/>
          
</
security
>
        
</
binding
>
      
</
netMsmqBinding
>
    
</
bindings
>
    
<
client
>
      
<
endpoint 
address
="net.msmq://localhost/private/orderprocessor"
 binding
="netMsmqBinding"
            bindingConfiguration
="MsmqBinding"
 contract
="Artech.ResponsiveQueuedService.Contract.IOrderProcessor"
 name
="defaultEndpoint"
 
/>
    
</
client
>
  
</
system.serviceModel
>
</
configuration
>

Program:

using System;
using System.Collections.Generic;
using System.Text;
using System.Configuration;
using System.ServiceModel;
using Artech.ResponsiveQueuedService.Contract;
using System.Messaging;
namespace Artech.ResponsiveQueuedService.Clinet
ExpandedBlockStart.gif {
    
class Program
ExpandedSubBlockStart.gif    {
        
static 
void Main(
string[] args)
ExpandedSubBlockStart.gif        {
            Order order1 = 
new Order(Guid.NewGuid(), DateTime.Today.AddDays(5), Guid.NewGuid(), "Supplier A");
            Order order2 = 
new Order(Guid.NewGuid(), DateTime.Today.AddDays(-5), Guid.NewGuid(), "Supplier A");
            
string path = ConfigurationManager.AppSettings["msmqPath"];
            Uri address = 
new Uri(path);
            OrderResponseContext context = 
new OrderResponseContext();
            context.ResponseAddress = address;
            ChannelFactory<IOrderProcessor> channelFactory = 
new ChannelFactory<IOrderProcessor>("defaultEndpoint");
            IOrderProcessor orderProcessor = channelFactory.CreateChannel();
            
using (OperationContextScope contextScope = 
new OperationContextScope(orderProcessor 
as IContextChannel))
ExpandedSubBlockStart.gif            {
                Console.WriteLine("Submit the order of order No.: {0}", order1.OrderNo);
                OrderResponseContext.Current = context;
                orderProcessor.Submit(order1);
            }
            
using (OperationContextScope contextScope = 
new OperationContextScope(orderProcessor 
as IContextChannel))
ExpandedSubBlockStart.gif            {
                Console.WriteLine("Submit the order of order No.: {0}", order2.OrderNo);
                OrderResponseContext.Current = context;
                orderProcessor.Submit(order2);
            }
            Console.Read();
        }
    }
}

我创建了两个Order对象, 其中一个已经过期。从Configuration中取出Response Address并购建一个OrderResponseContext,然后分两次将这两个Order向Order Processing Service递交。在调用Order Processing Order的Operation Context Scope中,通过OrderResponseContext.Current将OrderResponseContext对象插入Outcoming Message Header中。

我们现在运行一下整个程序,看看最终的输出结果:

Client:

Order Processing:

Order Response:
Reference:
WCF相关内容:

作者:蒋金楠
微信公众账号:大内老A
微博:
如果你想及时得到个人撰写文章以及著作的消息推送,或者想看看个人推荐的技术资料,可以扫描左边二维码(或者长按识别二维码)关注个人公众号(原来公众帐号
蒋金楠的自媒体将会停用)。
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
你可能感兴趣的文章
notepad2正则表达式替换字符串
查看>>
svn安装配置
查看>>
跟我学Spring Cloud(Finchley版)-20-Spring Cloud Config-Git仓库配置详解
查看>>
集合——Hashtable与ConcurrentHashMap区别
查看>>
/bin、/sbin、/usr/bin、/usr/bin的区别
查看>>
手机打开USB调试(OPPO)
查看>>
关于node 符号链接符号过多的问题
查看>>
GBin1分享的8个图片360度旋转展示的jQuery插件
查看>>
分享11个超棒的移动应用(mobile apps)开发解决方案
查看>>
【Spring】依赖注入三种方式
查看>>
【安全牛学习笔记】 本地提权
查看>>
VS2017 创建NET Core 1.1 Web项目,发布后找不到引用的js文件
查看>>
Linux_Mail_Server
查看>>
网络相关排查总结
查看>>
C++错误收集(2)
查看>>
博客园的老虞要搬家罗
查看>>
Data Pump需要用到steam pool
查看>>
异步7月上市新书,送出一本你爱的
查看>>
如何查看、管理npm模块--react-native为例
查看>>
归档及压缩
查看>>