为使用者订阅观察者。
Namespace:System.Reactive.Subjects
装配: System.Reactive.dll) 中的 System.Reactive (
语法
'Declaration
Public Function Subscribe ( _
observer As IObserver(Of T) _
) As IDisposable
'Usage
Dim instance As ReplaySubject
Dim observer As IObserver(Of T)
Dim returnValue As IDisposable
returnValue = instance.Subscribe(observer)
public IDisposable Subscribe(
IObserver<T> observer
)
public:
virtual IDisposable^ Subscribe(
IObserver<T>^ observer
) sealed
abstract Subscribe :
observer:IObserver<'T> -> IDisposable
override Subscribe :
observer:IObserver<'T> -> IDisposable
public final function Subscribe(
observer : IObserver<T>
) : IDisposable
参数
- 观测 器
类型: System.IObserver<T>
要订阅主题的观察者。
返回值
类型: System.IDisposable
可用于取消对使用者的观察者的订阅的 IDisposable 对象。
实现
IObservable<T>.订阅 (IObserver<T>)
备注
调用 Subscribe 会创建 ReplaySubject 的 IObservable 接口的订阅。 ReplaySubject 将通过其 IObservable 接口发布从其自己的订阅 () 接收的项目。
示例
在此示例中,我们创建了一个 NewsHeadlineFeed 类,该类只是一个可观测字符串序列形式的模拟新闻源。 它使用 Generate 运算符在三秒内连续生成随机新闻标题。
创建 ReplaySubject 以订阅 NewsHeadlineFeed 类的两个新闻源。 在主题订阅源之前,时间戳运算符用于为每个标题添加时间戳。 因此,ReplaySubject 实际订阅的序列的类型为 IObservable<Timestamped<字符串>>。 在标题序列加时间戳后,订阅者可以订阅主题的可观测接口,以根据时间戳观察数据流 () 或流子集 () 。
ReplaySubject 缓冲它接收的项。 因此,以后创建的订阅可以访问已缓冲和发布的序列中的项。 将创建 ReplaySubject 的订阅,该订阅仅接收创建本地新闻订阅前 10 秒发生的本地新闻标题。 因此,我们基本上有 ReplaySubject “重播” 10 秒前发生的事情。
本地新闻标题仅包含“你所在的地区” ( newsLocation 子字符串。) 。
using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Concurrency;
using System.Threading;
namespace Example
{
class Program
{
static void Main()
{
//*****************************************************************************************************//
//*** A subject acts similar to a proxy in that it acts as both a subscriber and a publisher ***//
//*** It's IObserver interface can be used to subscribe to multiple streams or sequences of data. ***//
//*** The data is then published through it's IObservable interface. ***//
//*** ***//
//*** In this example a simple ReplaySubject is used to subscribe to multiple news feeds ***//
//*** that provide random news headlines. Before the subject is subscribed to the feeds, we use ***//
//*** Timestamp operator to timestamp each headline. Subscribers can then subscribe to the subject ***//
//*** observable interface to observe the data stream(s) or a subset of the stream(s) based on ***//
//*** time. ***//
//*** ***//
//*** A ReplaySubject buffers items it receives. So a subscription created at a later time can ***//
//*** access items from the sequence which have already been published. ***//
//*** ***//
//*** A subscriptions is created to the ReplaySubject that receives only local news headlines which ***//
//*** occurred 10 seconds before the local news subscription was created. So we basically have the ***//
//*** ReplaySubject "replay" what happened 10 seconds earlier. ***//
//*** ***//
//*** A local news headline just contains the newsLocation substring ("in your area."). ***//
//*** ***//
//*****************************************************************************************************//
ReplaySubject<Timestamped<string>> myReplaySubject = new ReplaySubject<Timestamped<String>>();
//*****************************************************************//
//*** Create news feed #1 and subscribe the ReplaySubject to it ***//
//*****************************************************************//
NewsHeadlineFeed NewsFeed1 = new NewsHeadlineFeed("Headline News Feed #1");
NewsFeed1.HeadlineFeed.Timestamp().Subscribe(myReplaySubject);
//*****************************************************************//
//*** Create news feed #2 and subscribe the ReplaySubject to it ***//
//*****************************************************************//
NewsHeadlineFeed NewsFeed2 = new NewsHeadlineFeed("Headline News Feed #2");
NewsFeed2.HeadlineFeed.Timestamp().Subscribe(myReplaySubject);
//*****************************************************************************************************//
//*** Create a subscription to the subject's observable sequence. This subscription will filter for ***//
//*** only local headlines that occurred 10 seconds before the subscription was created. ***//
//*** ***//
//*** Since we are using a ReplaySubject with timestamped headlines, we can subscribe to the ***//
//*** headlines already past. The ReplaySubject will "replay" them for the localNewSubscription ***//
//*** from its buffered sequence of headlines. ***//
//*****************************************************************************************************//
Console.WriteLine("Waiting for 10 seconds before subscribing to local news headline feed.\n");
Thread.Sleep(10000);
Console.WriteLine("\n*** Creating local news headline subscription at {0} ***\n", DateTime.Now.ToString());
Console.WriteLine("This subscription asks the ReplaySubject for the buffered headlines that\n" +
"occurred within the last 10 seconds.\n\nPress ENTER to exit.", DateTime.Now.ToString());
DateTime lastestHeadlineTime = DateTime.Now;
DateTime earliestHeadlineTime = lastestHeadlineTime - TimeSpan.FromSeconds(10);
IDisposable localNewsSubscription = myReplaySubject.Where(x => x.Value.Contains("in your area.") &&
(x.Timestamp >= earliestHeadlineTime) &&
(x.Timestamp < lastestHeadlineTime)).Subscribe(x =>
{
Console.WriteLine("\n************************************\n" +
"***[ Local news headline report ]***\n" +
"************************************\n" +
"Time : {0}\n{1}\n\n", x.Timestamp.ToString(), x.Value);
});
Console.ReadLine();
//*******************************//
//*** Cancel the subscription ***//
//*******************************//
localNewsSubscription.Dispose();
//*************************************************************************//
//*** Unsubscribe all the ReplaySubject's observers and free resources. ***//
//*************************************************************************//
myReplaySubject.Dispose();
}
}
//*********************************************************************************//
//*** ***//
//*** The NewsHeadlineFeed class is just a mock news feed in the form of an ***//
//*** observable sequence in Reactive Extensions. ***//
//*** ***//
//*********************************************************************************//
class NewsHeadlineFeed
{
private string feedName; // Feedname used to label the stream
private IObservable<string> headlineFeed; // The actual data stream
private readonly Random rand = new Random(); // Used to stream random headlines.
//*** A list of predefined news events to combine with a simple ___location string ***//
static readonly string[] newsEvents = { "A tornado occurred ",
"Weather watch for snow storm issued ",
"A robbery occurred ",
"We have a lottery winner ",
"An earthquake occurred ",
"Severe automobile accident "};
//*** A list of predefined ___location strings to combine with a news event. ***//
static readonly string[] newsLocations = { "in your area.",
"in Dallas, Texas.",
"somewhere in Iraq.",
"Lincolnton, North Carolina",
"Redmond, Washington"};
public IObservable<string> HeadlineFeed
{
get { return headlineFeed; }
}
public NewsHeadlineFeed(string name)
{
feedName = name;
//*****************************************************************************************//
//*** Using the Generate operator to generate a continous stream of headline that occur ***//
//*** randomly within 5 seconds. ***//
//*****************************************************************************************//
headlineFeed = Observable.Generate(RandNewsEvent(),
evt => true,
evt => RandNewsEvent(),
evt => { Thread.Sleep(rand.Next(3000)); return evt; },
Scheduler.ThreadPool);
}
//****************************************************************//
//*** Some very simple formatting of the headline event string ***//
//****************************************************************//
private string RandNewsEvent()
{
return "Feedname : " + feedName + "\nHeadline : " + newsEvents[rand.Next(newsEvents.Length)] +
newsLocations[rand.Next(newsLocations.Length)];
}
}
}
以下输出是使用示例代码生成的。 新源是随机的,因此可能需要多次运行它才能看到当地新闻头条。
Waiting for 10 seconds before subscribing to local news headline feed.
** 在 2011/5/9 上午 4:07:48 创建本地新闻头条订阅 **
This subscription asks the ReplaySubject for the buffered headlines that
occurred within the last 10 seconds.
Press ENTER to exit.
********************************** [ 当地新闻头条报道 ]********************************** 时间: 5/9/2011 4:07:42 AM -04:00 Feedname: 标题新闻源 #2 标题:我们在你的地区有彩票中奖者。
********************************** [ 当地新闻头条报道 ]**********************************时间: 5/9/2011 4:07:47 AM -04:00 Feedname:头条新闻源 #1 标题:你地区发布的暴风雪天气watch。