使用 WCF 服务模型在 Oracle 数据库中接收基于轮询的数据更改消息

可以将适用于 Oracle 数据库的 Microsoft BizTalk 适配器配置为针对 Oracle 表或视图接收基于轮询的数据更改消息。 若要接收数据更改的消息,适配器会定期对 Oracle 表或视图执行 SQL 查询,然后执行可选的 PL/SQL 代码块。 然后,Oracle 数据库适配器将 SQL 查询的结果作为入站 POLLINGSTMT 作中的强类型结果集返回给应用程序。 有关用于使用 Oracle 数据库适配器配置和执行 Oracle 数据库轮询的机制的详细信息,请参阅 在 Oracle 数据库适配器中接收基于轮询的数据更改消息。 强烈建议在继续之前阅读本主题。

若要在使用 WCF 服务模型时接收 POLLINGSTMT 操作,必须:

  • 从适配器公开的元数据为 POLLINGSTMT 生成一个 WCF 服务协定(接口)。 为此,请使用“添加适配器服务引用 Visual Studio 插件”或“ServiceModel 元数据实用工具”(svcutil.exe)。

  • 从此接口实现 WCF 服务。

  • 使用服务主机(System.ServiceModel.ServiceHost)托管此 WCF 服务。

    本节中的主题提供了信息和过程,可帮助你对 WCF 服务模型中的 Oracle 数据库表和视图执行轮询。

关于本主题中使用的示例

本主题中的示例使用 /SCOTT/ACCOUNTACTIVITY 表和 /SCOTT/Package/ACCOUNT_PKG/PROCESS_ACTIVITY 函数。 用于生成这些工件的脚本随附于 BizTalk 适配器包示例。 有关示例的详细信息,请参阅 适配器示例

在 WCF 服务模型中设置轮询

通过设置绑定属性和可选的连接属性(参数),将 Oracle 数据库适配器配置为对 Oracle 数据库表和视图执行轮询。 其中一些属性是必需的,而有些属性为了生效必须在设计时和运行时同时设置。

  • 在设计时,在连接到 Oracle 数据库以生成 WCF 服务协定时设置连接参数和绑定属性。

  • 在运行时,在用于创建服务主机的 OracleDBBinding 对象上设置绑定属性。 将服务侦听器添加到服务主机时,可以设置连接参数。

    以下列表简要概述了用于配置轮询的绑定属性和连接参数:

  • PollingStatement 绑定属性。 必须在设计时和运行时设置此绑定属性。

  • 可选绑定属性。 这些设置只需在运行时进行配置。

  • AcceptCredentialsInUri 绑定属性。 如果要在连接 URI 中启用凭据,则必须在运行时将此绑定属性设置为 true 。 将服务终结点添加到服务主机时,用户名和密码必须存在于连接 URI 中。

    谨慎

    此示例或指南引用敏感信息,例如连接字符串或用户名和密码。 切勿在代码中硬编码这些值,并确保使用最安全的身份验证来保护机密数据。 有关详细信息,请参阅以下文档:

  • 连接 URI 中的 PollingId 查询字符串参数。 如果您想更改 POLLINGSTMT 操作的命名空间,则必须在设计时和运行时都设置这个连接属性。

    有关用于配置轮询的绑定属性和连接参数的完整说明,请参阅 在 Oracle 数据库适配器中接收基于轮询的数据更改消息

WCF 服务协定和类

可以使用“添加适配器服务引用 Visual Studio 插件”或 ServiceModel 元数据实用工具(svcutil.exe)来创建 POLLINGSTMT 操作的 WCF 服务契约(接口)和支持类。

使用这些工具连接到 Oracle 数据库时,为 POLLINGSTMT 操作生成服务协议:

  • 必须指定 PollingStatement 绑定属性。 适配器使用绑定属性中的 SELECT 语句为 POLLINGSTMT 操作返回的强类型结果集生成正确的元数据。

  • 可以选择在连接 URI 中指定 PollingId 参数。 适配器使用此参数为 POLLINGSTMT作生成命名空间。

    在以下示例中:

  • PollingStatement 设置为“SELECT * FROM ACCOUNTACTIVITY FOR UPDATE”。

  • PollingId 设置为“AcctActivity”。

WCF 服务协定(接口)

以下代码显示了为 POLLINGSTMT 操作生成的 WCF 服务协定(接口)。

[System.CodeDom.Compiler.GeneratedCodeAttribute("System.ServiceModel", "3.0.0.0")]  
[System.ServiceModel.ServiceContractAttribute(Namespace="http://Microsoft.LobServices.OracleDB/2007/03", ConfigurationName="POLLINGSTMT_OperationGroup")]  
public interface POLLINGSTMT_OperationGroup {  
  
