次の方法で共有


WCF チャネル モデルを使用して SQL Server からポーリング ベースのデータ変更メッセージを受信する

SQL Server テーブルまたはビューの定期的なデータ変更メッセージを受信するように SQL アダプターを構成できます。 データベースをポーリングするためにアダプターが実行するポーリング ステートメントを指定できます。 ポーリング ステートメントには、結果セットを返す SELECT ステートメントまたはストアド プロシージャを指定できます。

アダプターがポーリングをサポートする方法の詳細については、「 ポーリングを使用した受信呼び出しのサポート」を参照してください。

Von Bedeutung

1 つのアプリケーションで複数のポーリング操作を行う場合は、接続 URI の一部として InboundID 接続プロパティを指定して一意にする必要があります。 指定した受信 ID が操作名前空間に追加され、一意になります。

このトピックがポーリングを説明する方法

このトピックでは、SQL アダプターがデータ変更メッセージの受信をサポートする方法を示すために、 ポーリング 操作用の .NET アプリケーションを作成します。 このトピックでは、 PolledDataAvailableStatement を次のように指定します。

SELECT COUNT(*) FROM Employee  

PolledDataAvailableStatement は、正の値を含む最初のセルで結果セットを返す必要があります。 最初のセルに正の値が含まれていない場合、アダプターはポーリング ステートメントを実行しません。

ポーリング ステートメントの一部として、次の操作を実行します。

  1. Employee テーブルからすべての行を選択します。

  2. ストアド プロシージャ (MOVE_EMP_DATA) を実行して、すべてのレコードを Employee テーブルから EmployeeHistory テーブルに移動します。

  3. ストアド プロシージャ (ADD_EMP_DETAILS) を実行して、Employee テーブルに新しいレコードを追加します。 この手順では、従業員の名前、指定、給与をパラメーターとして受け取ります。

    これらの操作を実行するには、 PollingStatement バインディング プロパティに次を指定する必要があります。

SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000   

ポーリング ステートメントが実行されると、Employee テーブルのすべてのレコードが選択され、SQL Server からのメッセージが受信されます。 MOVE_EMP_DATA ストアド プロシージャがアダプターによって実行されると、すべてのレコードが EmployeeHistory テーブルに移動されます。 次に、ADD_EMP_DETAILS ストアド プロシージャが実行され、Employee テーブルに新しいレコードが追加されます。 次のポーリング実行では、1 つのレコードのみが返されます。 このサイクルは、チャネル リスナーを閉じるまで続行されます。

SQL アダプターのバインド プロパティを使用したポーリング クエリの構成

次の表は、データ変更メッセージを受信するようにアダプターを構成するために使用する SQL アダプター バインド プロパティをまとめたものです。 これらのバインド プロパティは、ポーリング用の .NET アプリケーションの一部として指定する必要があります。

バインディングプロパティ 説明
InboundOperationType ポーリングTypedPolling、または通知の受信操作を実行するかどうかを指定します。 既定値は ポーリングです
ポーリングデータの利用可能声明 ポーリングに使用できるデータがあるかどうかを判断するためにアダプターが実行する SQL ステートメントを指定します。 SQL ステートメントは、行と列で構成される結果セットを返す必要があります。 行が使用可能な場合にのみ、 PollingStatement バインディング プロパティに指定された SQL ステートメントが実行されます。
PollingIntervalInSeconds SQL アダプターが PolledDataAvailableStatement バインディング プロパティに指定されたステートメントを実行する間隔を秒単位で指定します。 既定値は 30 秒です。 ポーリング間隔は、連続するポーリング間の時間間隔を決定します。 指定した期間内にステートメントが実行された場合、アダプターはその間隔の残りの時間を待機します。
PollingStatement SQL Server データベース テーブルをポーリングする SQL ステートメントを指定します。 ポーリング ステートメントには、単純な SELECT ステートメントまたはストアド プロシージャを指定できます。 既定値は null です。 ポーリングを有効にするには、 PollingStatement の値を指定する必要があります。 ポーリング ステートメントは、 PolledDataAvailableStatement バインディング プロパティによって決定されるポーリングに使用できるデータがある場合にのみ実行されます。 任意の数の SQL ステートメントをセミコロンで区切って指定できます。
PollWhileDataFound ポーリング対象のテーブルでデータが使用可能な場合に、SQL アダプターがポーリング間隔を無視し、 PolledDataAvailableStatement バインディング プロパティに指定された SQL ステートメントを継続的に実行するかどうかを指定します。 テーブルに使用可能なデータがない場合、アダプターは指定されたポーリング間隔で SQL ステートメントを実行するように戻ります。 既定値は falseです。

これらのプロパティの詳細については、BizTalk Adapter for SQL Server アダプターのバインド プロパティに関する情報をご覧ください。 SQL アダプターを使用して SQL Server をポーリングする方法の詳細については、このトピックの残りの部分を参照してください。

