[BUG] Async ObservableChangeSet.Create<int, int>() does not publish changes until factory function completes.

Describe the bug

Consider the following code using Observable.Create from Reactive Extensions:

var random = new Random(1234);
var observable = Observable.Create<int>(
    async (observer, token) =>
    {
        Console.WriteLine("Started");
        while (!token.IsCancellationRequested)
        {
            var next = random.Next(0, 4);
            Console.WriteLine($"Pushing {next}");
            observer.OnNext(next);

            await Task.Delay(1000, token);
        }

        Console.WriteLine("Stopped");
    });
    
Console.WriteLine("Here");
using (var sub1 = observable.Subscribe(o => $"Sub1 received {o}".Dump()))
{
    Console.WriteLine("Here 2");
    await Task.Delay(5000);
}
Console.WriteLine("Finished");

This is a simplification of a common use case, allowing for an on-demand triggering of an asynchronous method that produces changes over time (for example listening to a USB device, or a network port). It is particularly powerful when combined with RefCount(), or similar, to allow multiple subscribers to share a single connection, which is disposed (by token being cancelled) when everyone stops listening.

The above code produces:

Here
Started
Pushing 0
Sub1 received 0
Here 2
Pushing 2
Sub1 received 2
Pushing 0
Sub1 received 0
Pushing 0
Sub1 received 0
Pushing 3
Sub1 received 3
Stopped
Finished

The equivalent in Dynamic Data would be:

var random = new Random(1234);
var observable = ObservableChangeSet.Create<int, int>(
    async (changeSet, token) =>
    {
        Console.WriteLine("Started");
        while (!token.IsCancellationRequested)
        {
            var next = random.Next(0, 4);
            Console.WriteLine($"Pushing {next}");
            changeSet.AddOrUpdate(next);

            await Task.Delay(1000, token);
        }

        Console.WriteLine("Stopped");
    },
    i => i)
    .Select(cs => cs.Select(c => c.Current).ToList());

Console.WriteLine("Here");
using (var sub1 = observable.Subscribe(o => $"Sub1 received {o}".Dump()))
{
    Console.WriteLine("Here 2");
    await Task.Delay(5000);
}
Console.WriteLine("Finished");

Which should have identical output to above, but instead produces:

Here
Started
Pushing 1
Here 2
Pushing 3
Pushing 1
Pushing 3
Pushing 1
Finished

As you can see, the subscriber is not receiving a notification, nor is token being cancelled directly (it appears that the factory function does stop, however, possible due to the task being terminated). The latter is less an issue as token cancellation is not 'guaranteed' by the Reactive Extensions specs anyway (but is a nice to have).