发送适配器的 Exchange 模式

发送适配器接收来自 BizTalk 消息引擎的消息,并将其通过网络传输。 可以使用单向或双向消息交换模式发送这些消息。 处理此双向消息交换模式的适配器称为 Solicit-Response 适配器。

阻塞与非阻塞传输

消息传送引擎使用 IBTTransmitter.TransmitMessage 方法或 IBTTransmitterBatch.TransmitMessage 方法将消息传送到发送适配器,具体取决于适配器是批量感知的。 这两个版本的方法都有一个布尔返回值,该值指示适配器如何传输消息。 如果适配器返回true,则表明它在返回之前已完全发送完消息。 在这种情况下,消息引擎代表适配器从 MessageBox 数据库中删除消息。 如果适配器返回 false,它将启动消息传输并在传输完成之前返回。 在这种情况下,适配器不仅负责从其应用程序队列中删除消息,还负责处理那些需要重试传输或挂起的传输失败情况。

返回 false 的适配器是一个非阻止调用,这意味着 TransmitMessage 实现代码不会阻止消息引擎的调用线程。 适配器只是将消息添加到已准备好传输的内存中队列,然后返回。 适配器应有自己的线程池,用于服务内存中队列、传输消息,然后通知引擎传输结果。

消息引擎的线程通常比用于通过网络发送数据的线程更加依赖 CPU。 混合这两种类型的线程会对性能产生负面影响。 非阻塞发送可以解耦这两种类型的线程,并比阻塞调用显著提高性能。

下图显示了适配器的线程池,其线程池可能会受到 I/O 操作的约束。 BizTalk Server 消息传送引擎的线程池更受 CPU 处理的约束。 通过保留两个不同的线程池,而不是混合相同类型的线程,系统可以更有效地运行。

此图展示了适配器的线程池,这些线程池往往容易受到 I/O 操作的约束。

性能提示: 为实现最佳性能,发送适配器应为非阻塞且具备批处理意识。 当 BizTalk 文件适配器从阻止和非批处理感知更改为非阻止和批处理感知时,实现了三倍的性能提升。

故障排除提示: 阻止传输可能会导致整个主机实例的性能下降。 如果适配器在 TransmitMessage 中过度阻塞,则会阻止引擎线程将消息传送到其他适配器。

非批量发送

不支持批量处理的适配器应根据 异步发送适配器的接口 中的详细说明来实现 IBTTransmitter。 对于适配器需要传输的每个消息,消息引擎会调用 IBTTransmitter.TransmitMessage。 下面的对象交互关系图详细介绍了传输消息的典型方法,其中包括以下步骤:

  1. 引擎将消息传送到适配器。

  2. 适配器将消息排入内存中队列,以便传输。

  3. 适配器线程池中的线程从队列中移出消息,读取消息的配置,并通过线路传输消息。

  4. 适配器从引擎获取新的一批数据。

  5. 适配器在批处理上调用 DeleteMessage ,并传入它刚刚传输的消息。

  6. 适配器在批处理上调用 “完成 ”。

  7. 引擎处理批处理并从应用程序队列中删除消息。

  8. 引擎调用适配器以通知其 DeleteMessage 操作的结果。

    显示适配器从应用程序队列中删除单个消息的图像。

    前面的对象交互关系图显示适配器从应用程序队列中删除单个消息。 理想情况下,适配器会对消息进行批量操作,而不是逐个操作单条消息。

批量发送

批量感知的适配器应实现 IBTBatchTransmitterIBTTransmitterBatch ,如 发送适配器接口中详述的那样。 当引擎有需要通过适配器传输的消息时,它通过调用 IBTBatchTransmitter.GetBatch 从适配器获取新批。 适配器返回实现 IBTTransmitterBatch 的新批处理对象。 然后,引擎通过调用 IBTTransmitterBatch.BeginBatch 启动批处理。 此 API 具有一个 out 参数,该参数允许适配器指定它在批处理上接受的最大消息数。 适配器可以选择返回 DTC 事务。 然后,引擎对要添加到批的每个传出消息调用 IBTTransmitterBatch.TransmitMessage 一次。 调用此项的次数大于零,但小于或等于适配器指示的批的最大大小。 将所有消息添加到批处理后,适配器将调用 IBTTransmitterBatch.Done。 此时,适配器通常会将批处理中的所有消息排队到其内存中队列。 适配器像那些不支持批处理的适配器一样,从其自己线程池中的一个或多个线程传输消息。 然后,适配器会通知引擎传输结果。