ポーリング要求メッセージの処理

アダプターは、SQL Server データベースをポーリングするために、コードに対して ポーリング 操作を呼び出します。 つまり、アダプターは、IInputChannel チャネル図形を介して受信したポーリング要求メッセージを送信します。 ポーリング要求メッセージには、PollingStatement バインディング プロパティで指定されたクエリの結果セットが含まれています。 ポーリング メッセージは、次の 2 つの方法のいずれかで使用できます。

  • ノード値ストリーミングを使用してメッセージを使用するには、応答メッセージで WriteBodyContents メソッドを呼び出し、ノード値ストリーミングを実装する XmlDictionaryWriter を渡す必要があります。

  • ノード ストリーミングを使用してメッセージを使用するには、応答メッセージで GetReaderAtBodyContents を呼び出して XmlReader を取得します。

このトピックで使用する例について

このトピックの例では、Employee テーブルを定期的に照会します。 この例では、MOVE_EMP_DATAストアド プロシージャとADD_EMP_DETAILS ストアド プロシージャも使用します。 これらの成果物を生成するスクリプトは、サンプルと共に提供されます。 サンプルの詳細については、 SQL アダプターのサンプルを参照してください。 このトピックに基づく サンプル Polling_ChannelModelも、SQL アダプターのサンプルと共に提供されています。

WCF チャネル モデルを使用したポーリング操作用の受信メッセージの受信

このセクションでは、SQL アダプターを使用して受信ポーリング メッセージを受信する .NET アプリケーション (チャネル モデル) を記述する方法について説明します。

SQL アダプターからポーリング メッセージを受信するには

  1. Visual Studio で Microsoft Visual C# プロジェクトを作成します。 このトピックでは、コンソール アプリケーションを作成します。

  2. ソリューション エクスプローラーで、 Microsoft.Adapters.SqlMicrosoft.ServiceModel.ChannelsSystem.ServiceModel、および System.Runtime.Serializationへの参照を追加します。

  3. Program.cs ファイルを開き、次の名前空間を追加します。

    • Microsoft.Adapters.Sql

    • System.ServiceModel

    • System.ServiceModel.Description

    • System.ServiceModel.Channels

    • System.Xml

  4. 接続 URI を指定します。 アダプター接続 URI の詳細については、「 SQL Server 接続 URI の作成」を参照してください。

    Uri ConnectionUri = new Uri("mssql://mysqlserver//mydatabase?");  
    
  5. SqlAdapterBinding のインスタンスを作成し、ポーリングを構成するために必要なバインディング プロパティを設定します。 少なくとも、 InboundOperationTypePolledDataAvailableStatement、および PollingStatement バインディング プロパティを設定する必要があります。 ポーリングの構成に使用されるバインディング プロパティの詳細については、「ポーリングを 使用した受信呼び出しのサポート」を参照してください。

    SqlAdapterBinding binding = new SqlAdapterBinding();  
    binding.InboundOperationType = InboundOperation.Polling;  
    binding.PolledDataAvailableStatement = "SELECT COUNT (*) FROM EMPLOYEE";  
    binding.PollingStatement = "SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000";  
    
  6. バインド パラメーター コレクションを作成し、資格情報を設定します。

    ClientCredentials credentials = new ClientCredentials();  
    credentials.UserName.UserName = "<Enter user name here>";  
    credentials.UserName.Password = "<Enter password here>";  
    
    BindingParameterCollection bindingParams = new BindingParameterCollection();  
    bindingParams.Add(credentials);  
    
  7. チャネル リスナーを作成して開きます。 リスナーを作成する場合は、SqlAdapterBindingBuildChannelListener<IInputChannel> メソッドを呼び出します。

    IChannelListener<IInputChannel> listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams);  
    listener.Open();  
    
  8. リスナーで AcceptChannel メソッドを呼び出して IInputChannel チャネルを取得し、開きます。

    IInputChannel channel = listener.AcceptChannel();  
    channel.Open();  
    
  9. チャネルで Receive を呼び出して、アダプターから次の POLLINGSTMT メッセージを取得します。

    Message message = channel.Receive();  
    
  10. POLLINGSTMT 操作によって返される結果セットを使用します。 XmlReader または XmlDictionaryWriter を使用してメッセージを使用できます。

    XmlReader reader = message.GetReaderAtBodyContents();  
    
  11. 要求の処理が完了したら、チャネルを閉じます。

    channel.Close()  
    

    Von Bedeutung

    POLLINGSTMT 操作の処理が完了したら、チャネルを閉じる必要があります。 チャネルを閉じできないと、コードの動作に影響する可能性があります。

  12. データ変更メッセージの受信が完了したら、リスナーを閉じます。

    listener.Close()  
    

    Von Bedeutung

    リスナーを閉じても、リスナーを使用して作成されたチャネルは閉じません。 リスナーを使用して作成された各チャネルを明示的に閉じる必要があります。