    // CODEGEN: Generating message contract since the wrapper namespace (http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMTAcctActivity)  
    // of message POLLINGSTMT does not match the default value (http://Microsoft.LobServices.OracleDB/2007/03)  
    [System.ServiceModel.OperationContractAttribute(IsOneWay=true, Action="http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMT")]  
    void POLLINGSTMT(POLLINGSTMT request);  
}  

消息合同

消息协定命名空间由连接 URI 中的 PollingId 参数修改。 请求消息返回一组强类型记录。

[System.Diagnostics.DebuggerStepThroughAttribute()]  
[System.CodeDom.Compiler.GeneratedCodeAttribute("System.ServiceModel", "3.0.0.0")]  
[System.ServiceModel.MessageContractAttribute(WrapperName="POLLINGSTMT", WrapperNamespace="http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMTAcctActivity", IsWrapped=true)]  
public partial class POLLINGSTMT {  
  
    [System.ServiceModel.MessageBodyMemberAttribute(Namespace="http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMTAcctActivity", Order=0)]  
    public microsoft.lobservices.oracledb._2007._03.POLLINGSTMTAcctActivity.POLLINGSTMTRECORD[] POLLINGSTMTRECORD;  
  
    public POLLINGSTMT() {  
    }  
  
    public POLLINGSTMT(microsoft.lobservices.oracledb._2007._03.POLLINGSTMTAcctActivity.POLLINGSTMTRECORD[] POLLINGSTMTRECORD) {  
        this.POLLINGSTMTRECORD = POLLINGSTMTRECORD;  
    }  
}  

数据协定命名空间

“数据协定”是在服务与客户端之间达成的正式协议,用于以抽象方式描述要交换的数据。 也就是说,为了进行通信,客户端和服务不必共享相同的类型,只有相同的数据协定。

对于数据更改消息,数据协定命名空间也由连接 URI 中的 PollingId 参数(如果指定)修改。 数据契约由一个表示查询结果集中强类型记录的类组成。 此示例中省略了类定义的详细信息。 类包含表示结果集中的列的属性。

在以下示例中,使用 PollingId“AcctActivity”。

namespace microsoft.lobservices.oracledb._2007._03.POLLINGSTMTAcctActivity {  
    using System.Runtime.Serialization;  
  
    [System.Diagnostics.DebuggerStepThroughAttribute()]  
    [System.CodeDom.Compiler.GeneratedCodeAttribute("System.Runtime.Serialization", "3.0.0.0")]  
    [System.Runtime.Serialization.DataContractAttribute(Name="POLLINGSTMTRECORD", Namespace="http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMTAcctActivity")]  
    public partial class POLLINGSTMTRECORD : object, System.Runtime.Serialization.IExtensibleDataObject {…}  
     }  
}  

WCF 服务类

添加适配器服务引用插件还会生成一个文件,该文件包含从服务协定(接口)实现的 WCF 服务类的存根。 文件的名称OracleDBBindingService.cs。 可以将逻辑直接插入到此类中以处理 POLLINGSTMT操作。 如果使用 svcutil.exe 生成服务合同接口,则必须自行实现此类。 以下代码显示了由“添加适配器服务引用插件”生成的 WCF 服务类。

namespace OracleDBBindingNamespace {  
  
    public class OracleDBBindingService : POLLINGSTMT_OperationGroup {  
  
        // CODEGEN: Generating message contract since the wrapper namespace (http://Microsoft.LobServices.OracleDB/2007/03/POLLINGSTMTAcctActivity)   
        // of message POLLINGSTMT does not match the default value (http://Microsoft.LobServices.OracleDB/2007/03)  
        public virtual void POLLINGSTMT(POLLINGSTMT request) {  
            throw new System.NotImplementedException("The method or operation is not implemented.");  
        }  
    }  
}  

接收 POLLINGSTMT操作

