可以将 SQL 适配器配置为接收 SQL Server 表或视图的定期数据更改消息。 可以指定让适配器执行的轮询语句,以对数据库进行轮询。 轮询语句可以是 SELECT 语句或返回结果集的存储过程。
有关适配器如何支持轮询的详细信息,请参阅 对使用轮询的入站呼叫的支持。
重要
如果要在单个应用程序中执行多个轮询作,则必须将 InboundID 连接属性指定为连接 URI 的一部分,使其唯一。 指定的入站 ID 将添加到操作命名空间,使其唯一。
本主题如何演示轮询
在本主题中,为演示 SQL 适配器如何支持接收数据更改消息,请创建一个用于轮询操作的 .NET 应用程序。 对于本主题,请将 PolledDataAvailableStatement 指定为:
SELECT COUNT(*) FROM Employee
PolledDataAvailableStatement 必须返回包含正值的第一个单元格的结果集。 如果第一个单元格不包含正值,适配器不会执行轮询语句。
作为轮询语句的一部分,执行以下操作:
从 Employee 表中选择所有行。
执行存储过程(MOVE_EMP_DATA),将所有记录从 Employee 表移动到 EmployeeHistory 表。
执行存储过程(ADD_EMP_DETAILS),将新记录添加到 Employee 表。 此过程采用员工名称、指定和工资作为参数。
若要执行这些作,必须为 PollingStatement 绑定属性指定以下内容:
SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000
执行查询语句后,将选择 Employee 表中的所有记录,并接收来自 SQL Server 的消息。 适配器执行MOVE_EMP_DATA存储过程后,所有记录都会移动到 EmployeeHistory 表。 然后,执行ADD_EMP_DETAILS存储过程以向 Employee 表添加新记录。 下一次轮询执行将仅返回单个记录。 此周期将继续,直到关闭通道侦听器。
使用 SQL 适配器的绑定属性来配置轮询查询
下表汇总了用于配置适配器以接收数据更改消息的 SQL 适配器绑定属性。 必须将这些绑定属性指定为用于轮询的 .NET 应用程序的一部分。
Binding 属性 | DESCRIPTION |
---|---|
InboundOperationType | 指定是否要执行 轮询、类型轮询 还是 通知 入站作业。 默认值为 轮询。 |
数据可用性声明 | 指定适配器执行的 SQL 语句,以确定是否有任何数据可用于轮询。 SQL 语句必须返回由行和列组成的结果集。 仅当某行可用时,才会执行 为 PollingStatement 绑定属性指定的 SQL 语句。 |
PollingIntervalInSeconds | 指定 SQL 适配器执行 为 PolledDataAvailableStatement 绑定属性指定的语句的间隔(以秒为单位)。 默认值为 30 秒。 轮询间隔定义了连续轮询之间的时间间隔。 如果语句在指定间隔内执行,适配器将等待间隔中的剩余时间。 |
PollingStatement | 指定要轮询 SQL Server 数据库表的 SQL 语句。 可以为轮询语句指定简单的 SELECT 语句或存储过程。 默认值为 null。 必须为 PollingStatement 指定一个值才能启用轮询。 仅当有数据可用于轮询时,才会执行轮询语句,该数据由 PolledDataAvailableStatement 绑定属性确定。 可以指定用分号分隔的任意数量的 SQL 语句。 |
PollWhileDataFound | 指定 SQL 适配器是否忽略轮询间隔并连续执行 为 PolledDataAvailableStatement 绑定属性指定的 SQL 语句(如果正在轮询的表中的数据可用)。 如果表中没有可用数据,适配器将还原为按指定的轮询间隔执行 SQL 语句。 默认值为 false。 |
有关这些属性的更完整说明,请参阅 有关适用于 SQL Server 适配器绑定属性的 BizTalk 适配器的信息。 有关如何使用 SQL 适配器轮询 SQL Server 的完整说明,请阅读本主题的其余部分。
使用轮询请求消息
适配器调用代码中的轮询操作以轮询 SQL Server 数据库。 也就是说,适配器发送了一个轮询请求消息,该消息是通过 IInputChannel 通道接收到的。 轮询请求消息包含 PollingStatement 绑定属性指定的查询的结果集。 可以通过以下两种方式之一处理轮询消息:
若要使用节点值流式处理使用消息,必须在响应消息上调用 WriteBodyContents 方法,并向其传递实现节点值流式处理的 XmlDictionaryWriter 。
若要使用节点流式处理来使用消息,可以在响应消息上调用 GetReaderAtBodyContents 以获取 XmlReader。
关于本主题中使用的示例
本主题中的示例查询 Employee 表。 该示例还使用MOVE_EMP_DATA和ADD_EMP_DETAILS存储过程。 提供的示例中附带了一个脚本,用于生成这些构件。 有关示例的详细信息,请参阅 SQL 适配器的示例。 提供了一个基于本主题的示例 Polling_ChannelModel,还包括在 SQL 适配器示例中。
使用 WCF 通道模型接收用于轮询操作的入站消息
本部分提供有关如何编写 .NET 应用程序(通道模型)以使用 SQL 适配器接收入站轮询消息的说明。
从 SQL 适配器接收查询消息
在 Visual Studio 中创建Microsoft Visual C# 项目。 对于本主题,请创建控制台应用程序。
在解决方案资源管理器中,添加对
Microsoft.Adapters.Sql
、Microsoft.ServiceModel.Channels
、System.ServiceModel
和System.Runtime.Serialization
的引用。打开Program.cs文件并添加以下命名空间:
Microsoft.Adapters.Sql
System.ServiceModel
System.ServiceModel.Description
System.ServiceModel.Channels
System.Xml
指定连接 URI。 有关适配器连接 URI 的详细信息,请参阅 创建 SQL Server 连接 URI。
Uri ConnectionUri = new Uri("mssql://mysqlserver//mydatabase?");
创建 SqlAdapterBinding 的实例,并设置配置轮询所需的绑定属性。 至少必须设置 InboundOperationType、 PolledDataAvailableStatement 和 PollingStatement 绑定属性。 有关用于配置轮询的绑定属性的详细信息,请参阅 支持使用轮询的入站呼叫。
SqlAdapterBinding binding = new SqlAdapterBinding(); binding.InboundOperationType = InboundOperation.Polling; binding.PolledDataAvailableStatement = "SELECT COUNT (*) FROM EMPLOYEE"; binding.PollingStatement = "SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000";
创建绑定参数集合并设置凭据。
ClientCredentials credentials = new ClientCredentials(); credentials.UserName.UserName = "<Enter user name here>"; credentials.UserName.Password = "<Enter password here>"; BindingParameterCollection bindingParams = new BindingParameterCollection(); bindingParams.Add(credentials);
创建通道侦听器并打开它。 通过在 SqlAdapterBinding 上调用 BuildChannelListener<IInputChannel> 方法来创建侦听器。
IChannelListener<IInputChannel> listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams); listener.Open();
通过在侦听器上调用 AcceptChannel 方法并打开它来获取 IInputChannel 通道。
IInputChannel channel = listener.AcceptChannel(); channel.Open();
调用通道上的 Receive ,从适配器获取下一条 POLLINGSTMT 消息。
Message message = channel.Receive();
使用 POLLINGSTMT 操作返回的结果集。 可以使用 XmlReader 或 XmlDictionaryWriter 来处理消息。
XmlReader reader = message.GetReaderAtBodyContents();
完成处理请求后关闭通道。
channel.Close()
重要
在处理完 POLLINGSTMT 操作后,必须关闭通道。 未能关闭通道可能会影响代码的行为。
在接收完数据更改消息后,关闭侦听器。
listener.Close()
重要
关闭侦听器不会关闭使用侦听器创建的通道。 必须显式关闭使用侦听器创建的每个通道。
示例:
以下示例演示执行 Employee 表的轮询查询。 轮询语句执行以下任务:
从 Employee 表中选择所有记录。
执行MOVE_EMP_DATA存储过程,将所有记录从 Employee 表移动到 EmployeeHistory 表。
执行ADD_EMP_DETAILS存储过程,将单个记录添加到 Employee 表。
轮询消息保存在
C:\PollingOutput.xml
.
using System;
using Microsoft.Adapters.Sql;
using System.ServiceModel;
using System.ServiceModel.Description;
using System.ServiceModel.Channels;
using System.Xml;
namespace ConsoleApplication1
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Sample started. This sample will poll 5 times and will perform the following tasks:");
Console.WriteLine("Press any key to start polling...");
Console.ReadLine();
IChannelListener<IInputChannel> listener = null;
IInputChannel channel = null;
try
{
TimeSpan messageTimeout = new TimeSpan(0, 0, 30);
SqlAdapterBinding binding = new SqlAdapterBinding();
binding.InboundOperationType = InboundOperation.Polling;
binding.PolledDataAvailableStatement = "SELECT COUNT (*) FROM EMPLOYEE";
binding.PollingStatement = "SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000";
Uri ConnectionUri = new Uri("mssql://mysqlserver//mydatabase?");
ClientCredentials credentials = new ClientCredentials();
credentials.UserName.UserName = "<Enter user name here>";
credentials.UserName.Password = "<Enter password here>";
BindingParameterCollection bindingParams = new BindingParameterCollection();
bindingParams.Add(credentials);
listener = binding.BuildChannelListener<IInputChannel>(ConnectionUri, bindingParams);
listener.Open();
channel = listener.AcceptChannel();
channel.Open();
Console.WriteLine("Channel and Listener opened...");
Console.WriteLine("\nWaiting for polled data...");
Console.WriteLine("Receive request timeout is {0}", messageTimeout);
// Poll five times with the specified message timeout
// If a timeout occurs polling will be aborted
for (int i = 0; i < 5; i++)
{
Console.WriteLine("Polling: " + i);
Message message = null;
XmlReader reader = null;
try
{
//Message is received so process the results
message = channel.Receive(messageTimeout);
}
catch (System.TimeoutException toEx)
{
Console.WriteLine("\nNo data for request number {0}: {1}", i + 1, toEx.Message);
continue;
}
// Get the query results using an XML reader
try
{
reader = message.GetReaderAtBodyContents();
}
catch (Exception ex)
{
Console.WriteLine("Exception :" + ex);
throw;
}
XmlDocument doc = new XmlDocument();
doc.Load(reader);
using (XmlWriter writer = XmlWriter.Create("C:\\PollingOutput.xml"))
{
doc.WriteTo(writer);
Console.WriteLine("The polling response is saved at 'C:\\PollingOutput.xml'");
}
// return the cursor
Console.WriteLine();
// close the reader
reader.Close();
message.Close();
}
Console.WriteLine("\nPolling done -- hit <RETURN> to finish");
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine("Exception is: " + ex.Message);
if (ex.InnerException != null)
{
Console.WriteLine("Inner Exception is: " + ex.InnerException.Message);
}
}
finally
{
// IMPORTANT: close the channel and listener to stop polling
if (channel != null)
{
if (channel.State == CommunicationState.Opened)
channel.Close();
else
channel.Abort();
}
if (listener != null)
{
if (listener.State == CommunicationState.Opened)
listener.Close();
else
listener.Abort();
}
}
}
}
}