次の例は、Employee テーブルを実行するポーリング クエリを示しています。 ポーリング ステートメントは、次のタスクを実行します。

  • Employee テーブルからすべてのレコードを選択します。

  • MOVE_EMP_DATA ストアド プロシージャを実行して、すべてのレコードを Employee テーブルから EmployeeHistory テーブルに移動します。

  • ADD_EMP_DETAILS ストアド プロシージャを実行して、1 つのレコードを Employee テーブルに追加します。

    ポーリング メッセージは、 C:\PollingOutput.xmlに保存されます。

using System;  
using Microsoft.Adapters.Sql;  
using System.ServiceModel;  
using System.ServiceModel.Description;  
using System.ServiceModel.Channels;  

using System.Xml;  

namespace ConsoleApplication1  
{  
    class Program  
    {  
        static void Main(string[] args)  
        {  
            Console.WriteLine("Sample started. This sample will poll 5 times and will perform the following tasks:");  
            Console.WriteLine("Press any key to start polling...");  
            Console.ReadLine();  
            IChannelListener<IInputChannel> listener = null;  

            IInputChannel channel = null;  

            try  
            {  
                TimeSpan messageTimeout = new TimeSpan(0, 0, 30);  

                SqlAdapterBinding binding = new SqlAdapterBinding();  
                binding.InboundOperationType = InboundOperation.Polling;  
                binding.PolledDataAvailableStatement = "SELECT COUNT (*) FROM EMPLOYEE";  
                binding.PollingStatement = "SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000";  

                Uri ConnectionUri = new Uri("mssql://mysqlserver//mydatabase?");  

                ClientCredentials credentials = new ClientCredentials();  
                credentials.UserName.UserName = "<Enter user name here>";  
                credentials.UserName.Password = "<Enter password here>";  

                BindingParameterCollection bindingParams = new BindingParameterCollection();  
                bindingParams.Add(credentials);  

                listener = binding.BuildChannelListener<IInputChannel>(ConnectionUri, bindingParams);  
                listener.Open();  

                channel = listener.AcceptChannel();  
                channel.Open();  

                Console.WriteLine("Channel and Listener opened...");  
                Console.WriteLine("\nWaiting for polled data...");  
                Console.WriteLine("Receive request timeout is {0}", messageTimeout);  

                // Poll five times with the specified message timeout   
                // If a timeout occurs polling will be aborted  
                for (int i = 0; i < 5; i++)  
                {  
                    Console.WriteLine("Polling: " + i);  
                    Message message = null;  
                    XmlReader reader = null;  
                    try  
                    {  
                        //Message is received so process the results  
                        message = channel.Receive(messageTimeout);  
                    }  
                    catch (System.TimeoutException toEx)  
                    {  
                        Console.WriteLine("\nNo data for request number {0}: {1}", i + 1, toEx.Message);  
                        continue;  
                    }  

                    // Get the query results using an XML reader  
                    try  
                    {  
                        reader = message.GetReaderAtBodyContents();  
                    }  
                    catch (Exception ex)  
                    {  
                        Console.WriteLine("Exception :" + ex);  
                        throw;  
                    }  

                    XmlDocument doc = new XmlDocument();  
                    doc.Load(reader);  
                    using (XmlWriter writer = XmlWriter.Create("C:\\PollingOutput.xml"))  
                    {  
                        doc.WriteTo(writer);  
                        Console.WriteLine("The polling response is saved at 'C:\\PollingOutput.xml'");  
                    }  

                    // return the cursor  
                    Console.WriteLine();  

                    // close the reader  
                    reader.Close();  

                    message.Close();  
                }  
                Console.WriteLine("\nPolling done -- hit <RETURN> to finish");  
                Console.ReadLine();  
            }  
            catch (Exception ex)  
            {  
                Console.WriteLine("Exception is: " + ex.Message);  
                if (ex.InnerException != null)  
                {  
                    Console.WriteLine("Inner Exception is: " + ex.InnerException.Message);  
                }  
            }  
            finally  
            {  
                // IMPORTANT: close the channel and listener to stop polling  
                if (channel != null)  
                {  
                    if (channel.State == CommunicationState.Opened)  
                        channel.Close();  
                    else  
                        channel.Abort();  
                }  

                if (listener != null)  
                {  
                    if (listener.State == CommunicationState.Opened)  
                        listener.Close();  
                    else  
                        listener.Abort();  
                }  
            }  
        }  
    }  
}  

こちらもご覧ください

WCF チャネル モデルを使用して SQL アプリケーションを開発する