ISubject<TSource、TResult> 接口

表示既是可观察序列又是观察者的对象。

Namespace:System.Reactive.Subjects
装配: System.Reactive.dll) 中的 System.Reactive (

语法

'Declaration
Public Interface ISubject(Of In TSource, Out TResult) _
    Inherits IObserver(Of TSource), IObservable(Of TResult)
'Usage
Dim instance As ISubject(Of In TSource, Out TResult)
public interface ISubject<in TSource, out TResult> : IObserver<TSource>, 
    IObservable<TResult>
generic<typename TSource, typename TResult>
public interface class ISubject : IObserver<TSource>, 
    IObservable<TResult>
type ISubject<'TSource, 'TResult> =  
    interface
        interface IObserver<'TSource>
        interface IObservable<'TResult>
    end
JScript does not support generic types and methods.

类型参数

  • inTSource
    源的类型。

    这是逆变类型参数。 即,可以使用指定的类型,也可以使用派生程度较低的任何类型。 有关协变和逆变的详细信息,请参阅

  • outTResult
    结果的类型。

    这是协变类型参数。 即,可以使用指定的类型,也可以使用派生程度较高的任何类型。 有关协变和逆变的详细信息,请参阅

ISubject<TSource, TResult> 类型公开以下成员。

方法

  名称 说明
Public 方法 OnCompleted (继承自 IObserver<TSource>.)
Public 方法 OnError (继承自 IObserver<TSource>.)
Public 方法 OnNext (继承自 IObserver<TSource>.)
Public 方法 订阅 (继承自 IObservable<TResult>.)

顶部

扩展方法

  名称 说明
公共扩展方法 聚合<TResult> (Func<TResult、TResult、TResult>) 已重载。 对可观测序列应用累加器函数。 由 Observable.) 定义的 (
公共扩展方法 聚合<TResult、TAccumulate> (TAccumulate、Func<TAccumulate、TResult、TAccumulate>) 已重载。 对具有指定种子值的可观测序列应用累加器函数。 由 Observable.) 定义的 (
公共扩展方法 所有<TResult> 确定可观测序列的所有元素是否都满足条件。 由 Observable.) 定义的 (
公共扩展方法 Amb<TResult> 传播可观测序列,该序列首先使用指定的第一个和第二个序列做出反应。 由 Observable.) 定义的 (
公共扩展方法 和<TResult、TRight> 当两个可观测序列都具有可用值时匹配。 由 Observable.) 定义的 (
公共扩展方法 任何<TResult> () 已重载。 确定可观测序列是否包含任何元素。 由 Observable.) 定义的 (
公共扩展方法 任何<TResult> (Func<TResult、布尔>) 已重载。 确定可观测序列的所有元素是否都满足条件。 由 Observable.) 定义的 (
公共扩展方法 AsObservable<TResult> 隐藏可观测序列的标识。 由 Observable.) 定义的 (
公共扩展方法 AsObserver<TSource> 隐藏观察者的标识。 由 Observer.) 定义的 (
公共扩展方法 AsQbservable<TResult> 将可观测序列转换为可查询的可观测序列。 由 Qbservable.) 定义的 (
公共扩展方法 AssertEqual<TResult> Extensions.) 定义的 (
公共扩展方法 Buffer<TResult> (Int32) 已重载。 指示可观测序列的每个元素进入基于元素计数信息生成的连续非重叠缓冲区。 由 Observable.) 定义的 (
公共扩展方法 Buffer<TResult> (TimeSpan) 已重载。 指示可观测序列的每个元素进入基于计时信息生成的连续非重叠缓冲区。 由 Observable.) 定义的 (
公共扩展方法 Buffer<TResult> (Int32、Int32) 已重载。 将可观测序列的每个元素指示为零个或多个缓冲区,这些缓冲区基于元素计数信息生成。 由 Observable.) 定义的 (
公共扩展方法 Buffer<TResult> (TimeSpan、IScheduler) 已重载。 指示可观测序列的每个元素进入基于计时信息生成的连续非重叠缓冲区。 由 Observable.) 定义的 (
公共扩展方法 Buffer<TResult> (TimeSpan、TimeSpan) 已重载。 指示可观测序列的每个元素成零个或多个缓冲区,这些缓冲区基于计时信息生成。 由 Observable.) 定义的 (
公共扩展方法 Buffer<TResult> (TimeSpan、Int32) 已重载。 指示可观测序列的每个元素进入缓冲区,该缓冲区在已满或经过给定时间量时发出。 由 Observable.) 定义的 (
公共扩展方法 Buffer<TResult> (TimeSpan、TimeSpan、IScheduler) 已重载。 指示可观测序列的每个元素成零个或多个缓冲区,这些缓冲区基于计时信息生成。 由 Observable.) 定义的 (
公共扩展方法 Buffer<TResult> (TimeSpan、Int32、IScheduler) 已重载。 指示可观测序列的每个元素进入缓冲区,该缓冲区在已满或经过给定时间量时发出。 由 Observable.) 定义的 (
公共扩展方法 Buffer<TResult、TBufferClosing> (Func<IObservable<TBufferClosing>>) 已重载。 指示可观测序列的每个元素进入连续的非重叠缓冲区。 由 Observable.) 定义的 (
公共扩展方法 Buffer<TResult、TBufferOpening、TBufferClosing> (IObservable<TBufferOpening>、Func<TBufferOpening、IObservable<TBufferClosing>>) 已重载。 指示可查询可观测序列的每个元素进入连续的非重叠缓冲区。 由 Observable.) 定义的 (
公共扩展方法 捕获<TResult> (IObservable<TResult>) 已重载。 继续由异常终止的可观测序列,该序列与下一个可观察序列一起终止。 由 Observable.) 定义的 (
公共扩展方法 Catch<TResult、TException> (Func<TException、IObservable<TResult>>) 已重载。 使用处理程序生成的可观察序列继续由指定类型的异常终止的可观察序列。 由 Observable.) 定义的 (
公共扩展方法 CombineLatest<TResult、TSecond、TResult> 每当其中一个可观测序列生成元素时,使用选择器函数将两个可观测序列合并为一个可观测序列。 由 Observable.) 定义的 (
公共扩展方法 Concat<TResult> 连接两个可观测序列。 由 Observable.) 定义的 (
公共扩展方法 包含<TResult> (TResult) 已重载。 使用默认相等比较器确定可观测序列是否包含指定的元素。 由 Observable.) 定义的 (
公共扩展方法 包含<TResult> (TResult、IEqualityComparer<TResult>) 已重载。 通过使用指定的 System.Collections.Generic.IEqualityComparer&lt;确定可观测序列是否包含指定的元素;T&gt;. 由 Observable.) 定义的 (
公共扩展方法 计数<TResult> 返回一个 Int32 ,表示可观测序列中的元素总数。 由 Observable.) 定义的 (
公共扩展方法 DefaultIfEmpty<TResult> () 已重载。 如果序列为空,则返回指定序列的元素或类型参数在单一实例序列中的默认值。 由 Observable.) 定义的 (
公共扩展方法 DefaultIfEmpty<TResult> (TResult) 已重载。 如果序列为空,则返回指定序列的元素或类型参数在单一实例序列中的默认值。 由 Observable.) 定义的 (
公共扩展方法 延迟<TResult> (TimeSpan) 已重载。 使用指定的源和 dueTime 按到期时间指示可观测序列。 由 Observable.) 定义的 (
公共扩展方法 延迟<TResult> (DateTimeOffset) 已重载。 使用指定的源和 dueTime 按到期时间指示可观测序列。 由 Observable.) 定义的 (
公共扩展方法 延迟<TResult> (TimeSpan、IScheduler) 已重载。 使用指定的源、dueTime 和计划程序指示按到期时间的可观测序列。 由 Observable.) 定义的 (
公共扩展方法 延迟<TResult> (DateTimeOffset、IScheduler) 已重载。 使用指定的源、dueTime 和计划程序指示按到期时间的可观测序列。 由 Observable.) 定义的 (
公共扩展方法 Distinct<TResult> () 已重载。 返回一个可观测序列,该序列仅包含具有指定源的不同元素。 由 Observable.) 定义的 (
公共扩展方法 Distinct<TResult> (IEqualityComparer<TResult>) 已重载。 根据比较器返回仅包含不同元素的可观测序列。 由 Observable.) 定义的 (
公共扩展方法 Distinct<TResult、TKey> (Func<TResult、TKey>) 已重载。 根据 keySelector 返回一个仅包含不同元素的可观察序列。 由 Observable.) 定义的 (
公共扩展方法 Distinct<TResult、TKey> (Func<TResult、TKey>、IEqualityComparer<TKey>) 已重载。 根据 keySelector 返回一个仅包含不同元素的可观察序列。 由 Observable.) 定义的 (
公共扩展方法 DistinctUntilChanged<TResult> () 已重载。 返回一个可观测序列,该序列仅包含具有指定源的不同连续元素。 由 Observable.) 定义的 (
公共扩展方法 DistinctUntilChanged<TResult> (IEqualityComparer<TResult>) 已重载。 返回一个可观测序列,该序列仅包含根据比较器的不同连续元素。 由 Observable.) 定义的 (
公共扩展方法 DistinctUntilChanged<TResult、TKey> (Func<TResult、TKey>) 已重载。 返回一个可观测序列,该序列仅包含根据 keySelector 的不同连续元素。 由 Observable.) 定义的 (
公共扩展方法 DistinctUntilChanged<TResult、TKey> (Func<TResult、TKey>、IEqualityComparer<TKey>) 已重载。 根据 keySelector 和比较器返回一个仅包含不同连续元素的可观测序列。 由 Observable.) 定义的 (
公共扩展方法 Do<TResult> (Action<TResult>) 已重载。 为可观测序列中的每个元素调用一个操作。 由 Observable.) 定义的 (
公共扩展方法 Do<TResult> (IObserver<TResult>) 已重载。 为可观测序列中的每个元素调用一个操作,并在可观察序列异常终止时调用操作。 由 Observable.) 定义的 (
公共扩展方法 Do<TResult> (Action<TResult>、Action) 已重载。 为可观测序列中的每个元素调用一个操作,并在可观测序列正常终止时调用一个操作。 由 Observable.) 定义的 (
公共扩展方法 执行<TResult> (操作<TResult>,操作<异常>) 已重载。 为可观测序列中的每个元素调用一个操作,并在可观察序列异常终止时调用操作。 由 Observable.) 定义的 (
公共扩展方法 执行<TResult> (操作<TResult>、操作<异常>、操作) 已重载。 为可观测序列中的每个元素调用一个操作,并在可观察序列正常或异常终止时调用操作。 由 Observable.) 定义的 (
公共扩展方法 ElementAt<TResult> 返回序列中指定索引处的元素。 由 Observable.) 定义的 (
公共扩展方法 ElementAtOrDefault<TResult> 返回序列中指定索引处的元素;如果索引超出范围,则返回默认值。 由 Observable.) 定义的 (
公共扩展方法 Finally<TResult> 在源可观测序列正常终止或由异常终止后调用指定的操作。 由 Observable.) 定义的 (
公共扩展方法 First<TResult> () 已重载。 返回具有指定源的可观测序列的第一个元素。 由 Observable.) 定义的 (
公共扩展方法 First<TResult> (Func<TResult,boolean>) 已重载。 返回与谓词匹配的可观测序列的第一个元素。 由 Observable.) 定义的 (
公共扩展方法 FirstOrDefault<TResult> () 已重载。 返回可观测序列的第一个元素,如果未找到任何值,则返回默认值。 由 Observable.) 定义的 (
公共扩展方法 FirstOrDefault<TResult> (Func<TResult,布尔>) 已重载。 返回与谓词匹配的可观测序列的第一个元素;如果未找到任何值,则返回默认值。 由 Observable.) 定义的 (
公共扩展方法 Foo<TResult,R> MyExt.) 定义的 (
公共扩展方法 ForEach<TResult> 对可观测序列中的每个元素调用操作,并在序列终止之前阻止。 由 Observable.) 定义的 (
公共扩展方法 GetEnumerator<TResult> 返回枚举可观测序列的所有值的枚举器。 由 Observable.) 定义的 (
公共扩展方法 GroupBy<TResult、TKey> (Func<TResult、TKey>) 已重载。 根据指定的键选择器函数对可观测序列的元素进行分组。 由 Observable.) 定义的 (
公共扩展方法 GroupBy<TResult、TKey> (Func<TResult、TKey>、IEqualityComparer<TKey>) 已重载。 根据指定的键选择器函数和比较器对可观测序列的元素进行分组。 由 Observable.) 定义的 (
公共扩展方法 GroupBy<TResult、TKey、TElement> (Func<TResult、TKey>、Func<TResult、TElement>) 已重载。 对可观测序列的元素进行分组,并使用指定的函数选择生成的元素。 由 Observable.) 定义的 (
公共扩展方法 GroupBy<TResult、TKey、TElement> (Func<TResult、TKey>、Func<TResult、TElement>、IEqualityComparer<TKey>) 已重载。 根据指定的键选择器函数和比较器对可观测序列的元素进行分组,并使用指定的函数选择生成的元素。 由 Observable.) 定义的 (
公共扩展方法 GroupByUntil<TResult、TKey、TDuration> (Func<TResult、TKey>、Func<IGroupedObservable<TKey、TResult>、IObservable<TDuration>>) 已重载。 根据指定的键选择器函数对可观测序列的元素进行分组。 由 Observable.) 定义的 (
公共扩展方法 GroupByUntil<TResult, TKey, TDuration> (Func<TResult, TKey>, Func<IGroupedObservable<TKey, TResult>, IObservable<TDuration>>, IEqualityComparer<TKey>) 已重载。 根据指定的键选择器函数和比较器对可观测序列的元素进行分组。 由 Observable.) 定义的 (
公共扩展方法 GroupByUntil<TResult, TKey, TElement, TDuration> (Func<TResult, TKey>, Func<TResult, TElement>, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>>) 已重载。 根据指定的键选择器函数对可观测序列的元素进行分组,并使用指定的函数选择生成的元素。 由 Observable.) 定义的 (
公共扩展方法 GroupByUntil<TResult, TKey, TElement, TDuration> (Func<TResult, TKey>, Func<TResult, TElement>, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>>, IEqualityComparer<TKey>) 已重载。 根据指定的键选择器函数和比较器对可观测序列的元素进行分组,并使用指定的函数选择生成的元素。 由 Observable.) 定义的 (
公共扩展方法 GroupJoin<TResult、TRight、TLeftDuration、TRightDuration、TResult> 根据重叠持续时间关联两个序列的元素,并将结果分组。 由 Observable.) 定义的 (
公共扩展方法 IgnoreElements<TResult> 忽略可观测序列中的所有值,只保留终止消息。 由 Observable.) 定义的 (
公共扩展方法 Join<TResult、TRight、TLeftDuration、TRightDuration、TResult> 根据重叠持续时间关联两个序列的元素。 由 Observable.) 定义的 (
公共扩展方法 Last<TResult> () 已重载。 返回具有指定源的可观测序列的最后一个元素。 由 Observable.) 定义的 (
公共扩展方法 Last<TResult> (Func<TResult,布尔>) 已重载。 返回与谓词匹配的可观测序列的最后一个元素。 由 Observable.) 定义的 (
公共扩展方法 LastOrDefault<TResult> () 已重载。 返回可观测序列中的最后一个元素,如果未找到值,则返回默认值。 由 Observable.) 定义的 (
公共扩展方法 LastOrDefault<TResult> (Func<TResult,布尔>) 已重载。 返回与谓词匹配的可观察序列的最后一个元素;如果未找到任何值,则返回默认值。 由 Observable.) 定义的 (
公共扩展方法 最新<TResult> 对可观测序列中的最新值进行采样。 由 Observable.) 定义的 (
公共扩展方法 LongCount<TResult> 返回一个 Int64 ,表示可观测序列中的元素总数。 由 Observable.) 定义的 (
公共扩展方法 具体化<TResult> 将可观测序列的隐式通知具体化为显式通知值。 由 Observable.) 定义的 (
公共扩展方法 Max<TResult> () 已重载。 返回可观测序列中的最大元素。 由 Observable.) 定义的 (
公共扩展方法 最大<TResult> (IComparer<TResult>) 已重载。 根据指定的比较器返回可观测序列中的最大值。 由 Observable.) 定义的 (
公共扩展方法 MaxBy<TResult、TKey> (Func<TResult、TKey>) 已重载。 返回具有最大键值的可观测序列中的元素。 由 Observable.) 定义的 (
公共扩展方法 MaxBy<TResult、TKey> (Func<TResult、TKey>、IComparer<TKey>) 已重载。 返回具有最大键值的可观测序列中的元素。 由 Observable.) 定义的 (
公共扩展方法 合并<TResult> (IObservable<TResult>) 已重载。 将可观测序列合并为可观测序列。 由 Observable.) 定义的 (
公共扩展方法 合并<TResult> (IObservable<TResult>、IScheduler) 已重载。 将两个可观测序列合并为单个可观测序列。 由 Observable.) 定义的 (
公共扩展方法 Min<TResult> () 已重载。 返回可观测序列中的最小元素。 由 Observable.) 定义的 (
公共扩展方法 Min<TResult> (IComparer<TResult>) 已重载。 根据指定的比较器返回可观测序列中的最小值。 由 Observable.) 定义的 (
公共扩展方法 MinBy<TResult、TKey> (Func<TResult、TKey>) 已重载。 返回具有最小键值的可观测序列中的元素。 由 Observable.) 定义的 (
公共扩展方法 MinBy<TResult、TKey> (Func<TResult、TKey>、IComparer<TKey>) 已重载。 根据指定的比较器返回具有最小键值的可观测序列中的元素。 由 Observable.) 定义的 (
公共扩展方法 MostRecent<TResult> 对可观测序列中的最新值进行采样。 由 Observable.) 定义的 (
公共扩展方法 多播<TResult、TResult> (ISubject<TResult、TResult>) 已重载。 返回一个可连接的可观察序列,该序列在连接时使源序列将结果推送到指定的主题。 由 Observable.) 定义的 (
公共扩展方法 多播<TResult、TIntermediate、TResult> (Func<ISubject<TResult、TIntermediate>>、Func<IObservable<TIntermediate>、IObservable<TResult>>) 已重载。 返回一个可观测序列,其中包含在选择器函数内多播源序列所生成的序列的元素。 由 Observable.) 定义的 (
公共扩展方法 Next<TResult> 对下一个值采样 (阻塞,而不缓冲) 在可观测序列中。 由 Observable.) 定义的 (
公共扩展方法 ObserveOn<TResult> (SynchronizationContext) 已重载。 在指定的同步上下文中异步通知观察者。 由 Observable.) 定义的 (
公共扩展方法 ObserveOn<TResult> (Control) 已重载。 由 ControlObservable.) 定义的 (
公共扩展方法 ObserveOn<TResult> (Dispatcher) 已重载。 由 DispatcherObservable.) 定义的 (
公共扩展方法 ObserveOn<TResult> (DispatcherScheduler) 已重载。 由 DispatcherObservable.) 定义的 (
公共扩展方法 ObserveOn<TResult> (IScheduler) 已重载。 异步通知指定计划程序上的观察程序。 由 Observable.) 定义的 (
公共扩展方法 ObserveOnDispatcher<TResult> DispatcherObservable.) 定义的 (
公共扩展方法 OnErrorResumeNext<TResult> 继续正常终止的可观测序列,或者与下一个可观测序列发生异常。 由 Observable.) 定义的 (
公共扩展方法 发布<TResult> () 已重载。 返回与基础序列共享单个订阅的可连接可观察序列。 由 Observable.) 定义的 (
公共扩展方法 发布<TResult> (TResult) 已重载。 返回一个可连接的可观察序列,该序列共享与基础序列的单个订阅,并且以 initialValue 开头。 由 Observable.) 定义的 (
公共扩展方法 发布<TResult、TResult> (Func<IObservable<TResult>、IObservable<TResult>>) 已重载。 返回一个可观测序列,该序列是在共享基础序列的单个订阅的可连接可观测序列上调用选择器的结果。 由 Observable.) 定义的 (
公共扩展方法 发布<TResult、TResult> (Func<IObservable<TResult>、IObservable<TResult>>、TResult) 已重载。 返回一个可连接的可观察序列,该序列共享与基础序列的单个订阅,并且以 initialValue 开头。 由 Observable.) 定义的 (
公共扩展方法 PublishLast<TResult> () 已重载。 返回一个可连接的可观察序列,该序列与仅包含最后一个通知的基础序列共享单个订阅。 由 Observable.) 定义的 (
公共扩展方法 PublishLast<TResult、TResult> (Func<IObservable<TResult>、IObservable<TResult>>) 已重载。 返回一个可观测序列,该序列是调用可连接可观测序列上的选择器的结果,该序列共享仅包含最后一个通知的基础序列的单个订阅。 由 Observable.) 定义的 (
公共扩展方法 重复<TResult> () 已重载。 无限期地重复可观测序列。 由 Observable.) 定义的 (
公共扩展方法 重复<TResult> (Int32) 已重载。 无限期地重复可观测序列。 由 Observable.) 定义的 (
公共扩展方法 重播<TResult> () 已重载。 返回一个可连接的可观察序列,该序列与重播所有通知的基础序列共享单个订阅。 由 Observable.) 定义的 (
公共扩展方法 重播<TResult> (TimeSpan) 已重载。 返回一个可连接的可观察序列,该序列与在窗口中重播所有通知的基础序列共享单个订阅。 由 Observable.) 定义的 (
公共扩展方法 重播<TResult> (Int32) 已重载。 返回一个可连接的可观察序列,该序列共享与基础序列重播 bufferSize 通知的单个订阅。 由 Observable.) 定义的 (
公共扩展方法 重播<TResult> (IScheduler) 已重载。 返回一个可连接的可观察序列,该序列与重播所有通知的基础序列共享单个订阅。 由 Observable.) 定义的 (
公共扩展方法 重播<TResult> (TimeSpan、IScheduler) 已重载。 返回一个可连接的可观察序列,该序列与在窗口中重播所有通知的基础序列共享单个订阅。 由 Observable.) 定义的 (
公共扩展方法 重播<TResult> (Int32、IScheduler) 已重载。 返回一个可连接的可观察序列,该序列共享与基础序列重播 bufferSize 通知的单个订阅。 由 Observable.) 定义的 (
公共扩展方法 重播<TResult> (Int32、TimeSpan) 已重载。 返回一个可连接的可观察序列,该序列共享与基础序列的单个订阅,重播窗口中的 bufferSize 通知。 由 Observable.) 定义的 (
公共扩展方法 重播<TResult> (Int32、TimeSpan、IScheduler) 已重载。 返回一个可连接的可观察序列,该序列共享与基础序列的单个订阅,重播窗口中的 bufferSize 通知。 由 Observable.) 定义的 (
公共扩展方法 重播<TResult、TResult> (Func<IObservable<TResult>、IObservable<TResult>>) 已重载。 返回一个可观测序列,该序列是调用可连接可观测序列上的选择器的结果,该序列共享与基础序列的单个订阅并以初始值开头。 由 Observable.) 定义的 (
公共扩展方法 重播<TResult、TResult> (Func<IObservable<TResult>、IObservable<TResult>>、IScheduler) 已重载。 返回一个可观察序列,该序列是在可连接可观测序列上调用选择器的结果,该序列共享与重播所有通知的基础序列共享单个订阅。 由 Observable.) 定义的 (
公共扩展方法 重播<TResult、TResult> (Func<IObservable<TResult>、IObservable<TResult>>、TimeSpan) 已重载。 返回一个可观察序列,该序列是在可连接可观测序列上调用选择器的结果,该序列共享一个订阅到在窗口中重播所有通知的基础序列。 由 Observable.) 定义的 (
公共扩展方法 重播<TResult、TResult> (Func<IObservable<TResult>、IObservable<TResult>>、Int32) 已重载。 返回一个可观察序列,该序列是调用可连接可观察序列上的选择器的结果,该序列共享与基础序列重播 bufferSize 通知的单个订阅。 由 Observable.) 定义的 (
公共扩展方法 重播<TResult、TResult> (Func<IObservable<TResult>、IObservable<TResult>>、TimeSpan、IScheduler) 已重载。 返回一个可观察序列,该序列是在可连接可观测序列上调用选择器的结果,该序列共享一个订阅到在窗口中重播所有通知的基础序列。 由 Observable.) 定义的 (
公共扩展方法 重播<TResult、TResult> (Func<IObservable<TResult>、IObservable<TResult>>、Int32、IScheduler) 已重载。 返回一个可观察序列,该序列是调用可连接可观察序列上的选择器的结果,该序列共享与基础序列重播 bufferSize 通知的单个订阅。 由 Observable.) 定义的 (
公共扩展方法 重播<TResult、TResult> (Func<IObservable<TResult>、IObservable<TResult>>、Int32、TimeSpan) 已重载。 返回一个可观测序列,该序列是调用可连接可观测序列上的选择器的结果,该序列共享与基础序列的单个订阅在窗口中重播 bufferSize 通知。 由 Observable.) 定义的 (
公共扩展方法 重播<TResult、TResult> (Func<IObservable<TResult>、IObservable<TResult>>、Int32、TimeSpan、IScheduler) 已重载。 返回一个可观测序列,该序列是调用可连接可观测序列上的选择器的结果,该序列共享与基础序列的单个订阅在窗口中重播 bufferSize 通知。 由 Observable.) 定义的 (
公共扩展方法 重试<TResult> () 已重载。 重复源可观测序列,直到成功终止。 由 Observable.) 定义的 (
公共扩展方法 重试<TResult> (Int32) 已重载。 重复源可观测序列,直到成功终止。 由 Observable.) 定义的 (
公共扩展方法 示例<TResult> (TimeSpan) 已重载。 按每个间隔对可观测序列进行采样。 由 Observable.) 定义的 (
公共扩展方法 示例<TResult> (TimeSpan、IScheduler) 已重载。 使用指定的源、间隔和计划程序对每个间隔的可观测序列进行采样。 由 Observable.) 定义的 (
公共扩展方法 示例<TResult、TSample> (IObservable<TSample>) 已重载。 使用指定的源和采样器对采样周期处的可观测序列进行采样。 由 Observable.) 定义的 (
公共扩展方法 扫描<TResult> (Func<TResult、TResult、TResult>) 已重载。 对可观测序列应用累加器函数,并使用指定的源和累加器返回每个中间结果。 由 Observable.) 定义的 (
公共扩展方法 扫描<TResult、TAccumulate> (TAccumulate、Func<TAccumulate、TResult、TAccumulate>) 已重载。 对可观测序列应用累加器函数,并使用指定的源、种子和累加器返回每个中间结果。 由 Observable.) 定义的 (
公共扩展方法 依次选择“<TResult”、“TResult> (Func<TResult”、“TResult>) 已重载。 将可观测序列的每个元素投影到具有指定源和选择器的新窗体中。 由 Observable.) 定义的 (
公共扩展方法 依次选择“<TResult”、“TResult> (Func<TResult”、“Int32”、“TResult>) 已重载。 通过将元素的索引与指定的源和选择器合并,将可观测序列的每个元素投影到一个新窗体中。 由 Observable.) 定义的 (
公共扩展方法 SelectMany<TResult、TOther> (IObservable<TOther>) 已重载。 将可观测序列的每个元素投影到可观测序列中,并将生成的可观测序列平展为一个可观测序列。 由 Observable.) 定义的 (
公共扩展方法 SelectMany<TResult、TResult> (Func<TResult、IObservable<TResult>>) 已重载。 将可观测序列的每个元素投影到可观测序列中,并将生成的可观测序列平展为一个可观测序列。 由 Observable.) 定义的 (
公共扩展方法 SelectMany<TResult、TResult> (Func<TResult、IEnumerable<TResult>>) 已重载。 将可观测序列的每个元素投影到可观测序列中,并将生成的可观测序列平展为一个可观测序列。 由 Observable.) 定义的 (
公共扩展方法 SelectMany<TResult、TResult> (Func<TResult、IObservable<TResult>>、Func<Exception、IObservable<TResult>>、Func<IObservable<TResult>>) 已重载。 将可观测序列的每个元素投影到可观测序列中,并将生成的可观测序列平展为一个可观测序列。 由 Observable.) 定义的 (
公共扩展方法 SelectMany<TResult、TCollection、TResult> (Func<TResult、IEnumerable<TCollection>>、Func<TResult、TCollection、TResult>) 已重载。 将可观测序列的每个元素投影到可观测序列中,并将生成的可观测序列平展为一个可观测序列。 由 Observable.) 定义的 (
公共扩展方法 SelectMany<TResult、TCollection、TResult> (Func<TResult、IObservable<TCollection>>、Func<TResult、TCollection、TResult>) 已重载。 将可观测序列的每个元素投影到可观测序列中,并将生成的可观测序列平展为一个可观测序列。 由 Observable.) 定义的 (
公共扩展方法 SequenceEqual<TResult> (IObservable<TResult>) 已重载。 通过成对比较元素来确定两个序列是否相等。 由 Observable.) 定义的 (
公共扩展方法 SequenceEqual<TResult> (IObservable<TResult>、 IEqualityComparer<TResult>) 已重载。 通过使用指定的相等比较器以成对方式比较元素来确定两个序列是否相等。 由 Observable.) 定义的 (
公共扩展方法 Single<TResult> () 已重载。 返回可观测序列的唯一元素,如果可观测序列中没有恰好有一个元素,则引发异常。 由 Observable.) 定义的 (
公共扩展方法 Single<TResult> (Func<TResult,boolean>) 已重载。 返回与谓词匹配的可观测序列中唯一的元素,如果可观测序列中没有恰好一个元素,则引发异常。 由 Observable.) 定义的 (
公共扩展方法 SingleOrDefault<TResult> () 已重载。 返回可观测序列的唯一元素;如果可观察序列为空,则返回默认值。 由 Observable.) 定义的 (
公共扩展方法 SingleOrDefault<TResult> (Func<TResult,布尔>) 已重载。 返回可观测序列中与谓词匹配的唯一元素;如果未找到任何值,则返回默认值。 由 Observable.) 定义的 (
公共扩展方法 跳过<TResult> 绕过可观察序列中的指定数目的值,然后返回剩余的值。 由 Observable.) 定义的 (
公共扩展方法 SkipLast<TResult> 在可观测序列末尾绕过指定数量的元素。 由 Observable.) 定义的 (
公共扩展方法 SkipUntil<TResult、TOther> 仅当另一个可观测序列生成值后,才返回源可观测序列中的值。 由 Observable.) 定义的 (
公共扩展方法 SkipWhile<TResult> (Func<TResult,布尔>) 已重载。 只要指定的条件为 true,就绕过可观察序列中的值,然后返回剩余的值。 由 Observable.) 定义的 (
公共扩展方法 SkipWhile<TResult> (Func<TResult、Int32、布尔>) 已重载。 只要指定的条件为 true,就绕过可观察序列中的值,然后返回剩余的值。 由 Observable.) 定义的 (
公共扩展方法 StartWith<TResult>TResult[]) 已重载。 将值序列附加到具有指定源和值的可观测序列。 由 Observable.) 定义的 (
公共扩展方法 StartWith<TResult> (IScheduler,TResult[]) 已重载。 使用指定的源、计划程序以及值将值序列追加到可观测序列。 由 Observable.) 定义的 (
公共扩展方法 订阅<TResult> () 已重载。 计算具有指定源的可观测序列。 由 ObservableExtensions.) 定义的 (
公共扩展方法 订阅<TResult> (操作<TResult>) 已重载。 将元素处理程序订阅到可观测序列。 由 ObservableExtensions.) 定义的 (
公共扩展方法 订阅<TResult> (操作<TResult>,操作<异常>) 已重载。 将元素处理程序和异常处理程序订阅到可观测序列。 由 ObservableExtensions.) 定义的 (
公共扩展方法 订阅<TResult> (操作<TResult>、操作) 已重载。 将元素处理程序和完成处理程序订阅到可观测序列。 由 ObservableExtensions.) 定义的 (
公共扩展方法 订阅<TResult> (操作<TResult>、操作<异常>、操作) 已重载。 将元素处理程序、异常处理程序和完成处理程序订阅到可观测序列。 由 ObservableExtensions.) 定义的 (
公共扩展方法 SubscribeOn<TResult> (SynchronizationContext) 已重载。 在指定的同步上下文上异步订阅和取消订阅观察程序。 由 Observable.) 定义的 (
公共扩展方法 SubscribeOn<TResult> (控制) 已重载。 由 ControlObservable.) 定义的 (
公共扩展方法 SubscribeOn<TResult> (Dispatcher) 已重载。 由 DispatcherObservable.) 定义的 (
公共扩展方法 SubscribeOn<TResult> (DispatcherScheduler) 已重载。 由 DispatcherObservable.) 定义的 (
公共扩展方法 SubscribeOn<TResult> (IScheduler) 已重载。 异步订阅和取消订阅指定计划程序上的观察程序。 由 Observable.) 定义的 (
公共扩展方法 SubscribeOnDispatcher<TResult> DispatcherObservable.) 定义的 (
公共扩展方法 同步<TResult> () 已重载。 同步可观测序列。 由 Observable.) 定义的 (
公共扩展方法 同步<TResult> (对象) 已重载。 同步可观测序列。 由 Observable.) 定义的 (
公共扩展方法 采用<TResult> 从可观测序列的开头返回指定数量的连续值。 由 Observable.) 定义的 (
公共扩展方法 TakeLast<TResult> 从可观测序列的末尾返回指定数量的连续元素。 由 Observable.) 定义的 (
公共扩展方法 TakeUntil<TResult、TOther> 返回源可观测序列中的值,直到另一个可观测序列生成值。 由 Observable.) 定义的 (
公共扩展方法 TakeWhile<TResult> (Func<TResult、布尔) > 已重载。 只要指定的条件为 true,就从可观测序列中返回值,然后跳过其余值。 由 Observable.) 定义的 (
公共扩展方法 TakeWhile<TResult> (Func<TResult、Int32、Boolean>) 已重载。 只要指定的条件为 true,就从可观测序列中返回值,然后跳过其余值。 由 Observable.) 定义的 (
公共扩展方法 然后<是 TResult、TResult> 当可观测序列具有可用值并投影值时匹配。 由 Observable.) 定义的 (
公共扩展方法 <限制 TResult> (TimeSpan) 已重载。 忽略可观测序列中的值,这些值后跟在指定源和 dueTime 的到期时间前的另一个值。 由 Observable.) 定义的 (
公共扩展方法 Throttle<TResult> (TimeSpan、IScheduler) 已重载。 使用指定的源、dueTime 和计划程序,忽略可观测序列中的值,这些序列后跟另一个值。 由 Observable.) 定义的 (
公共扩展方法 TimeInterval<TResult> () 已重载。 记录具有指定源的可观测序列中连续值之间的时间间隔。 由 Observable.) 定义的 (
公共扩展方法 TimeInterval<TResult> (IScheduler) 已重载。 使用指定的源和计划程序记录可观测序列中连续值之间的时间间隔。 由 Observable.) 定义的 (
公共扩展方法 超时<TResult> (TimeSpan) 已重载。 如果 dueTime 已过,则返回可观测序列或 TimeoutException。 由 Observable.) 定义的 (
公共扩展方法 超时<TResult> (DateTimeOffset) 已重载。 如果 dueTime 已过,则返回可观测序列或 TimeoutException。 由 Observable.) 定义的 (
公共扩展方法 超时<TResult> (TimeSpan、IObservable<TResult>) 已重载。 如果 dueTime 已过,则返回源可观测序列或其他可观测序列。 由 Observable.) 定义的 (
公共扩展方法 Timeout<TResult> (DateTimeOffset、IObservable<TResult>) 已重载。 如果 dueTime 已过,则返回可观测序列或 TimeoutException。 由 Observable.) 定义的 (
公共扩展方法 超时<TResult> (TimeSpan、IScheduler) 已重载。 如果 dueTime 已过,则返回可观测序列或 TimeoutException。 由 Observable.) 定义的 (
公共扩展方法 超时<TResult> (DateTimeOffset、IScheduler) 已重载。 如果 dueTime 已过,则返回可观测序列或 TimeoutException。 由 Observable.) 定义的 (
公共扩展方法 Timeout<TResult> (TimeSpan、IObservable<TResult>、IScheduler) 已重载。 如果 dueTime 已过,则返回源可观测序列或其他可观测序列。 由 Observable.) 定义的 (
公共扩展方法 Timeout<TResult> (DateTimeOffset、IObservable<TResult>、IScheduler) 已重载。 如果 dueTime 已过,则返回源可观测序列或其他可观测序列。 由 Observable.) 定义的 (
公共扩展方法 Timestamp<TResult> () 已重载。 使用指定的源记录可观测序列中每个值的时间戳。 由 Observable.) 定义的 (
公共扩展方法 时间戳<TResult> (IScheduler) 已重载。 使用指定的源和计划程序记录可观测序列中每个值的时间戳。 由 Observable.) 定义的 (
公共扩展方法 ToArray<TResult> 从可观测序列创建数组。 由 Observable.) 定义的 (
公共扩展方法 ToDictionary<TResult、TKey> (Func<TResult、TKey>) 已重载。 根据指定的键选择器函数从可观测序列创建字典。 由 Observable.) 定义的 (
公共扩展方法 ToDictionary<TResult、TKey> (Func<TResult、TKey>、IEqualityComparer<TKey>) 已重载。 根据指定的键选择器函数和比较器从可观测序列创建字典。 由 Observable.) 定义的 (
公共扩展方法 ToDictionary<TResult、TKey、TElement> (Func<TResult、TKey>、Func<TResult、TElement>) 已重载。 根据指定的键选择器函数和元素选择器函数,从可观测序列创建字典。 由 Observable.) 定义的 (
公共扩展方法 ToDictionary<TResult、TKey、TElement> (Func<TResult、TKey>、Func<TResult、TElement>、IEqualityComparer<TKey>) 已重载。 根据指定的键选择器函数、比较器和元素选择器函数,从可观测序列创建字典。 由 Observable.) 定义的 (
公共扩展方法 ToEnumerable<TResult> 将可观测序列转换为可枚举序列。 由 Observable.) 定义的 (
公共扩展方法 ToEvent<TResult> 将可观测序列公开为具有具有指定源的 .NET 事件的对象。 由 Observable.) 定义的 (
公共扩展方法 ToList<TResult> 从可观测序列创建列表。 由 Observable.) 定义的 (
公共扩展方法 ToLookup<TResult、TKey> (Func<TResult、TKey>) 已重载。 根据指定的键选择器函数从可观测序列创建查找。 由 Observable.) 定义的 (
公共扩展方法 ToLookup<TResult、TKey> (Func<TResult、TKey>、IEqualityComparer<TKey>) 已重载。 根据指定的键选择器函数和比较器,从可观测序列创建查找。 由 Observable.) 定义的 (
公共扩展方法 ToLookup<TResult、TKey、TElement> (Func<TResult、TKey>、Func<TResult、TElement>) 已重载。 根据指定的键选择器函数和元素选择器函数,从可观测序列创建查找。 由 Observable.) 定义的 (
公共扩展方法 ToLookup<TResult、TKey、TElement> (Func<TResult、TKey>、Func<TResult、TElement>、IEqualityComparer<TKey>) 已重载。 根据指定的键选择器函数、比较器和元素选择器函数,从可观测序列创建查找。 由 Observable.) 定义的 (
公共扩展方法 ToNotifier<TSource> 从观察者创建通知回调。 由 Observer.) 定义的 (
公共扩展方法 ToTask<TResult> () 已重载。 返回一个任务,其中包含可观测序列的最后一个值。 由 TaskObservableExtensions.) 定义的 (
公共扩展方法 ToTask<TResult> (对象) 已重载。 返回一个任务,其中包含可观测序列的最后一个值。 由 TaskObservableExtensions.) 定义的 (
公共扩展方法 ToTask<TResult> (CancellationToken) 已重载。 返回一个任务,其中包含可观测序列的最后一个值。 由 TaskObservableExtensions.) 定义的 (
公共扩展方法 ToTask<TResult> (CancellationToken、Object) 已重载。 返回一个任务,其中包含可观测序列的最后一个值。 由 TaskObservableExtensions.) 定义的 (
公共扩展方法 其中<TResult> (Func<TResult、布尔>) 已重载。 根据谓词筛选可观测序列的元素。 由 Observable.) 定义的 (
公共扩展方法 其中<TResult> (Func<TResult、Int32、布尔>) 已重载。 通过合并元素的索引,基于谓词筛选可观测序列的元素。 由 Observable.) 定义的 (
公共扩展方法 Window<TResult> (Int32) 已重载。 将可观测序列的每个元素投影到基于元素计数信息生成的连续非重叠窗口。 由 Observable.) 定义的 (
公共扩展方法 Window<TResult> (TimeSpan) 已重载。 将可观测序列的每个元素投影到基于计时信息生成的连续非重叠窗口。 由 Observable.) 定义的 (
公共扩展方法 Window<TResult> (Int32、Int32) 已重载。 将可观测序列的每个元素投影到零个或多个窗口,这些窗口基于元素计数信息生成。 由 Observable.) 定义的 (
公共扩展方法 Window<TResult> (TimeSpan、IScheduler) 已重载。 将可观测序列的每个元素投影到基于计时信息生成的连续非重叠窗口。 由 Observable.) 定义的 (
公共扩展方法 Window<TResult> (TimeSpan、TimeSpan) 已重载。 将可观测序列的每个元素投影到基于计时信息生成的零个或多个窗口。 由 Observable.) 定义的 (
公共扩展方法 Window<TResult> (TimeSpan、Int32) 已重载。 将可观测序列的每个元素投影到一个窗口,该窗口在已满或经过给定时间时完成。 由 Observable.) 定义的 (
公共扩展方法 Window<TResult> (TimeSpan、TimeSpan、IScheduler) 已重载。 将可观测序列的每个元素投影到基于计时信息生成的零个或多个窗口。 由 Observable.) 定义的 (
公共扩展方法 Window<TResult> (TimeSpan、Int32、IScheduler) 已重载。 将可观测序列的每个元素投影到一个窗口,该窗口在已满或经过给定时间时完成。 由 Observable.) 定义的 (
公共扩展方法 Window<TResult、TWindowClosing> (Func<IObservable<TWindowClosing>>) 已重载。 将可观测序列的每个元素投影到连续的非重叠窗口。 由 Observable.) 定义的 (
公共扩展方法 Window<TResult、TWindowOpening、TWindowClosing> (IObservable<TWindowOpening>、Func<TWindowOpening、IObservable<TWindowClosing>>) 已重载。 将可观测序列的每个元素投影到零个或多个窗口中。 由 Observable.) 定义的 (
公共扩展方法 Zip<TResult、TSecond、TResult> (IObservable<TSecond>、Func<TResult、TSecond、TResult>) 已重载。 通过将两个可观测序列以成对方式组合其元素,将两个可观测序列合并为一个可观测序列。 由 Observable.) 定义的 (
公共扩展方法 Zip<TResult、TSecond、TResult> (IEnumerable<TSecond>、Func<TResult、TSecond、TResult>) 已重载。 使用选择器函数将可观测序列和可枚举序列合并到一个可观测序列中。 由 Observable.) 定义的 (

顶部

备注

此主题接口使使用者能够灵活地观察一种类型的可观测序列,同时发布另一种类型的可观测序列。

示例

此示例演示如何实现 ISubject<TSource,TResult> ,它观察一种类型的可观测序列,同时发布另一种类型的可观测序列。 此示例中的 AsciiConverterSubject 通过观察 char 类型的序列并发布可观测的 int 序列来演示实现。已发布的可观测序列 int 是它观察到的每个字符值的 ASCII 代码。

using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;

namespace Example
{
  class Program
  {
    static void Main()
    {
      //****************************************************************************************//
      //*** Create an observable sequence of char from console input until enter is pressed. ***//
      //****************************************************************************************//
      IObservable<char> keySequence = Observable.Create<char>(observer =>
      {
        bool bContinue = true;

        while (bContinue)
        {
          ConsoleKeyInfo keyInfo = Console.ReadKey(true);

          if (keyInfo.Key != ConsoleKey.Enter)
          {
            observer.OnNext(keyInfo.KeyChar);
          }
          else
          {
            observer.OnCompleted();
            bContinue = false;
          }
        }

        return (() => { });
      });


      //****************************************************************************************//
      //*** Create an AsciiConverterSubject which takes a source type of char and returns an ***//
      //*** observable sequence of int which is the ASCII code for source items of char.     ***//
      //****************************************************************************************//

      AsciiConverterSubject myConverterSubject = new AsciiConverterSubject();


      //****************************************************************************************//
      //*** Subscribe to the keySequence on the .NET threadpool so the main thread can       ***//
      //*** create subscriptions to the AsciiConverterSubject                                ***//
      //****************************************************************************************//

      IDisposable subscription = keySequence.SubscribeOn(Scheduler.ThreadPool).Subscribe(myConverterSubject);


      Console.WriteLine("\nEnter a sequence of keys to have the AsciiConverterSubject\nconvert the keys to their ASCII code values.\n"); 
      Console.WriteLine("Press ENTER to terminate the observable sequence...\n");


      //****************************************************************************************//
      //*** Subscribe to the AsciiConverterSubject and write the ASCII code values to the    ***//
      //*** console window.                                                                  ***//
      //***                                                                                  ***//
      //*** The main thread will wait on the completion of the keySequence. It completes     ***//
      //*** when ENTER is pressed.                                                           ***//
      //****************************************************************************************//

      EventWaitHandle waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
      myConverterSubject.Subscribe(c => Console.WriteLine("Ascii Char code {0} entered.",c), () => waitHandle.Set());
      waitHandle.WaitOne();


      //***************************************//
      //*** Explicitly releasing resources. ***//
      //***************************************//
      
      subscription.Dispose();
      myConverterSubject.Dispose();
    }
  }



  //***********************************************************************************************//
  //***                                                                                         ***//
  //*** The AsciiConverterSubject demonstrates an implementation of ISubject<TSource, TResult>. ***//
  //*** It is used to subscribe to an observable sequence of char. It publishes an observable   ***//
  //*** sequence of int which should be the ASCII code for each char value it observes.         ***//
  //***                                                                                         ***//
  //***********************************************************************************************//

  class AsciiConverterSubject : ISubject<char, int>, IDisposable
  {
    private List<IObserver<int>> observerList;
    private bool isDisposed;
    private bool isStopped;
    object gate = new object();
    Exception exception;

    public AsciiConverterSubject()
    {
      observerList = new List<IObserver<int>>();
    }

    public void OnCompleted()
    {
      //****************************************************************************************//
      //*** Make sure the OnCompleted operation is not preempted by another operation        ***//
      //*** which would break the expected behavior.  For example, don't allow an error from ***//
      //*** OnError preempt OnCompleted from anotther thread. Then OnCompleted would follow  ***//
      //*** an error.  That would be an incorrect behavior.                                  ***//
      //****************************************************************************************//

      lock (gate)
      {
        CheckDisposed();

        if (!isStopped)
        {
          foreach (IObserver<int> observer in observerList)
          {
            observer.OnCompleted();
          }

          observerList.Clear();
          isStopped = true;
        }
      }
    }

    public void OnError(Exception error)
    {
      if (error == null)
        throw new ArgumentException("Exception error should not be null.");

      //****************************************************************************************//
      //*** Make sure the OnError operation is not preempted by another operation which      ***//
      //*** would break the expected behavior.  For example, don't allow unsubscribe or an   ***//
      //*** OnCompleted operation to preempt OnError from another thread. This would result  ***//
      //*** in an error following completion.  That would be an incorrect behavior.          ***//
      //****************************************************************************************//

      lock (gate)
      {
        CheckDisposed();

        if (!isStopped)
        {
          exception = error;

          foreach (IObserver<int> observer in observerList)
          {
            observer.OnError(error);
          }

          observerList.Clear();
          isStopped = true;
        }
      }
    }

    public void OnNext(char value)
    {
      //****************************************************************************************//
      //*** Make sure the OnNext operation is not preempted by another operation which       ***//
      //*** would break the expected behavior.  For example, don't allow unsubscribe, errors ***//
      //*** or an OnCompleted operation to preempt OnNext from another thread. This would    ***//
      //*** have the result of items in a sequence following completion, errors, or          ***//
      //*** unsubscribe.  That would be an incorrect behavior.                               ***//
      //****************************************************************************************//

      lock (gate)
      {
        CheckDisposed();

        if (!isStopped)
        {
          foreach (IObserver<int> observer in observerList)
          {
            observer.OnNext(Convert.ToInt32(value));
          }
        }
      }
    }

    public IDisposable Subscribe(IObserver<int> observer)
    {
      if (observer == null)
        throw new ArgumentException("observer should not BehaviorSubject null.");

      //****************************************************************************************//
      //*** Make sure Subscribe occurs in sync with the other operations so we keep the      ***//
      //*** correct behavior depending on whether an error has occurred or the observable    ***//
      //*** sequence has completed.                                                          ***//
      //****************************************************************************************//

      lock (gate)
      {
        CheckDisposed();

        if (!isStopped)
        {
          observerList.Add(observer);
          return new Subscription(observer, this);
        }
        else if (exception != null)
        {
          observer.OnError(exception);
          return Disposable.Empty;
        }
        else
        {
          observer.OnCompleted();
          return Disposable.Empty;
        }
      }
    }

    private void Unsubscribe(IObserver<int> observer)
    {
      //****************************************************************************************//
      //*** Make sure Unsubscribe occurs in sync with the other operations so we keep the    ***//
      //*** correct behavior.                                                                ***//
      //****************************************************************************************//

      lock (gate)
      {
        observerList.Remove(observer);
      }
    }

    public void Dispose()
    {
      //****************************************************************************************//
      //*** Make sure Dispose occurs in sync with the other operations so we keep the        ***//
      //*** correct behavior. For example, Dispose shouldn't preempt the other operations    ***//
      //*** changing state variables after they have been checked.                           ***//
      //****************************************************************************************//

      lock (gate)
      {
        observerList.Clear();
        isStopped = true;
        isDisposed = true;
      }
    }

    private void CheckDisposed()
    {
      if (isDisposed)
        throw new ObjectDisposedException("Subject has been disposed.");
    }


    //************************************************************************************//
    //***                                                                              ***//
    //*** The Subscription class wraps each observer that creates a subscription. This ***//
    //*** is needed to expose an IDisposable interface through which a observer can    ***//
    //*** cancel the subscription.                                                     ***//
    //***                                                                              ***//
    //************************************************************************************//

    class Subscription : IDisposable
    {
      private AsciiConverterSubject subject;
      private IObserver<int> observer;

      public Subscription(IObserver<int> obs, AsciiConverterSubject sub)
      {
        subject = sub;
        observer = obs;
      }

      public void Dispose()
      {
        subject.Unsubscribe(observer);
      }
    }
  }
}

示例代码生成了以下输出。

Enter a sequence of keys to have the AsciiConverterSubject
convert the keys to their ASCII code values.

Press ENTER to terminate the observable sequence...

Ascii Char code 59 entered.
Ascii Char code 108 entered.
Ascii Char code 115 entered.
Ascii Char code 100 entered.
Ascii Char code 107 entered.
Ascii Char code 102 entered.
Ascii Char code 59 entered.
Ascii Char code 108 entered.
Ascii Char code 115 entered.
Ascii Char code 100 entered.
Ascii Char code 107 entered.
Ascii Char code 102 entered.
Ascii Char code 59 entered.
Ascii Char code 108 entered.
Ascii Char code 115 entered.
Ascii Char code 107 entered.
Ascii Char code 100 entered.
Ascii Char code 59 entered.
Ascii Char code 102 entered.
Ascii Char code 108 entered.
Ascii Char code 115 entered.
Ascii Char code 107 entered.
Ascii Char code 100 entered.

另请参阅

参考

System.Reactive.Subjects 命名空间