<?xml version="1.0" encoding="utf-8"?><rss version="2.0"><channel><title>Scott Weinstein on .Net</title><link>https://weblogs.asp.net:443/sweinstein/</link><description>Scott Weinstein on .Net, Linq, PowerShell, WPF, and WCF</description><item><title>First impressions of Scala</title><link>https://weblogs.asp.net:443/sweinstein/first-impressions-of-scala</link><description>&lt;p&gt;I have an idea that it may be possible to predict build success/failure based on commit data. Why Scala? It’s a JVM language, has lots of powerful type features, and it has a linear algebra library which I’ll need later.&lt;/p&gt;  &lt;h2&gt;Project definition and build&lt;/h2&gt;  &lt;p&gt;Neither maven or the &lt;a href="https://github.com/harrah/xsbt/wiki" target="_blank"&gt;scala build tool (sbt)&lt;/a&gt; are completely satisfactory. &lt;/p&gt;  &lt;p&gt;This maven **archetype** (what .Net folks would call a VS project template)&lt;/p&gt;  &lt;pre class="brush: bash; gutter: false; toolbar: false;"&gt;mvn archetype:generate `-DarchetypeGroupId=org.scala-tools.archetypes `-DarchetypeArtifactId=scala-archetype-simple  `-DremoteRepositories=http://scala-tools.org/repo-releases `-DgroupId=org.SW -DartifactId=BuildBreakPredictor&lt;/pre&gt;

&lt;p&gt;gets you started right away with “hello world” code, unit tests demonstrating a number of different testing approaches, and even a ready made `.gitignore` file - nice! But the Scala version is behind at v2.8, and more seriously, compiling and testing was painfully slow. So much that a rapid edit – test – edit cycle was not practical. So &lt;a href="http://lab49.com/" target="_blank"&gt;Lab49&lt;/a&gt; colleague &lt;a href="http://blue64.net/" target="_blank"&gt;Steve Levine&lt;/a&gt; tells me that I can either adjust my pom to use fsc – the fast scala compiler, or use sbt. &lt;/p&gt;

&lt;p&gt;Sbt has some nice features&lt;/p&gt;

&lt;ul&gt;
  &lt;li&gt;It’s fast – it uses fsc by default &lt;/li&gt;

  &lt;li&gt;It has a continuous mode, so&amp;#160; `&amp;gt; ~test` will compile and run your unit test each time you save a file &lt;/li&gt;

  &lt;li&gt;It’s can consume (and produce) Maven 2 dependencies &lt;/li&gt;

  &lt;li&gt;the build definition file can be much shorter than the equivalent pom (about 1/5 the size, as repos and dependencies can be declared on a single line) &lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;And some real limitations&lt;/p&gt;

&lt;ul&gt;
  &lt;li&gt;Limited support for 3rd party integration – for instance out of the box, TeamCity doesn’t speak sbt, nor does IntelliJ IDEA &lt;/li&gt;

  &lt;li&gt;Steeper learning curve for build steps outside the default &lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Side note: If a language has a fast compiler, why keep the &lt;em&gt;slow&lt;/em&gt; compiler around? Even worse, why make it the default?&lt;/p&gt;

&lt;p&gt;I choose sbt, for the faster development speed it offers.&lt;/p&gt;

&lt;h2&gt;Syntax&lt;/h2&gt;

&lt;p&gt;Scala APIs really like to use punctuation – sometimes this works well, as in the following&lt;/p&gt;

&lt;pre class="brush: scala; gutter: false; toolbar: false;"&gt; map1 |+| map2 &lt;/pre&gt;

&lt;p&gt;The `|+|` defines a merge operator which does addition on the `values` of the maps.&lt;/p&gt;

&lt;p&gt;It’s less useful here:&lt;/p&gt;

