表示既是可观察序列又是观察者的对象。
Namespace:System.Reactive.Subjects
装配: System.Reactive.dll) 中的 System.Reactive (
语法
'Declaration
Public Interface ISubject(Of In TSource, Out TResult) _
Inherits IObserver(Of TSource), IObservable(Of TResult)
'Usage
Dim instance As ISubject(Of In TSource, Out TResult)
public interface ISubject<in TSource, out TResult> : IObserver<TSource>,
IObservable<TResult>
generic<typename TSource, typename TResult>
public interface class ISubject : IObserver<TSource>,
IObservable<TResult>
type ISubject<'TSource, 'TResult> =
interface
interface IObserver<'TSource>
interface IObservable<'TResult>
end
JScript does not support generic types and methods.
类型参数
ISubject<TSource, TResult> 类型公开以下成员。
方法
名称 | 说明 | |
---|---|---|
![]() |
OnCompleted | (继承自 IObserver<TSource>.) |
![]() |
OnError | (继承自 IObserver<TSource>.) |
![]() |
OnNext | (继承自 IObserver<TSource>.) |
![]() |
订阅 | (继承自 IObservable<TResult>.) |
顶部
扩展方法
顶部
备注
此主题接口使使用者能够灵活地观察一种类型的可观测序列,同时发布另一种类型的可观测序列。
示例
此示例演示如何实现 ISubject<TSource,TResult> ,它观察一种类型的可观测序列,同时发布另一种类型的可观测序列。 此示例中的 AsciiConverterSubject 通过观察 char 类型的序列并发布可观测的 int 序列来演示实现。已发布的可观测序列 int 是它观察到的每个字符值的 ASCII 代码。
using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;
namespace Example
{
class Program
{
static void Main()
{
//****************************************************************************************//
//*** Create an observable sequence of char from console input until enter is pressed. ***//
//****************************************************************************************//
IObservable<char> keySequence = Observable.Create<char>(observer =>
{
bool bContinue = true;
while (bContinue)
{
ConsoleKeyInfo keyInfo = Console.ReadKey(true);
if (keyInfo.Key != ConsoleKey.Enter)
{
observer.OnNext(keyInfo.KeyChar);
}
else
{
observer.OnCompleted();
bContinue = false;
}
}
return (() => { });
});
//****************************************************************************************//
//*** Create an AsciiConverterSubject which takes a source type of char and returns an ***//
//*** observable sequence of int which is the ASCII code for source items of char. ***//
//****************************************************************************************//
AsciiConverterSubject myConverterSubject = new AsciiConverterSubject();
//****************************************************************************************//
//*** Subscribe to the keySequence on the .NET threadpool so the main thread can ***//
//*** create subscriptions to the AsciiConverterSubject ***//
//****************************************************************************************//
IDisposable subscription = keySequence.SubscribeOn(Scheduler.ThreadPool).Subscribe(myConverterSubject);
Console.WriteLine("\nEnter a sequence of keys to have the AsciiConverterSubject\nconvert the keys to their ASCII code values.\n");
Console.WriteLine("Press ENTER to terminate the observable sequence...\n");
//****************************************************************************************//
//*** Subscribe to the AsciiConverterSubject and write the ASCII code values to the ***//
//*** console window. ***//
//*** ***//
//*** The main thread will wait on the completion of the keySequence. It completes ***//
//*** when ENTER is pressed. ***//
//****************************************************************************************//
EventWaitHandle waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
myConverterSubject.Subscribe(c => Console.WriteLine("Ascii Char code {0} entered.",c), () => waitHandle.Set());
waitHandle.WaitOne();
//***************************************//
//*** Explicitly releasing resources. ***//
//***************************************//
subscription.Dispose();
myConverterSubject.Dispose();
}
}
//***********************************************************************************************//
//*** ***//
//*** The AsciiConverterSubject demonstrates an implementation of ISubject<TSource, TResult>. ***//
//*** It is used to subscribe to an observable sequence of char. It publishes an observable ***//
//*** sequence of int which should be the ASCII code for each char value it observes. ***//
//*** ***//
//***********************************************************************************************//
class AsciiConverterSubject : ISubject<char, int>, IDisposable
{
private List<IObserver<int>> observerList;
private bool isDisposed;
private bool isStopped;
object gate = new object();
Exception exception;
public AsciiConverterSubject()
{
observerList = new List<IObserver<int>>();
}
public void OnCompleted()
{
//****************************************************************************************//
//*** Make sure the OnCompleted operation is not preempted by another operation ***//
//*** which would break the expected behavior. For example, don't allow an error from ***//
//*** OnError preempt OnCompleted from anotther thread. Then OnCompleted would follow ***//
//*** an error. That would be an incorrect behavior. ***//
//****************************************************************************************//
lock (gate)
{
CheckDisposed();
if (!isStopped)
{
foreach (IObserver<int> observer in observerList)
{
observer.OnCompleted();
}
observerList.Clear();
isStopped = true;
}
}
}
public void OnError(Exception error)
{
if (error == null)
throw new ArgumentException("Exception error should not be null.");
//****************************************************************************************//
//*** Make sure the OnError operation is not preempted by another operation which ***//
//*** would break the expected behavior. For example, don't allow unsubscribe or an ***//
//*** OnCompleted operation to preempt OnError from another thread. This would result ***//
//*** in an error following completion. That would be an incorrect behavior. ***//
//****************************************************************************************//
lock (gate)
{
CheckDisposed();
if (!isStopped)
{
exception = error;
foreach (IObserver<int> observer in observerList)
{
observer.OnError(error);
}
observerList.Clear();
isStopped = true;
}
}
}
public void OnNext(char value)
{
//****************************************************************************************//
//*** Make sure the OnNext operation is not preempted by another operation which ***//
//*** would break the expected behavior. For example, don't allow unsubscribe, errors ***//
//*** or an OnCompleted operation to preempt OnNext from another thread. This would ***//
//*** have the result of items in a sequence following completion, errors, or ***//
//*** unsubscribe. That would be an incorrect behavior. ***//
//****************************************************************************************//
lock (gate)
{
CheckDisposed();
if (!isStopped)
{
foreach (IObserver<int> observer in observerList)
{
observer.OnNext(Convert.ToInt32(value));
}
}
}
}
public IDisposable Subscribe(IObserver<int> observer)
{
if (observer == null)
throw new ArgumentException("observer should not BehaviorSubject null.");
//****************************************************************************************//
//*** Make sure Subscribe occurs in sync with the other operations so we keep the ***//
//*** correct behavior depending on whether an error has occurred or the observable ***//
//*** sequence has completed. ***//
//****************************************************************************************//
lock (gate)
{
CheckDisposed();
if (!isStopped)
{
observerList.Add(observer);
return new Subscription(observer, this);
}
else if (exception != null)
{
observer.OnError(exception);
return Disposable.Empty;
}
else
{
observer.OnCompleted();
return Disposable.Empty;
}
}
}
private void Unsubscribe(IObserver<int> observer)
{
//****************************************************************************************//
//*** Make sure Unsubscribe occurs in sync with the other operations so we keep the ***//
//*** correct behavior. ***//
//****************************************************************************************//
lock (gate)
{
observerList.Remove(observer);
}
}
public void Dispose()
{
//****************************************************************************************//
//*** Make sure Dispose occurs in sync with the other operations so we keep the ***//
//*** correct behavior. For example, Dispose shouldn't preempt the other operations ***//
//*** changing state variables after they have been checked. ***//
//****************************************************************************************//
lock (gate)
{
observerList.Clear();
isStopped = true;
isDisposed = true;
}
}
private void CheckDisposed()
{
if (isDisposed)
throw new ObjectDisposedException("Subject has been disposed.");
}
//************************************************************************************//
//*** ***//
//*** The Subscription class wraps each observer that creates a subscription. This ***//
//*** is needed to expose an IDisposable interface through which a observer can ***//
//*** cancel the subscription. ***//
//*** ***//
//************************************************************************************//
class Subscription : IDisposable
{
private AsciiConverterSubject subject;
private IObserver<int> observer;
public Subscription(IObserver<int> obs, AsciiConverterSubject sub)
{
subject = sub;
observer = obs;
}
public void Dispose()
{
subject.Unsubscribe(observer);
}
}
}
}
示例代码生成了以下输出。
Enter a sequence of keys to have the AsciiConverterSubject
convert the keys to their ASCII code values.
Press ENTER to terminate the observable sequence...
Ascii Char code 59 entered.
Ascii Char code 108 entered.
Ascii Char code 115 entered.
Ascii Char code 100 entered.
Ascii Char code 107 entered.
Ascii Char code 102 entered.
Ascii Char code 59 entered.
Ascii Char code 108 entered.
Ascii Char code 115 entered.
Ascii Char code 100 entered.
Ascii Char code 107 entered.
Ascii Char code 102 entered.
Ascii Char code 59 entered.
Ascii Char code 108 entered.
Ascii Char code 115 entered.
Ascii Char code 107 entered.
Ascii Char code 100 entered.
Ascii Char code 59 entered.
Ascii Char code 102 entered.
Ascii Char code 108 entered.
Ascii Char code 115 entered.
Ascii Char code 107 entered.
Ascii Char code 100 entered.