次の方法で共有


WCF チャネル モデルを使用して Oracle データベースでポーリング ベースのデータ変更メッセージを受信する

Microsoft BizTalk Adapter for Oracle Database を構成して、Oracle データベース テーブルまたはビューでデータの変更をポーリングできます。 このようなポーリング操作を実行するために、アダプターは Oracle テーブルまたはビューに対して SQL クエリを定期的に実行し、その後にオプションの PL/SQL コード ブロックを実行します。 SQL クエリの結果は、Oracle Database アダプターによって、受信 POLLINGSTMT 操作で厳密に型指定された結果セットとしてコードに返されます。 Oracle データベース アダプターを使用した Oracle データベースのポーリングの構成と実行に使用されるメカニズムの詳細については、「Oracle Database アダプター でのポーリング ベースのデータ変更メッセージの受信」を参照してください。 先に進む前に、このトピックを読んでおくことを強くお勧めします。

Oracle データベース アダプターを構成して、Oracle データベース テーブルまたはビューをポーリングするには、 OracleDBBinding のインスタンスにバインド プロパティを設定します。 WCF チャネル モデルでは、このバインディングを使用してチャネル リスナーを構築します。そこから 、IInputChannel チャネルを取得してアダプターから POLLINGSTMT 操作を受信できます。

WCF で IInputChannel を使用して操作を受信する方法の概要については、「 サービス Channel-Level プログラミング」を参照してください。

このトピックのセクションでは、WCF チャネル モデルを使用して Oracle データベース テーブルとビューのポーリングを実行するのに役立つ情報を提供します。

POLLINGSTMT 要求メッセージの処理

アダプターは、コードに対して POLLINGSTMT 操作を呼び出して Oracle データベースをポーリングします。 つまり、アダプターは、 IInputChannel チャネル図形を介して受信した POLLINGSTMT 要求メッセージを送信します。 POLLINGSTMT 要求メッセージには、 PollingStatement バインディング プロパティで指定されたクエリの結果セットが含まれています。 POLLINGSTMT メッセージは、次の 2 つの方法のいずれかで使用できます。

  • ノード値ストリーミングを使用してメッセージを使用するには、応答メッセージで WriteBodyContents メソッドを呼び出し、ノード値ストリーミングを実装する XmlDictionaryWriter を渡す必要があります。

  • ノード ストリーミングを使用してメッセージを使用するには、応答メッセージで GetReaderAtBodyContents を呼び出して XmlReader を取得します。

    通常、ノード値ストリーミングを使用して、Oracle LOB データ列を含む結果セットを使用します。

    POLLINGSTMT 操作のメッセージ構造の詳細については、「ポーリング操作の メッセージ スキーマ」を参照してください。

    Oracle Database アダプターが LOB データのストリーミングをサポートする方法の詳細については、「 Oracle Database アダプターでのラージ オブジェクト データ型のストリーミング」を参照してください。

    LOB データのエンド ツー エンド ストリーミングをサポートするためにコードにノード値ストリーミングを実装する方法の詳細については、「 WCF チャネル モデルを使用した Oracle データベース LOB データ型のストリーミング」を参照してください。

このトピックで使用する例について

このトピックの例では、SCOTT.ACCOUNTACTIVITY テーブルと SCOTT.ACCOUNT_PKG.PROCESS_ACTIVITY 関数を使用します。 これらの成果物を生成するスクリプトは、サンプルと共に提供されます。 この例では、次の操作を実行します。

  • ポーリング ステートメントの一部として、ACCOUNTACTIVITY テーブルからすべてのレコードを選択し、コンソールに表示します。

  • この例では、post poll ステートメントの一部として、ACCOUNTACTIVITY テーブルから ACTIVITYHISTORY テーブルにすべてのレコードを移動するPROCESS_ACTIVITY関数を呼び出します。

  • その後のACCOUNTACTIVITYテーブルへのポーリングでは、レコードは返されません。 ただし、この例でポーリング操作の一部としてさらに多くのレコードを返す場合は、ACCOUNTACTIVITY テーブルにいくつかのレコードを挿入する必要があります。 これを行うには、サンプルで提供されているmore_activity_data.sql スクリプトを実行します。

    サンプルの詳細については、「 アダプターのサンプル」を参照してください。

IInputChannel を使用して Oracle データベースをポーリングする方法

WCF チャネル モデルを使用してデータ変更メッセージを受信するように Oracle データベース テーブルまたはビューをポーリングするには、次の手順を実行します。

