在 ASP.NET Core 中使用流式处理 SignalR

作者: Brennan Conroy

ASP.NET Core SignalR 支持从客户端传输到服务器以及从服务器传输到客户端。 这适用于数据片段随着时间的推移而发生的情况。 流式传输时,每个片段一旦变为可用,就会发送到客户端或服务器,而不是等待所有数据都可用。

ASP.NET Core SignalR 支持服务器方法的流返回值。 这适用于数据片段随着时间的推移而发生的情况。 将返回值流式传输到客户端时,每个片段会在其可用时立即发送到客户端,而不是等待所有数据都可用。

查看或下载示例代码如何下载

设置用于流式传输的集线器

当集线器方法返回 IAsyncEnumerable<T>ChannelReader<T>Task<IAsyncEnumerable<T>>Task<ChannelReader<T>>时,它会自动变为流式处理中心方法。

当集线器方法返回 ChannelReader<T>Task<ChannelReader<T>>时,该方法会自动变为流式处理中心方法。

服务器到客户端流式处理

除了 ChannelReader<T>之外,流式处理中心方法还可以返回 IAsyncEnumerable<T> 返回 IAsyncEnumerable<T> 的最简单方法是将集线器方法设为异步迭代器方法,如下例所示。 中心异步迭代器方法可以接受当客户端从流中取消订阅时触发的 CancellationToken 参数。 异步迭代器方法避免了与通道常见的问题,例如,不能尽早返回 ChannelReader 或退出方法,无需完成 ChannelWriter<T>

备注

下面的示例需要C#8.0 或更高版本。