&lt;pre class="brush: scala; gutter: false; toolbar: false;"&gt;http(baseUrl / url &amp;gt;- parseJson[BuildStatus]&lt;/pre&gt;

&lt;pre class="brush: scala; gutter: false; toolbar: false;"&gt;sure you can probably guess what `&amp;gt;-` does from the context, but how about `&amp;gt;~` or `&amp;gt;+`?&lt;/pre&gt;

&lt;h2&gt;Language features&lt;/h2&gt;

&lt;p&gt;I’m still learning, so not much to say just yet. However case classes are quite usefull, implicits scare me, and type constructors have lots of power.&lt;/p&gt;

&lt;h2&gt;Community&lt;/h2&gt;

&lt;p&gt;A number of projects, such as &lt;a title="https://github.com/scalala" href="https://github.com/scalala"&gt;https://github.com/scalala&lt;/a&gt; and &lt;a href="https://github.com/scalaz/scalaz"&gt;https://github.com/scalaz/scalaz&lt;/a&gt; are split between github and google code – github for the src, and google code for the docs. Not sure I understand the motivation here.&lt;/p&gt;</description><pubDate>Mon, 28 Nov 2011 05:05:00 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sweinstein/first-impressions-of-scala</guid><category>General Software Development</category><category>JVM</category><category>Scala</category></item><item><title>Intro to RX</title><link>https://weblogs.asp.net:443/sweinstein/intro-to-rx</link><description>&lt;p&gt;&lt;a href="http://lab49.com/" target="_blank"&gt;Lab49&lt;/a&gt; colleague &lt;a href="http://leecampbell.blogspot.com/" target="_blank"&gt;Lee Campbell&lt;/a&gt; has &lt;a href="http://leecampbell.blogspot.com/2010/08/reactive-extensions-for-net.html" target="_blank"&gt;a nice 7 part write-up on the Reactive Extensions&lt;/a&gt;&lt;/p&gt;  &lt;p&gt;He says:&lt;/p&gt;  &lt;blockquote&gt;   &lt;p&gt;it is big in all sorts of ways: &lt;/p&gt;    &lt;ol&gt;     &lt;li&gt;In the way that it tackles the Observer pattern is bold &lt;/li&gt;      &lt;li&gt;In the way it tackles concurrency is quite a shift from how I have done it before. &lt;/li&gt;      &lt;li&gt;The number of (extension) methods is huge. &lt;/li&gt;      &lt;li&gt;The way in which it integrates with LINQ to leverage LINQ's compensability &amp;amp; declarative style &lt;/li&gt;      &lt;li&gt;The fact that any .NET dev should care UI, backend algorithm coder or Integrator. It helps all of us. &lt;/li&gt;      &lt;li&gt;The future plans are even more grand, but that is a different series all together :-)&lt;/li&gt;   &lt;/ol&gt; &lt;/blockquote&gt;  &lt;p&gt;The series covers&lt;/p&gt;  &lt;ul&gt;   &lt;li&gt;&lt;a href="http://leecampbell.blogspot.com/2010/05/intro-to-rx.html"&gt;Part 1 - Introduction to Rx&lt;/a&gt;&lt;/li&gt;    &lt;li&gt;&lt;a href="http://leecampbell.blogspot.com/2010/05/rx-part-2-static-and-extension-methods.html"&gt;Part 2 - Static and extension methods&lt;/a&gt;&lt;/li&gt;    &lt;li&gt;&lt;a href="http://leecampbell.blogspot.com/2010/05/rx-part-3-lifetime-management.html"&gt;Part 3 - Lifetime management – Completing and Unsubscribing&lt;/a&gt;&lt;/li&gt;    &lt;li&gt;&lt;a href="http://leecampbell.blogspot.com/2010/05/rx-part-4-flow-control.html"&gt;Part 4 - Flow control&lt;/a&gt;&lt;/li&gt;    &lt;li&gt;&lt;a href="http://leecampbell.blogspot.com/2010/06/rx-part-5-combining-multiple.html"&gt;Part 5 - Combining multiple IObservable streams&lt;/a&gt;&lt;/li&gt;    &lt;li&gt;&lt;a href="http://leecampbell.blogspot.com/2010/06/rx-part-6-scheduling-and-threading.html"&gt;Part 6 - Scheduling and threading&lt;/a&gt;&lt;/li&gt;    &lt;li&gt;&lt;a href="http://leecampbell.blogspot.com/2010/08/rx-part-7-hot-and-cold-observables.html"&gt;Part 7 - Hot and Cold observables&lt;/a&gt;&lt;/li&gt; &lt;/ul&gt;</description><pubDate>Fri, 27 Aug 2010 02:08:09 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sweinstein/intro-to-rx</guid><category>.NET</category><category>ReactiveExtentions</category><category>RxNet</category></item><item><title>Bending Time with the Reactive Extensions</title><link>https://weblogs.asp.net:443/sweinstein/bending-time-with-the-reactive-extensions</link><description>&lt;p&gt;The latest releases of the Reactive Extensions for .Net include an abstract VirtualScheduler and a concrete implementation called TestScheduler.&lt;/p&gt;  &lt;p&gt;So now it’s possible test time dependent code without relying on the passage of time (or tide).&lt;/p&gt;  &lt;p&gt;Here’s a sample of code that would take 3 days to complete in the real&lt;/p&gt;  &lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;[Fact(Timeout = 1000)]
public void TestScheduler()
{
    List&amp;lt;long&amp;gt; actual = new List&amp;lt;long&amp;gt;();
    Observable.Interval(TimeSpan.FromDays(1), _testSched)
                                .Take(3)
                                .Subscribe(actual.Add);
    _testSched.Run();
    Assert.Equal(new[] { 0L, 1, 2 }, actual.ToArray());
}&lt;/pre&gt;

&lt;p&gt;Notice that I didn’t use a blocking call, such as &lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;.Take(3).ToEnumerable().ToArray()&lt;/pre&gt;

&lt;p&gt;to obtain a the values from the interval. The TestScheduler runs on the current thread, and as a result blocking calls never complete.&lt;/p&gt;

&lt;p&gt;&amp;#160;&lt;/p&gt;

&lt;p&gt;Here’s another example where we run for a specific duration. Usefull when testing Observables that never end&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;[Fact]
public void TestOneElementSlidingWindow()
{
    List&amp;lt;SlidingWindow&amp;lt;Timestamped&amp;lt;int&amp;gt;&amp;gt;&amp;gt; actual = new List&amp;lt;SlidingWindow&amp;lt;Timestamped&amp;lt;int&amp;gt;&amp;gt;&amp;gt;();
    var oneBeat = Observable.Return&amp;lt;int&amp;gt;(1, _testSched).Timestamp(_testSched);
    var sWindow = oneBeat.ToSlidingWindow(_oneSecond, _oneSecond, _testSched);
    sWindow.Subscribe(slw =&amp;gt; actual.Add(slw));

    _testSched.RunTo(_testSched.FromTimeSpan(TimeSpan.FromSeconds(3)));

    Assert.Equal(2, actual.Count);

    Assert.Equal(1, actual[0].Added.Count());
    Assert.Equal(1, actual[0].Current.Count());
    Assert.Equal(0, actual[0].Removed.Count());

    Assert.Equal(0, actual[1].Added.Count());
    Assert.Equal(0, actual[1].Current.Count());
    Assert.Equal(1, actual[1].Removed.Count());
}&lt;/pre&gt;

&lt;p&gt;&amp;#160;&lt;/p&gt;

&lt;p&gt;Code samples updated at &lt;a href="http://code.msdn.microsoft.com/RxDemos"&gt;http://code.msdn.microsoft.com/RxDemos&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Also - &lt;a href="http://twitter.com/jvgogh" target="_blank"&gt;Jeffrey van Gogh&lt;/a&gt; promises more to come on #c9&lt;/p&gt;</description><pubDate>Sun, 22 Aug 2010 19:23:27 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sweinstein/bending-time-with-the-reactive-extensions</guid><category>.NET</category><category>ReactiveExtentions</category><category>RX</category></item><item><title>PowerShell and a bit of the Task Parallel Library as a replacement for SSIS</title><link>https://weblogs.asp.net:443/sweinstein/powershell-and-a-bit-of-the-task-parallel-library-as-a-replacement-for-ssis</link><description>&lt;P&gt;I gave a presentation at today’s SQL Saturday in NY on replacing SSIS with PowerShell. &lt;/P&gt;
&lt;P&gt;You can &lt;A href="http://www.slideshare.net/ScottWeinstein/using-powershell-to-simplify-your-etl" target=_blank&gt;view the presentation&lt;/A&gt;, or see below for the two second summary:&lt;/P&gt;
&lt;UL&gt;
&lt;LI&gt;SSIS is a terrible development tool&lt;/LI&gt;
&lt;LI&gt;Many SSIS features can be built with out much effort with PowerShell, C#, and the TPL&lt;/LI&gt;
&lt;LI&gt;See the demos&lt;/LI&gt;&lt;/UL&gt;
&lt;P&gt;The code is hosted at &lt;A title=http://psis.codeplex.com/ href="http://psis.codeplex.com/" mce_href="http://psis.codeplex.com/"&gt;http://psis.codeplex.com/&lt;/A&gt; Currently it has the following capabilities&lt;/P&gt;
&lt;UL&gt;
&lt;LI&gt;Concurrent bulk data transfer&lt;/LI&gt;
&lt;LI&gt;Single pass star-schema populator&lt;/LI&gt;&lt;/UL&gt;
&lt;P&gt;&lt;A href="http://weblogs.asp.net/sweinstein/contact.aspx" target=_blank mce_href="http://weblogs.asp.net/sweinstein/contact.aspx"&gt;Contact me&lt;/A&gt; if you’d like to contribute or collaborate on this.&lt;/P&gt;</description><pubDate>Sat, 24 Apr 2010 18:19:00 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sweinstein/powershell-and-a-bit-of-the-task-parallel-library-as-a-replacement-for-ssis</guid><category>PowerShell</category><category>SqlServer</category><category>SSIS</category><category>TPL</category></item><item><title>Tracking My Internet Provider Speeds</title><link>https://weblogs.asp.net:443/sweinstein/tracking-my-internet-provider-speeds</link><description>&lt;p&gt;Of late, our broadband internet has been feeling sluggish. A call to the company took way more hold-time than I wanted to spend, and it only fixed the problem for a short while. Thus a perfect opportunity to play with some new tech to solve a problem, in this case, documenting a systemic issue from a service provider.&lt;/p&gt;  &lt;p&gt;The goal – a log a internet speeds, taken say every 15 min. Recording ping time, upload speed, download speed, and local LAN usage.&lt;/p&gt;  &lt;p&gt;&amp;#160;&lt;/p&gt;  &lt;p&gt;The solution&lt;/p&gt;  &lt;ul&gt;   &lt;li&gt;A WCF service to measure speeds&lt;/li&gt;    &lt;ul&gt;     &lt;li&gt;Internet speed was measured via &lt;a href="http://speedtest.net/" target="_blank"&gt;speedtest.net&lt;/a&gt;&lt;/li&gt;      &lt;li&gt;LAN usage was measured by querying my router for packets received and sent&lt;/li&gt;   &lt;/ul&gt;    &lt;li&gt;A SQL express instance to persist the data&lt;/li&gt;    &lt;li&gt;A PowerShell script to invoke the WCF service – launched by Windows’ Task Scheduler&lt;/li&gt;    &lt;li&gt;An OData WCF Data Service to allow me to read the data&lt;/li&gt;    &lt;li&gt;&lt;strike&gt;&lt;a href="http://www.powerpivot.com/" target="_blank"&gt;MS PowerPivot&lt;/a&gt; to show a nice viz&lt;/strike&gt; (scratch that, the beta expired)&lt;/li&gt;    &lt;li&gt;&lt;a href="www.linqpad.net/" target="_blank"&gt;LinqPad&lt;/a&gt; to get the data, export it to excel&lt;/li&gt;    &lt;li&gt;&lt;a href="http://www.tableausoftware.com/public/" target="_blank"&gt;Tableau Public&lt;/a&gt; to show the viz&lt;/li&gt; &lt;/ul&gt;  &lt;p&gt;&amp;#160;&lt;/p&gt;  &lt;p&gt;&amp;#160;&lt;/p&gt; &lt;script type="text/javascript" src="http://public.tableausoftware.com/javascripts/api/viz_v1.js"&gt;&lt;/script&gt;&lt;object class="tableauViz" width="654" height="469" style="display:none;"&gt;&lt;param name="name" value="FiosSpeeds2/Dashboard1" /&gt;&lt;param name="toolbar" value="yes" /&gt;&lt;/object&gt;&lt;noscript&gt;&lt;/noscript&gt;  &lt;div style="padding-bottom: 0px; margin-top: -6px; padding-left: 0px; width: 654px; padding-right: 10px; font: 8pt verdana,helvetica,arial,sans-serif; height: 22px; color: black; padding-top: 0px"&gt;   &lt;div style="padding-left: 538px"&gt;&lt;a href="http://www.tableausoftware.com/public?ref=http://public.tableausoftware.com/views/FiosSpeeds2/Dashboard1" target="_blank"&gt;Powered by Tableau&lt;/a&gt;&lt;/div&gt; &lt;/div&gt;</description><pubDate>Sat, 10 Apr 2010 18:52:53 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sweinstein/tracking-my-internet-provider-speeds</guid><category>.NET</category><category>General Software Development</category><category>OData</category><category>WCF</category></item><item><title>Samples and Slides from Alt.Net Meet up on the Reactive Extensions</title><link>https://weblogs.asp.net:443/sweinstein/samples-and-slides-from-alt-net-meet-up-on-the-reactive-extensions</link><description>&lt;p&gt;The code samples and PowerPoint deck from my presentation on the RX to the New York ALT.NET group are available (and updated) on MSDN Code samples:&lt;/p&gt;  &lt;p&gt;&lt;a title="http://code.msdn.microsoft.com/RxDemos" href="http://code.msdn.microsoft.com/RxDemos"&gt;http://code.msdn.microsoft.com/RxDemos&lt;/a&gt;&lt;/p&gt;  &lt;p&gt;And the &lt;a href="http://www.slideshare.net/ScottWeinstein/cfakepathintro-to-rx" target="_blank"&gt;slide deck&lt;/a&gt;&lt;/p&gt;</description><pubDate>Thu, 28 Jan 2010 03:29:52 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sweinstein/samples-and-slides-from-alt-net-meet-up-on-the-reactive-extensions</guid><category>.NET</category><category>CEP</category><category>General Software Development</category><category>ReactiveExtentions</category><category>RX</category></item><item><title>Converting a polling based API into a streaming API with the Reactive Extensions</title><link>https://weblogs.asp.net:443/sweinstein/converting-a-polling-based-api-into-a-streaming-api-with-the-reactive-extensions</link><description>&lt;p&gt;Recently my building has been having issues with its boilers, and the heat has been going out for longer than is comfortable. The superintendent that makes a habit of periodically checking on the status of each of the boilers. A workable approach certainly, but figured this would be ideal for a technology assist.&lt;/p&gt;  &lt;p&gt;&amp;#160;&lt;/p&gt;  &lt;p&gt;&lt;a href="http://www.dealextreme.com/details.dx/sku.7003" target="_blank"&gt;For $9, I purchased a USB thermometer&lt;/a&gt;, word on the web is that the software comes with its fairly miserable (and crashed&amp;#160; immediately on machine), but with some Google and reflector, was able to come up with a polling based API to read the temperature&lt;/p&gt;  &lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;public interface IUsbTEMPer
{
    double Temperature { get; }
}&lt;/pre&gt;

&lt;p&gt;With the RX, converting the API into a stream of data is just one line:&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;IObservable&amp;lt;double&amp;gt; ts = Observable.Generate(
Scheduler.Later, 
() =&amp;gt; new Notification&amp;lt;double&amp;gt;.OnNext(usbTempReader.Temperature)
).Publish();&lt;/pre&gt;

&lt;p&gt;And getting some simple alerts is easy too:&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;ts.Buffer(new TimeSpan(1, 5, 0))
    .Select(fiveminOfTemp =&amp;gt; fiveminOfTemp.Average())
    .Where(avgtemp =&amp;gt; avgtemp &amp;lt; 65)
    .Subscribe(cold =&amp;gt; ToTwiter(&amp;quot;buildingstatus account...&amp;quot;));&lt;/pre&gt;</description><pubDate>Fri, 22 Jan 2010 03:47:56 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sweinstein/converting-a-polling-based-api-into-a-streaming-api-with-the-reactive-extensions</guid><category>.NET</category><category>CEP</category><category>LINQ</category><category>ReactiveExtentions</category></item><item><title>The Anonymous Implementation pattern (as seen in the Reactive Extensions)</title><link>https://weblogs.asp.net:443/sweinstein/the-anonymous-implementation-pattern-as-seen-in-the-reactive-extensions</link><description>&lt;p&gt;There’s a pattern used in the Reactive Extensions that I’m calling the Anonymous Implementation.&lt;/p&gt;  &lt;p&gt;You can see it in use on IObservable’s one method&lt;/p&gt;  &lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;IDisposable Subscribe(IObserver&amp;lt;T&amp;gt; observer);&lt;/pre&gt;

&lt;p&gt;Given an Observable which you want to subscribe to, the brute force, naive (or pedantic) approach would be to create a new class implementing IObserver&amp;lt;T&amp;gt; and then subscribe it.&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;public class BasicConsoleWriteLineObserver&amp;lt;T&amp;gt;: IObserver&amp;lt;T&amp;gt;
{
    public void OnNext(T value)
    {
        Console.WriteLine(value);
    }
    
    public void OnError(Exception ex) 
    {
        throw ex;
    }
    public void OnCompleted() { }
}

IObservable&amp;lt;int&amp;gt; stream = ...;
stream.Subscribe(new BasicConsoleWriteLineObserver&amp;lt;int&amp;gt;());&lt;/pre&gt;

&lt;p&gt;But a simpler method, one that dispenses of all the ceremony of creating a new Type is to use one of the Observer factory methods, like so:&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;IObserver&amp;lt;int&amp;gt; basicCWLobserver = Observer.Create((int value) =&amp;gt; Console.WriteLine(value));
stream.Subscribe(basicCWLobserver);&lt;/pre&gt;

&lt;p&gt;Observer.Create (and it’s 4 overloads) will create an anonymous implementation of the interface needed. Of course you can skip a step or two and use the extension methods on IObservable to get even shorter versions of the same:&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;stream.Subscribe(Console.WriteLine);&lt;/pre&gt;

&lt;p&gt;This approach is great, for all but the most complex implementations, there’s no need to ever implement IObserver&amp;lt;T&amp;gt;.&amp;#160; And as they say, less code is better code.&lt;/p&gt;

&lt;p&gt;&amp;#160;&lt;/p&gt;

&lt;p&gt;There are a number of other interfaces that I wish used this approach. IValueConverter, and IComparer&amp;lt;T&amp;gt; come to mind first.&lt;/p&gt;</description><pubDate>Mon, 11 Jan 2010 04:40:14 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sweinstein/the-anonymous-implementation-pattern-as-seen-in-the-reactive-extensions</guid><category>.NET</category><category>General Software Development</category><category>LINQ</category><category>ReactiveExtentions</category></item><item><title>16 Ways To Create IObservables without implementing IObservable</title><link>https://weblogs.asp.net:443/sweinstein/16-ways-to-create-iobservables-without-implementing-iobservable</link><description>&lt;p&gt;The &lt;a href="http://msdn.microsoft.com/en-us/devlabs/ee794896.aspx" target="_blank"&gt;Reactive Extensions for .Net&lt;/a&gt; offers plenty of ways to create IObservables&lt;/p&gt;  &lt;p&gt;&amp;#160;&lt;/p&gt;  &lt;p&gt;&lt;strong&gt;Some primitives&lt;/strong&gt;&lt;/p&gt;  &lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;IObservable&amp;lt;int&amp;gt; obs = Observable.Empty&amp;lt;int&amp;gt;();
IObservable&amp;lt;int&amp;gt; obs = Observable.Return(0);
IObservable&amp;lt;int&amp;gt; obs = Observable.Throw&amp;lt;int&amp;gt;(new Exception());&lt;/pre&gt;

&lt;p&gt;&lt;strong&gt;Simple streams&lt;/strong&gt;&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;IObservable&amp;lt;long&amp;gt; obs = Observable.Interval(new TimeSpan(0, 0, 1));
IObservable&amp;lt;long&amp;gt; obs = Observable.Timer(DateTimeOffset.Now.AddHours(1)); // Plus 7 more overloads
IObservable&amp;lt;int&amp;gt; obs = Observable.Repeat(1); // Plus 7 more overloads
IObservable&amp;lt;int&amp;gt; obs = Observable.Range(0, 1);&lt;/pre&gt;

&lt;p&gt;&amp;#160;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;From async data&lt;/strong&gt;&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;//From an Action or Func
Observable.Start(() =&amp;gt; 1);

//From Task
Task.Factory.StartNew(...).ToObservable();

//From AsyncPattern
// typical use case is IO or Web service calls
Func&amp;lt;int,int,double&amp;gt; sampleFunc = (a,b) =&amp;gt; 1d;
Func&amp;lt;int, int, IObservable&amp;lt;double&amp;gt;&amp;gt; funcObs = 
                  Observable.FromAsyncPattern&amp;lt;int, int, double&amp;gt;(sampleFunc.BeginInvoke, 
                                                                      sampleFunc.EndInvoke);
IObservable&amp;lt;double&amp;gt; obs = funcObs(1, 0);&lt;/pre&gt;

&lt;p&gt;&lt;strong&gt;From Events&lt;/strong&gt;&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;public event EventHandler&amp;lt;EventArgs&amp;gt; AnEvent;
IObservable&amp;lt;IEvent&amp;lt;EventArgs&amp;gt;&amp;gt; fromEventObs = 
Observable.FromEvent&amp;lt;EventArgs&amp;gt;(h =&amp;gt; this.AnEvent += h, 
                                      h =&amp;gt; this.AnEvent -= h);&lt;/pre&gt;
&lt;strong&gt;From Existing Collections&lt;/strong&gt; 

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;IEnumerable&amp;lt;int&amp;gt; ie = new int[] {};
observable = ie.ToObservable();&lt;/pre&gt;

&lt;p&gt;&lt;strong&gt;By Generate()&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;There are 20 overloads to generate. See some prior examples &lt;a href="http://weblogs.asp.net/sweinstein/archive/2009/11/25/sliding-windows-via-the-reactive-framework.aspx" target="_blank"&gt;here&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;By Create()&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;This creates a &lt;strong&gt;cold&lt;/strong&gt; stream&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;IObservable&amp;lt;int&amp;gt; observable = Observable.Create&amp;lt;int&amp;gt;(o =&amp;gt;
                                {
                                    o.OnNext(1);
                                    o.OnNext(2);
                                    o.OnCompleted();
                                    return () =&amp;gt; { };
                                });&lt;/pre&gt;

&lt;p&gt;To make a &lt;strong&gt;hot&lt;/strong&gt; stream via Create()&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;List&amp;lt;IObserver&amp;lt;int&amp;gt;&amp;gt; _subscribed = new List&amp;lt;IObserver&amp;lt;int&amp;gt;&amp;gt;();
private CreateHot()
{
    observable = Observable.Create&amp;lt;int&amp;gt;(o =&amp;gt;
    {
        _subscribed.Add(o);
        return () =&amp;gt; _subscribed.Remove(o);
    });

}
private void onNext(int val)
{
    foreach (var o in _subscribed)
    {
        o.OnNext(val);
    }
}&lt;/pre&gt;

&lt;p&gt;But rather than using create, a subject provides a cleaner (thread safe and tested) way of doing the above&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;var subj = new Subject&amp;lt;int&amp;gt;();
observable = subj.Hide();
subj.OnNext(1);&lt;/pre&gt;

&lt;p&gt;&lt;a href="http://weblogs.asp.net/sweinstein/archive/2009/11/25/sliding-windows-via-the-reactive-framework.aspx" target="_blank"&gt;&lt;/a&gt;&lt;/p&gt;</description><pubDate>Mon, 11 Jan 2010 02:20:45 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sweinstein/16-ways-to-create-iobservables-without-implementing-iobservable</guid><category>.NET</category><category>LINQ</category><category>ReactiveExtentions</category></item><item><title>Streaming OLAP with the Reactive Extensions (RX) for .Net</title><link>https://weblogs.asp.net:443/sweinstein/streaming-olap-with-the-reactive-extensions-rx-for-net-part-1</link><description>&lt;p&gt;Streaming OLAP is something that comes up over and over again in the “CEP space” – using the &lt;a href="http://msdn.microsoft.com/en-us/devlabs/ee794896.aspx" target="_blank"&gt;Reactive Extensions for .Net&lt;/a&gt; this demo shows the basics; filtering, grouping, aggregates, and concurrent queries.&lt;/p&gt;  &lt;p&gt;To set the context, the idea here is focus on the “query” side, so the UI is aggressively simple – no nifty visualizations here, just enough to show working code. I chose a non-financial domain, filesystem changes in this case, simply because the simplified equities example is a bit overused. &lt;/p&gt;  &lt;p&gt;Here’s a screenshot and download link &lt;a title="http://code.msdn.microsoft.com/FSOlapRxDemo" href="http://code.msdn.microsoft.com/FSOlapRxDemo"&gt;http://code.msdn.microsoft.com/FSOlapRxDemo&lt;/a&gt;&amp;#160;&lt;/p&gt;  &lt;p&gt;&lt;a href="https://aspblogs.blob.core.windows.net/media/sweinstein/Media/image_6F2A4D9C.png"&gt;&lt;img style="border-right-width: 0px; display: inline; border-top-width: 0px; border-bottom-width: 0px; border-left-width: 0px" title="app screenshot" border="0" alt="app screenshot" src="https://aspblogs.blob.core.windows.net/media/sweinstein/Media/image_thumb_560E31BD.png" width="644" height="174" /&gt;&lt;/a&gt; &lt;/p&gt;  &lt;p&gt;Let’s walk through the code:&lt;/p&gt;  &lt;p&gt;&lt;strong&gt;Getting the data&lt;/strong&gt;&lt;/p&gt;  &lt;p&gt;&lt;a href="https://aspblogs.blob.core.windows.net/media/sweinstein/Media/image_1A4BC7FF.png"&gt;&lt;img style="border-right-width: 0px; display: inline; border-top-width: 0px; border-bottom-width: 0px; border-left-width: 0px" title="raw data" border="0" alt="raw data" src="https://aspblogs.blob.core.windows.net/media/sweinstein/Media/image_thumb_66A35BA0.png" width="432" height="22" /&gt;&lt;/a&gt; &lt;/p&gt;  &lt;p&gt;The .Net class FileSystemWatcher will report changes to the filesystem as events, so to get a stream of filesystem changes we only need do the following:&lt;/p&gt;  &lt;ul&gt;   &lt;ul&gt;     &lt;li&gt;get all the fixed drives &lt;/li&gt;      &lt;li&gt;create a new FileSystemWatcher for each &lt;/li&gt;      &lt;li&gt;convert the Changed, Deleted, and Created events to IObservables using &lt;strong&gt;FromEvent()&lt;/strong&gt; &lt;/li&gt;      &lt;li&gt;&lt;strong&gt;Merge()&lt;/strong&gt; those into a single IObservable &lt;/li&gt;      &lt;li&gt;map the EventArgs to to a more query friendly class &lt;/li&gt;   &lt;/ul&gt; &lt;/ul&gt;  &lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;public static IObservable&amp;lt;FileChangeFact&amp;gt; GetFileSystemStream()
{
    var fsEventTypes = new string[] { &amp;quot;Changed&amp;quot;, &amp;quot;Deleted&amp;quot;, &amp;quot;Created&amp;quot; };

    IEnumerable&amp;lt;IObservable&amp;lt;IEvent&amp;lt;FileSystemEventArgs&amp;gt;&amp;gt;&amp;gt; fsEventsAsObservables =
        DriveInfo.GetDrives()
                .Where(di =&amp;gt; di.DriveType == DriveType.Fixed)
                .Select(drive =&amp;gt; new FileSystemWatcher(drive.RootDirectory.FullName) { … })
                .SelectMany(fsw =&amp;gt; fsEventTypes.Select(eventType =&amp;gt; Observable.FromEvent…;

    return  Observable.Merge(fsEventsAsObservables)
        .Select(fsea =&amp;gt;
        {
            var fi = new FileInfo(fsea.EventArgs.FullPath);
            return new FileChangeFact
            {
                ChangeType = fsea.EventArgs.ChangeType,
                Path = fsea.EventArgs.FullPath,
                IsContainer = !fi.Exists,
                Length = fi.Exists ? fi.Length : 0,
                Extension = String.IsNullOrEmpty(fi.Extension) ? &amp;quot;(none)&amp;quot; : fi.Extension
            };
        });
}&lt;/pre&gt;

&lt;p&gt;&lt;strong&gt;Calculating Aggregates, take 1&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://aspblogs.blob.core.windows.net/media/sweinstein/Media/image_663728AB.png"&gt;&lt;img style="border-right-width: 0px; display: inline; border-top-width: 0px; border-bottom-width: 0px; border-left-width: 0px" title="total aggregates" border="0" alt="total aggregates" src="https://aspblogs.blob.core.windows.net/media/sweinstein/Media/image_thumb_45AFE8F9.png" width="179" height="9" /&gt;&lt;/a&gt; &lt;/p&gt;

&lt;p&gt;The Scan() operator is ideal for computing aggregates in a streaming olap scenario. Unlike traditional queries where vectors are aggregated into a single value, we want to computing running values. &lt;/p&gt;

&lt;p&gt;So to compute a few of the most common query aggregates, &lt;em&gt;Count&lt;/em&gt;, &lt;em&gt;Sum&lt;/em&gt;, &lt;em&gt;Mean&lt;/em&gt;, and &lt;em&gt;StdDev&lt;/em&gt; we can do the following:&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;public static IObservable&amp;lt;double&amp;gt; Mean(this IObservable&amp;lt;double&amp;gt; source)
{
    var temp = new { N = 0, Mean = 0d };
    return source.Scan(temp, (cur, next) =&amp;gt;
    {
        var n = cur.N + 1;
        var delta = next - cur.Mean;
        var meanp = cur.Mean + delta / n;
        return new
        {
            N = n,
            Mean = meanp,
        };
    }).Select(it =&amp;gt; it.Mean);
}
public static IObservable&amp;lt;double&amp;gt; StdDev(this IObservable&amp;lt;double&amp;gt; source)
{
    var temp = new { N = 0, Mean = 0d, M2 = 0d };
    return source.Scan(temp, (cur, next) =&amp;gt;
    {
        var n = cur.N + 1;
        var delta = next - cur.Mean;
        var meanp = cur.Mean + delta / n;
        return new
        {
            N = n,
            Mean = meanp,
            M2 = cur.M2 + delta * (next - meanp)
        };
    }).Select(it =&amp;gt; Math.Sqrt(it.M2 / (it.N)));
}

var fss = GetFileSystemStream();
fss.Select(fcf =&amp;gt; (double)fcf.Length)
        .Scan(0,(c, _) =&amp;gt; c + 1) // Count
        .Zip(lenxs.Scan(0d,(c, n) =&amp;gt; c + n), (cnt, sum) =&amp;gt; new FileChangeAggregate(){Sum=sum,Count...
        .Zip(lenxs.Mean(), (fca, mean) =&amp;gt; { fca.Mean = mean; return fca; })
        .Zip(lenxs.StdDev(), (fca, stddev) =&amp;gt; { fca.StdDev = stddev; return fca; })
        //... subscribe()...&lt;/pre&gt;

&lt;p&gt;The first two methods are just wrappers around Scan(), and to compute multiple aggregates on a single stream, the &lt;strong&gt;Zip()&lt;/strong&gt; operator comes in handy, letting us essentially stitch together multiple computations.&lt;/p&gt;

&lt;p&gt;One nice aspect of this approach is that the code for each of the aggregates cohesive, loosely coupled, and composable – all nice attributes, but it’s verbose and redundant (in that the Mean is calculated twice) and I suspect not as performant. So that takes us to &lt;/p&gt;

&lt;p&gt;&amp;#160;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Calculating Aggregates, take 2&lt;/strong&gt;&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;public static IObservable&amp;lt;StatInfoItem&amp;lt;T&amp;gt;&amp;gt; ToCommonAggregates&amp;lt;T, TSrc&amp;gt;(
                   this IObservable&amp;lt;TSrc&amp;gt; source,
                   Func&amp;lt;TSrc, double&amp;gt; dataSelector,
                   Func&amp;lt;TSrc, T&amp;gt; itemSelector)
{
    return source.Scan(new StatInfoItem&amp;lt;T&amp;gt;(), (cur, next) =&amp;gt;
    {
        double data = dataSelector(next);
        T itemp = itemSelector(next);
        var n = cur.Count + 1;
        var delta = data - cur.Mean;
        var meanp = cur.Mean + delta / n;
        var m2 = cur.M2 + delta * (data - meanp);
        var stdDevp = Math.Sqrt(m2 / n);
        return new StatInfoItem&amp;lt;T&amp;gt;()
        {
            Item = itemp,
            Sum = data + cur.Sum,
            Count = n,
            Mean = meanp,
            M2 = m2,
            StdDev = stdDevp,
            Min = Math.Min(data, cur.Min),
            Max = Math.Max(data, cur.Max),
        };
    })
    .Skip(1); // need a seed, but don't want to include seed value in the output
}&lt;/pre&gt;

&lt;p&gt;Which also add Min() &amp;amp; Max(). Perhaps not as elegant, but using the above is quite easy:&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;IOvservable&amp;lt;StatInfoItem&amp;lt;string&amp;gt;&amp;gt; aggstream = 
fss.ToCommonAggregates(fcf =&amp;gt; fcf.Length, _ =&amp;gt; &amp;quot;Label&amp;quot;)&lt;/pre&gt;

&lt;p&gt;&lt;strong&gt;Filtering and Multiple Queries&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://aspblogs.blob.core.windows.net/media/sweinstein/Media/image_09841D51.png"&gt;&lt;img style="border-right-width: 0px; display: inline; border-top-width: 0px; border-bottom-width: 0px; border-left-width: 0px" title="image" border="0" alt="image" src="https://aspblogs.blob.core.windows.net/media/sweinstein/Media/image_thumb_2932F719.png" width="150" height="23" /&gt;&lt;/a&gt; &lt;/p&gt;

&lt;p&gt;For the demo, each query will be a “drill down” into the filesystem, implemented as a filter on the full path of the file that changed. This is done with the same &lt;strong&gt;Where()&lt;/strong&gt; operator used in normal Linq. Adding multiple queries isn’t as trivial. Unlike IEnumerables, which contain their data, Observables only have a promise of future data. So if we want to say find out how much has changed in c:\windows, we need to query a store of past data as well as include future data. Here’s one approach using the &lt;strong&gt;StartWith()&lt;/strong&gt; operator and a ConcurrentQueue :&lt;/p&gt;

&lt;p&gt;&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;private IObservable&amp;lt;FileChangeFact&amp;gt; _fss;
private ConcurrentQueue&amp;lt;FileChangeFact&amp;gt; _store = new …
private void NewQuery()
{
    _fss = GetFileSystemStream();
    _fss.Subscribe(fsi =&amp;gt; _store.Enqueue(fsi));
    var newQuery = _fss.StartWith(_store.ToArray()).Where(fsi =&amp;gt; {...} );
} &lt;/pre&gt;

&lt;p&gt;A cleaner approach is to take advantage of the RX’s subjects, in particular the ReplaySubject which store history for us.&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;private ReplaySubject&amp;lt;FileChangeFact&amp;gt; _storeSubject;
_storeSubject = GetFileSystemStream().Replay();
var drillDownQry = _storeSubject.Where(fltr);&lt;/pre&gt;

&lt;p&gt;&lt;strong&gt;Grouping&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://aspblogs.blob.core.windows.net/media/sweinstein/Media/image_48E1D0E1.png"&gt;&lt;img style="border-right-width: 0px; display: inline; border-top-width: 0px; border-bottom-width: 0px; border-left-width: 0px" title="grouping" border="0" alt="grouping" src="https://aspblogs.blob.core.windows.net/media/sweinstein/Media/image_thumb_03C903AB.png" width="110" height="23" /&gt;&lt;/a&gt; &lt;/p&gt;

&lt;p&gt;Thanks to the &lt;strong&gt;GroupBy()&lt;/strong&gt; and &lt;strong&gt;SelectMany()&lt;/strong&gt; operators grouping turns out to be quite easy. First we group by the file extention, selecting the Length property. And for each group, compute the aggregates.&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;IObservable&amp;lt;StatInfoItem&amp;lt;string&amp;gt;&amp;gt; grouped = 
                newQuerystream.GroupBy(fsi =&amp;gt; fsi.Extension, fsi =&amp;gt; (double)fsi.Length)
.SelectMany(grp =&amp;gt; grp.ToCommonAggregates(x =&amp;gt; x, _ =&amp;gt; grp.Key));&lt;/pre&gt;

&lt;p&gt;&lt;strong&gt;Updating the UI&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Merging a stream of data into an Observable&lt;strong&gt;Collection&amp;lt;T&amp;gt;&lt;/strong&gt; is such a common operation. I’ve found myself wishing that MS offered a KeyedObservableCollection&amp;lt;T&amp;gt; and I’ve considered writting such a thing, but instead I used a smaller extension method &lt;strong&gt;MergeInsert()&lt;/strong&gt; which does the job without the all the additional ceremony of a new class&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;public static IDisposable MergeInsert&amp;lt;T, TKey&amp;gt;(
                 this ObservableCollection&amp;lt;T&amp;gt; col, 
                 IObservable&amp;lt;T&amp;gt; stream, 
                 Func&amp;lt;T, TKey&amp;gt; keySelector)
{
    col.Clear();
    Dictionary&amp;lt;TKey, int&amp;gt; lookupTable = new Dictionary&amp;lt;TKey, int&amp;gt;();
    return stream.Subscribe(item =&amp;gt;
    {
        var key = keySelector(item);
        if (!lookupTable.ContainsKey(key))
        {
            lookupTable[key] = col.Count;
            col.Add(item);
        }
        else
        {
            col[lookupTable[key]] = item;
        }
    });
}&lt;/pre&gt;

&lt;p&gt;Use it like so:&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;GroupedByExtentionCollection = new ObservableCollection&amp;lt;StatInfoItem&amp;lt;string&amp;gt;&amp;gt;();
IDisposable _disp = GroupedByExtentionCollection.MergeInsert(grouped.ObserveOnDispatcher(), 
                                                             sii =&amp;gt; sii.Item);&lt;/pre&gt;</description><pubDate>Sun, 03 Jan 2010 01:28:06 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sweinstein/streaming-olap-with-the-reactive-extensions-rx-for-net-part-1</guid><category>.NET</category><category>CEP</category><category>LINQ</category><category>ReactiveExtentions</category><category>RX</category></item><item><title>CEP Style Sliding windows in the RX – Take 2</title><link>https://weblogs.asp.net:443/sweinstein/cep-style-sliding-windows-in-the-rx-take-2</link><description>&lt;p&gt;The bug I mentioned in my &lt;a href="http://weblogs.asp.net/sweinstein/archive/2009/11/25/sliding-windows-via-the-reactive-framework.aspx" target="_blank"&gt;first attempt at a sliding window&lt;/a&gt; was the minor issue that the aggegates never went down to 0, even if the window had emptied out.&lt;/p&gt;  &lt;p&gt;The problem line of code was cur.GroupBy(tsst =&amp;gt; tsst.Value.Symbol) – if the window is empty, there is nothing to group – and as a result the aggregates don’t get computed.&lt;/p&gt;  &lt;p&gt;Here’s the fix:&lt;/p&gt;  &lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;public IObservable&amp;lt;VWAPItem&amp;gt; GetVWAPWS(IObservable&amp;lt;Timestamped&amp;lt;StockTick&amp;gt;&amp;gt; oticks)
{
    var existingWindows = new ConcurrentDictionary&amp;lt;string,int&amp;gt;();
    return oticks
              .ToSlidingWindow(new TimeSpan(0, 0, 0, 30), new TimeSpan(0, 0, 0, 0, 500))
              .Select(sl =&amp;gt; sl.Current)
              .SelectMany(cur =&amp;gt;
                  {
                      IEnumerable&amp;lt;VWAPItem&amp;gt; grouped = cur.GroupBy(tsst =&amp;gt; tsst.Value.Symbol)
                          .Select(grp =&amp;gt;
                              {
                                  IEnumerable&amp;lt;StockTick&amp;gt; ticks = grp.Select(tsst2 =&amp;gt; tsst2.Value);
                                  var totalAmount = ticks.Sum(tk =&amp;gt; tk.Size * tk.Price);
                                  var totalVolume = ticks.Sum(tk =&amp;gt; tk.Size);
                                  return new VWAPItem(grp.Key, totalAmount, 
								totalVolume, 
								totalAmount / totalVolume);
                              });
                      foreach (var grpd in grouped)
                      {
                          existingWindows[grpd.Symbol] = 1;
                      } 
                     IEnumerable&amp;lt; IEnumerable&amp;lt;VWAPItem&amp;gt;&amp;gt; outerJoin = existingWindows
                                                        .GroupJoin(grouped, 
                                                            key =&amp;gt; key.Key, 
                                                            grped =&amp;gt; grped.Symbol,
                                                            (key, item) =&amp;gt; 
								item.DefaultIfEmpty(new VWAPItem(key.Key, 0, 0, 0)));
                     return outerJoin.SelectMany(x =&amp;gt; x);
                  });
 
}&lt;/pre&gt;</description><pubDate>Mon, 30 Nov 2009 04:01:14 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sweinstein/cep-style-sliding-windows-in-the-rx-take-2</guid><category>.NET</category><category>CEP</category><category>LINQ</category><category>ReactiveFramework</category><category>RX</category></item><item><title>Sliding Windows via the Reactive Framework</title><link>https://weblogs.asp.net:443/sweinstein/sliding-windows-via-the-reactive-framework</link><description>&lt;p&gt;A few months ago, playing with CTP 2 of StreamInsight, I created a small &lt;a href="http://weblogs.asp.net/sweinstein/archive/2009/08/25/from-the-reactiveframework-to-streaminsight-and-back.aspx" target="_blank"&gt;VWAP demo on a sliding window&lt;/a&gt;. Now that a proper &lt;a href="http://msdn.microsoft.com/en-us/devlabs/ee794896.aspx" target="_blank"&gt;CTP of the RX is available&lt;/a&gt;, I wanted to see how much effort the same demo would be without the CEP infrastructure of StreamInsight. I’ll admit that this was a little bit harder to write then I expected – and there’s still at least one bug remaining (&lt;a href="http://weblogs.asp.net/sweinstein/archive/2009/11/29/cep-style-sliding-windows-in-the-rx-take-2.aspx" target="_blank"&gt;updated&lt;/a&gt;) , but the code for actually computing the VWAPS feels much cleaner in the RX version then it did in the StreamInsight version. The debugability (which is really about transparency) of RX is a welcome difference to most CEP systems.&lt;/p&gt;  &lt;p&gt;So here’s the code: &lt;/p&gt;  &lt;p&gt;The generation of stock ticks remained nearly identical – however instead of timestamping by hand, I used to Timestamp() extension method. And to allow multiple observers to the same IObservable, the ticks are routed to a &lt;strong&gt;Subject&lt;/strong&gt;.&lt;/p&gt;  &lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;public IObservable&amp;lt;Timestamped&amp;lt;StockTick&amp;gt;&amp;gt; GetTicks()
{
    var subj = new Subject&amp;lt;Timestamped&amp;lt;StockTick&amp;gt;&amp;gt;();
    var gen = Observable.Generate(
                    0
                   , ii =&amp;gt; ii &amp;lt; 1000  // produce 1000 ticks
                   , ii =&amp;gt; new StockTick() // next value
...
                  )
                  .Timestamp();

    gen.Subscribe(tsst =&amp;gt; subj.OnNext(tsst));
    return subj;
}&lt;/pre&gt;

&lt;p&gt;Compute VWAP on a 10 second sliding window&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;public IObservable&amp;lt;VWAPItem&amp;gt; GetVWAPWS(IObservable&amp;lt;Timestamped&amp;lt;StockTick&amp;gt;&amp;gt; oticks)
{
    return oticks
              .ToSlidingWindow(new TimeSpan(0, 0, 0,10), new TimeSpan(0, 0, 0, 0, 500))
              .Select(sl =&amp;gt; sl.Current)
              .SelectMany(cur =&amp;gt;
                      cur.GroupBy(tsst =&amp;gt; tsst.Value.Symbol)
                          .Select(grp =&amp;gt;
                              {
                                  IEnumerable&amp;lt;StockTick&amp;gt; ticks = grp.Select(tsst2 =&amp;gt; tsst2.Value);
                                  var totalAmount = ticks.Sum(tk =&amp;gt; tk.Size * tk.Price);
                                  var totalVolume = ticks.Sum(tk =&amp;gt; tk.Size);
                                  return new VWAPItem(grp.Key, totalAmount, totalVolume, 
                                     totalAmount / totalVolume);
                              }));

}&lt;/pre&gt;

&lt;p&gt;And the code for ToSlidingWindow()&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;public static IObservable&amp;lt;SlidingWindow&amp;lt;Timestamped&amp;lt;T&amp;gt;&amp;gt;&amp;gt; ToSlidingWindow&amp;lt;T&amp;gt;(
                                     this IObservable&amp;lt;Timestamped&amp;lt;T&amp;gt;&amp;gt; source, 
              TimeSpan size, TimeSpan resolution)
{
    Func&amp;lt;SlidingWindow&amp;lt;Timestamped&amp;lt;T&amp;gt;&amp;gt;, TimeoutJoinItem&amp;lt;T&amp;gt;, SlidingWindow&amp;lt;Timestamped&amp;lt;T&amp;gt;&amp;gt;&amp;gt; 
     windowing = (window, item) =&amp;gt;
    {
        Func&amp;lt;Timestamped&amp;lt;T&amp;gt;, bool&amp;gt; checkTimestamp = 
                 cwi =&amp;gt; cwi.Timestamp.Add(size) &amp;lt;= item.ComparisonTimestamp;

        var newCurrent = window.Current.SkipWhile(checkTimestamp);
        var removed = window.Current.TakeWhile(checkTimestamp);

        var added = Enumerable.Repeat(item.TSItem, (item.IsTimeout) ? 0 : 1);
        return 
             new SlidingWindow&amp;lt;Timestamped&amp;lt;T&amp;gt;&amp;gt;(newCurrent.Concat(added), added, removed);
    };

    DateTime priorleft = DateTime.MinValue;
    return source.CombineLatest(Observable.Timer(resolution, resolution).Timestamp(), 
           (left, right) =&amp;gt;
           {
               bool isTimeout = left.Timestamp == priorleft;
               priorleft = left.Timestamp;
               return new TimeoutJoinItem&amp;lt;T&amp;gt;(left,
                         (isTimeout)? right.Timestamp: left.Timestamp,  
                          isTimeout);
           }).Scan(new SlidingWindow&amp;lt;Timestamped&amp;lt;T&amp;gt;&amp;gt;(), windowing)
             .Where(sl =&amp;gt; sl.Added.Count() &amp;gt; 0 || sl.Removed.Count() &amp;gt; 0);
}&lt;/pre&gt;

&lt;p&gt;The key elements in the above are&lt;/p&gt;

&lt;ul&gt;
  &lt;li&gt;Observable.Timer – this is our heartbeat which allows us to detect passage of time without new events &lt;/li&gt;

  &lt;li&gt;CombineLatest – Join two IObservables – the data stream and the time stream &lt;/li&gt;

  &lt;li&gt;Scan – this is Accumulate() for Observables – the windowing function takes the current Window and computes the new windows based on which elements have expired and been added &lt;/li&gt;

  &lt;li&gt;And finally reduce noise by removing SlidingWindows which have not changed &lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The code to wire it up to a windows is standard stuff, just &lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;var ticks = _model.GetTicks();
ticks
    .ObserveOnDispatcher()
    .Subscribe(tst =&amp;gt; TickCollection.Add(tst));

var vwapDict= new Dictionary&amp;lt;string,VWAPItem&amp;gt;();
_model.GetVWAPWS(ticks)
    .ObserveOnDispatcher()
    .Subscribe(vwap =&amp;gt;
        {
            if (vwapDict.ContainsKey(vwap.Symbol))
                VWAPCollection.Remove(vwapDict[vwap.Symbol]);
            vwapDict[vwap.Symbol] = vwap;
            VWAPCollection.Add(vwap);
        });&lt;/pre&gt;

&lt;p&gt;And of course the required screenshot&lt;/p&gt;

&lt;p&gt;&lt;img style="border-right-width: 0px; margin: 0px; display: inline; border-top-width: 0px; border-bottom-width: 0px; border-left-width: 0px" title="image" border="0" alt="image" src="https://aspblogs.blob.core.windows.net/media/sweinstein/Media/image_0C15590A.png" width="240" height="204" /&gt;&lt;/p&gt;
&lt;p&gt;
&lt;a href="http://www.dotnetkicks.com/kick/?url=http%3a%2f%2fweblogs.asp.net%2fsweinstein%2farchive%2f2009%2f11%2f25%2fsliding-windows-via-the-reactive-framework.aspx"&gt;&lt;img src="http://www.dotnetkicks.com/Services/Images/KickItImageGenerator.ashx?url=http%3a%2f%2fweblogs.asp.net%2fsweinstein%2farchive%2f2009%2f11%2f25%2fsliding-windows-via-the-reactive-framework.aspx" border="0" alt="kick it on DotNetKicks.com" /&gt;&lt;/a&gt;</description><pubDate>Wed, 25 Nov 2009 06:13:48 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sweinstein/sliding-windows-via-the-reactive-framework</guid></item><item><title>Reactive Framework available from DevLabs</title><link>https://weblogs.asp.net:443/sweinstein/reactive-framework-available-from-devlabs</link><description>&lt;p&gt;Downloads of the Reactive Framework (RX) can now be found at &lt;a href="http://msdn.microsoft.com/en-us/devlabs/ee794896.aspx" target="_blank"&gt;MS DevLabs&lt;/a&gt;. Versions for 3.5 SP1, 4.0 Beta, and Silverlight 3 are available. Interestingly, the API size appears to be substantially larger than the preview which was leaked as part of the Silverlight 3 Toolkit. That DLL was all of 84KB, the current release is weighs in at 283KB.&lt;/p&gt;  &lt;p&gt;&amp;#160;&lt;/p&gt;  &lt;p&gt;In regards to CEP, the comparisons between StreamInsight and RX are interesting&lt;/p&gt;  &lt;table border="1" cellspacing="0" cellpadding="2" width="532"&gt;&lt;tbody&gt;     &lt;tr&gt;       &lt;td valign="top" width="168"&gt;RX&lt;/td&gt;        &lt;td valign="top" width="193"&gt;&amp;#160;&lt;/td&gt;        &lt;td valign="top" width="169"&gt;StreamInsight&lt;/td&gt;     &lt;/tr&gt;      &lt;tr&gt;       &lt;td valign="top" width="168"&gt;Low – it’s managed code all the way down&lt;/td&gt;        &lt;td valign="top" width="193"&gt;Leaking abstraction&lt;/td&gt;        &lt;td valign="top" width="169"&gt;High; Linq2SI is like Linq2SQL, except the underling SI implementation isn’t well understood&lt;/td&gt;     &lt;/tr&gt;      &lt;tr&gt;       &lt;td valign="top" width="168"&gt;Limited by CLR and GC&lt;/td&gt;        &lt;td valign="top" width="193"&gt;Performance &lt;/td&gt;        &lt;td valign="top" width="169"&gt;High b/c of native code&lt;/td&gt;     &lt;/tr&gt;      &lt;tr&gt;       &lt;td valign="top" width="168"&gt;None out of box&lt;/td&gt;        &lt;td valign="top" width="193"&gt;Windowing support&lt;/td&gt;        &lt;td valign="top" width="169"&gt;Explicit&lt;/td&gt;     &lt;/tr&gt;      &lt;tr&gt;       &lt;td valign="top" width="168"&gt;Easy&lt;/td&gt;        &lt;td valign="top" width="193"&gt;Adaptor support&lt;/td&gt;        &lt;td valign="top" width="169"&gt;Not hard, but not trivial&lt;/td&gt;     &lt;/tr&gt;   &lt;/tbody&gt;&lt;/table&gt;</description><pubDate>Wed, 18 Nov 2009 04:23:08 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sweinstein/reactive-framework-available-from-devlabs</guid><category>CEP</category><category>CLR</category><category>ReactiveFramework</category><category>RX</category><category>StreamInsight</category></item><item><title>From the ReactiveFramework to StreamInsight and Back</title><link>https://weblogs.asp.net:443/sweinstein/from-the-reactiveframework-to-streaminsight-and-back</link><description>&lt;p&gt;In my last post I showed how to &lt;a href="http://weblogs.asp.net/sweinstein/archive/2009/08/23/routing-streaminsight-output-streams-to-a-ui.aspx" target="_blank"&gt;send StreamInsight output streams to a UI via the ReactiveFramework&lt;/a&gt;. Here’s we’ll do the reverse, by sending an RX stream into a CEP stream. Instead of a partial example, I’ll use an end to end example showing simulated stock ticks, computing the 5 min rolling &lt;a href="http://en.wikipedia.org/wiki/VWAP" target="_blank"&gt;VWAP&lt;/a&gt;, and showing the results on a UI.&lt;/p&gt;  &lt;p&gt;&amp;#160;&lt;/p&gt;  &lt;p&gt;First we’ll generate the ticks:&lt;/p&gt;  &lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;System.Collections.Generic.IObservable&amp;lt;StockTick&amp;gt; stockTicks =
    System.Linq.Observable.Generate(
                new Random() // inital state
                 , rnd =&amp;gt; true // continue
                 , rnd =&amp;gt; new StockTick() // next value
                         {
                             Price = rnd.NextDouble() * 1000,
                             Side = Sides[rnd.Next(Sides.Length - 1)],
                             Size = (long)(rnd.NextDouble() * 1000),
                             Symbol = Symbols[rnd.Next(Symbols.Length - 1)],
                             Timestamp = DateTime.Now
                         }
                         , rnd =&amp;gt; (int)(rnd.NextDouble() * 2000)  // waitInterval
                         , rnd =&amp;gt; rnd  // iterate
                         );&lt;/pre&gt;

&lt;p&gt;And now convert to a CEP stream:&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;var cepInput = stockTicks.ToCEP()
                .ToCepStream(tick =&amp;gt; tick.Timestamp);&lt;/pre&gt;

&lt;p&gt;Where &lt;font face="Cons"&gt;ToCep()&lt;/font&gt; is just the inverse of &lt;font face="con"&gt;ToRx()&lt;/font&gt;, &lt;a href="http://weblogs.asp.net/sweinstein/archive/2009/08/23/routing-streaminsight-output-streams-to-a-ui.aspx" target="_blank"&gt;defined previously&lt;/a&gt;. &lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;public static S.IObservable&amp;lt;T&amp;gt; ToCEP&amp;lt;T&amp;gt;(this RX.IObservable&amp;lt;T&amp;gt; rxSource)
{
    return new CEPAnonymousObservable&amp;lt;T&amp;gt;(
        (S.IObserver&amp;lt;T&amp;gt; cepObserver) =&amp;gt; 
            RX.ObservableExtensions.Subscribe(rxSource, nextVal=&amp;gt; cepObserver.OnNext(nextVal)));
}&lt;/pre&gt;

&lt;p&gt;Computing the rolling 5 min VWAP, (grouped by symbol) takes some effort&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;var alteredDurationStream = cepInput
                            .ToPointEventStream()
                            .AlterEventDuration(tick =&amp;gt; TimeSpan.FromMinutes(5));

var fiveMinVWaps =  from fivemin in
                    alteredDurationStream
                    group fivemin by fivemin.Symbol into fGrp
                    from evwindow in fGrp.Snapshot()
                    select new
                    { 
                       Symbol = fGrp.Key, 
                       TotalAmount = evwindow.Sum(fmin =&amp;gt; fmin.Size * fmin.Price),
                       TotalVolume = evwindow.Sum(fmin =&amp;gt; fmin.Size), 
                    };

var fiveMinVWaps2 = from fivemin in fiveMinVWaps
                    select new VWAPItem()
                    {
                        Symbol = fivemin.Symbol,
                        VWAP = fivemin.TotalAmount / fivemin.TotalVolume,
                        Timestamp = DateTime.Now,
                    };&lt;/pre&gt;

&lt;p&gt;Although this nearly looks like conventional .Net Linq code, it isn’t. Think Linq2SQL. These are expressions, not pure CLR lambdas, so it’s not possible to place a breakpoint, nor are any arbitrary .Net computations allowed. The reason for the additional fiveMinVWaps2 projection is that it’s not possible to compute anything but the Sum or Avg in a SnapShot().&lt;/p&gt;

&lt;p&gt;Now that we have the data, we can convert to back to a RX stream:&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;PropertyInfo tsProp = typeof(VWAPItem).GetProperty(&amp;quot;Timestamp&amp;quot;);
var vwaps = fiveMinVWaps2.ToObservable(tsProp).ToRX();&lt;/pre&gt;

&lt;p&gt;And update an ObservableCollection&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;vwaps.Post(_sc).Subscribe(item =&amp;gt;
{
    var exists = CEPOS1.Where(vw =&amp;gt; vw.Symbol == item.Symbol).FirstOrDefault();
    if (exists == null)
        CEPOS1.Add(item);
    else
        exists.CopyFrom(item);
});&lt;/pre&gt;

&lt;p&gt;&amp;#160;&lt;/p&gt;

&lt;p&gt;Which displays on a UI&lt;/p&gt;

&lt;p&gt;&lt;a href="https://aspblogs.blob.core.windows.net/media/sweinstein/Media/image_3439C57F.png"&gt;&lt;img style="border-right-width: 0px; display: inline; border-top-width: 0px; border-bottom-width: 0px; border-left-width: 0px" title="image" border="0" alt="image" src="https://aspblogs.blob.core.windows.net/media/sweinstein/Media/image_thumb_686DEEC5.png" width="176" height="244" /&gt;&lt;/a&gt;&lt;/p&gt;</description><pubDate>Tue, 25 Aug 2009 05:01:05 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sweinstein/from-the-reactiveframework-to-streaminsight-and-back</guid><category>.NET</category><category>CEP</category><category>LINQ</category><category>ReactiveFramework</category><category>RX</category><category>StreamInsight</category></item><item><title>Routing StreamInsight output streams to a UI</title><link>https://weblogs.asp.net:443/sweinstein/routing-streaminsight-output-streams-to-a-ui</link><description>&lt;p&gt;One compelling feature of StreamInsight is it’s in-process hosting model. In addition to reducing the complexity of server side installs, it’s now possible to have a&amp;#160; CEP engine in the client UI. &lt;/p&gt;  &lt;p&gt;The simplest way of getting CEP streams onto the UI would be the Reactive Framework methods. Something like&lt;/p&gt;  &lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;queryOutputStream
    .ToObservable(...)
    .Post(syncContext)
    .Subscribe(item=&amp;gt; collection.Add(item) );&lt;/pre&gt;

&lt;p&gt;But in the CTP that won’t work. As I discovered a few days ago The IObservable used in StreamInsight is defined in a different namespace and assembly than the IObservable in the System.Reactive. Furthermore the StreamInsight api lacks the base classes and extension methods defined in System.Reactive.&lt;/p&gt;

&lt;p&gt;I didn’t want to go the normal route of creating an implementation of IObserver, on say a ViewModel, route the data through the dispatcher on onto a collection, as while it would have the benefits of simplicity and it would work, it would mean giving up on all the goodness in System.Reactive.&lt;/p&gt;

&lt;p&gt;The first method I tried in an effort to convert a CEP IObservable into an RX IObservable didn’t work, but was instructive nonetheless. &lt;/p&gt;

&lt;p&gt;Using StreamInsight’s own I/O adapter API, I would create an “Eventing” Adapter which would raise an conventional .Net event on an object of my choosing, then using the ReactiveFramework, convert that event to an RX IObservable.&amp;#160; &lt;/p&gt;

&lt;p&gt;But it’s not easy (or possible) to do. Instances of output adapters are created by OutputAdapterFactories, which in turn are created by Factory methods. You can send in a configuration object, but it needs to be XML serializable, so there’s no sending in of Action&amp;lt;&amp;gt; delegates.&lt;/p&gt;

&lt;p&gt;&amp;#160;&lt;/p&gt;

&lt;p&gt;But it turns out that it’s not hard to convert from a CEP IObservable to an RX IObservable. &lt;/p&gt;

&lt;p&gt;First you need a CEP AnonymousObserver&amp;lt;T&amp;gt; &lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;using S = System;
internal class AnonymousObserver&amp;lt;T&amp;gt; : S.IObserver&amp;lt;T&amp;gt;
{
    private bool isStopped;
    private S.Action _onCompleted;
    private S.Action&amp;lt;S.Exception&amp;gt; _onError;
    private S.Action&amp;lt;T&amp;gt; _onNext;

    public AnonymousObserver(S.Action&amp;lt;T&amp;gt; onNext, S.Action&amp;lt;S.Exception&amp;gt; onError)
        : this(onNext, onError, () =&amp;gt; { })
    {
    }
    public AnonymousObserver(S.Action&amp;lt;T&amp;gt; onNext, S.Action&amp;lt;S.Exception&amp;gt; onError, S.Action onCompleted)
    {
        _onNext = onNext;
        _onError = onError;
        _onCompleted = onCompleted;
    }
    public void OnCompleted()
    {
        if (!isStopped)
        {
            isStopped = true;
            _onCompleted();
        }
    }
    public void OnError(S.Exception exception)
    {
        if (!isStopped)
        {
            isStopped = true;
            _onError(exception);
        }
    }
    public void OnNext(T value)
    {
        if (!isStopped)
            _onNext(value);
    }
}&lt;/pre&gt;

&lt;p&gt;Then an extension method taking a CEP IObservable, returning a RX AnonymousObservable&amp;lt;T&amp;gt;, subscribing to it via the CEP IObserver and on the OnNext, calling the returning RX IObservable’s on OnNext.&lt;/p&gt;

&lt;p&gt;Like so:&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;using RX = System.Collections.Generic;
using S = System;

public static class CEPExtMethods
{
    public static RX.IObservable&amp;lt;T&amp;gt; ToRX&amp;lt;T&amp;gt;(this S.IObservable&amp;lt;T&amp;gt; source)
    {
        return new AnonymousObservable&amp;lt;T&amp;gt;(
            (RX.IObserver&amp;lt;T&amp;gt; rxObserver) =&amp;gt;
                            source.Subscribe(new AnonymousObserver&amp;lt;T&amp;gt;(
                                    nextVal =&amp;gt; rxObserver.OnNext(nextVal),
                                    rxObserver.OnError
                                )));
    }
}&lt;/pre&gt;

&lt;p&gt;To use it:&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;var queryOutputStream = CreateQueryTemplate(input);
var queryOutput = queryOutputStream.ToObservable(typeof(EventTypeCount).GetField(&amp;quot;Time&amp;quot;));
queryOutput.ToRX().Send(_sc).Subscribe(v =&amp;gt; this.CEPOS1.Add(v));&lt;/pre&gt;

&lt;p&gt;&amp;#160;&lt;/p&gt;

&lt;p&gt;And results from the Observable sample on screen &lt;/p&gt;

&lt;p&gt;&lt;a href="https://aspblogs.blob.core.windows.net/media/sweinstein/Media/image_5FC9CBDB.png"&gt;&lt;img style="border-bottom: 0px; border-left: 0px; display: inline; border-top: 0px; border-right: 0px" title="image" border="0" alt="image" src="https://aspblogs.blob.core.windows.net/media/sweinstein/Media/image_thumb_54340E9C.png" width="170" height="243" /&gt;&lt;/a&gt;&lt;/p&gt;</description><pubDate>Sun, 23 Aug 2009 21:01:48 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sweinstein/routing-streaminsight-output-streams-to-a-ui</guid><category>.NET</category><category>CEP</category><category>ClickOnce</category><category>LINQ</category><category>ReactiveFramework</category><category>RX</category><category>StreamInsight</category></item><item><title>A first look at MS StreamInsight</title><link>https://weblogs.asp.net:443/sweinstein/a-first-look-at-ms-streaminsight</link><description>&lt;p&gt;This morning I was hoping to take a few minutes to modify one of the examples in the StreamInsight CTP and send an output stream to a UI, rather than the text files used in the examples. I thought this would be easy, as the readme states that there’s&lt;/p&gt;  &lt;blockquote&gt;   &lt;p&gt;“An alpha version of the StreamInsight libraries for development using the IObservable/IObserver programming paradigm.”&lt;/p&gt;    &lt;p&gt;&lt;/p&gt; &lt;/blockquote&gt;  &lt;p&gt;But it wasn’t. The IObservable used in StreamInsight is defined in a different namespace than the IObservable in the System.Reactive and the StreamInsight api lacks the base classes and extension methods defined in System.Reactive. At this time, the two APIs &lt;strong&gt;do not&lt;/strong&gt; play well with each other.&lt;/p&gt;  &lt;p&gt;&amp;#160;&lt;/p&gt;  &lt;p&gt;Some thoughts on how to get around this temporary inconsistency:&lt;/p&gt;  &lt;ul&gt;   &lt;li&gt;Recompile System.Reactive to use StreamInsight’s IObservable/IObserver &lt;/li&gt;    &lt;li&gt;Create a type converter between the two IObservable/IObservers &lt;/li&gt;    &lt;li&gt;Create a StreamInsight output adapter which just raises a .Net event, then use the RX method of converting events to IObservables &lt;/li&gt; &lt;/ul&gt;  &lt;p&gt;Perhaps tonight.&lt;/p&gt;</description><pubDate>Thu, 20 Aug 2009 12:23:05 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sweinstein/a-first-look-at-ms-streaminsight</guid></item><item><title>Exploring the Reactive Framework part II</title><link>https://weblogs.asp.net:443/sweinstein/exploring-the-reactive-framework-part-ii</link><description>&lt;p&gt;Talk around the water cooler is that it might be possible to use the Reactive Framework for some lightweight CEP.&lt;/p&gt;  &lt;p&gt;I’ll correct some of the (big) mistakes from &lt;a href="http://weblogs.asp.net/sweinstein/archive/2009/07/29/exploring-the-reactive-framework-rx.aspx" target="_blank"&gt;my last post&lt;/a&gt; and build up a “jumping” window extension method for &lt;font face="con"&gt;IObservable&lt;/font&gt;s.&lt;/p&gt;  &lt;p&gt;&amp;#160;&lt;/p&gt;  &lt;p&gt;In my last post I build a simple grouping method, but in it I immediately turned the push style of processing into a pull style by using the &lt;font face="Consolas"&gt;GetEnumerator()&lt;/font&gt; method. This is a bad idea for two key reasons, a) it takes the inherit elegance of the RX reduces it to a for loop, and b) it commits a cardinal sin of multi-threading and reserves a thread for a primarily blocking operation.&lt;/p&gt;  &lt;p&gt;&amp;#160;&lt;/p&gt;  &lt;p&gt;Here’s an improved version&lt;/p&gt;  &lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;public static IObservable&amp;lt;IEnumerable&amp;lt;TSource&amp;gt;&amp;gt; ToWindow&amp;lt;TSource&amp;gt;(
    this IObservable&amp;lt;TSource&amp;gt; source, 
    Func&amp;lt;TSource, IEnumerable&amp;lt;TSource&amp;gt;, bool&amp;gt; grouper)
{
    return RXGrouping.ToWindow(source, val =&amp;gt; val, grouper);
}


public static IObservable&amp;lt;IEnumerable&amp;lt;TResult&amp;gt;&amp;gt; ToWindow&amp;lt;TSource, TResult&amp;gt;(
 this IObservable&amp;lt;TSource&amp;gt; source,
 Func&amp;lt;TSource, TResult&amp;gt; selector, 
 Func&amp;lt;TSource, IEnumerable&amp;lt;TResult&amp;gt;, bool&amp;gt; grouper)
{
    List&amp;lt;TResult&amp;gt; res = new List&amp;lt;TResult&amp;gt;();
    return new AnonymousObservable&amp;lt;IEnumerable&amp;lt;TResult&amp;gt;&amp;gt;(
        observer =&amp;gt;
            source.Subscribe(
            nextVal =&amp;gt;
            {
                try
                {
                    if (!grouper(nextVal, res))
                    {
                        observer.OnNext(res);
                        res = new List&amp;lt;TResult&amp;gt;();
                    }
                    res.Add(selector(nextVal));
                }
                catch (Exception exception)
                {
                    observer.OnError(exception);
                    return;
                }
            }
            ,observer.OnError
            ,observer.OnCompleted));
}&lt;/pre&gt;

&lt;p&gt;The mistake in the prior version stemmed in part from thinking that I needed to ask for the next value, but of course the RX will supply the next value when it’s available. &lt;/p&gt;

&lt;p&gt;To use it:&lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;TimeSpan windowDuration = new TimeSpan(0,0,10);
generatedNums
    // add a Timestamp to our raw data
    .Select(val =&amp;gt; new { Timestamp = DateTime.Now, Value = val })
    // create a 5 min &amp;quot;jumping&amp;quot; window
    .ToWindow((lastVal, seq) =&amp;gt; 
            (seq.Count() == 0) || 
            (lastVal.Timestamp - seq.First().Timestamp &amp;lt; windowDuration))
    // create item for display
    .Select(seq =&amp;gt; new { Timestamp = seq.First().Timestamp
                        , Values = seq.Select(a =&amp;gt; a.Value).ToArray()
                        , Average = seq.Average(a =&amp;gt; a.Value) })
    // marshal and add to list
    .Post(sc).Subscribe(wv =&amp;gt; WindowVals.Add(wv));&lt;/pre&gt;

&lt;p&gt;&amp;#160;&lt;/p&gt;

&lt;p&gt;And the results&lt;/p&gt;

&lt;p&gt;&amp;#160;&lt;a href="https://aspblogs.blob.core.windows.net/media/sweinstein/Media/image_56B9DED3.png"&gt;&lt;img style="border-right-width: 0px; display: inline; border-top-width: 0px; border-bottom-width: 0px; border-left-width: 0px" title="image" border="0" alt="image" src="https://aspblogs.blob.core.windows.net/media/sweinstein/Media/image_thumb_7FC47DCF.png" width="465" height="249" /&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Next we’ll look into creating a &lt;strong&gt;sliding&lt;/strong&gt; window.&lt;/p&gt;</description><pubDate>Wed, 12 Aug 2009 04:15:16 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sweinstein/exploring-the-reactive-framework-part-ii</guid><category>.NET</category><category>CEP</category><category>CLR</category><category>LINQ</category><category>ReactiveFramework</category><category>WPF</category></item><item><title>Exploring the Reactive Framework (RX)</title><link>https://weblogs.asp.net:443/sweinstein/exploring-the-reactive-framework-rx</link><description>&lt;p&gt;&lt;a href="http://themechanicalbride.blogspot.com/2009/07/introducing-rx-linq-to-events.html" target="_blank"&gt;A few days ago&lt;/a&gt;, intentionally or not, a version of the Reactive Framework was &lt;a href="http://weblogs.asp.net/sweinstein/archive/2009/07/23/preview-of-the-reactive-framework-available-via-silverlight-toolkit.aspx" target="_blank"&gt;released into the wild&lt;/a&gt;. Let’s see how we can use the RX for computations on a stream of data. As an example we’ll take a stream of ints and produce the averages in groups of five.&lt;/p&gt;  &lt;p&gt;&lt;a href="https://aspblogs.blob.core.windows.net/media/sweinstein/Media/image_2D20AD5F.png"&gt;&lt;img style="border-bottom: 0px; border-left: 0px; display: inline; border-top: 0px; border-right: 0px" title="image" border="0" alt="image" src="https://aspblogs.blob.core.windows.net/media/sweinstein/Media/image_thumb_5D4A88D3.png" width="244" height="228" /&gt;&lt;/a&gt; &lt;/p&gt;  &lt;p&gt;&amp;#160;&lt;/p&gt;  &lt;p&gt;Here’s the primary stream of numbers, using the static &lt;font face="Consolas"&gt;Generate()&lt;/font&gt;&amp;#160; method&lt;/p&gt;  &lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;Random rnd = new Random();
var generatedNums = Observable.Generate&amp;lt;int,int&amp;gt;(
                    0 // seed
                    , d =&amp;gt; true // condition to continue
                    , d =&amp;gt; d % 12 //generated value 
                    , d =&amp;gt; (int)(rnd.NextDouble() * 300) //delay
                    , d =&amp;gt; d + 1 // modify value for next iter
                    );&lt;/pre&gt;

&lt;p&gt;And to consume the stream by adding the values into an &lt;font face="Consolas"&gt;ObservableCollection&lt;/font&gt; &lt;/p&gt;

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;generatedNums
    .Post(sc) // move onto UI thread
    .Subscribe(num =&amp;gt; Numbers.Add(num) // add numbers to observable collection
    );&lt;/pre&gt;
&lt;font face="Trebuchet MS"&gt;Computing the average, in groups of 5 turns out to be harder, as the Reactive FX doesn’t seem to have a &lt;font face="Consolas"&gt;GroupBy()&lt;/font&gt; method at this time. Here’s what I came up with:&lt;/font&gt; 

&lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;generatedNums
    .Grouper(a =&amp;gt; a, (a,list) =&amp;gt; list.Count() &amp;lt; 5) // group into lists of 5, returning an IObservable&amp;lt;IEnumerable&amp;lt;int&amp;gt;&amp;gt;
    .Select(list =&amp;gt; list.Average()) // take the average of the list, so project IObservable&amp;lt;IEnumerable&amp;lt;int&amp;gt;&amp;gt; to IObservable&amp;lt;int&amp;gt; 
    .Post(sc).Subscribe(mean =&amp;gt; Averages.Add(mean) // move onto UI and add to observable collection
    );&lt;/pre&gt;
&lt;font face="Trebuchet MS"&gt;And the implementation for “&lt;font face="Consolas"&gt;Grouper()&lt;/font&gt;”&lt;/font&gt;&amp;#160; &lt;pre class="brush: csharp; gutter: false; toolbar: false;"&gt;public static IObservable&amp;lt;IEnumerable&amp;lt;TResult&amp;gt;&amp;gt; Grouper&amp;lt;TSource, TResult&amp;gt;(
    this IObservable&amp;lt;TSource&amp;gt; source,
    Func&amp;lt;TSource, TResult&amp;gt; selector
    , Func&amp;lt;TSource, IEnumerable&amp;lt;TResult&amp;gt;, bool&amp;gt; grouper) 
{
    return new AnonymousObservable&amp;lt;IEnumerable&amp;lt;TResult&amp;gt;&amp;gt;(
        observer =&amp;gt;
            source.Subscribe(x =&amp;gt;
        {
            try
            {
                using (var er = source.GetEnumerator())
                    while (er.MoveNext())
                    {
                        bool needsMove = false;
                        var res = new List&amp;lt;TResult&amp;gt;();
                        while (grouper(er.Current, res) &amp;amp;&amp;amp; ((needsMove) ? er.MoveNext() : true))
                        {
                            needsMove = true;
                            res.Add(selector(er.Current));
                        }
                        observer.OnNext(res);
                    }
            }
            catch (Exception exception)
            {
                observer.OnError(exception);
                return;
            }
        },
        new Action&amp;lt;Exception&amp;gt;(observer.OnError),
        new Action(observer.OnCompleted)));
}&lt;/pre&gt;</description><pubDate>Wed, 29 Jul 2009 05:07:18 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sweinstein/exploring-the-reactive-framework-rx</guid><category>.NET</category><category>LINQ</category><category>ReactiveFramework</category><category>RX</category><category>WPF</category></item><item><title>Preview of the Reactive Framework available via Silverlight Toolkit</title><link>https://weblogs.asp.net:443/sweinstein/preview-of-the-reactive-framework-available-via-silverlight-toolkit</link><description>&lt;p&gt;Via &lt;a href="http://themechanicalbride.blogspot.com/2009/07/introducing-rx-linq-to-events.html"&gt;Jafar Husain&lt;/a&gt; - it appears that there’s a early release of the &lt;a href="http://langnetsymposium.com/2009/talks/23-ErikMeijer-LiveLabsReactiveFramework.html" target="_blank"&gt;Live Labs Reactive Framework&lt;/a&gt; (&amp;amp; with &lt;a href="http://channel9.msdn.com/shows/Going+Deep/Expert-to-Expert-Brian-Beckman-and-Erik-Meijer-Inside-the-NET-Reactive-Framework-Rx/" target="_blank"&gt;Brian Beckman and Erik Meijer&lt;/a&gt;) in the latest &lt;a href="http://www.codeplex.com/Silverlight" target="_blank"&gt;Silverlight Toolkit&lt;/a&gt;&lt;/p&gt;  &lt;p&gt;&amp;#160;&lt;/p&gt;  &lt;p&gt;In addition to some of the standard LINQ operators (&lt;font face="Consolas"&gt;Select, Where, Aggregate&lt;/font&gt;), some new operators look quite promising&amp;#160; - &lt;/p&gt;  &lt;ul&gt;   &lt;li&gt;&lt;font face="Consolas"&gt;ForkJoin&lt;/font&gt;&lt;/li&gt;    &lt;li&gt;&lt;font face="Consolas"&gt;Merge&lt;/font&gt;&lt;/li&gt;    &lt;li&gt;&lt;font face="Consolas"&gt;Delay&lt;/font&gt;&lt;/li&gt;    &lt;li&gt;&lt;font face="Consolas"&gt;HoldUntilChanged&lt;/font&gt;&lt;/li&gt;    &lt;li&gt;&lt;font face="Consolas"&gt;Latest&lt;/font&gt;&lt;/li&gt;    &lt;li&gt;&lt;font face="Consolas"&gt;Merge&lt;/font&gt;&lt;/li&gt;    &lt;li&gt;&lt;font face="Consolas"&gt;Throttle&lt;/font&gt;&lt;/li&gt; &lt;/ul&gt;  &lt;p&gt;&lt;font face="Consolas"&gt;&lt;/font&gt;&lt;/p&gt;</description><pubDate>Thu, 23 Jul 2009 21:49:03 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sweinstein/preview-of-the-reactive-framework-available-via-silverlight-toolkit</guid><category>CEP</category><category>CLR</category><category>LINQ</category><category>ReactiveFramework</category><category>RX</category></item><item><title>When Add(ing)-Type, choose your method signatures wisely</title><link>https://weblogs.asp.net:443/sweinstein/when-add-ing-type-choose-your-method-signatures-wisely</link><description>&lt;p&gt;&amp;#160;&lt;/p&gt;  &lt;p&gt;Powershell V2 has some great new features, in particular Add-Type and Remoting features are likely to be quite popular and work together without much issue. That said, there are edge cases which illustrate how the types returned from remoting calls. The following script demonstrates the issue&lt;/p&gt;  &lt;pre class="brush: csharp;"&gt;$csCode = @&amp;quot;
using System;
using System.Collections.Generic;
using System.Linq;
namespace Demo {
    public static class D
    {
        public static int Add(int a, int b)
        {
            return a + b;
        }
        public static int AddArray(int[] ints)
        {
            return ints.Sum();
        }
        public static int AddEnumerable(IEnumerable&amp;lt;object&amp;gt; ints)
        {
            return ints.Cast&amp;lt;int&amp;gt;().Sum();
        }
        public static int AddEnumerable(IEnumerable&amp;lt;int&amp;gt; ints)
        {
            return ints.Sum();
        }
    }
}
&amp;quot;@

Add-Type -TypeDefinition $csCode -Language CSharpVersion3
$oneRemote = Invoke-Command -ComputerName localhost  -ScriptBlock { return 1 }
$listRemote = Invoke-Command -ComputerName localhost  -ScriptBlock { return (1,2,3) }
$one = &amp;amp;{return 1}
if ($one -eq $oneRemote)
{
    Write-Host &amp;quot;1 == 1&amp;quot;
}

$v =  [Demo.D]::Add($one,$oneRemote)
Write-output &amp;quot;One + OneRemote = $v&amp;quot;  ; $v=$null
$v = [Demo.D]::AddArray(($one,$oneRemote))
Write-output &amp;quot;One + OneRemote via array =  $v&amp;quot; ; $v=$null

$v = [Demo.D]::AddArray($listRemote)
Write-output &amp;quot;One + OneRemote via remote array =  $v&amp;quot; ; $v=$null

$v = [Demo.D]::AddEnumerable((1,2,3,4))
Write-output &amp;quot;One + OneRemote via local IEnumerable =  $v&amp;quot; ; $v=$null

$v = [Demo.D]::AddEnumerable($listRemote)
Write-output &amp;quot;One + OneRemote via remote IEnumerable =  $v&amp;quot;; $v=$null



$oneRemote | Get-Member&lt;/pre&gt;

&lt;p&gt;The output from the above is&lt;/p&gt;

&lt;blockquote&gt;
  &lt;p&gt;&lt;font size="1" face="con"&gt;1 == 1 
      &lt;br /&gt;One + OneRemote = 2 

      &lt;br /&gt;One + OneRemote via array =&amp;#160; 2 

      &lt;br /&gt;One + OneRemote via remote array =&amp;#160; 6 

      &lt;br /&gt;One + OneRemote via local IEnumerable =&amp;#160; 10 # so far as expected&lt;/font&gt;&lt;/p&gt;

  &lt;p&gt;&lt;font size="1" face="con"&gt;&amp;#160; &lt;br /&gt;&lt;/font&gt;&lt;font color="#ff0000" size="1" face="con"&gt;Exception calling &amp;quot;AddEnumerable&amp;quot; with &amp;quot;1&amp;quot; argument(s): &amp;quot;Specified cast is not valid.&amp;quot; 
      &lt;br /&gt;At typeDemo.ps1:49 char:29 

      &lt;br /&gt;+ $v = [Demo.D]::AddEnumerable &amp;lt;&amp;lt;&amp;lt;&amp;lt; ($listRemote) 

      &lt;br /&gt;&lt;/font&gt;&lt;/p&gt;

  &lt;p&gt;&lt;font size="1" face="con"&gt;&amp;#160;&amp;#160; TypeName: System.Int32 &lt;/font&gt;&lt;/p&gt;

  &lt;p&gt;&lt;font size="1" face="con"&gt;Name&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; MemberType&amp;#160;&amp;#160; Definition&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; &lt;br /&gt;----&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; ----------&amp;#160;&amp;#160; ----------&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; &lt;br /&gt;CompareTo&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; Method&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; System.Int32 CompareTo(Object value)

      &lt;br /&gt;Equals&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; Method&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; System.Boolean Equals(Object obj), System.Boolean Equals(Int32 obj)&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; &lt;br /&gt;GetHashCode&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; Method&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; System.Int32 GetHashCode()&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; &lt;br /&gt;GetType&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; Method&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; System.Type GetType()&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; &lt;br /&gt;GetTypeCode&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; Method&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; System.TypeCode GetTypeCode()&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; &lt;br /&gt;ToString&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; Method&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; System.String ToString()

      &lt;br /&gt;&lt;/font&gt;&lt;strong&gt;&lt;font size="1" face="con"&gt;PSComputerName&amp;#160;&amp;#160;&amp;#160;&amp;#160; NoteProperty System.String PSComputerName=localhost&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; &lt;br /&gt;PSShowComputerName NoteProperty System.Boolean PSShowComputerName=True&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; &lt;br /&gt;RunspaceId&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; NoteProperty System.Guid RunspaceId=e0dc5c05-c87d-41ad-afe0-16bc1b711f08&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160;&amp;#160; &lt;/font&gt;&lt;/strong&gt;&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;What’s happening under the covers is that the PowerShell reporting infrastructure is returning a PSObject. By inspecting the type via Get-Member you can see that it has some extra NoteProperties. To PowerShell and .Net methods that expect integers and arrays of integers, the object looks and behaves like it should. But if your Add-Types use a more LINQ style approach, which expects an IEnumerable&amp;lt;T&amp;gt;, the PowerShell type system doesn’t properly convert the adapted type to its native underlying type and runtime exceptions are the result.&lt;/p&gt;</description><pubDate>Fri, 08 May 2009 18:44:18 GMT</pubDate><guid isPermaLink="true">https://weblogs.asp.net:443/sweinstein/when-add-ing-type-choose-your-method-signatures-wisely</guid><category>.NET</category><category>CLR</category><category>LINQ</category><category>PowerShell</category></item></channel></rss>