IInputChannel を使用してデータ変更メッセージを受信するには

  1. Visual Studio で Visual C# プロジェクトを作成します。 このトピックでは、コンソール アプリケーションを作成します。

  2. ソリューション エクスプローラーで、 Microsoft.Adapters.OracleDBMicrosoft.ServiceModel.ChannelsSystem.ServiceModel、および System.Runtime.Serializationへの参照を追加します。

  3. Program.cs ファイルを開き、次の名前空間を追加します。

    • Microsoft.Adapters.OracleDB

    • Microsoft.ServiceModel.Channels

    • System.ServiceModel

    • System.ServiceModel.Description

    • System.ServiceModel.Channels

    • System.Xml

    • System.Runtime.Serialization

    • System.IO

    • Microsoft.ServiceModel.Channels.Common

  4. OracleDBBinding のインスタンスを作成し、ポーリングを構成するために必要なバインディング プロパティを設定します。 少なくとも、 InboundOperationTypePollingStatement、および PollingInterval バインディング プロパティを設定する必要があります。 この例では、 PostPollStatement バインド プロパティも設定します。 ポーリングの構成に使用されるバインディング プロパティの詳細については、「 Oracle Database アダプターでのポーリング ベースのデータ変更メッセージの受信」を参照してください。

    OracleDBBinding binding = new OracleDBBinding();  
    binding.InboundOperationType = InboundOperation.Polling;  
    binding.PollingInterval = 30;  
    binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE";  
    binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;"  
    
  5. バインド パラメーター コレクションを作成し、資格情報を設定します。

    ClientCredentials credentials = new ClientCredentials();  
    credentials.UserName.UserName = "SCOTT";  
    credentials.UserName.Password = "TIGER";  
    
    BindingParameterCollection bindingParams = new BindingParameterCollection();  
    bindingParams.Add(credentials);  
    
  6. チャネル リスナーを作成して開きます。 OracleDBBindingBuildChannelListener<IInputChannel> メソッドを呼び出してリスナーを作成します。 接続 URI で PollingId プロパティを設定することで、POLLINGSTMT 操作のターゲット名前空間を変更できます。 アダプター接続 URI の詳細については、「 Oracle データベース接続 URI の作成」を参照してください。

    IChannelListener<IInputChannel> listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams);  
    listener.Open();  
    
  7. リスナーで AcceptChannel メソッドを呼び出して IInputChannel チャネルを取得し、開きます。

    IInputChannel channel = listener.AcceptChannel();  
    channel.Open();  
    
  8. チャネルで Receive を呼び出して、アダプターから次の POLLINGSTMT メッセージを取得します。

    Message message = channel.Receive();  
    
  9. POLLINGSTMT 操作によって返される結果セットを使用します。 XmlReader または XmlDictionaryWriter を使用してメッセージを使用できます。

    XmlReader reader = message.GetReaderAtBodyContents();  
    
  10. 要求の処理が完了したら、チャネルを閉じます。

    channel.Close()  
    

    Von Bedeutung

    POLLINGSTMT 操作の処理が完了したら、チャネルを閉じる必要があります。 チャネルを閉じできないと、コードの動作に影響する可能性があります。

  11. データ変更メッセージの受信が完了したら、リスナーを閉じます。

    listener.Close()  
    

    Von Bedeutung

    リスナーを閉じても、リスナーを使用して作成されたチャネルは閉じません。 リスナーを使用して作成された各チャネルを明示的に閉じる必要があります。

次の例は、Oracle データベース のテーブルとビューをポーリングし、WCF チャネル モデルを使用して POLLLINGSTMT 操作を受信するように Oracle データベース アダプターを構成する方法を示しています。 POLLINGSTMT 操作で返される結果セットは、 XmlReader を使用してコンソールに書き込まれます。

using System;  
using System.Collections.Generic;  
using System.Text;  
  
// Add WCF, WCF LOB Adapter SDK, and Oracle Database adapter namepaces  
using System.ServiceModel;  
using System.ServiceModel.Description;  
using Microsoft.ServiceModel.Channels;  
using Microsoft.Adapters.OracleDB;  
  
// Add this namespace for channel model  
using System.ServiceModel.Channels;  
  
using System.Xml;  
using System.Runtime.Serialization;  
using System.IO;  
  
// Include this namespace for the WCF LOB Adapter SDK and Oracle exceptions  
using Microsoft.ServiceModel.Channels.Common;  
  