以下对象交互图演示了通过批处理发送适配器传输两条消息。

展示通过批量发送适配器传输两条消息的示意图。

处理传输失败

下图演示了推荐的传输失败语义。 这些只是建议,不会由消息引擎强制执行。 如果这样做有有效原因,则可以开发偏离这些准则的适配器,但在这种情况下应小心。 例如,一般情况下,适配器在所有重试用尽后,应始终将消息移动到备份传输中。

传输通常可能需要使用比配置的重试次数更多。 虽然这稍有不同,但被认为是可以接受的,因为传输层的复原能力正在增加。 通常,消息引擎公开的 API 旨在尽可能为适配器提供最大控制。 有了这种控制,责任就更高了。

显示处理传输失败过程的示意图。

适配器通过检查系统上下文属性 RetryCount 来确定消息上可用的重试次数。 适配器为每次重试尝试调用一次 Resubmit API,并传入要重新提交的消息。 连同消息一起,它会传递时间戳,指示引擎何时应将消息传递回适配器。 此值通常应为当前时间加上 RetryInterval 的值。 RetryInterval 是一个系统上下文属性,其单位为分钟。 消息上下文中的 RetryCountRetryInterval 都是在发送端口上配置的值。 考虑规模化部署,其中同一 BizTalk 主机实例部署在多台计算机上。 如果消息在重试间隔过期后传递,则消息可能会传递到配置为运行的任何计算机上的任一主机实例。 因此,适配器不应保留与重试尝试中使用的消息关联的任何状态,因为不能保证适配器的同一实例将在以后负责传输。

适配器应仅在重试计数小于或等于零之后尝试将消息移动到备份传输。 如果没有为端口配置备份传输,则尝试将消息移动到备份传输将失败。 在这种情况下,应挂起消息。

以下代码片段演示如何从消息上下文中确定重试计数和间隔,以及随后重新提交或移动到备份传输。

using Microsoft.XLANGs.BaseTypes;  
using Microsoft.BizTalk.Message.Interop;  
using Microsoft.BizTalk.TransportProxy.Interop;  
 …  
// RetryCount and RetyInterval are system context properties...  
private static readonly PropertyBase RetryCountProperty =   
 new BTS.RetryCount();  
private static readonly PropertyBase RetryIntervalProperty =   
 new BTS.RetryInterval();  

public void HandleRetry(IBaseMessage msg, IBTTransportBatch batch)  
{  
int retryCount = 0;  
int retryInterval = 0;  

// Get the RetryCount and RetryInterval off the msg ctx...  
 GetMessageRetryState(msg, out retryCount, out retryInterval);  

// If we have retries available resubmit, else move to   
 // backup transport...  
 if ( retryCount > 0 )  
batch.Resubmit(  
 msg, DateTime.Now.AddMinutes(retryInterval));  
else  
batch.MoveToNextTransport(msg);  
}  

public void GetMessageRetryState(  
 IBaseMessage msg,   
 out int retryCount,   
 out int retryInterval )  
{  
retryCount = 0;  
retryInterval = 0;  

object obj =  msg.Context.Read(  
RetryCountProperty.Name.Name,    
RetryCountProperty.Name.Namespace);   

if ( null != obj )  
retryCount = (int)obj;  

obj =  msg.Context.Read(  
RetryIntervalProperty.Name.Name,    
RetryIntervalProperty.Name.Namespace);   

if ( null != obj )  
retryInterval = (int)obj;  
}  

在 TransmitMessage 中抛出异常

如果适配器在任何 API IBTTransmitter.TransmitMessage、IBTTransmitterBatch.TransmitMessageIBTTransmitterBatch.Done 上引发异常,则引擎会将所涉及的消息传输视为传输失败,并为消息采取适当的作,如“如何处理适配器故障”中详述。

对于批量感知发送适配器,在 TransmitMessage API 上引发异常会导致整个批处理被清除,并对该批中的所有消息执行默认传输失败处理措施。

Solicit-Response

