次の方法で共有


IConnectableObservable<T>.Connect メソッド

監視可能な を接続します。

Namespace:System.Reactive.Subjects
アセンブリ: System.Reactive (System.Reactive.dll)

構文

'Declaration
Function Connect As IDisposable
'Usage
Dim instance As IConnectableObservable
Dim returnValue As IDisposable

returnValue = instance.Connect()
IDisposable Connect()
IDisposable^ Connect()
abstract Connect : unit -> IDisposable 
function Connect() : IDisposable

戻り値

種類: System.IDisposable
監視可能オブジェクトを切断するために使用される IDisposable オブジェクト。

次の例では、Publish 演算子を使用してコールド監視可能なシーケンス ソースをホット ソースに変換します。これは、ホットという 名前の IConnectableObservable<T> インスタンスを返します。 Publish オペレーターは、1 つのサブスクリプションを複数のサブスクライバーにブロードキャストすることで、サブスクリプションを共有するメカニズムを提供します。 ホットはプロキシとして機能し、ソースをサブスクライブし、ソースから値を受け取ると、それらを独自のサブスクライバーにプッシュします。 バッキング ソースへのサブスクリプションを確立し、値の受信を開始するには、IConnectableObservable.Connect() メソッドを使用します。 IConnectableObservable は IObservable を継承するため、Subscribe を使用して、実行を開始する前でもこのホット シーケンスをサブスクライブできます。 この例では、subscription1 がサブスクライブするときにホット シーケンスが開始されていないことに注意してください。 したがって、値はサブスクライバーにプッシュされません。 Connect を呼び出すと、値は subscription1 にプッシュされます。 3 秒の遅延の後、subscription2 はホットにサブスクライブし、現在の位置 (この場合は 3) から最後まですぐに値の受信を開始します。 出力は次のようになります。

Current Time: 6/1/2011 3:38:49 PM

Current Time after 1st subscription: 6/1/2011 3:38:49 PM

Current Time after Connect: 6/1/2011 3:38:52 PM

Observer 1: OnNext: 0

Observer 1: OnNext: 1

Current Time just before 2nd subscription: 6/1/2011 3:38:55 PM 

Observer 1: OnNext: 2

Observer 1: OnNext: 3

Observer 2: OnNext: 3

Observer 1: OnNext: 4

Observer 2: OnNext: 4
       
Console.WriteLine("Current Time: " + DateTime.Now);
var source = Observable.Interval(TimeSpan.FromSeconds(1));   //creates a sequence

IConnectableObservable<long> hot = Observable.Publish<long>(source);  // convert the sequence into a hot sequence

IDisposable subscription1 = hot.Subscribe(     // no value is pushed to 1st subscription at this point
                            x => Console.WriteLine("Observer 1: OnNext: {0}", x),
                            ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
                            () => Console.WriteLine("Observer 1: OnCompleted"));
Console.WriteLine("Current Time after 1st subscription: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
hot.Connect();       // hot is connected to source and starts pushing value to subscribers 
Console.WriteLine("Current Time after Connect: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
Console.WriteLine("Current Time just before 2nd subscription: " + DateTime.Now);
IDisposable subscription2 = hot.Subscribe(     // value will immediately be pushed to 2nd subscription
                            x => Console.WriteLine("Observer 2: OnNext: {0}", x),
                            ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
                            () => Console.WriteLine("Observer 2: OnCompleted"));
Console.ReadKey();

参照

リファレンス

IConnectableObservable<T> インターフェイス

System.Reactive.Subjects 名前空間