namespace OraclePollingCM  
{  
    class Program  
    {  
        static void Main(string[] args)  
        {  
            Uri connectionUri = new Uri("oracleDB://ADAPTER/");  
  
            IChannelListener<IInputChannel> listener = null;  
            IInputChannel channel = null;  
  
            // set timeout to receive POLLINGSTMT message  
            TimeSpan messageTimeout = new TimeSpan(0, 0, 30);  
  
            Console.WriteLine("Sample Started");  
  
            try  
            {  
                // Create a binding: specify the InboundOperationType, PollingInterval (in seconds), the           
                // PollingStatement,and the PostPollStatement.  
                OracleDBBinding binding = new OracleDBBinding();  
                binding.InboundOperationType = InboundOperation.Polling;  
                binding.PollingInterval = 30;  
                binding.PollingStatement = "SELECT * FROM ACCOUNTACTIVITY FOR UPDATE";  
                binding.PostPollStatement = "BEGIN ACCOUNT_PKG.PROCESS_ACTIVITY(); END;";  
  
                // Create a binding parameter collection and set the credentials  
                ClientCredentials credentials = new ClientCredentials();  
                credentials.UserName.UserName = "SCOTT";  
                credentials.UserName.Password = "TIGER";  
  
                BindingParameterCollection bindingParams = new BindingParameterCollection();  
                bindingParams.Add(credentials);  
  
                Console.WriteLine("Opening listener");  
                // get a listener  from the binding  
                listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams);  
                listener.Open();  
  
                Console.WriteLine("Opening channel");  
                // get a channel from the listener  
                channel = listener.AcceptChannel();  
                channel.Open();  
  
                Console.WriteLine("Channel opened -- waiting for polled data");  
                Console.WriteLine("Receive request timeout is {0}", messageTimeout);  
  
                // Poll five times with the specified message timeout   
                // If a timeout occurs polling will be aborted  
                for (int i = 0; i < 5; i++)  
                {  
                    Console.WriteLine("Polling: " + i);  
                    Message message = null;  
                    XmlReader reader = null;  
                    try  
                    {  
                        //Message is received so process the results  
                        message = channel.Receive(messageTimeout);  
                    }  
                    catch (System.TimeoutException toEx)  
                    {  
                        Console.WriteLine("\nNo data for request number {0}: {1}", i + 1, toEx.Message);  
                        continue;  
                    }  
  
                    // Get the query results using an XML reader  
                    try  
                    {  
                        reader = message.GetReaderAtBodyContents();  
                    }  
                    catch (Exception ex)  
                    {  
                        Console.WriteLine("Exception :" + ex);  
                        throw;  
                    }  
  
                    // Write the TID, ACCOUNT, AMOUNT, and TRANSDATE for each record to the Console  
                    Console.WriteLine("\nPolling data received for request number {0}", i+1);  
                    Console.WriteLine("Tx ID\tACCOUNT\tAMOUNT\tTx DATE");  
  
                    while (reader.Read())  
                    {  
                        if (reader.IsStartElement())  
                        {  
                            switch (reader.Name)  
                            {  
                                case "POLLINGSTMTRECORD":  
                                    Console.Write("\n");  
                                    break;  
  
                                case "TID":  
                                    reader.Read();  
                                    Console.Write(reader.ReadString() + "\t");  
                                    break;  
  
                                case "ACCOUNT":  
                                    reader.Read();  
                                    Console.Write(reader.ReadString() + "\t");  
                                    break;  
                                case "AMOUNT":  
                                    reader.Read();  
                                    Console.Write(reader.ReadString() + "\t");  
                                    break;  
  
                                case "TRANSDATE":  
                                    reader.Read();  
                                    Console.Write(reader.ReadString() + "\t");  
                                    break;  
  
                                default:  
                                    break;  
                            }  
                        }  
                    }  
  
                    // return the cursor  
                    Console.WriteLine();  
  
                    // close the reader  
                    reader.Close();  
  
                    //            To save the polling data to a file you can REPLACE the code above with the following  
                    //  
                    //            XmlDocument doc = new XmlDocument();  
                    //            doc.Load(reader);  
                    //            using (XmlWriter writer = XmlWriter.Create("PollingOutput.xml"))  
                    //            {  
                    //                doc.WriteTo(writer);  
                    //            }  
                    message.Close();  
                }  
  
                Console.WriteLine("\nPolling done -- hit <RETURN> to finish");  
                Console.ReadLine();  
            }  
            catch (TargetSystemException tex)  
            {  
                Console.WriteLine("Exception occurred on the Oracle Database");  
                Console.WriteLine(tex.InnerException.Message);  
            }  
            catch (ConnectionException cex)  
            {  
                Console.WriteLine("Exception occurred connecting to the Oracle Database");  
                Console.WriteLine(cex.InnerException.Message);  
            }  
            catch (Exception ex)  
            {  
                Console.WriteLine("Exception is: " + ex.Message);  
                if (ex.InnerException != null)  
                {  
                    Console.WriteLine("Inner Exception is: " + ex.InnerException.Message);  
                }  
            }  
            finally  
            {  
                // IMPORTANT: close the channel and listener to stop polling  
                if (channel != null)  
                {  
                    if (channel.State == CommunicationState.Opened)  
                        channel.Close();  
                    else  
                        channel.Abort();  
                }  
  
                if (listener != null)  
                {  
                    if (listener.State == CommunicationState.Opened)  
                        listener.Close();  
                    else  
                        listener.Abort();  
                }  
            }  
        }  
    }  
}  

こちらもご覧ください

WCF チャネル モデルを使用して Oracle データベース アプリケーションを開発する