双向发送适配器通常支持单向和双向传输。 发送适配器通过检查消息上下文中的 IsSolicitResponse 系统上下文属性来确定消息是作为单向发送还是双向发送。

以下代码片段演示了这一点:

private bool portIsTwoWay = false;  
private static readonly PropertyBase IsSolicitResponseProperty= new BTS.IsSolicitResponse();  

...  

 // Port is one way or two way...  
 object obj =  this.message.Context.Read(  
 IsSolicitResponseProperty.Name.Name,    
 IsSolicitResponseProperty.Name.Namespace);   

if ( null != obj )  
 this.portIsTwoWay = (bool)obj;  

在请求响应传输期间,适配器传输请求消息。 完成后,它会提交关联的响应,并告知消息引擎从 MessageBox 数据库中删除原始请求消息。 从应用程序队列中删除消息的作应在与提交响应消息相同的传输代理批处理中执行。 这可确保删除和提交响应的原子性。 为了完全原子性,适配器应使用 DTC 事务,即将请求消息传输到事务感知资源、提交响应消息和删除请求消息都位于同一 DTC 事务的上下文中。 与往常一样,我们建议使用非阻止发送来传输请求消息。

以下代码片段说明了双向发送的主要方面。 当引擎调用 IBTTransmitter.TransmitMessage 时,适配器会将消息添加到内存队列中以便传输。 适配器返回 false 以指示它正在执行非阻塞发送。 适配器的线程池(WorkerThreadThunk)处理内存队列,并从队列中出队消息,将其交给一个辅助方法。 此方法负责发送请求消息并接收响应消息。 (此帮助程序方法的实现超出了本主题的范围。响应消息提交到引擎中,请求消息将从应用程序队列中删除。

using System.Collections;  
using Microsoft.XLANGs.BaseTypes;  
using Microsoft.BizTalk.Message.Interop;  
using Microsoft.BizTalk.TransportProxy.Interop;  

     static private Queue _transmitQueue = new Queue();  
  static private IBTTransportProxy _transportProxy = null;   
// IBTTransmitter...  
 public bool TransmitMessage(IBaseMessage msg)  
{  
// Add message to the transmit queue...  
lock(_transmitQueue.SyncRoot)  
 {  
_transmitQueue.Enqueue(msg);  
 }  

 return false;  
}  

 // Threadpool worker thread...   
private void WorkerThreadThunk()  
{  
try  
{  
 IBaseMessage solicitMsg = null;  

 lock(_transmitQueue.SyncRoot)  
 {  
 solicitMsg =   
 (IBaseMessage)_transmitQueue.Dequeue();  
}  

 IBaseMessage responseMsg = SendSolicitResponse(   
 solicitMsg );  
Callback cb = new Callback();  

IBTTransportBatch batch = _transportProxy.GetBatch(  
 cb, null);  
batch.SubmitResponseMessage( solicitMsg, responseMsg );  
batch.DeleteMessage( solicitMsg );  
batch.Done(null);  
}  
catch(Exception)  
{  
// Handle failure....  
}  
}  

static private IBaseMessage SendSolicitResponse(  
 IBaseMessage solicitMsg )  
{  
// Helper method to send solicit message and receive   
 // response message...  
IBaseMessage responseMsg = null;  
return responseMsg;  
}  

动态发送

动态发送端口没有与之关联的适配器配置。 相反,他们使用处理程序配置适配器在动态端口上传输消息所需的任何默认属性。 例如,HTTP 适配器可能需要使用代理,并且需要提供凭据。 可以在适配器运行时缓存的处理程序配置中指定用户名、密码和端口。

为了让引擎确定动态消息要通过哪种传输方式发送,OutboundTransportLocation 的前缀设为适配器的别名。 适配器可以在安装时向 BizTalk Server 注册一个或多个别名。 引擎在运行时分析 OutboundTransportLocation 以查找匹配项。 适配器负责处理OutboundTransportLocation,可以选择加上或不加别名。 下表显示了一些为开箱即用的 BizTalk 适配器注册的别名示例。

适配器别名 适配器 OutboundTransportLocation 示例
HTTP:// HTTP http://www. MyCompany.com/bar
HTTPS:// HTTP https://www. MyCompany.com/bar
mailto: SMTP mailto:A.User@MyCompany.com
文件:// 文件 FILE://C:\ MyCompany \%MessageID%.xml