Coś takiego: https://dotnetfiddle.net/eiGzS4
using System;
using System.Threading.Tasks;
using System.Threading;
using System.Reactive;
using System.Reactive.Linq;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Subjects;
public class Program
{
public static void Main()
{
Console.WriteLine(DateTime.Now.ToString("hh:mm:ss.fff") + " beginning");
var subject = new BehaviorSubject<long>(0);
subject.MagicBuffer(TimeSpan.FromMilliseconds(200)).Subscribe(x => Console.WriteLine(DateTime.Now.ToString("hh:mm:ss.fff") + " Received " + string.Join(",", x))); // Should emit [0] immediately
Console.WriteLine(DateTime.Now.ToString("hh:mm:ss.fff") + " emitting 1,2,3");
subject.OnNext(1); // Should cache
subject.OnNext(2); // Should cache
subject.OnNext(3); // Should cache
Thread.Sleep(250); // Should emit [1, 2, 3]
Console.WriteLine(DateTime.Now.ToString("hh:mm:ss.fff") + " should emit empty in a bit");
Thread.Sleep(250); // Should emit []
Console.WriteLine(DateTime.Now.ToString("hh:mm:ss.fff") + " emitting 4");
subject.OnNext(4); // Should emit immediately
subject.OnNext(5); // Should cache
Thread.Sleep(200);
}
}
public static class Ext{
public static IObservable<IList<T>> MagicBuffer<T>(this IObservable<T> source, TimeSpan delay)
{
return Observable.Create<IList<T>>(o => {
DateTime lastEmitTime = DateTime.MinValue;
DateTime lastCallTime = DateTime.MinValue;
IList<T> values = new List<T>();
Task continuation = Task.CompletedTask;
Action<Task> callback = _ => {
lock(values){
if(DateTime.Now - lastCallTime >= delay){
lastCallTime = DateTime.Now;
o.OnNext(values);
values.Clear();
}
}
};
Action<Task> recur = null;
recur = _ => Task.Delay(delay).ContinueWith(ignored => {
continuation = continuation.ContinueWith(callback).ContinueWith(recur);
});
recur(null);
return source.Subscribe(
v => {
lock(values){
if(DateTime.Now - lastEmitTime >= delay){
lastEmitTime = lastCallTime = DateTime.Now;
o.OnNext(new List<T>{v});
recur(null);
}else{
values.Add(v);
}
}
},
o.OnError,
o.OnCompleted
);
});
}
}
Deadlocki, synchronizacje i resztę zostawiam czytelnikowi. Zapewne da się prościej, pewnie jest jakiś magiczny operator od tego.