可以将用于 Oracle 数据库的 Microsoft BizTalk 适配器配置为轮询 Oracle 数据库表或视图,以检测数据更改。 若要执行此类轮询作,适配器会定期针对 Oracle 表或视图执行 SQL 查询,然后执行可选的 PL/SQL 代码块。 然后,Oracle 数据库适配器会在入站 POLLINGSTMT 操作中,将 SQL 查询结果作为强类型结果集返回给您的代码。 有关用于使用 Oracle 数据库适配器配置和执行 Oracle 数据库轮询的机制的详细信息,请参阅 在 Oracle 数据库适配器中接收基于轮询的数据更改消息。 强烈建议在继续之前阅读本主题。
通过在 OracleDBBinding 实例上设置绑定属性,可以将 Oracle 数据库适配器配置为轮询 Oracle 数据库表或视图。 在 WCF 通道模型中,您可以使用此绑定来创建通道侦听器,并从中获取 IInputChannel 通道,以接收来自适配器的 POLLINGSTMT 操作。
关于如何使用 WCF 中的 IInputChannel 接收操作的概述,请参阅 服务 Channel-Level 编程。
本主题各节提供的信息有助于您使用 WCF 通道模型对 Oracle 数据库的表和视图进行轮询。
使用 POLLINGSTMT 请求消息
适配器调用代码中的 POLLINGSTMT 操作来轮询 Oracle 数据库。 也就是说,适配器会通过 IInputChannel 通道形状发送一个 POLLINGSTMT 请求消息,您将接收到该消息。 POLLINGSTMT 请求消息包含 PollingStatement 绑定属性指定的查询的结果集。 可以通过以下两种方式中的一种处理 POLLINGSTMT 消息:
若要使用节点值流式处理使用消息,必须在响应消息上调用 WriteBodyContents 方法,并向其传递实现节点值流式处理的 XmlDictionaryWriter 。
若要使用节点流式处理来使用消息,可以在响应消息上调用 GetReaderAtBodyContents 以获取 XmlReader。
通常使用节点值流传输来处理包含 Oracle LOB 数据列的结果集。
有关 POLLINGSTMT操作的消息结构的详细信息,请参阅 轮询操作的消息架构。
有关 Oracle 数据库适配器如何支持对 LOB 数据进行流式处理的详细信息,请参阅 Oracle 数据库适配器中的流式处理大型对象数据类型。
有关在代码中实现节点值流式处理以支持 LOB 数据的端到端流式处理的详细信息,请参阅 使用 WCF 通道模型的流式处理 Oracle Database LOB 数据类型。
关于本主题中使用的示例
本主题中的示例使用 SCOTT.ACCOUNTACTIVITY 表和 SCOTT.ACCOUNT_PKG.PROCESS_ACTIVITY 函数。 提供的示例中附带了一个脚本,用于生成这些构件。 该示例执行以下操作:
作为轮询语句的一部分,从 ACCOUNTACTIVITY 表中选择所有记录,并将其显示在控制台上。
作为 post poll 语句的一部分,该示例调用将所有记录从 ACCOUNTACTIVITY 表移动到 ACTIVITYHISTORY 表的 PROCESS_ACTIVITY 函数。
对 ACCOUNTACTIVITY 表的后续轮询不会返回任何记录。 但是,如果希望示例在轮询操作中返回更多记录,则必须在 ACCOUNTACTIVITY 表中插入一些记录。 为此,可以运行示例提供的more_activity_data.sql脚本。
有关示例的详细信息,请参阅 适配器示例。
如何使用 IInputChannel 轮询 Oracle 数据库?
若要轮询 Oracle 数据库表或视图以使用 WCF 通道模型接收数据更改消息,请执行以下步骤。
使用 IInputChannel 接收数据已更改的消息
在 Visual Studio 中创建 Visual C# 项目。 对于本主题,请创建控制台应用程序。
在解决方案资源管理器中,添加对
Microsoft.Adapters.OracleDB
、Microsoft.ServiceModel.Channels
、System.ServiceModel
和System.Runtime.Serialization
的引用。打开Program.cs文件并添加以下命名空间:
Microsoft.Adapters.OracleDB
Microsoft.ServiceModel.Channels
System.ServiceModel
System.ServiceModel.Description
System.ServiceModel.Channels
System.Xml
System.Runtime.Serialization
System.IO
Microsoft.ServiceModel.Channels.Common
创建 OracleDBBinding 的实例,并设置配置轮询所需的绑定属性。 至少必须设置 InboundOperationType、 PollingStatement 和 PollingInterval 绑定属性。 在此示例中,还设置 PostPollStatement 绑定属性。 有关用于配置轮询的绑定属性的详细信息,请参阅 Oracle 数据库适配器中接收基于轮询的数据更改消息。
OracleDBBinding binding = new OracleDBBinding(); binding.InboundOperationType = InboundOperation.Polling; binding.PollingInterval = 30; binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE"; binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;"
创建绑定参数集合并设置凭据。
ClientCredentials credentials = new ClientCredentials(); credentials.UserName.UserName = "SCOTT"; credentials.UserName.Password = "TIGER"; BindingParameterCollection bindingParams = new BindingParameterCollection(); bindingParams.Add(credentials);
创建通道侦听器并打开它。 通过在 OracleDBBinding 上调用 BuildChannelListener<IInputChannel> 方法来创建侦听器。 可以通过在连接 URI 中设置 PollingId 属性来修改 POLLINGSTMT 操作的目标命名空间。 有关适配器连接 URI 的详细信息,请参阅 “创建 Oracle 数据库连接 URI”。
IChannelListener<IInputChannel> listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams); listener.Open();
通过在侦听器上调用 AcceptChannel 方法并打开它来获取 IInputChannel 通道。
IInputChannel channel = listener.AcceptChannel(); channel.Open();
调用通道上的 Receive ,从适配器获取下一条 POLLINGSTMT 消息。
Message message = channel.Receive();
使用 POLLINGSTMT 操作返回的结果集。 可以使用 XmlReader 或 XmlDictionaryWriter 来处理消息。
XmlReader reader = message.GetReaderAtBodyContents();
完成处理请求后关闭通道。
channel.Close()
重要
在处理完 POLLINGSTMT 操作后,必须关闭通道。 未能关闭通道可能会影响代码的行为。
在接收完数据更改消息后,关闭侦听器。
listener.Close()
重要
关闭侦听器不会关闭使用侦听器创建的通道。 必须显式关闭使用侦听器创建的每个通道。
示例:
以下示例演示如何配置 Oracle 数据库适配器以轮询 Oracle 数据库表和视图,并使用 WCF 通道模型接收 POLLINGSTMT 操作。 POLLINGSTMT操作中返回的结果集是使用XmlReader写入控制台的。
using System;
using System.Collections.Generic;
using System.Text;
// Add WCF, WCF LOB Adapter SDK, and Oracle Database adapter namepaces
using System.ServiceModel;
using System.ServiceModel.Description;
using Microsoft.ServiceModel.Channels;
using Microsoft.Adapters.OracleDB;
// Add this namespace for channel model
using System.ServiceModel.Channels;
using System.Xml;
using System.Runtime.Serialization;
using System.IO;
// Include this namespace for the WCF LOB Adapter SDK and Oracle exceptions
using Microsoft.ServiceModel.Channels.Common;
namespace OraclePollingCM
{
class Program
{
static void Main(string[] args)
{
Uri connectionUri = new Uri("oracleDB://ADAPTER/");
IChannelListener<IInputChannel> listener = null;
IInputChannel channel = null;
// set timeout to receive POLLINGSTMT message
TimeSpan messageTimeout = new TimeSpan(0, 0, 30);
Console.WriteLine("Sample Started");
try
{
// Create a binding: specify the InboundOperationType, PollingInterval (in seconds), the
// PollingStatement,and the PostPollStatement.
OracleDBBinding binding = new OracleDBBinding();
binding.InboundOperationType = InboundOperation.Polling;
binding.PollingInterval = 30;
binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE";
binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;";
// Create a binding parameter collection and set the credentials
ClientCredentials credentials = new ClientCredentials();
credentials.UserName.UserName = "SCOTT";
credentials.UserName.Password = "TIGER";
BindingParameterCollection bindingParams = new BindingParameterCollection();
bindingParams.Add(credentials);
Console.WriteLine("Opening listener");
// get a listener from the binding
listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams);
listener.Open();
Console.WriteLine("Opening channel");
// get a channel from the listener
channel = listener.AcceptChannel();
channel.Open();
Console.WriteLine("Channel opened -- waiting for polled data");
Console.WriteLine("Receive request timeout is {0}", messageTimeout);
// Poll five times with the specified message timeout
// If a timeout occurs polling will be aborted
for (int i = 0; i < 5; i++)
{
Console.WriteLine("Polling: " + i);
Message message = null;
XmlReader reader = null;
try
{
//Message is received so process the results
message = channel.Receive(messageTimeout);
}
catch (System.TimeoutException toEx)
{
Console.WriteLine("\nNo data for request number {0}: {1}", i + 1, toEx.Message);
continue;
}
// Get the query results using an XML reader
try
{
reader = message.GetReaderAtBodyContents();
}
catch (Exception ex)
{
Console.WriteLine("Exception :" + ex);
throw;
}
// Write the TID, ACCOUNT, AMOUNT, and TRANSDATE for each record to the Console
Console.WriteLine("\nPolling data received for request number {0}", i+1);
Console.WriteLine("Tx ID\tACCOUNT\tAMOUNT\tTx DATE");
while (reader.Read())
{
if (reader.IsStartElement())
{
switch (reader.Name)
{
case "POLLINGSTMTRECORD":
Console.Write("\n");
break;
case "TID":
reader.Read();
Console.Write(reader.ReadString() + "\t");
break;
case "ACCOUNT":
reader.Read();
Console.Write(reader.ReadString() + "\t");
break;
case "AMOUNT":
reader.Read();
Console.Write(reader.ReadString() + "\t");
break;
case "TRANSDATE":
reader.Read();
Console.Write(reader.ReadString() + "\t");
break;
default:
break;
}
}
}
// return the cursor
Console.WriteLine();
// close the reader
reader.Close();
// To save the polling data to a file you can REPLACE the code above with the following
//
// XmlDocument doc = new XmlDocument();
// doc.Load(reader);
// using (XmlWriter writer = XmlWriter.Create("PollingOutput.xml"))
// {
// doc.WriteTo(writer);
// }
message.Close();
}
Console.WriteLine("\nPolling done -- hit <RETURN> to finish");
Console.ReadLine();
}
catch (TargetSystemException tex)
{
Console.WriteLine("Exception occurred on the Oracle Database");
Console.WriteLine(tex.InnerException.Message);
}
catch (ConnectionException cex)
{
Console.WriteLine("Exception occurred connecting to the Oracle Database");
Console.WriteLine(cex.InnerException.Message);
}
catch (Exception ex)
{
Console.WriteLine("Exception is: " + ex.Message);
if (ex.InnerException != null)
{
Console.WriteLine("Inner Exception is: " + ex.InnerException.Message);
}
}
finally
{
// IMPORTANT: close the channel and listener to stop polling
if (channel != null)
{
if (channel.State == CommunicationState.Opened)
channel.Close();
else
channel.Abort();
}
if (listener != null)
{
if (listener.State == CommunicationState.Opened)
listener.Close();
else
listener.Abort();
}
}
}
}
}