可以将 SQL 适配器配置为接收 SQL Server 表或视图的定期数据更改消息。 可以指定让适配器执行的轮询语句,以对数据库进行轮询。 轮询语句可以是 SELECT 语句或返回结果集的存储过程。
有关适配器如何支持轮询的详细信息,请参阅 使用 SQL 适配器在 SQL Server 中轮询。
注释
本主题演示如何使用入站轮询操作来处理轮询消息。 轮询操作的信息不是强类型化的。 如果要获取强类型轮询消息,则必须使用TypedPolling操作。 你还必须使用TypedPolling操作,以便在单个应用程序中进行多次轮询操作。 有关如何执行 TypedPolling 操作的说明,请参阅 使用 WCF 服务模型从 SQL Server 接收基于强类型轮询的数据变更消息。
重要
如果要在单个应用程序中执行多个轮询作,则必须将 InboundID 连接属性指定为连接 URI 的一部分,使其唯一。 指定的入站 ID 将添加到操作命名空间,使其唯一。
本主题如何演示轮询
在本主题中,为演示 SQL 适配器如何支持接收数据更改消息,请创建一个 .NET 应用程序,并为 轮询 操作生成 WCF 服务协定。 如果要在生成 WCF 服务协定时指定轮询相关绑定属性,请将 PolledDataAvailableStatement 指定为:
SELECT COUNT(*) FROM Employee
PolledDataAvailableStatement 必须返回包含正值的第一个单元格的结果集。 如果第一个单元格不包含正值,适配器不会执行轮询语句。
作为轮询语句的一部分,执行以下操作:
从 Employee 表中选择所有行。
执行存储过程(MOVE_EMP_DATA),将所有记录从 Employee 表移动到 EmployeeHistory 表。
执行存储过程(ADD_EMP_DETAILS),将新记录添加到 Employee 表。 此过程采用员工名称、指定和工资作为参数。
若要执行这些作,必须为 PollingStatement 绑定属性指定以下内容:
SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000
执行查询语句后,将选择 Employee 表中的所有记录,并接收来自 SQL Server 的消息。 适配器执行MOVE_EMP_DATA存储过程后,所有记录将移动到 EmployeeHistory 表。 然后,执行ADD_EMP_DETAILS存储过程以向 Employee 表添加新记录。 下一次轮询执行将仅返回单个记录。 此周期将继续,直到关闭服务主机。
使用 SQL 适配器的绑定属性来配置轮询查询
下表汇总了用于配置适配器以接收数据更改消息的 SQL 适配器绑定属性。 必须将这些绑定属性指定为用于轮询的 .NET 应用程序的一部分。
Binding 属性 | DESCRIPTION |
---|---|
InboundOperationType | 指定是否要执行 轮询、类型轮询 还是 通知 入站作业。 默认值为 轮询。 |
数据可用性声明 | 指定适配器执行的 SQL 语句,以确定是否有任何数据可用于轮询。 SQL 语句必须返回由行和列组成的结果集。 仅当某行可用时,才会执行 为 PollingStatement 绑定属性指定的 SQL 语句。 |
PollingIntervalInSeconds | 指定 SQL 适配器执行 为 PolledDataAvailableStatement 绑定属性指定的语句的间隔(以秒为单位)。 默认值为 30 秒。 轮询间隔定义了连续轮询之间的时间间隔。 如果语句在指定间隔内执行,适配器将等待间隔中的剩余时间。 |
PollingStatement | 指定要轮询 SQL Server 数据库表的 SQL 语句。 可以为轮询语句指定简单的 SELECT 语句或存储过程。 默认值为 null。 必须为 PollingStatement 指定一个值才能启用轮询。 仅当有数据可用于轮询时,才会执行轮询语句,该数据由 PolledDataAvailableStatement 绑定属性确定。 可以指定用分号分隔的任意数量的 SQL 语句。 |
PollWhileDataFound | 指定 SQL 适配器是否忽略轮询间隔并连续执行 为 PolledDataAvailableStatement 绑定属性指定的 SQL 语句(如果正在轮询的表中的数据可用)。 如果表中没有可用数据,适配器将还原为按指定的轮询间隔执行 SQL 语句。 默认值为 false。 |
有关这些属性的更完整说明,请参阅 有关适用于 SQL Server 适配器绑定属性的 BizTalk 适配器的信息。 有关如何使用 SQL 适配器轮询 SQL Server 的完整说明,请阅读进一步。
在 WCF 服务模型中设置轮询
若要在使用 WCF 服务模型时接收 轮询 操作,必须:
从适配器公开的元数据中为轮询操作生成 WCF 服务协定(接口)。 为此,可以使用 Visual Studio 插件“添加适配器服务引用”。
从此接口实现 WCF 服务。
使用服务主机(System.ServiceModel.ServiceHost)托管此 WCF 服务。
关于本主题中使用的示例
本主题中的示例查询 Employee 表。 该示例还使用MOVE_EMP_DATA和ADD_EMP_DETAILS存储过程。 提供的示例中附带了一个脚本,用于生成这些构件。 有关示例的详细信息,请参阅 SQL 适配器的示例。 提供 SQL 适配器示例时,还附带一个名为 Polling_ServiceModel 的示例,这个示例是基于本主题的。
WCF 服务协定和类
可以使用“添加适配器服务引用插件”来创建用于轮询操作的 WCF 服务契约(接口)和支持类。 有关生成 WCF 服务协定的详细信息,请参阅 为 SQL Server 项目生成 WCF 客户端或 WCF 服务协定。
WCF 服务协定(接口)
以下代码显示了为 轮询 生成的 WCF 服务协定(接口)。
[System.CodeDom.Compiler.GeneratedCodeAttribute("System.ServiceModel", "3.0.0.0")]
[System.ServiceModel.ServiceContractAttribute(Namespace="http://schemas.microsoft.com/Sql/2008/05/", ConfigurationName="PollingOperation")]
public interface PollingOperation {
// CODEGEN: Generating message contract since the wrapper namespace (https://schemas.microsoft.com/Sql/2008/05/Polling/) of message Polling
// does not match the default value (https://schemas.microsoft.com/Sql/2008/05/)
[System.ServiceModel.OperationContractAttribute(IsOneWay=true, Action="Polling")]
[System.ServiceModel.XmlSerializerFormatAttribute()]
void Polling(Polling request);
}
消息合同
消息协定命名空间由连接 URI 中的 InboundID 参数修改(如果指定)。 在此示例中,未在连接 URI 中指定入站 ID。 请求消息返回数据集。
[System.Diagnostics.DebuggerStepThroughAttribute()]
[System.CodeDom.Compiler.GeneratedCodeAttribute("System.ServiceModel", "3.0.0.0")]
[System.ServiceModel.MessageContractAttribute(WrapperName="Polling", WrapperNamespace="http://schemas.microsoft.com/Sql/2008/05/Polling/", IsWrapped=true)]
public partial class Polling {
[System.ServiceModel.MessageBodyMemberAttribute(Namespace="http://schemas.microsoft.com/Sql/2008/05/Polling/", Order=0)]
[System.Xml.Serialization.XmlArrayAttribute(IsNullable=true)]
[System.Xml.Serialization.XmlArrayItemAttribute("DataSet", Namespace="http://schemas.datacontract.org/2004/07/System.Data", IsNullable=false)]
public System.Data.DataSet[] PolledData;
public Polling() {
}
public Polling(System.Data.DataSet[] PolledData) {
this.PolledData = PolledData;
}
}
WCF 服务类
添加适配器服务引用插件还会生成一个文件,该文件包含从服务协定(接口)实现的 WCF 服务类的存根。 文件的名称SqlAdapterBindingService.cs。 可以将逻辑直接插入到这个类中以处理 轮询 操作。 以下代码显示了由“添加适配器服务引用插件”生成的 WCF 服务类。
namespace SqlAdapterBindingNamespace {
public class SqlAdapterBindingService : PollingOperation {
// CODEGEN: Generating message contract since the wrapper namespace (https://schemas.microsoft.com/Sql/2008/05/Polling/) of message Polling
// does not match the default value (https://schemas.microsoft.com/Sql/2008/05/)
public virtual void Polling(Polling request) {
throw new System.NotImplementedException("The method or operation is not implemented.");
}
}
}
接收用于轮询操作的入站消息
本部分介绍如何编写 .NET 应用程序以使用 SQL 适配器接收入站轮询消息。
从 SQL 适配器接收查询消息
使用“添加适配器服务引用插件” 为 轮询 操作生成 WCF 服务协定(接口)和辅助类。 有关详细信息,请参阅 为 SQL Server 项目生成 WCF 客户端或 WCF 服务协定。 可以选择在生成服务协定和帮助程序类时指定绑定属性。 这可以保证在生成的配置文件中正确设置它们。
从步骤 1 中生成的接口和帮助程序类实现 WCF 服务。 如果在处理从轮询操作接收到的数据时遇到错误,此类的轮询方法可能会引发异常以中止轮询事务;否则,该方法不返回任何内容。 必须按如下所示将 WCF 服务类属性化:
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
在 轮询 方法中,可以直接实现应用程序逻辑。 可以在 SqlAdapterBindingService.cs 文件中找到该类。 此示例中的此代码子类为 SqlAdapterBindingService 类。 在此代码中,接收到作为数据集的轮询信息后会将其写入控制台。
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)] public class PollingService : SqlAdapterBindingNamespace.SqlAdapterBindingService { public override void Polling(Polling request) { Console.WriteLine("\nNew Polling Records Received"); Console.WriteLine("*************************************************"); DataSet[] dataArray = request.PolledData; foreach (DataTable tab in dataArray[0].Tables) { foreach (DataRow row in tab.Rows) { for (int i = 0; i < tab.Columns.Count; i++) { Console.WriteLine(row[i]); } } } Console.WriteLine("*************************************************"); Console.WriteLine("\nHit <RETURN> to stop polling"); } }
由于 SQL 适配器不接受凭据作为连接 URI 的一部分,因此必须实现以下类来传递 SQL Server 数据库的凭据。 在应用程序的后期阶段,您将实例化这个类以提供 SQL Server 凭据。
class PollingCredentials : ClientCredentials, IServiceBehavior { public void AddBindingParameters(ServiceDescription serviceDescription, ServiceHostBase serviceHostBase, Collection<ServiceEndpoint> endpoints, BindingParameterCollection bindingParameters) { bindingParameters.Add(this); } public void ApplyDispatchBehavior(ServiceDescription serviceDescription, ServiceHostBase serviceHostBase) { } public void Validate(ServiceDescription serviceDescription, ServiceHostBase serviceHostBase) { } protected override ClientCredentials CloneCore() { ClientCredentials clone = new PollingCredentials(); clone.UserName.UserName = this.UserName.UserName; clone.UserName.Password = this.UserName.Password; return clone; } }
创建 SqlAdapterBinding,然后通过指定绑定属性来配置轮询操作。 可以在代码中显式执行此作,也可以在配置中以声明方式执行此作。 至少必须指定 InboundOperationType、 PolledDataAvailableStatement 和 PollingStatement。
SqlAdapterBinding binding = new SqlAdapterBinding(); binding.InboundOperationType = InboundOperation.Polling; binding.PolledDataAvailableStatement = "SELECT COUNT (*) FROM EMPLOYEE"; binding.PollingStatement = "SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000";
通过实例化在步骤 3 中创建的 PollingCredentials 类来指定 SQL Server 数据库凭据。
PollingCredentials credentials = new PollingCredentials(); credentials.UserName.UserName = "<Enter user name here>"; credentials.UserName.Password = "<Enter password here>";
创建在步骤 2 中创建的 WCF 服务的实例。
// create service instance PollingService service = new PollingService();
使用 WCF 服务和基本连接 URI 创建 System.ServiceModel.ServiceHost 的实例。 基连接 URI 不能包含入站 ID(如果指定)。 还应在此处指定凭据。
// Enable service host Uri[] baseUri = new Uri[] { new Uri("mssql://mysqlserver//mydatabase") }; ServiceHost serviceHost = new ServiceHost(service, baseUri); serviceHost.Description.Behaviors.Add(credentials);
将服务终结点添加到服务主机。 为此,请按以下步骤操作:
使用在步骤 4 中创建的绑定。
指定包含凭据的连接 URI,并根据需要指定入站 ID。
将协定指定为 “PollingOperation”。
// Add service endpoint: be sure to specify PollingOperation as the contract Uri ConnectionUri = new Uri("mssql://mysqlserver//mydatabase?"); serviceHost.AddServiceEndpoint("PollingOperation", binding, ConnectionUri);
若要接收轮询数据,请打开服务主机。 每当查询返回结果集时,适配器都会返回数据。
// Open the service host to begin polling serviceHost.Open();
若要终止轮询,请关闭服务主机。
重要
适配器将继续轮询,直到服务主机关闭。
serviceHost.Close();
示例:
以下示例演示执行 Employee 表的轮询查询。 轮询语句执行以下任务:
从 Employee 表中选择所有记录。
执行MOVE_EMP_DATA存储过程,将所有记录从 Employee 表移动到 EmployeeHistory 表。
执行ADD_EMP_DETAILS存储过程,将单个记录添加到 Employee 表。
第一条轮询消息将包含 "Employee" 表中的所有记录。 后续轮询消息将仅包含ADD_EMP_DETAILS存储过程插入的最后一条记录。 适配器将继续轮询,直到按下
<RETURN>
关闭服务主机。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Microsoft.Adapters.Sql;
using Microsoft.ServiceModel.Channels;
using System.ServiceModel;
using System.ServiceModel.Description;
using System.ServiceModel.Channels;
using System.Collections.ObjectModel;
using System.Data;
namespace Polling_ServiceModel
{
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
public class PollingService : SqlAdapterBindingNamespace.SqlAdapterBindingService
{
public override void Polling(Polling request)
{
Console.WriteLine("\nNew Polling Records Received");
Console.WriteLine("*************************************************");
DataSet[] dataArray = request.PolledData;
foreach (DataTable tab in dataArray[0].Tables)
{
foreach (DataRow row in tab.Rows)
{
for (int i = 0; i < tab.Columns.Count; i++)
{
Console.WriteLine(row[i]);
}
}
}
Console.WriteLine("*************************************************");
Console.WriteLine("\nHit <RETURN> to stop polling");
}
}
class PollingCredentials : ClientCredentials, IServiceBehavior
{
public void AddBindingParameters(ServiceDescription serviceDescription, ServiceHostBase serviceHostBase, Collection<ServiceEndpoint> endpoints, BindingParameterCollection bindingParameters)
{
bindingParameters.Add(this);
}
public void ApplyDispatchBehavior(ServiceDescription serviceDescription, ServiceHostBase serviceHostBase)
{ }
public void Validate(ServiceDescription serviceDescription, ServiceHostBase serviceHostBase)
{ }
protected override ClientCredentials CloneCore()
{
ClientCredentials clone = new PollingCredentials();
clone.UserName.UserName = this.UserName.UserName;
clone.UserName.Password = this.UserName.Password;
return clone;
}
}
class Program
{
static void Main(string[] args)
{
ServiceHost serviceHost = null;
try
{
Console.WriteLine("Sample started...");
Console.WriteLine("Press any key to start polling...");
Console.ReadLine();
SqlAdapterBinding binding = new SqlAdapterBinding();
binding.InboundOperationType = InboundOperation.Polling;
binding.PolledDataAvailableStatement = "SELECT COUNT (*) FROM EMPLOYEE";
binding.PollingStatement = "SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000";
Console.WriteLine("Binding properties assigned...");
// This URI is used to specify the address for the ServiceEndpoint
// It must contain the InboundId (if any) that was used to generate
// the WCF service callback interface
Uri ConnectionUri = new Uri("mssql://mysqlserver//mydatabase?");
// This URI is used to initialize the ServiceHost. It cannot contain
// a query_string (InboundID); otherwise,an exception is thrown when
// the ServiceHost is initialized.
Uri[] baseUri = new Uri[] { new Uri("mssql://mysqlserver//mydatabase") };
PollingCredentials credentials = new PollingCredentials();
credentials.UserName.UserName = "<Enter user name here>";
credentials.UserName.Password = "<Enter password here>";
Console.WriteLine("Opening service host...");
PollingService service = new PollingService();
serviceHost = new ServiceHost(service, baseUri);
serviceHost.Description.Behaviors.Add(credentials);
serviceHost.AddServiceEndpoint("PollingOperation", binding, ConnectionUri);
serviceHost.Open();
Console.WriteLine("Service host opened...");
Console.WriteLine("Polling started...");
Console.ReadLine();
}
catch (Exception e)
{
Console.WriteLine("Exception :" + e.Message);
Console.ReadLine();
/* If there is an error it will be specified in the inner exception */
if (e.InnerException != null)
{
Console.WriteLine("InnerException: " + e.InnerException.Message);
Console.ReadLine();
}
}
finally
{
// IMPORTANT: you must close the ServiceHost to stop polling
if (serviceHost.State == CommunicationState.Opened)
serviceHost.Close();
else
serviceHost.Abort();
}
}
}
}