ReplaySubject<T> 类

表示既是可观察序列又是观察者的对象。

继承层次结构

System.Object
  System.Reactive.Subjects.ReplaySubject<T>

Namespace:System.Reactive.Subjects
装配: System.Reactive.dll) 中的 System.Reactive (

语法

'Declaration
Public NotInheritable Class ReplaySubject(Of T) _
    Implements ISubject(Of T), ISubject(Of T, T),  _
    IObserver(Of T), IObservable(Of T), IDisposable
'Usage
Dim instance As ReplaySubject(Of T)
public sealed class ReplaySubject<T> : ISubject<T>, 
    ISubject<T, T>, IObserver<T>, IObservable<T>, IDisposable
generic<typename T>
public ref class ReplaySubject sealed : ISubject<T>, 
    ISubject<T, T>, IObserver<T>, IObservable<T>, IDisposable
[<SealedAttribute>]
type ReplaySubject<'T> =  
    class
        interface ISubject<'T>
        interface ISubject<'T, 'T>
        interface IObserver<'T>
        interface IObservable<'T>
        interface IDisposable
    end
JScript does not support generic types and methods.

类型参数

  • T
    类型。

ReplaySubject<T> 类型公开以下成员。

构造函数

  名称 说明
Public method Public method ReplaySubject<T> () 创建可重播的主题。
Public method Public method ReplaySubject<T> (Int32) 使用指定的缓冲区大小初始化 ReplaySubject<T> 类的新实例。
Public method Public method ReplaySubject<T> (TimeSpan) 使用指定的窗口初始化 ReplaySubject<T> 类的新实例。
Public method Public method ReplaySubject<T> (IScheduler) 使用指定的计划程序初始化 ReplaySubject<T> 类的新实例。
Public method Public method ReplaySubject<T> (Int32、IScheduler) 使用指定的缓冲区大小和计划程序初始化 ReplaySubject<T> 类的新实例。
Public method Public method ReplaySubject<T> (Int32、TimeSpan) 使用指定的缓冲区大小和窗口初始化 ReplaySubject<T> 类的新实例。
Public method Public method ReplaySubject<T> (TimeSpan、 IScheduler) 使用指定的窗口和计划程序初始化 ReplaySubject<T> 类的新实例。
Public method Public method ReplaySubject<T> (Int32、TimeSpan、IScheduler) 使用指定的缓冲区大小、窗口和计划程序初始化 ReplaySubject<T> 类的新实例。

顶部

方法

  名称 说明
Public method Public method 释放 释放 ReplaySubject<T> 类的当前实例使用的所有资源,并取消订阅所有观察程序。
Public method Public method 等于 (继承自 Object.)
受保护的方法 完成 (继承自 Object.)
Public method Public method GetHashCode (继承自 Object.)
Public method Public method GetType (继承自 Object.)
受保护的方法 MemberwiseClone (继承自 Object.)
Public method Public method OnCompleted 通知所有订阅的观察者序列的末尾。
Public method Public method OnError 通知所有订阅的观察者例外。
Public method Public method OnNext 使用 值通知所有订阅的观察者。
Public method Public method 订阅 将观察者订阅主题。
Public method Public method ToString (继承自 Object.)

顶部

扩展方法

  名称 说明
公共扩展方法 聚合<T> (Func<T、T、T>) 已重载。 对可观测序列应用累加器函数。 由 Observable.) 定义的 (
公共扩展方法 聚合<T、TAccumulate> (TAccumulate、Func<TAccumulate、T、TAccumulate>) 已重载。 对具有指定种子值的可观测序列应用累加器函数。 由 Observable.) 定义的 (
公共扩展方法 所有<T> 确定可观测序列的所有元素是否都满足某个条件。 由 Observable.) 定义的 (
公共扩展方法 Amb<T> 传播可观测序列,该序列首先使用指定的第一个和第二个序列做出反应。 由 Observable.) 定义的 (
公共扩展方法 和<T、TRight> 当两个可观测序列都有可用值时匹配。 由 Observable.) 定义的 (
公共扩展方法 Any<T> () 已重载。 确定可观测序列是否包含任何元素。 由 Observable.) 定义的 (
公共扩展方法 Any<T> (Func<T、Boolean>) 已重载。 确定可观测序列的所有元素是否都满足某个条件。 由 Observable.) 定义的 (
公共扩展方法 AsObservable<T> 隐藏可观测序列的标识。 由 Observable.) 定义的 (
公共扩展方法 AsObserver<T> 隐藏观察者的标识。 由 Observer.) 定义的 (
公共扩展方法 AsQbservable<T> 将可观测序列转换为可查询的可观测序列。 由 Qbservable.) 定义的 (
公共扩展方法 AssertEqual<T> Extensions.) 定义的 (
公共扩展方法 Buffer<T> (Int32) 已重载。 指示可观测序列的每个元素进入基于元素计数信息生成的连续非重叠缓冲区。 由 Observable.) 定义的 (
公共扩展方法 Buffer<T> (TimeSpan) 已重载。 指示可观测序列的每个元素进入基于计时信息生成的连续非重叠缓冲区。 由 Observable.) 定义的 (
公共扩展方法 Buffer<T> (Int32、Int32) 已重载。 指示可观测序列的每个元素成零个或多个缓冲区,这些缓冲区基于元素计数信息生成。 由 Observable.) 定义的 (
公共扩展方法 Buffer<T> (TimeSpan、IScheduler) 已重载。 指示可观测序列的每个元素进入基于计时信息生成的连续非重叠缓冲区。 由 Observable.) 定义的 (
公共扩展方法 Buffer<T> (TimeSpan、TimeSpan) 已重载。 将可观测序列的每个元素指示为基于计时信息生成的零个或多个缓冲区。 由 Observable.) 定义的 (
公共扩展方法 Buffer<T> (TimeSpan、Int32) 已重载。 指示可观测序列的每个元素在已满或给定时间过后发送到缓冲区中。 由 Observable.) 定义的 (
公共扩展方法 Buffer<T> (TimeSpan、TimeSpan、IScheduler) 已重载。 将可观测序列的每个元素指示为基于计时信息生成的零个或多个缓冲区。 由 Observable.) 定义的 (
公共扩展方法 Buffer<T> (TimeSpan、Int32、IScheduler) 已重载。 指示可观测序列的每个元素在已满或给定时间过后发送到缓冲区中。 由 Observable.) 定义的 (
公共扩展方法 Buffer<T、TBufferClosing> (Func<IObservable<TBufferClosing>>) 已重载。 指示可观测序列的每个元素进入连续的非重叠缓冲区。 由 Observable.) 定义的 (
公共扩展方法 Buffer<T、TBufferOpening、TBufferClosing> (IObservable<TBufferOpening>、Func<TBufferOpening、IObservable<TBufferClosing>>) 已重载。 指示可查询可观测序列的每个元素进入连续的非重叠缓冲区。 由 Observable.) 定义的 (
公共扩展方法 捕获<T> (IObservable<T>) 已重载。 继续一个可观测序列,该序列被异常终止,并带有下一个可观测序列。 由 Observable.) 定义的 (
公共扩展方法 Catch<T、TException> (Func<TException、IObservable<T>>) 已重载。 使用处理程序生成的可观测序列继续由指定类型的异常终止的可观察序列。 由 Observable.) 定义的 (
公共扩展方法 CombineLatest<T、TSecond、TResult> 每当其中一个可观测序列生成元素时,使用选择器函数将两个可观测序列合并为一个可观测序列。 由 Observable.) 定义的 (
公共扩展方法 Concat<T> 连接两个可观测序列。 由 Observable.) 定义的 (
公共扩展方法 包含<T> (T) 已重载。 使用默认相等比较器确定可观测序列是否包含指定的元素。 由 Observable.) 定义的 (
公共扩展方法 包含<T> (T、IEqualityComparer<T>) 已重载。 使用指定的 System.Collections.Generic.IEqualityComparer&lt 确定可观测序列是否包含指定的元素;T&gt;。 由 Observable.) 定义的 (
公共扩展方法 计数<T> 返回一个 Int32 ,它表示可观测序列中的元素总数。 由 Observable.) 定义的 (
公共扩展方法 DefaultIfEmpty<T> () 已重载。 如果序列为空,则返回指定序列的元素或类型参数在单一实例序列中的默认值。 由 Observable.) 定义的 (
公共扩展方法 DefaultIfEmpty<T> (T) 已重载。 如果序列为空,则返回指定序列的元素或类型参数在单一实例序列中的默认值。 由 Observable.) 定义的 (
公共扩展方法 延迟<T> (TimeSpan) 已重载。 使用指定的源和 dueTime 按到期时间指示可观测序列。 由 Observable.) 定义的 (
公共扩展方法 延迟<T> (DateTimeOffset) 已重载。 使用指定的源和 dueTime 按到期时间指示可观测序列。 由 Observable.) 定义的 (
公共扩展方法 延迟<T> (TimeSpan、IScheduler) 已重载。 使用指定的源、dueTime 和计划程序按到期时间指示可观测序列。 由 Observable.) 定义的 (
公共扩展方法 延迟<T> (DateTimeOffset、IScheduler) 已重载。 使用指定的源、dueTime 和计划程序按到期时间指示可观测序列。 由 Observable.) 定义的 (
公共扩展方法 Distinct<T> () 已重载。 返回一个可观测序列,该序列仅包含具有指定源的不同元素。 由 Observable.) 定义的 (
公共扩展方法 Distinct<T> (IEqualityComparer<T>) 已重载。 根据比较器返回仅包含非重复元素的可观测序列。 由 Observable.) 定义的 (
公共扩展方法 Distinct<T、TKey> (Func<T、TKey>) 已重载。 返回一个可观测序列,该序列仅包含根据 keySelector 的不同元素。 由 Observable.) 定义的 (
公共扩展方法 Distinct<T、TKey> (Func<T、TKey>、IEqualityComparer<TKey>) 已重载。 返回一个可观测序列,该序列仅包含根据 keySelector 的不同元素。 由 Observable.) 定义的 (
公共扩展方法 DistinctUntilChanged<T> () 已重载。 返回一个可观测序列,该序列仅包含具有指定源的不同连续元素。 由 Observable.) 定义的 (
公共扩展方法 DistinctUntilChanged<T> (IEqualityComparer<T>) 已重载。 返回一个可观测序列,该序列仅包含根据比较器的不同连续元素。 由 Observable.) 定义的 (
公共扩展方法 DistinctUntilChanged<T、TKey> (Func<T、TKey>) 已重载。 返回一个可观测序列,该序列仅包含根据 keySelector 的不同连续元素。 由 Observable.) 定义的 (
公共扩展方法 DistinctUntilChanged<T、TKey> (Func<T、TKey>、IEqualityComparer<TKey>) 已重载。 根据 keySelector 和比较器返回一个可观测序列,该序列仅包含不同的连续元素。 由 Observable.) 定义的 (
公共扩展方法 T<> (操作<T>) 已重载。 为可观测序列中的每个元素调用一个操作。 由 Observable.) 定义的 (
公共扩展方法 T<> (IObserver<T>) 已重载。 为可观测序列中的每个元素调用一个操作,并在可观测序列异常终止时调用一个操作。 由 Observable.) 定义的 (
公共扩展方法 Do<T> (Action<T>、Action) 已重载。 为可观测序列中的每个元素调用一个操作,并在可观测序列正常终止时调用一个操作。 由 Observable.) 定义的 (
公共扩展方法 执行<T> (操作<T>,操作<异常>) 已重载。 为可观测序列中的每个元素调用一个操作,并在可观测序列异常终止时调用一个操作。 由 Observable.) 定义的 (
公共扩展方法 执行<T> (操作<T>、操作<异常>、操作) 已重载。 为可观测序列中的每个元素调用一个操作,并在可观测序列正常或异常终止时调用操作。 由 Observable.) 定义的 (
公共扩展方法 ElementAt<T> 返回序列中指定索引处的元素。 由 Observable.) 定义的 (
公共扩展方法 ElementAtOrDefault<T> 返回序列中指定索引处的元素;如果索引超出范围,则返回默认值。 由 Observable.) 定义的 (
公共扩展方法 最后<T> 在源可观测序列正常终止或由异常终止后调用指定的操作。 由 Observable.) 定义的 (
公共扩展方法 First<T> () 已重载。 返回具有指定源的可观测序列的第一个元素。 由 Observable.) 定义的 (
公共扩展方法 First<T> (Func<T,Boolean>) 已重载。 返回与谓词匹配的可观测序列的第一个元素。 由 Observable.) 定义的 (
公共扩展方法 FirstOrDefault<T> () 已重载。 返回可观测序列的第一个元素,如果未找到任何值,则返回默认值。 由 Observable.) 定义的 (
公共扩展方法 FirstOrDefault<T> (Func<T,boolean>) 已重载。 返回与谓词匹配的可观测序列的第一个元素;如果未找到任何值,则返回默认值。 由 Observable.) 定义的 (
公共扩展方法 Foo<T、R> MyExt.) 定义的 (
公共扩展方法 ForEach<T> 为可观测序列中的每个元素调用一个操作,并在序列终止之前阻止。 由 Observable.) 定义的 (
公共扩展方法 GetEnumerator<T> 返回枚举器,该枚举器枚举可观测序列的所有值。 由 Observable.) 定义的 (
公共扩展方法 GroupBy<T、TKey> (Func<T、TKey>) 已重载。 根据指定的键选择器函数对可观测序列的元素进行分组。 由 Observable.) 定义的 (
公共扩展方法 GroupBy<T、TKey> (Func<T、TKey>、IEqualityComparer<TKey>) 已重载。 根据指定的键选择器函数和比较器对可观测序列的元素进行分组。 由 Observable.) 定义的 (
公共扩展方法 GroupBy<T、TKey、TElement> (Func<T、TKey>、Func<T、TElement>) 已重载。 对可观测序列的元素进行分组,并使用指定的函数选择生成的元素。 由 Observable.) 定义的 (
公共扩展方法 GroupBy<T、TKey、TElement> (Func<T、TKey>、Func<T、TElement>、IEqualityComparer<TKey>) 已重载。 根据指定的键选择器函数和比较器对可观测序列的元素进行分组,并使用指定的函数选择生成的元素。 由 Observable.) 定义的 (
公共扩展方法 GroupByUntil<T、TKey、TDuration> (Func<T、TKey>、Func<IGroupedObservable<TKey、T>、IObservable<TDuration>>) 已重载。 根据指定的键选择器函数对可观测序列的元素进行分组。 由 Observable.) 定义的 (
公共扩展方法 GroupByUntil<T、TKey、TDuration> (Func<T、TKey>、Func<IGroupedObservable<TKey、T>、IObservable<TDuration>>、IEqualityComparer<TKey>) 已重载。 根据指定的键选择器函数和比较器对可观测序列的元素进行分组。 由 Observable.) 定义的 (
公共扩展方法 GroupByUntil<T、TKey、TElement、TDuration> (Func<T、TKey>、Func<T、TElement>、Func<IGroupedObservable<TKey、TElement>、IObservable<TDuration>>) 已重载。 根据指定的键选择器函数对可观测序列的元素进行分组,并使用指定的函数选择生成的元素。 由 Observable.) 定义的 (
公共扩展方法 GroupByUntil<T、TKey、TElement、TDuration> (Func<T、TKey>、Func<T、TElement>、Func<IGroupedObservable<TKey、TElement>、IObservable<TDuration>>、IEqualityComparer<TKey>) 已重载。 根据指定的键选择器函数和比较器对可观测序列的元素进行分组,并使用指定的函数选择生成的元素。 由 Observable.) 定义的 (
公共扩展方法 GroupJoin<T、TRight、TLeftDuration、TRightDuration、TResult> 根据重叠持续时间关联两个序列的元素,并将结果分组。 由 Observable.) 定义的 (
公共扩展方法 IgnoreElements<T> 忽略可观测序列中的所有值,只保留终止消息。 由 Observable.) 定义的 (
公共扩展方法 Join<T、TRight、TLeftDuration、TRightDuration、TResult> 根据重叠持续时间关联两个序列的元素。 由 Observable.) 定义的 (
公共扩展方法 Last<T> () 已重载。 返回具有指定源的可观测序列的最后一个元素。 由 Observable.) 定义的 (
公共扩展方法 Last<T> (Func<T,布尔>) 已重载。 返回与谓词匹配的可观测序列的最后一个元素。 由 Observable.) 定义的 (
公共扩展方法 LastOrDefault<T> () 已重载。 返回可观测序列中的最后一个元素,如果未找到值,则返回默认值。 由 Observable.) 定义的 (
公共扩展方法 LastOrDefault<T> (Func<T,布尔>) 已重载。 返回与谓词匹配的可观察序列的最后一个元素;如果未找到任何值,则返回默认值。 由 Observable.) 定义的 (
公共扩展方法 最新<T> 对可观测序列中的最新值进行采样。 由 Observable.) 定义的 (
公共扩展方法 LongCount<T> 返回一个 Int64 ,表示可观测序列中的元素总数。 由 Observable.) 定义的 (
公共扩展方法 具体化<T> 将可观测序列的隐式通知具体化为显式通知值。 由 Observable.) 定义的 (
公共扩展方法 Max<T> () 已重载。 返回可观测序列中的最大元素。 由 Observable.) 定义的 (
公共扩展方法 最大<T> (IComparer<T>) 已重载。 根据指定的比较器返回可观测序列中的最大值。 由 Observable.) 定义的 (
公共扩展方法 MaxBy<T、TKey> (Func<T、TKey>) 已重载。 返回具有最大键值的可观测序列中的元素。 由 Observable.) 定义的 (
公共扩展方法 MaxBy<T、TKey> (Func<T、TKey>、IComparer<TKey>) 已重载。 返回具有最大键值的可观测序列中的元素。 由 Observable.) 定义的 (
公共扩展方法 合并<T> (IObservable<T>) 已重载。 将可观测序列的可观测序列合并到可观测序列中。 由 Observable.) 定义的 (
公共扩展方法 合并<T> (IObservable<T>、IScheduler) 已重载。 将两个可观测序列合并为单个可观测序列。 由 Observable.) 定义的 (
公共扩展方法 最小值<T> () 已重载。 返回可观测序列中的最小元素。 由 Observable.) 定义的 (
公共扩展方法 Min<T> (IComparer<T>) 已重载。 根据指定的比较器返回可观测序列中的最小值。 由 Observable.) 定义的 (
公共扩展方法 MinBy<T、TKey> (Func<T、TKey>) 已重载。 返回具有最小键值的可观测序列中的元素。 由 Observable.) 定义的 (
公共扩展方法 MinBy<T、TKey> (Func<T、TKey>、IComparer<TKey>) 已重载。 根据指定的比较器,返回具有最小键值的可观测序列中的元素。 由 Observable.) 定义的 (
公共扩展方法 MostRecent<T> 对可观测序列中的最新值进行采样。 由 Observable.) 定义的 (
公共扩展方法 多播<T、TResult> (ISubject<T、TResult>) 已重载。 返回一个可连接的可观察序列,该序列在连接时使源序列将结果推送到指定主题中。 由 Observable.) 定义的 (
公共扩展方法 多播<T、TIntermediate、TResult> (Func<ISubject<T、TIntermediate>>、Func<IObservable<TIntermediate>、IObservable<TResult>>) 已重载。 返回一个可观察序列,其中包含在选择器函数中多播源序列所生成的序列的元素。 由 Observable.) 定义的 (
公共扩展方法 Next<T> 采样下一个值 (阻塞,而不缓冲可观测序列中的) 。 由 Observable.) 定义的 (
公共扩展方法 观察 T<> (SynchronizationContext) 已重载。 在指定的同步上下文中异步通知观察者。 由 Observable.) 定义的 (
公共扩展方法 ObserveOn<T> (Control) 已重载。 由 ControlObservable.) 定义的 (
公共扩展方法 观察 T<> (调度程序) 已重载。 由 DispatcherObservable.) 定义的 (
公共扩展方法 观察 T<> (DispatcherScheduler) 已重载。 由 DispatcherObservable.) 定义的 (
公共扩展方法 观察 T<> (IScheduler) 已重载。 异步通知指定计划程序上的观察程序。 由 Observable.) 定义的 (
公共扩展方法 ObserveOnDispatcher<T> DispatcherObservable.) 定义的 (
公共扩展方法 OnErrorResumeNext<T> 继续正常终止的可观测序列,或者与下一个可观测序列发生异常。 由 Observable.) 定义的 (
公共扩展方法 发布<T> () 已重载。 返回与基础序列共享单个订阅的可连接可观察序列。 由 Observable.) 定义的 (
公共扩展方法 发布<T> (T) 已重载。 返回一个可连接的可观察序列,该序列共享与基础序列的单个订阅,并且以 initialValue 开头。 由 Observable.) 定义的 (
公共扩展方法 发布<T、TResult> (Func<IObservable<T>、IObservable<TResult>>) 已重载。 返回一个可观测序列,该序列是在共享基础序列的单个订阅的可连接可观测序列上调用选择器的结果。 由 Observable.) 定义的 (
公共扩展方法 发布<T、TResult> (Func<IObservable<T>、IObservable<TResult>>、T) 已重载。 返回一个可连接的可观察序列,该序列共享与基础序列的单个订阅,并且以 initialValue 开头。 由 Observable.) 定义的 (
公共扩展方法 PublishLast<T> () 已重载。 返回一个可连接的可观察序列,该序列与仅包含最后一个通知的基础序列共享单个订阅。 由 Observable.) 定义的 (
公共扩展方法 PublishLast<T、TResult> (Func<IObservable<T>、IObservable<TResult>>) 已重载。 返回一个可观测序列,该序列是调用可连接可观测序列上的选择器的结果,该序列共享仅包含最后一个通知的基础序列的单个订阅。 由 Observable.) 定义的 (
公共扩展方法 重复<T> () 已重载。 无限期地重复可观测序列。 由 Observable.) 定义的 (
公共扩展方法 重复<T> (Int32) 已重载。 无限期地重复可观测序列。 由 Observable.) 定义的 (
公共扩展方法 重播<T> () 已重载。 返回一个可连接的可观察序列,该序列与重播所有通知的基础序列共享单个订阅。 由 Observable.) 定义的 (
公共扩展方法 重播<T> (TimeSpan) 已重载。 返回一个可连接的可观察序列,该序列与在窗口中重播所有通知的基础序列共享单个订阅。 由 Observable.) 定义的 (
公共扩展方法 重播<T> (Int32) 已重载。 返回一个可连接的可观察序列,该序列共享与基础序列重播 bufferSize 通知的单个订阅。 由 Observable.) 定义的 (
公共扩展方法 重播<T> (IScheduler) 已重载。 返回一个可连接的可观察序列,该序列与重播所有通知的基础序列共享单个订阅。 由 Observable.) 定义的 (
公共扩展方法 重播<T> (TimeSpan、IScheduler) 已重载。 返回一个可连接的可观察序列,该序列与在窗口中重播所有通知的基础序列共享单个订阅。 由 Observable.) 定义的 (
公共扩展方法 重播<T> (Int32、IScheduler) 已重载。 返回一个可连接的可观察序列,该序列共享与基础序列重播 bufferSize 通知的单个订阅。 由 Observable.) 定义的 (
公共扩展方法 重播<T> (Int32、TimeSpan) 已重载。 返回一个可连接的可观察序列,该序列共享与基础序列的单个订阅,重播窗口中的 bufferSize 通知。 由 Observable.) 定义的 (
公共扩展方法 重播<T> (Int32、TimeSpan、IScheduler) 已重载。 返回一个可连接的可观察序列,该序列共享与基础序列的单个订阅,重播窗口中的 bufferSize 通知。 由 Observable.) 定义的 (
公共扩展方法 重播<T、TResult> (Func<IObservable<T>、IObservable<TResult>>) 已重载。 返回一个可观测序列,该序列是调用可连接可观测序列上的选择器的结果,该序列共享与基础序列的单个订阅并以初始值开头。 由 Observable.) 定义的 (
公共扩展方法 重播<T、TResult> (Func<IObservable<T>、IObservable<TResult>>、IScheduler) 已重载。 返回一个可观察序列,该序列是在可连接可观测序列上调用选择器的结果,该序列共享与重播所有通知的基础序列共享单个订阅。 由 Observable.) 定义的 (
公共扩展方法 重播<T、TResult> (Func<IObservable<T>、IObservable<TResult>>、TimeSpan) 已重载。 返回一个可观察序列,该序列是在可连接可观测序列上调用选择器的结果,该序列共享一个订阅到在窗口中重播所有通知的基础序列。 由 Observable.) 定义的 (
公共扩展方法 重播<T、TResult> (Func<IObservable<T>、IObservable<TResult>>、Int32) 已重载。 返回一个可观察序列,该序列是调用可连接可观察序列上的选择器的结果,该序列共享与基础序列重播 bufferSize 通知的单个订阅。 由 Observable.) 定义的 (
公共扩展方法 重播<T、TResult> (Func<IObservable<T>、IObservable<TResult>>、TimeSpan、IScheduler) 已重载。 返回一个可观察序列,该序列是在可连接可观测序列上调用选择器的结果,该序列共享一个订阅到在窗口中重播所有通知的基础序列。 由 Observable.) 定义的 (
公共扩展方法 重播<T、TResult> (Func<IObservable<T>、IObservable<TResult>>、Int32、IScheduler) 已重载。 返回一个可观察序列,该序列是调用可连接可观察序列上的选择器的结果,该序列共享与基础序列重播 bufferSize 通知的单个订阅。 由 Observable.) 定义的 (
公共扩展方法 重播<T、TResult> (Func<IObservable<T>、IObservable<TResult>>、Int32、TimeSpan) 已重载。 返回一个可观测序列,该序列是调用可连接可观测序列上的选择器的结果,该序列共享与基础序列的单个订阅在窗口中重播 bufferSize 通知。 由 Observable.) 定义的 (
公共扩展方法 重播<T、TResult> (Func<IObservable<T>、IObservable<TResult>>、Int32、TimeSpan、IScheduler) 已重载。 返回一个可观测序列,该序列是调用可连接可观测序列上的选择器的结果,该序列共享与基础序列的单个订阅在窗口中重播 bufferSize 通知。 由 Observable.) 定义的 (
公共扩展方法 重试<T> () 已重载。 重复源可观测序列,直到成功终止。 由 Observable.) 定义的 (
公共扩展方法 重试<T> (Int32) 已重载。 重复源可观测序列,直到成功终止。 由 Observable.) 定义的 (
公共扩展方法 示例<T> (TimeSpan) 已重载。 按每个间隔对可观测序列进行采样。 由 Observable.) 定义的 (
公共扩展方法 示例<T> (TimeSpan、IScheduler) 已重载。 使用指定的源、间隔和计划程序对每个间隔的可观测序列进行采样。 由 Observable.) 定义的 (
公共扩展方法 示例<T、TSample> (IObservable<TSample>) 已重载。 使用指定的源和采样器对采样周期处的可观测序列进行采样。 由 Observable.) 定义的 (
公共扩展方法 扫描<T> (Func<T、T、T>) 已重载。 对可观测序列应用累加器函数,并使用指定的源和累加器返回每个中间结果。 由 Observable.) 定义的 (
公共扩展方法 扫描<T、TAccumulate> (TAccumulate、Func<TAccumulate、T、TAccumulate>) 已重载。 对可观测序列应用累加器函数,并使用指定的源、种子和累加器返回每个中间结果。 由 Observable.) 定义的 (
公共扩展方法 选择<T、TResult> (Func<T、TResult>) 已重载。 将可观测序列的每个元素投影到具有指定源和选择器的新窗体中。 由 Observable.) 定义的 (
公共扩展方法 依次选择<T、TResult> (Func<T、Int32、TResult>) 已重载。 通过将元素的索引与指定的源和选择器合并,将可观测序列的每个元素投影到一个新窗体中。 由 Observable.) 定义的 (
公共扩展方法 SelectMany<T、TOther> (IObservable<TOther>) 已重载。 将可观测序列的每个元素投影到可观测序列中,并将生成的可观测序列平展为一个可观测序列。 由 Observable.) 定义的 (
公共扩展方法 SelectMany<T、TResult> (Func<T、IObservable<TResult>>) 已重载。 将可观测序列的每个元素投影到可观测序列中,并将生成的可观测序列平展为一个可观测序列。 由 Observable.) 定义的 (
公共扩展方法 SelectMany<T、TResult> (Func<T、IEnumerable<TResult>>) 已重载。 将可观测序列的每个元素投影到可观测序列中,并将生成的可观测序列平展为一个可观测序列。 由 Observable.) 定义的 (
公共扩展方法 SelectMany<T、TResult> (Func<T、IObservable<TResult>>、Func<Exception、IObservable<TResult>>、Func<IObservable<TResult>>) 已重载。 将可观测序列的每个元素投影到可观测序列中,并将生成的可观测序列平展为一个可观测序列。 由 Observable.) 定义的 (
公共扩展方法 SelectMany<T、TCollection、TResult> (Func<T、IEnumerable<TCollection>>、Func<T、TCollection、TResult>) 已重载。 将可观测序列的每个元素投影到可观测序列中,并将生成的可观测序列平展为一个可观测序列。 由 Observable.) 定义的 (
公共扩展方法 SelectMany<T、TCollection、TResult> (Func<T、IObservable<TCollection>>、Func<T、TCollection、TResult>) 已重载。 将可观测序列的每个元素投影到可观测序列中,并将生成的可观测序列平展为一个可观测序列。 由 Observable.) 定义的 (
公共扩展方法 SequenceEqual<T> (IObservable<T>) 已重载。 通过成对比较元素来确定两个序列是否相等。 由 Observable.) 定义的 (
公共扩展方法 SequenceEqual<T> (IObservable<T>,IEqualityComparer<T>) 已重载。 通过使用指定的相等比较器以成对方式比较元素来确定两个序列是否相等。 由 Observable.) 定义的 (
公共扩展方法 单个<T> () 已重载。 返回可观测序列的唯一元素,如果可观测序列中没有恰好有一个元素,则引发异常。 由 Observable.) 定义的 (
公共扩展方法 Single<T> (Func<T,boolean>) 已重载。 返回与谓词匹配的可观测序列中唯一的元素,如果可观测序列中没有恰好一个元素,则引发异常。 由 Observable.) 定义的 (
公共扩展方法 SingleOrDefault<T> () 已重载。 返回可观测序列的唯一元素,如果可观测序列为空,则返回默认值。 由 Observable.) 定义的 (
公共扩展方法 SingleOrDefault<T> (Func<T,boolean>) 已重载。 返回与谓词匹配的可观测序列中唯一的元素;如果未找到任何值,则返回默认值。 由 Observable.) 定义的 (
公共扩展方法 跳过<T> 在可观测序列中绕过指定数量的值,然后返回剩余值。 由 Observable.) 定义的 (
公共扩展方法 SkipLast<T> 在可观测序列末尾绕过指定数量的元素。 由 Observable.) 定义的 (
公共扩展方法 SkipUntil<T、TOther> 仅当其他可观测序列生成值后,才从源可观测序列返回值。 由 Observable.) 定义的 (
公共扩展方法 SkipWhile<T> (Func<T,boolean>) 已重载。 只要指定的条件为 true,则绕过可观测序列中的值,然后返回剩余值。 由 Observable.) 定义的 (
公共扩展方法 SkipWhile<T> (Func<T、Int32、布尔>) 已重载。 只要指定的条件为 true,则绕过可观测序列中的值,然后返回剩余值。 由 Observable.) 定义的 (
公共扩展方法 StartWith<T>T[]) 已重载。 将值序列追加到具有指定源和值的可观测序列。 由 Observable.) 定义的 (
公共扩展方法 StartWith<T> (IScheduler, T[]) 已重载。 将值序列追加到具有指定源、计划程序和值的可观测序列。 由 Observable.) 定义的 (
公共扩展方法 订阅<T> () 已重载。 使用指定源计算可观测序列。 由 ObservableExtensions.) 定义的 (
公共扩展方法 订阅<T> (操作<T>) 已重载。 将元素处理程序订阅到可观测序列。 由 ObservableExtensions.) 定义的 (
公共扩展方法 订阅<T> (操作<T>,操作<异常>) 已重载。 将元素处理程序和异常处理程序订阅到可观测序列。 由 ObservableExtensions.) 定义的 (
公共扩展方法 订阅<T> (操作<T>,操作) 已重载。 将元素处理程序和完成处理程序订阅到可观测序列。 由 ObservableExtensions.) 定义的 (
公共扩展方法 订阅<T> (操作<T>、操作<异常>、操作) 已重载。 将元素处理程序、异常处理程序和完成处理程序订阅到可观测序列。 由 ObservableExtensions.) 定义的 (
公共扩展方法 SubscribeOn<T> (SynchronizationContext) 已重载。 在指定的同步上下文上异步订阅和取消订阅观察程序。 由 Observable.) 定义的 (
公共扩展方法 SubscribeOn<T> (Control) 已重载。 由 ControlObservable.) 定义的 (
公共扩展方法 SubscribeOn<T> (Dispatcher) 已重载。 由 DispatcherObservable.) 定义的 (
公共扩展方法 SubscribeOn<T> (DispatcherScheduler) 已重载。 由 DispatcherObservable.) 定义的 (
公共扩展方法 SubscribeOn<T> (IScheduler) 已重载。 异步订阅和取消订阅指定计划程序上的观察程序。 由 Observable.) 定义的 (
公共扩展方法 SubscribeOnDispatcher<T> DispatcherObservable.) 定义的 (
公共扩展方法 同步<T> () 已重载。 同步可观测序列。 由 Observable.) 定义的 (
公共扩展方法 同步<T> (对象) 已重载。 同步可观测序列。 由 Observable.) 定义的 (
公共扩展方法 Take<T> 从可观测序列的开头返回指定数量的连续值。 由 Observable.) 定义的 (
公共扩展方法 TakeLast<T> 从可观测序列的末尾返回指定数量的连续元素。 由 Observable.) 定义的 (
公共扩展方法 TakeUntil<T、TOther> 返回源可观测序列中的值,直到另一个可观测序列生成值。 由 Observable.) 定义的 (
公共扩展方法 TakeWhile<T> (Func<T,布尔>) 已重载。 返回可观测序列中的值,只要指定的条件为 true,然后跳过剩余的值。 由 Observable.) 定义的 (
公共扩展方法 TakeWhile<T> (Func<T、Int32、布尔>) 已重载。 返回可观测序列中的值,只要指定的条件为 true,然后跳过剩余的值。 由 Observable.) 定义的 (
公共扩展方法 然后<T、TResult> 当可观测序列具有可用值并投影值时匹配。 由 Observable.) 定义的 (
公共扩展方法 <限制 T> (TimeSpan) 已重载。 忽略可观测序列中的值,这些值后跟在具有指定源和 dueTime 的截止日期之前的另一个值。 由 Observable.) 定义的 (
公共扩展方法 Throttle<T> (TimeSpan、IScheduler) 已重载。 忽略可观测序列中的值,这些值后跟指定源、dueTime 和计划程序到期前的另一个值。 由 Observable.) 定义的 (
公共扩展方法 TimeInterval<T> () 已重载。 记录具有指定源的可观测序列中连续值之间的时间间隔。 由 Observable.) 定义的 (
公共扩展方法 TimeInterval<T> (IScheduler) 已重载。 使用指定的源和计划程序记录可观测序列中连续值之间的时间间隔。 由 Observable.) 定义的 (
公共扩展方法 超时<T> (TimeSpan) 已重载。 如果 dueTime 已过,则返回可观测序列或 TimeoutException。 由 Observable.) 定义的 (
公共扩展方法 超时<T> (DateTimeOffset) 已重载。 如果 dueTime 已过,则返回可观测序列或 TimeoutException。 由 Observable.) 定义的 (
公共扩展方法 Timeout<T> (TimeSpan、IObservable<T>) 已重载。 如果 dueTime 已过,则返回源可观测序列或其他可观测序列。 由 Observable.) 定义的 (
公共扩展方法 超时<T> (DateTimeOffset、IObservable<T>) 已重载。 如果 dueTime 已过,则返回可观测序列或 TimeoutException。 由 Observable.) 定义的 (
公共扩展方法 超时<T> (TimeSpan、IScheduler) 已重载。 如果 dueTime 已过,则返回可观测序列或 TimeoutException。 由 Observable.) 定义的 (
公共扩展方法 超时<T> (DateTimeOffset、IScheduler) 已重载。 如果 dueTime 已过,则返回可观测序列或 TimeoutException。 由 Observable.) 定义的 (
公共扩展方法 Timeout<T> (TimeSpan、IObservable<T>、IScheduler) 已重载。 如果 dueTime 已过,则返回源可观测序列或其他可观测序列。 由 Observable.) 定义的 (
公共扩展方法 超时<T> (DateTimeOffset、IObservable<T>、IScheduler) 已重载。 如果 dueTime 已过,则返回源可观测序列或其他可观测序列。 由 Observable.) 定义的 (
公共扩展方法 Timestamp<T> () 已重载。 记录具有指定源的可观测序列中每个值的时间戳。 由 Observable.) 定义的 (
公共扩展方法 Timestamp<T> (IScheduler) 已重载。 使用指定的源和计划程序记录可观测序列中每个值的时间戳。 由 Observable.) 定义的 (
公共扩展方法 ToArray<T> 从可观测序列创建数组。 由 Observable.) 定义的 (
公共扩展方法 ToDictionary<T、TKey> (Func<T、TKey>) 已重载。 根据指定的键选择器函数从可观测序列创建字典。 由 Observable.) 定义的 (
公共扩展方法 ToDictionary<T, TKey> (Func<T, TKey>, IEqualityComparer<TKey>) 已重载。 根据指定的键选择器函数和比较器从可观测序列创建字典。 由 Observable.) 定义的 (
公共扩展方法 ToDictionary<T、TKey、TElement> (Func<T、TKey>、Func<T、TElement>) 已重载。 根据指定的键选择器函数和元素选择器函数,从可观测序列创建字典。 由 Observable.) 定义的 (
公共扩展方法 ToDictionary<T、TKey、TElement> (Func<T、TKey>、Func<T、TElement>、IEqualityComparer<TKey>) 已重载。 根据指定的键选择器函数、比较器和元素选择器函数,从可观测序列创建字典。 由 Observable.) 定义的 (
公共扩展方法 ToEnumerable<T> 将可观测序列转换为可枚举序列。 由 Observable.) 定义的 (
公共扩展方法 ToEvent<T> 将可观测序列公开为具有具有指定源的 .NET 事件的对象。 由 Observable.) 定义的 (
公共扩展方法 ToList<T> 从可观测序列创建列表。 由 Observable.) 定义的 (
公共扩展方法 ToLookup<T、TKey> (Func<T、TKey>) 已重载。 根据指定的键选择器函数从可观测序列创建查找。 由 Observable.) 定义的 (
公共扩展方法 ToLookup<T、TKey> (Func<T、TKey>、IEqualityComparer<TKey>) 已重载。 根据指定的键选择器函数和比较器,从可观测序列创建查找。 由 Observable.) 定义的 (
公共扩展方法 ToLookup<T、TKey、TElement> (Func<T、TKey>、Func<T、TElement>) 已重载。 根据指定的键选择器函数和元素选择器函数,从可观测序列创建查找。 由 Observable.) 定义的 (
公共扩展方法 ToLookup<T、TKey、TElement> (Func<T、TKey>、Func<T、TElement>、IEqualityComparer<TKey>) 已重载。 根据指定的键选择器函数、比较器和元素选择器函数,从可观测序列创建查找。 由 Observable.) 定义的 (
公共扩展方法 ToNotifier<T> 从观察程序创建通知回调。 由 Observer.) 定义的 (
公共扩展方法 ToTask<T> () 已重载。 返回一个任务,其中包含可观测序列的最后一个值。 由 TaskObservableExtensions.) 定义的 (
公共扩展方法 ToTask<T> (对象) 已重载。 返回一个任务,其中包含可观测序列的最后一个值。 由 TaskObservableExtensions.) 定义的 (
公共扩展方法 ToTask<T> (CancellationToken) 已重载。 返回一个任务,其中包含可观测序列的最后一个值。 由 TaskObservableExtensions.) 定义的 (
公共扩展方法 ToTask<T> (CancellationToken、Object) 已重载。 返回一个任务,其中包含可观测序列的最后一个值。 由 TaskObservableExtensions.) 定义的 (
公共扩展方法 其中<T> (Func<T、布尔>) 已重载。 根据谓词筛选可观测序列的元素。 由 Observable.) 定义的 (
公共扩展方法 其中<T> (Func<T、Int32、布尔>) 已重载。 通过合并元素的索引,基于谓词筛选可观测序列的元素。 由 Observable.) 定义的 (
公共扩展方法 窗口<T> (Int32) 已重载。 将可观测序列的每个元素投影到基于元素计数信息生成的连续非重叠窗口。 由 Observable.) 定义的 (
公共扩展方法 Window<T> (TimeSpan) 已重载。 将可观测序列的每个元素投影到基于计时信息生成的连续非重叠窗口。 由 Observable.) 定义的 (
公共扩展方法 窗口<T> (Int32、Int32) 已重载。 将可观测序列的每个元素投影到零个或多个窗口,这些窗口基于元素计数信息生成。 由 Observable.) 定义的 (
公共扩展方法 Window<T> (TimeSpan、IScheduler) 已重载。 将可观测序列的每个元素投影到基于计时信息生成的连续非重叠窗口。 由 Observable.) 定义的 (
公共扩展方法 Window<T> (TimeSpan、TimeSpan) 已重载。 将可观测序列的每个元素投影到基于计时信息生成的零个或多个窗口。 由 Observable.) 定义的 (
公共扩展方法 窗口<T> (TimeSpan,Int32) 已重载。 将可观测序列的每个元素投影到一个窗口,该窗口在已满或经过给定时间时完成。 由 Observable.) 定义的 (
公共扩展方法 Window<T> (TimeSpan、TimeSpan、IScheduler) 已重载。 将可观测序列的每个元素投影到基于计时信息生成的零个或多个窗口。 由 Observable.) 定义的 (
公共扩展方法 Window<T> (TimeSpan、Int32、IScheduler) 已重载。 将可观测序列的每个元素投影到一个窗口,该窗口在已满或经过给定时间时完成。 由 Observable.) 定义的 (
公共扩展方法 Window<T、TWindowClosing> (Func<IObservable<TWindowClosing>>) 已重载。 将可观测序列的每个元素投影到连续的非重叠窗口。 由 Observable.) 定义的 (
公共扩展方法 Window<T、TWindowOpening、TWindowClosing> (IObservable<TWindowOpening>、Func<TWindowOpening、IObservable<TWindowClosing>>) 已重载。 将可观测序列的每个元素投影到零个或多个窗口中。 由 Observable.) 定义的 (
公共扩展方法 Zip<T、TSecond、TResult> (IObservable<TSecond>、Func<T、TSecond、TResult>) 已重载。 通过将两个可观测序列以成对方式组合其元素,将两个可观测序列合并为一个可观测序列。 由 Observable.) 定义的 (
公共扩展方法 Zip<T、TSecond、TResult> (IEnumerable<TSecond>、Func<T、TSecond、TResult>) 已重载。 使用选择器函数将可观测序列和可枚举序列合并到一个可观测序列中。 由 Observable.) 定义的 (

顶部

备注

使用者的行为类似于代理,因为它既充当订阅服务器又充当发布者。 其 IObserver 接口可用于订阅多个可观测数据序列。 然后,通过使用者的 IObservable 接口发布数据。

ReplaySubject 缓冲它收到的项。 因此,以后创建的订阅可以访问缓冲序列中的项,即使它们已经发布。 ReplaySubject 将缓冲多少项取决于传递给构造函数的参数。

常规使用者使用计划程序同步对子加密观察器的传出调用。

示例

在此示例中,我们创建了一个 NewsHeadlineFeed 类,该类只是一个模拟新闻源,形式为可观测的字符串序列。 它使用 Generate 运算符在三秒内连续生成随机新闻标题。

创建 ReplaySubject 以订阅 NewsHeadlineFeed 类的两个新闻源。 在主题订阅源之前,时间戳运算符用于为每个标题添加时间戳。 因此,ReplaySubject 实际订阅的序列的类型为 IObservable<Timestamped<字符串>>。 在带有标题序列时间戳的情况下,订阅者可以订阅主题的可观测接口,以观察数据流 () 或流子集 (基于时间戳) 。

ReplaySubject 缓冲它收到的项。 因此,以后创建的订阅可以访问已缓冲并发布的序列中的项。 为 ReplaySubject 创建订阅,该订阅仅接收创建本地新闻订阅前 10 秒发生的本地新闻标题。 因此,我们基本上有了 ReplaySubject“重播”10 秒前发生的事情。

本地新闻标题仅包含“你的区域中的 newsLocation”子字符串 (。) 。

using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Concurrency;
using System.Threading;

namespace Example
{
  class Program
  {
    static void Main()
    {
      //*****************************************************************************************************//
      //*** A subject acts similar to a proxy in that it acts as both a subscriber and a publisher        ***//
      //*** It's IObserver interface can be used to subscribe to multiple streams or sequences of data.   ***//
      //*** The data is then published through it's IObservable interface.                                ***//
      //***                                                                                               ***//
      //*** In this example a simple ReplaySubject is used to subscribe to multiple news feeds            ***//
      //*** that provide random news headlines. Before the subject is subscribed to the feeds, we use     ***//
      //*** Timestamp operator to timestamp each headline. Subscribers can then subscribe to the subject  ***//
      //*** observable interface to observe the data stream(s) or a subset of the stream(s) based on      ***//
      //*** time.                                                                                         ***//
      //***                                                                                               ***//
      //*** A ReplaySubject buffers items it receives. So a subscription created at a later time can      ***//
      //*** access items from the sequence which have already been published.                             ***//
      //***                                                                                               ***//
      //*** A subscriptions is created to the ReplaySubject that receives only local news headlines which ***//
      //*** occurred 10 seconds before the local news subscription was created. So we basically have the  ***//
      //*** ReplaySubject "replay" what happened 10 seconds earlier.                                      ***//
      //***                                                                                               ***//
      //*** A local news headline just contains the newsLocation substring ("in your area.").             ***//
      //***                                                                                               ***//
      //*****************************************************************************************************//

      ReplaySubject<Timestamped<string>> myReplaySubject = new ReplaySubject<Timestamped<String>>();


      //*****************************************************************//
      //*** Create news feed #1 and subscribe the ReplaySubject to it ***//
      //*****************************************************************//

      NewsHeadlineFeed NewsFeed1 = new NewsHeadlineFeed("Headline News Feed #1");
      NewsFeed1.HeadlineFeed.Timestamp().Subscribe(myReplaySubject);


      //*****************************************************************//
      //*** Create news feed #2 and subscribe the ReplaySubject to it ***//
      //*****************************************************************//

      NewsHeadlineFeed NewsFeed2 = new NewsHeadlineFeed("Headline News Feed #2");
      NewsFeed2.HeadlineFeed.Timestamp().Subscribe(myReplaySubject);


      //*****************************************************************************************************//
      //*** Create a subscription to the subject's observable sequence. This subscription will filter for ***//
      //*** only local headlines that occurred 10 seconds before the subscription was created.            ***//
      //***                                                                                               ***//
      //*** Since we are using a ReplaySubject with timestamped headlines, we can subscribe to the        ***//
      //*** headlines already past. The ReplaySubject will "replay" them for the localNewSubscription     ***//
      //*** from its buffered sequence of headlines.                                                      ***//
      //*****************************************************************************************************//

      Console.WriteLine("Waiting for 10 seconds before subscribing to local news headline feed.\n");
      Thread.Sleep(10000);

      Console.WriteLine("\n*** Creating local news headline subscription at {0} ***\n", DateTime.Now.ToString());
      Console.WriteLine("This subscription asks the ReplaySubject for the buffered headlines that\n" +
                        "occurred within the last 10 seconds.\n\nPress ENTER to exit.", DateTime.Now.ToString());

      DateTime lastestHeadlineTime = DateTime.Now;
      DateTime earliestHeadlineTime = lastestHeadlineTime - TimeSpan.FromSeconds(10);     
      
      IDisposable localNewsSubscription = myReplaySubject.Where(x => x.Value.Contains("in your area.") &&
                                                               (x.Timestamp >= earliestHeadlineTime) &&
                                                               (x.Timestamp < lastestHeadlineTime)).Subscribe(x =>
      {
        Console.WriteLine("\n************************************\n" +
                          "***[ Local news headline report ]***\n" +
                          "************************************\n" + 
                          "Time         : {0}\n{1}\n\n", x.Timestamp.ToString(), x.Value);
      });

      Console.ReadLine();


      //*******************************//
      //*** Cancel the subscription ***//
      //*******************************//

      localNewsSubscription.Dispose();
      

      //*************************************************************************//
      //*** Unsubscribe all the ReplaySubject's observers and free resources. ***//
      //*************************************************************************//

      myReplaySubject.Dispose();    
    }
  }



  //*********************************************************************************//
  //***                                                                           ***//
  //*** The NewsHeadlineFeed class is just a mock news feed in the form of an     ***//
  //*** observable sequence in Reactive Extensions.                               ***//
  //***                                                                           ***//
  //*********************************************************************************//
  class NewsHeadlineFeed
  {
    private string feedName;                     // Feedname used to label the stream
    private IObservable<string> headlineFeed;    // The actual data stream
    private readonly Random rand = new Random(); // Used to stream random headlines.


    //*** A list of predefined news events to combine with a simple ___location string ***//
    static readonly string[] newsEvents = { "A tornado occurred ",
                                            "Weather watch for snow storm issued ",
                                            "A robbery occurred ",
                                            "We have a lottery winner ",
                                            "An earthquake occurred ",
                                            "Severe automobile accident "};

    //*** A list of predefined ___location strings to combine with a news event. ***//
    static readonly string[] newsLocations = { "in your area.",
                                               "in Dallas, Texas.",
                                               "somewhere in Iraq.",
                                               "Lincolnton, North Carolina",
                                               "Redmond, Washington"};

    public IObservable<string> HeadlineFeed
    {
      get { return headlineFeed; }
    }

    public NewsHeadlineFeed(string name)
    {
      feedName = name;

      //*****************************************************************************************//
      //*** Using the Generate operator to generate a continous stream of headline that occur ***//
      //*** randomly within 5 seconds.                                                        ***//
      //*****************************************************************************************//
      headlineFeed = Observable.Generate(RandNewsEvent(),
                                         evt => true,
                                         evt => RandNewsEvent(),
                                         evt => { Thread.Sleep(rand.Next(3000)); return evt; },
                                         Scheduler.ThreadPool);
    }


    //****************************************************************//
    //*** Some very simple formatting of the headline event string ***//
    //****************************************************************//
    private string RandNewsEvent()
    {
      return "Feedname     : " + feedName + "\nHeadline     : " + newsEvents[rand.Next(newsEvents.Length)] +
             newsLocations[rand.Next(newsLocations.Length)];
    }
  }
}

以下输出是使用示例代码生成的。 新源是随机的,因此可能需要多次运行它才能看到本地新闻标题。

Waiting for 10 seconds before subscribing to local news headline feed.

** 在 2011/5/9 4:07:48 创建本地新闻头条订阅 **

This subscription asks the ReplaySubject for the buffered headlines that
occurred within the last 10 seconds.

Press ENTER to exit.

********************************** [ 当地新闻头条报道 ]********************************** 时间:2011/5/9 4:07:42 AM -04:00 Feedname:头条新闻源 #2 标题:我们在你的地区有一个彩票赢家。

********************************** [ 当地新闻头条报道 ]**********************************时间:2011/5/9 4:07:47 AM -04:00 Feedname:头条新闻源 #1 标题:针对你地区发布的暴风雪天气watch。

线程安全性

此类型的所有公共静态(Visual Basic 中共享的)成员都是线程安全的。 但不保证所有实例成员都是线程安全的。

另请参阅

参考

System.Reactive.Subjects 命名空间