CodeWalker/CodeWalker.Core/Utils/StreamingExtensions.cs

168 lines
5.7 KiB
C#
Raw Normal View History

2023-11-12 01:59:17 +08:00
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.IO;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace CodeWalker.Core.Utils
{
public static class StreamingExtensions
{
public static Task<int> ReadAsync(this BinaryReader br, byte[] buffer, int index, int count)
{
return br.BaseStream.ReadAsync(buffer, index, count);
}
public static void CopyToFast(this Stream stream, Stream destination)
{
var buffer = ArrayPool<byte>.Shared.Rent(81920);
try
{
int read;
while ((read = stream.Read(buffer, 0, buffer.Length)) != 0)
destination.Write(buffer, 0, read);
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
public static async Task CopyToFastAsync(this Stream stream, Stream destination, int bufferSize = 131072, CancellationToken cancellationToken = default)
{
var buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
try
{
int bytesRead;
while ((bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0)
{
await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false);
}
}
catch(Exception ex)
{
Console.WriteLine(ex.ToString());
throw;
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
private static async Task FinishWriteAsync(Task writeTask, byte[] localBuffer)
{
try
{
await writeTask.ConfigureAwait(false);
}
finally
{
ArrayPool<byte>.Shared.Return(localBuffer);
}
}
public static ValueTask WriteAsync(this Stream stream, Memory<byte> buffer, CancellationToken cancellationToken = default)
{
if (MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> array))
{
return new ValueTask(stream.WriteAsync(array.Array!, array.Offset, array.Count, cancellationToken));
}
byte[] sharedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
buffer.Span.CopyTo(sharedBuffer);
return new ValueTask(FinishWriteAsync(stream.WriteAsync(sharedBuffer, 0, buffer.Length, cancellationToken), sharedBuffer));
}
public static void Write(this Stream stream, ReadOnlySpan<byte> buffer)
{
byte[] sharedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
try
{
buffer.CopyTo(sharedBuffer);
stream.Write(sharedBuffer, 0, buffer.Length);
}
finally
{
ArrayPool<byte>.Shared.Return(sharedBuffer);
}
}
public static int Read(this Stream stream, Span<byte> buffer)
{
byte[] sharedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
try
{
int numRead = stream.Read(sharedBuffer, 0, buffer.Length);
if ((uint)numRead > (uint)buffer.Length)
{
throw new IOException("Stream too long!");
}
new ReadOnlySpan<byte>(sharedBuffer, 0, numRead).CopyTo(buffer);
return numRead;
}
finally
{
ArrayPool<byte>.Shared.Return(sharedBuffer);
}
}
public static int Read(this Stream stream, Memory<byte> buffer)
{
if (MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> array))
{
return stream.Read(array.Array!, array.Offset, array.Count);
}
byte[] sharedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
try
{
int numRead = stream.Read(sharedBuffer, 0, buffer.Length);
if ((uint)numRead > (uint)buffer.Length)
{
throw new IOException("Stream too long!");
}
new ReadOnlySpan<byte>(sharedBuffer, 0, numRead).CopyTo(buffer.Span);
return numRead;
}
finally
{
ArrayPool<byte>.Shared.Return(sharedBuffer);
}
}
public static ValueTask<int> ReadAsync(this Stream stream, Memory<byte> buffer, CancellationToken cancellationToken = default)
{
if (MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> array))
{
return new ValueTask<int>(stream.ReadAsync(array.Array!, array.Offset, array.Count, cancellationToken));
}
byte[] sharedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
return FinishReadAsync(stream.ReadAsync(sharedBuffer, 0, buffer.Length, cancellationToken), sharedBuffer, buffer);
static async ValueTask<int> FinishReadAsync(Task<int> readTask, byte[] localBuffer, Memory<byte> localDestination)
{
try
{
int result = await readTask.ConfigureAwait(false);
new ReadOnlySpan<byte>(localBuffer, 0, result).CopyTo(localDestination.Span);
return result;
}
finally
{
ArrayPool<byte>.Shared.Return(localBuffer);
}
}
}
}
}