批量回调
接收适配器向消息引擎提交的批处理将异步处理。 因此,适配器需要一种机制来将回调绑定到适配器中的某个状态,以便可以通知其成功或失败并执行任何必要的清理操作。 回调语义很灵活,因此适配器可以使用一种或三种方法的组合。 其中包括:
所有回调都是在实现 IBTBatchCallBack 的同一对象实例上进行的。
请求新批处理时传入引擎的 Cookie 用于将回调与适配器中的状态相关联。
每个实现 IBTBatchCallBack 的批处理都有不同的回调对象。 在这里,每个对象都有自己的私有状态。
处理批处理后,适配器将重新调用其 IBTBatchCallBack.BatchComplete 的实现。 批处理的总体状态由第一个参数 HRESULT 状态指示。 如果此值大于或等于零,则批成功,因为引擎拥有数据的所有权,适配器可以自由地从线路中删除该数据。 负状态表示批处理失败:批处理中的作均未成功,适配器负责处理失败。
如果批次处理失败,适配器需要知道哪个操作中的哪个项目失败。 例如,假设适配器正在从磁盘读取 20 个文件,并使用单个批处理将它们提交到 BizTalk Server。 如果第十个文件已损坏,适配器需要挂起该一个文件,然后重新提交剩余的 19 个文件。 此信息通过第二个和第三个参数提供给适配器,
opCount
是 short 类型(操作计数),operationStatus
的类型为 BTBatchOperationStatus[]。
注释
在单个批处理对象上,不应多次提交消息。 在同一批上多次提交同一消息对象将导致引擎错误。
操作计数表示批处理中有多少种操作类型(BTBatchOperationStatus 数组的大小)。 作状态数组中的每个元素都对应于给定的作类型。 通过使用 BTBatchOperationStatus 数组,适配器可以通过查看 BTBatchOperationStatus.MessageStatus 数组来确定给定作中的哪个项失败,以获取表示失败的负 HRESULT 值。 在上述场景中,适配器将创建一个新的批次,其中包含 19 次消息提交动作和一次消息挂起操作。
以下代码片段演示适配器如何通过其传输代理从引擎请求新批,并使用 Cookie 方法将单个消息提交到引擎中。 消息传送引擎在完成处理整个提交的消息批时,将 BatchComplete 方法调用为批处理回调。
using Microsoft.BizTalk.TransportProxy.Interop;
using Microsoft.BizTalk.Message.Interop;
public class MyAdapter :
IBTTransport,
IBTTransportConfig,
IBTTransportControl,
IPersistPropertyBag,
IBaseComponent,
IBTBatchCallBack
{
private IBTTransportProxy _tp;
public void BatchComplete(
Int32 status,
Int16 opCount,
BTBatchOperationStatus[] operationStatus,
System.Object callbackCookie)
{
// Use cookie to correlate callback with work done,
// in this example the batch is to submit a single
// file the name of which will be in the
// callbackCookie
string fileName = (string)callbackCookie;
if ( status >= 0 )
// DeleteFile from disc
File.Delete(fileName);
else
// Rename file to fileName.bad
File.Move(fileName, fileName + ".bad");
}
private void SubmitMessage(
IBaseMessage msg,
string fileName)
{
// Note: Pass in the filename as the cookie
IBTTransportBatch batch =
_tp.GetBatch(this, (object)fileName);
// Add msg to batch for submitting
batch.SubmitMessage(msg);
// Process this batch
batch.Done(null);
}
}
BizTalk Server SDK 包括以下适配器的示例:文件、HTTP、MSMQ 和事务适配器。 所有这些适配器都是基于称为 BaseAdapter 的通用构建基块构建的。 BaseAdapter 版本 1.0.1 包含所有相关代码,用于解析操作状态并重建一个新的批处理以提交。
竞态
解决错误的两个任务和决定提交的批处理的最终结果似乎很简单,但事实上,它们依赖于来自不同线程的信息:
适配器根据 BizTalk Server 传递的信息,通过适配器的 BatchComplete 回调方法处理错误。 此回调在适配器的线程上执行。
DTCCommitConfirm 是 IBTDTCCommitConfirm 对象上的方法。 IBTDTCCommitConfirm 对象的实例由批处理 IBTTransportBatch::Done 调用返回。 此实例与 IBTTransportBatch::Done 调用位于同一线程上,而该调用不同于适配器的回调线程。
对于适配器每次调用 IBTTransportBatch::Done,消息引擎将在单独的线程中调用回调方法 BatchComplete,以报告批处理提交的结果。 在 BatchComplete 中,适配器需要根据批处理是成功还是失败来提交事务或回滚事务。 在任一情况下,适配器都应调用 DTCCommitConfirm 来报告事务的状态。
存在可能的竞争条件,因为适配器的BatchComplete实现可以假定IBTTransportBatch::Done返回的IBTDTCCommitConfirm对象在BatchComplete执行时始终可用。 但是,即使在 IBTTransportBatch::Done 返回之前,也可以在一个单独的消息引擎线程中调用 BatchComplete。 当适配器尝试访问 IBTDTCCommitConfirm 对象作为 BatchComplete 实现的一部分时,可能存在访问冲突,因为调用线程不再存在。 使用以下解决方案来避免这种情况。
在以下示例中,使用事件解决了该问题。 此处通过使用该事件的属性访问接口指针。 在继续前,get始终会等待set完成。
protected IBTDTCCommitConfirm CommitConfirm
{
set
{
this.commitConfirm = value;
this.commitConfirmEvent.Set();
}
get
{
this.commitConfirmEvent.WaitOne();
return this.commitConfirm;
}
}
protected IBTDTCCommitConfirm commitConfirm = null;
private ManualResetEvent commitConfirmEvent = new ManualResetEvent(false);
批处理状态代码
总体批处理状态代码指示批处理的结果。 操作状态提供单个消息项的提交状态代码。 由于各种原因,批处理可能会失败。 可能发生了与安全相关的故障,或者消息可能是在引擎关闭时提交的。 (通常当引擎关闭时,它不接受任何新工作,但允许完成进程内工作。下表指示在批处理状态或作状态中返回的一些常见 HRESULT 值。 它还指示这些代码是成功还是失败代码。 有关这些代码的正确处理,在如何处理适配器故障中有更全面的描述。
代码(在 BTTransportProxy 类中定义) | 成功/失败代码 | DESCRIPTION |
---|---|---|
BTS_S_EPM_SECURITY_CHECK_FAILED | 成功 | 端口配置为执行安全检查并删除身份验证失败的消息。 适配器不应挂起返回此状态代码的批处理。 |
BTS_S_EPM_MESSAGE_暂停 | 成功 | 表示一个或多个消息已挂起,并且引擎对数据拥有所有权。 因为消息引擎已接受消息提交,所以这是一个成功代码。 |
E_BTS_URL_DISALLOWED | 失败 | 已提交一条消息,其中包含无效的InboundTransportLocation。 |
批处理操作
下表详细介绍了适配器用于将工作添加到给定批次的 IBTTransportBatch 对象的成员方法和操作。
方法名称 | 操作类型 | DESCRIPTION |
---|---|---|
SubmitMessage | 提交 | 将消息提交到引擎中。 |
SubmitResponseMessage | 提交 | 将响应消息提交到引擎中。 这是请求-响应对中的响应消息。 |
DeleteMessage | 删除 | 删除适配器使用非阻止发送成功通过网络传输的消息。 使用阻止发送的适配器不需要调用此方法,因为如果适配器从 TransmitMesssage 返回true ,消息引擎将代表适配器将其删除。 |
MoveToSuspendQ | MoveToSuspendQ | 将消息移动到挂起的队列中。 |
重新提交 | 提交 | 请求稍后重试进行消息传输。 这通常在传输尝试失败后调用。 |
MoveToNextTransport | 移动到下一个传输 | 请求使用备份传输发送消息。 通常在所有重试都用尽后调用。 如果没有备份传输,此方法将失败 |
提交请求消息 | 提交请求 | 提交请求消息。 这是请求-响应对中的请求消息。 |
CancelRequestForResponse | CancelRequestForResponse | 通知引擎,适配器不再希望在请求-响应对中等待响应消息。 |
清除 | 暂无 | 删除批处理中的所有工作。 |
已完成 | 暂无 | 将一批消息提交到引擎进行处理。 |
批次管理
批处理在消息引擎的本机代码中实现。 因此,使用托管代码编写的适配器应在批处理完成后释放批处理的运行时可调用包装器(RCW)。 这是通过使用 Marshal.ReleaseComObject API 在托管代码中完成的。 请务必记住,应在 while 循环中调用此 API,直到返回的引用计数达到零。
BizTalk Server 在批处理中同步处理消息。 许多批可以同时处理,这可以通过调整应用程序域中的批大小,为适配器中的某些优化提供机会。 例如,可以处理 FTP 站点上的所有文件(直到达到某些限制)。 对于 SAP,可以将单个流处理成多个消息,然后将这些消息作为批处理提交。
适配器用于传递操作的批处理不需要与 BizTalk Server 提供给适配器的消息列表完全对应。 换句话说,在执行事务性发送时,必须将重新提交的操作 MoveToNextTransport 和 MoveToSuspendQ 拆分为各自独立的批处理。 许多适配器将已为多个终结点提交的一批消息排序为单独的消息列表,以便进一步处理。
要点是,除了与事务关联的规则外,在 BizTalk Server 中没有与消息批处理关联的规则。 批处理只是适配器将消息分入和传出 BizTalk Server 的特定于实现的方式。
适配器可以将 BizTalk Server 提供的多个较小批次的消息合并成一个较大的批次,以此响应 BizTalk Server。 这可能是处理大量非常小消息的事务适配器中一次重要优化。 它将“每条消息的事务总数”比率降到最低。
通常,BizTalk Server 生成 5 到 10 条消息的发送端批。 如果这些消息非常小,则适配器在将事务性删除批提交回 BizTalk Server 之前,最多可以批处理 100 条消息或更多消息。 实施这样的优化并不容易;必须确保消息不会永远卡在适配器内存中,无休止地等待某个阈值的满足。
对于 MQSeries 等高吞吐量适配器,可以在 BizTalk Server 的性能编号中看到批处理的重要性。 在这些适配器中,消息的接收频率至少是发送消息的两倍。 默认情况下,接收适配器使用包含 100 条消息的批处理,而发送适配器使用 BizTalk Server 的默认批处理。
交易批处理
编写创建事务对象的适配器并将其交给 BizTalk Server 时,你负责编写执行以下作的代码:
决定批处理操作的最终结果:是提交还是结束事务处理。 这可能取决于此事务范围内的其他事务分支,这些分支不依赖于 BizTalk Server,例如写入消息队列(MSMQ)或进行事务性数据库操作。
通过调用 IBTDTCCommitConfirm.DTCCommitConfirm 方法通知 BizTalk Server 最终结果。
true
返回表示事务成功提交;返回false
表示事务失败和回滚事务。适配器必须通知 BizTalk Server 事务的最终结果,以维护其内部跟踪数据。 适配器通过调用 DTCCommitConfirm 通知 BizTalk Server 结果。 如果适配器不执行此作,则会发生严重的内存泄漏,并且 MSDTC 事务可能会超时并失败,尽管其作成功完成。