从 Oracle 数据库适配器接收轮询数据

  1. 使用“添加适配器服务引用插件”或 svcutil.exe 为 POLLINGSTMT 操作生成 WCF 服务协定(接口)和帮助类。 有关详细信息,请参阅 为 Oracle 数据库解决方案项目生成 WCF 客户端或 WCF 服务协定。 连接到适配器时,必须至少设置 PollingStatement 绑定属性。 可以选择在连接 URI 中指定 PollingId 参数。 如果使用“添加适配器服务引用插件”,则应设置配置所需的所有绑定参数。 这可以保证在生成的配置文件中正确设置它们。

  2. 从步骤 1 中生成的接口和帮助程序类实现 WCF 服务。 如果在处理从 POLLINGSTMT 操作接收的数据时遇到错误,此类的 POLLINGSTMT 方法可能会引发异常来中止轮询事务,否则该方法不返回任何内容。 必须按如下所示将 WCF 服务类属性化:

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]  
    
    1. 如果使用“添加适配器服务引用插件”生成接口,可以直接在生成的 OracleDBBindingService 类的 POLLINGSTMT 方法中实现逻辑。 可以在OracleDBBindingService.cs中找到此类。 此示例中的此代码是 OracleDBBindingService 类的子类。

      [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]  
      
      public class PollingStmtService : OracleDBBindingService  
      {  
          public override void POLLINGSTMT(POLLINGSTMT request)  
          {  
              Console.WriteLine("\nNew Polling Records Received");  
              Console.WriteLine("Tx Id\tAccount\tAmount\tDate\t\t\tDescription");  
              for (int i = 0; i < request.POLLINGSTMTRECORD.Length; i++)  
              {  
                  Console.WriteLine("{0}\t{1}\t{2}\t{3}\t{4}", request.POLLINGSTMTRECORD[i].TID,  
                                      request.POLLINGSTMTRECORD[i].ACCOUNT,  
                                      request.POLLINGSTMTRECORD[i].AMOUNT,  
                                      request.POLLINGSTMTRECORD[i].TRANSDATE,  
                                      request.POLLINGSTMTRECORD[i].DESCRIPTION);  
              }  
          }  
      }  
      
    2. 如果使用 svcutil.exe 生成界面,则必须创建一个 WCF 服务来实现该接口,并在该类的 POLLINGSTMT 方法中实现您的逻辑。

  3. 创建在步骤 2 中创建的 WCF 服务的实例。

    // create service instance  
    PollingStmtService pollingInstance = new PollingStmtService();  
    
  4. 使用 WCF 服务和基本连接 URI 创建 System.ServiceModel.ServiceHost 的实例。 基本连接 URI 不能包含 userinfoparams 或 query_string。

    // Enable service host  
    Uri[] baseUri = new Uri[] { new Uri("oracledb://Adapter") };  
    ServiceHost srvHost = new ServiceHost(pollingInstance, baseUri);  
    
  5. 创建 OracleDBBinding 并通过设置其绑定属性来配置轮询操作。 可以在代码中显式执行此作,也可以在配置中以声明方式执行此作。 至少,你必须指定轮询语句和轮询的时间间隔。 在此示例中,将凭据指定为 URI 的一部分,因此还必须将 AcceptCredentialsInUri 设置为 true

    // Create and configure a binding for the service endpoint. NOTE: binding  
    // parameters are set here for clarity, but these are already set in the  
    // the generated configuration file  
    OracleDBBinding binding = new OracleDBBinding();  
    
    // The credentials are included in the connection URI, so set this property to true  
    binding.AcceptCredentialsInUri = true;  
    
    // Same as statement specified in Configure Adapter dialog box  
    binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE";  
    binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;";  
    
    // Be sure to set the interval long enough to complete processing before  
    // the next poll  
    binding.PollingInterval = 15;  
    // Polling is transactional; be sure to set an adequate isolation level   
    // for your environment  
    binding.TransactionIsolationLevel = TransactionIsolationLevel.ReadCommitted;  
    
  6. 将服务终结点添加到服务主机。 为此,请按以下步骤操作:

    • 使用在步骤 5 中创建的绑定。

    • 指定包含凭据的连接 URI,并根据需要指定 PollingId。

    • 将协定指定为“POLLINGSTMT_OperationGroup”。

    // Add service endpoint: be sure to specify POLLINGSTMT_OperationGroup as the contract  
    Uri serviceUri = new Uri("oracledb://User=SCOTT;Password=TIGER@Adapter?PollingId=AcctActivity");  
    srvHost.AddServiceEndpoint("POLLINGSTMT_OperationGroup", binding, serviceUri);  
    
  7. 若要接收轮询数据,请打开服务主机。 每当查询返回结果集时,适配器都会返回数据。

    // Open the service host to begin polling  
    srvHost.Open();  
    
  8. 若要终止轮询,请关闭服务主机。

    重要

    适配器将继续轮询,直到服务主机关闭。

    srvHost.Close();  
    

示例:

以下示例演示针对 /SCOTT/ACCOUNTACTIVITY 表执行的轮询查询。 轮询后语句调用一个 Oracle 函数,该函数将处理过的记录移到另一个表 /SCOTT/ACCOUNTHISTORY。 修改 POLLINGSTMT 操作的命名空间,可以通过在连接 URI 中将 PollingId 参数设置为“AccountActivity”实现。 在此示例中,POLLLINGSTMT 操作的 WCF 服务是通过对生成的OracleDBBindingService类进行子类化来创建的,但是,也可以直接在生成的类中实现您的逻辑。

using System;  
using System.Collections.Generic;  
using System.Text;  
  
// Add these three references to use the Oracle adapter  
using System.ServiceModel;  
using Microsoft.ServiceModel.Channels;  
using Microsoft.Adapters.OracleDB;  
  
