Rx.Net kombinacja operatorów

0

cześć,
załóżmy że mamy IObservable<int> który generuje w różnych odstępach czasu dane.
Chcę osiągnąć rezultat podobny do operatora Buffer, z tą różnicą, że jeżeli przez ustawiony Interval nie dostaniemy żadnych danych, to gdy następne dane "dojdą" zostanie od razu wysłane info do obserwatorów(IObserver), a Interval buffera się zresetuje.

czyli:
Buffer(5000) - emituje dane ZAWSZE co 5 sekund
operatorKtóregoPotrzebuję(5000) - emituje dane co 5 sekund lub natychmiast jeśli pojawiły się jakieś "dane", a przez poprzednie 5 sekund żadne dane się nie pojawiły.

z jakiego operator(a/ów) RX'a można skorzystać żeby otrzymać taki rezultat?

0

Specyficzna logika.
Buffor z pojedyńczych eventów zrobi Tobie zawsze kolekcje. W twoim przypadku gdu nie pojawi się event to i tak po "5000" zostanie wyemitowany event z pusta kolekcja.
Zawsze możesz utworzć własny operator z wyżej wymienion logika.
Pytanie czy teraz po tych "5000" ma emitowac pojedyczny event czy event z kolekcja jedno elementowa? Czy ostatni element ma wchodzić w skład następnego buffera?

0

Pytanie czy teraz po tych "5000" ma emitowac pojedyczny event czy event z kolekcja jedno elementowa? Czy ostatni element ma wchodzić w skład następnego buffera?

Tak, buffor zrobi kolekcję i taki jest mój cel, chcę mieć listę wszystkich elementów, które przez ten czas zostały "wyemitowane".
Odpowiadając na Twoje pytanie: ma emitować event z kolekcją i ostatni element nie musi wchodzić w skład następnego buffera.

Zawsze możesz utworzć własny operator z wyżej wymienion logika.

No właśnie to jest temat tego posta, jak zrobić taki operator?

2

Coś takiego: https://dotnetfiddle.net/eiGzS4

using System;
using System.Threading.Tasks;
using System.Threading;
using System.Reactive;
using System.Reactive.Linq;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Subjects;
					
public class Program
{
	public static void Main()
	{
		Console.WriteLine(DateTime.Now.ToString("hh:mm:ss.fff") + " beginning");
		var subject = new BehaviorSubject<long>(0);
		subject.MagicBuffer(TimeSpan.FromMilliseconds(200)).Subscribe(x => Console.WriteLine(DateTime.Now.ToString("hh:mm:ss.fff") + " Received " + string.Join(",", x))); // Should emit [0] immediately
		Console.WriteLine(DateTime.Now.ToString("hh:mm:ss.fff") + " emitting 1,2,3");
		subject.OnNext(1); // Should cache
		subject.OnNext(2); // Should cache
		subject.OnNext(3); // Should cache
		Thread.Sleep(250); // Should emit [1, 2, 3]
		Console.WriteLine(DateTime.Now.ToString("hh:mm:ss.fff") + " should emit empty in a bit");
		Thread.Sleep(250); // Should emit []
		Console.WriteLine(DateTime.Now.ToString("hh:mm:ss.fff") + " emitting 4");
		subject.OnNext(4); // Should emit immediately
		subject.OnNext(5); // Should cache
		Thread.Sleep(200);
	}
}

public static class Ext{
	public static IObservable<IList<T>> MagicBuffer<T>(this IObservable<T> source, TimeSpan delay)
	{
		return Observable.Create<IList<T>>(o => {
			DateTime lastEmitTime = DateTime.MinValue;
			DateTime lastCallTime = DateTime.MinValue;
			IList<T> values = new List<T>();
			Task continuation = Task.CompletedTask;
			Action<Task> callback = _ => {
				lock(values){
					if(DateTime.Now - lastCallTime >= delay){
						lastCallTime = DateTime.Now;
						o.OnNext(values);
						values.Clear();
					}
				}
			};
			Action<Task> recur = null;
			recur = _ => Task.Delay(delay).ContinueWith(ignored => {
				continuation = continuation.ContinueWith(callback).ContinueWith(recur);
			});
			recur(null);

			return source.Subscribe(
				v => {
					lock(values){
						if(DateTime.Now - lastEmitTime >= delay){
							lastEmitTime = lastCallTime = DateTime.Now;
							o.OnNext(new List<T>{v});
							recur(null);
						}else{
							values.Add(v);
						}
					}
				},
				o.OnError,
				o.OnCompleted
			);
		});
	}
}

Deadlocki, synchronizacje i resztę zostawiam czytelnikowi. Zapewne da się prościej, pewnie jest jakiś magiczny operator od tego.

1 użytkowników online, w tym zalogowanych: 0, gości: 1