public class AsyncEnumerableHub : Hub
{
    public async IAsyncEnumerable<int> Counter(
        int count,
        int delay,
        [EnumeratorCancellation]
        CancellationToken cancellationToken)
    {
        for (var i = 0; i < count; i++)
        {
            // Check the cancellation token regularly so that the server will stop
            // producing items if the client disconnects.
            cancellationToken.ThrowIfCancellationRequested();

            yield return i;

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
}

下面的示例演示了使用通道将数据流式传输到客户端的基础知识。 每当将对象写入 ChannelWriter<T>时,都会立即将对象发送到客户端。 最后,ChannelWriter 完成,告诉客户端流已关闭。

备注

在后台线程上写入 ChannelWriter<T>,并尽快返回 ChannelReader 其他中心调用会被阻止,直到返回 ChannelReader

try ... catch中的环绕逻辑。 完成 catchcatch 之外的 Channel,确保中心方法调用正确完成。

public ChannelReader<int> Counter(
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    var channel = Channel.CreateUnbounded<int>();

    // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
    // for all the items to be written before returning the channel back to
    // the client.
    _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

    return channel.Reader;
}

private async Task WriteItemsAsync(
    ChannelWriter<int> writer,
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    Exception localException = null;
    try
    {
        for (var i = 0; i < count; i++)
        {
            await writer.WriteAsync(i, cancellationToken);

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
    catch (Exception ex)
    {
        localException = ex;
    }

    writer.Complete(localException);
}
public class StreamHub : Hub
{
    public ChannelReader<int> Counter(
        int count,
        int delay,
        CancellationToken cancellationToken)
    {
        var channel = Channel.CreateUnbounded<int>();

        // We don't want to await WriteItemsAsync, otherwise we'd end up waiting
        // for all the items to be written before returning the channel back to
        // the client.
        _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

        return channel.Reader;
    }

    private async Task WriteItemsAsync(
        ChannelWriter<int> writer,
        int count,
        int delay,
        CancellationToken cancellationToken)
    {
        try
        {
            for (var i = 0; i < count; i++)
            {
                // Check the cancellation token regularly so that the server will stop
                // producing items if the client disconnects.
                cancellationToken.ThrowIfCancellationRequested();
                await writer.WriteAsync(i);

                // Use the cancellationToken in other APIs that accept cancellation
                // tokens so the cancellation can flow down to them.
                await Task.Delay(delay, cancellationToken);
            }
        }
        catch (Exception ex)
        {
            writer.TryComplete(ex);
        }

        writer.TryComplete();
    }
}
public class StreamHub : Hub
{
    public ChannelReader<int> Counter(int count, int delay)
    {
        var channel = Channel.CreateUnbounded<int>();

        // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
        // for all the items to be written before returning the channel back to
        // the client.
        _ = WriteItemsAsync(channel.Writer, count, delay);

        return channel.Reader;
    }

    private async Task WriteItemsAsync(
        ChannelWriter<int> writer,
        int count,
        int delay)
    {
        try
        {
            for (var i = 0; i < count; i++)
            {
                await writer.WriteAsync(i);
                await Task.Delay(delay);
            }
        }
        catch (Exception ex)
        {
            writer.TryComplete(ex);
        }

        writer.TryComplete();
    }
}

服务器到客户端流式处理中心方法可以接受当客户端从流中取消订阅时触发的 CancellationToken 参数。 如果客户端在流末尾之前断开连接,请使用此标记停止服务器操作并释放任何资源。

客户端到服务器的流式处理

当某个集线器方法接受 ChannelReader<T>IAsyncEnumerable<T>类型的一个或多个对象时,它会自动成为客户端到服务器的流式处理中心方法。 下面的示例演示了读取从客户端发送的流式处理数据的基础知识。 每当客户端写入 ChannelWriter<T>时,数据都会写入中心方法读取的服务器上的 ChannelReader 中。

public async Task UploadStream(ChannelReader<string> stream)
{
    while (await stream.WaitToReadAsync())
    {
        while (stream.TryRead(out var item))
        {
            // do something with the stream item
            Console.WriteLine(item);
        }
    }
}

下面是方法的 IAsyncEnumerable<T> 版本。

备注

下面的示例需要C#8.0 或更高版本。

public async Task UploadStream(IAsyncEnumerable<string> stream)
{
    await foreach (var item in stream)
    {
        Console.WriteLine(item);
    }
}

.NET 客户端

服务器到客户端流式处理

HubConnection 上的 StreamAsyncStreamAsChannelAsync 方法用于调用服务器到客户端的流式处理方法。 将中心方法中定义的集线器方法名称和参数传递到 StreamAsyncStreamAsChannelAsync StreamAsync<T>StreamAsChannelAsync<T> 上的泛型参数指定流方法返回的对象的类型。 IAsyncEnumerable<T>ChannelReader<T> 类型的对象从流调用返回,并表示客户端上的流。

返回 IAsyncEnumerable<int>StreamAsync 示例:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = await hubConnection.StreamAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

await foreach (var count in stream)
{
    Console.WriteLine($"{count}");
}

Console.WriteLine("Streaming completed");

一个返回 ChannelReader<int>的相应 StreamAsChannelAsync 示例:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

HubConnection 上的 StreamAsChannelAsync 方法用于调用服务器到客户端流式处理方法。 将中心方法中定义的集线器方法名称和参数传递到 StreamAsChannelAsync StreamAsChannelAsync<T> 上的泛型参数指定流方法返回的对象的类型。 从流调用返回 ChannelReader<T>,并表示客户端上的流。

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

HubConnection 上的 StreamAsChannelAsync 方法用于调用服务器到客户端流式处理方法。 将中心方法中定义的集线器方法名称和参数传递到 StreamAsChannelAsync StreamAsChannelAsync<T> 上的泛型参数指定流方法返回的对象的类型。 从流调用返回 ChannelReader<T>,并表示客户端上的流。

var channel = await hubConnection
    .StreamAsChannelAsync<int>("Counter", 10, 500, CancellationToken.None);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

客户端到服务器的流式处理

可以通过两种方法从 .NET 客户端调用客户端到服务器的流式处理中心方法。 可以将 IAsyncEnumerable<T>ChannelReader 作为参数传入 SendAsyncInvokeAsyncStreamAsChannelAsync,具体取决于所调用的集线器方法。

只要将数据写入 IAsyncEnumerableChannelWriter 对象,服务器上的集线器方法就会收到来自客户端的数据的新项。

如果使用 IAsyncEnumerable 对象,则流在返回流项的方法退出后结束。

备注

下面的示例需要C#8.0 或更高版本。

async IAsyncEnumerable<string> clientStreamData()
{
    for (var i = 0; i < 5; i++)
    {
        var data = await FetchSomeData();
        yield return data;
    }
    //After the for loop has completed and the local function exits the stream completion will be sent.
}

await connection.SendAsync("UploadStream", clientStreamData());

或者,如果使用的是 ChannelWriter,则使用 channel.Writer.Complete()完成通道:

var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();

JavaScript 客户端

服务器到客户端流式处理

JavaScript 客户端通过 connection.stream调用集线器上的服务器到客户端流式处理方法。 stream 方法接受两个参数:

  • 集线器方法的名称。 在下面的示例中,中心方法名称是 Counter
  • 在 hub 方法中定义的参数。 在下面的示例中,参数是要接收的流项数的计数以及流项之间的延迟。

connection.stream 返回 IStreamResult,它包含 subscribe 方法。 IStreamSubscriber 传递到 subscribe,并设置 nexterrorcomplete 回调,以便从 stream 调用接收通知。

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

若要从客户端结束流,请对从 subscribe 方法返回的 ISubscription 调用 dispose 方法。 调用此方法会导致取消集线器方法的 CancellationToken 参数(如果提供了一个参数)。

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

若要从客户端结束流,请对从 subscribe 方法返回的 ISubscription 调用 dispose 方法。

客户端到服务器的流式处理

JavaScript 客户端通过将 Subject 作为参数传入到 sendinvokestream(具体取决于所调用的集线器方法),在集线器上调用客户端到服务器的流式处理方法。 Subject 是一种类似于 Subject的类。 例如,在 RxJS 中,可以使用该库中的Subject类。

const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
    iteration++;
    subject.next(iteration.toString());
    if (iteration === 10) {
        clearInterval(intervalHandle);
        subject.complete();
    }
}, 500);

使用项调用 subject.next(item) 会将项写入流,集线器方法接收服务器上的项。

若要结束流,请调用 subject.complete()

Java 客户端

服务器到客户端流式处理

SignalR Java 客户端使用 stream 方法来调用流式处理方法。 stream 接受三个或更多参数:

  • 流项的预期类型。
  • 集线器方法的名称。
  • 在 hub 方法中定义的参数。
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
    .subscribe(
        (item) -> {/* Define your onNext handler here. */ },
        (error) -> {/* Define your onError handler here. */},
        () -> {/* Define your onCompleted handler here. */});

HubConnection 上的 stream 方法返回流项类型的可观察对象。 可观察类型的 subscribe 方法是定义 onNextonErroronCompleted 处理程序的位置。

其他资源

上一篇:在 SignalR 中使用 MessagePack 集线器协议 ASP.NET Core

下一篇:ASP.NET SignalR 和 ASP.NET Core 之间的差异 SignalR

关注微信小程序
程序员编程王-随时随地学编程

扫描二维码
程序员编程王

扫一扫关注最新编程教程