using microsoft.lobservices.oracledb._2007._03.POLLINGSTMTAcctActivity;  
using OracleDBBindingNamespace;  
  
namespace OraclePollingSM  
{  
    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]  
  
    public class PollingStmtService : OracleDBBindingService  
    {  
        public override void POLLINGSTMT(POLLINGSTMT request)  
        {  
            Console.WriteLine("\nNew Polling Records Received");  
            Console.WriteLine("Tx Id\tAccount\tAmount\tDate\t\t\tDescription");  
            for (int i = 0; i < request.POLLINGSTMTRECORD.Length; i++)  
            {  
                Console.WriteLine("{0}\t{1}\t{2}\t{3}\t{4}", request.POLLINGSTMTRECORD[i].TID,  
                                    request.POLLINGSTMTRECORD[i].ACCOUNT,  
                                    request.POLLINGSTMTRECORD[i].AMOUNT,  
                                    request.POLLINGSTMTRECORD[i].TRANSDATE,  
                                    request.POLLINGSTMTRECORD[i].DESCRIPTION);  
  
            }  
            Console.WriteLine("\nHit <RETURN> to stop polling");  
         }  
    }  
  
    class Program  
    {  
        static void Main(string[] args)  
        {  
            ServiceHost srvHost = null;  
  
            // This URI is used to specify the address for the ServiceEndpoint  
            // It must contain credentials and the PollingId (if any) that was used to generate  
            // the WCF service callback interface  
            Uri serviceUri = new Uri("OracleDb://User=SCOTT;Password=TIGER@Adapter?PollingId=AcctActivity");  
  
            // This URI is used to initialize the ServiceHost. It cannot contain  
            // userinfoparms (credentials) or a query_string (PollingId); otherwise,  
            // an exception is thrown when the ServiceHost is initialized.  
            Uri[] baseUri = new Uri[] { new Uri("OracleDb://Adapter") };  
  
            Console.WriteLine("Sample started, initializing service host -- please wait");  
  
            // create an instanc of the WCF service callback class  
            PollingStmtService pollingInstance = new PollingStmtService();  
  
            try  
            {  
                // Create a ServiceHost with the service callback instance and a base URI (address)  
                srvHost = new ServiceHost(pollingInstance, baseUri);  
  
                // Create and configure a binding for the service endpoint. Note: binding  
                // parameters are set here for clarity but these are already set in the  
                // generated configuration file  
                //  
                // The following properties are set  
                //    AcceptCredentialsInUri (true) to enable credentials in the connection URI for AddServiceEndpoint  
                //    PollingStatement  
                //    PostPollStatement calls PROCESS_ACTIVITY on Oracle. This procedure moves the queried records to  
                //                      the ACCOUNTHISTORY table  
                //    PollingInterval (15 seconds)  
                //    TransactionIsolationLevel   
  
                OracleDBBinding binding = new OracleDBBinding();  
  
                // The Credentials are included in the Connection Uri so set this property true  
                binding.AcceptCredentialsInUri = true;  
  
                // Same as statement specified in Configure Adapter dialog box  
                binding.InboundOperationType = InboundOperation.Polling;  
                binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE";  
                binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;";  
  
                // Be sure to set the interval long enough to complete processing before  
                // the next poll  
                binding.PollingInterval = 15;  
  
                // Polling is transactional, be sure to set an adequate isolation level   
                // for your environment  
                binding.TransactionIsolationLevel = TransactionIsolationLevel.ReadCommitted;  
  
                // Add service endpoint: be sure to specify POLLINGSTMT_OperationGroup as the contract  
                srvHost.AddServiceEndpoint("POLLINGSTMT_OperationGroup", binding, serviceUri);  
  
                Console.WriteLine("Opening the service host");  
                // Open the service host to begin polling  
                srvHost.Open();  
  
                // Wait to receive request  
                Console.WriteLine("\nPolling started. Returned records will be written to the console.");  
                Console.WriteLine("Hit <RETURN> to stop polling");  
                Console.ReadLine();  
            }  
            catch (Exception e)  
            {  
                Console.WriteLine("Exception :" + e.Message);  
                Console.ReadLine();  
  
                /* If there is an Oracle Error it will be specified in the inner exception */  
                if (e.InnerException != null)  
                {  
                    Console.WriteLine("InnerException: " + e.InnerException.Message);  
                    Console.ReadLine();  
                }  
            }  
            finally  
            {  
                // IMPORTANT: you must close the ServiceHost to stop polling  
                if (srvHost.State == CommunicationState.Opened)  
                    srvHost.Close();  
                else  
                    srvHost.Abort();  
            }  
  
        }  
    }  
}  

另请参阅

使用 WCF 服务模型开发 Oracle 数据库应用程序