// ---------------------------------------------------------------------
// Copyright (c) 2015-2016 Microsoft
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
// ---------------------------------------------------------------------
namespace Microsoft.IO
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Linq;
using System.Threading;
///
/// Manages pools of RecyclableMemoryStream objects.
///
///
/// There are two pools managed in here. The small pool contains same-sized buffers that are handed to streams
/// as they write more data.
///
/// For scenarios that need to call GetBuffer(), the large pool contains buffers of various sizes, all
/// multiples of LargeBufferMultiple (1 MB by default). They are split by size to avoid overly-wasteful buffer
/// usage. There should be far fewer 8 MB buffers than 1 MB buffers, for example.
///
public partial class RecyclableMemoryStreamManager
{
///
/// Generic delegate for handling events without any arguments.
///
public delegate void EventHandler();
///
/// Delegate for handling large buffer discard reports.
///
/// Reason the buffer was discarded.
public delegate void LargeBufferDiscardedEventHandler(Events.MemoryStreamDiscardReason reason);
///
/// Delegate for handling reports of stream size when streams are allocated
///
/// Bytes allocated.
public delegate void StreamLengthReportHandler(long bytes);
///
/// Delegate for handling periodic reporting of memory use statistics.
///
/// Bytes currently in use in the small pool.
/// Bytes currently free in the small pool.
/// Bytes currently in use in the large pool.
/// Bytes currently free in the large pool.
public delegate void UsageReportEventHandler(
long smallPoolInUseBytes, long smallPoolFreeBytes, long largePoolInUseBytes, long largePoolFreeBytes);
public const int DefaultBlockSize = 128 * 1024;
public const int DefaultLargeBufferMultiple = 1024 * 1024;
public const int DefaultMaximumBufferSize = 128 * 1024 * 1024;
private readonly int blockSize;
private readonly long[] largeBufferFreeSize;
private readonly long[] largeBufferInUseSize;
private readonly int largeBufferMultiple;
///
/// pools[0] = 1x largeBufferMultiple buffers
/// pools[1] = 2x largeBufferMultiple buffers
/// etc., up to maximumBufferSize
///
private readonly ConcurrentStack[] largePools;
private readonly int maximumBufferSize;
private readonly ConcurrentStack smallPool;
private long smallPoolFreeSize;
private long smallPoolInUseSize;
///
/// Initializes the memory manager with the default block/buffer specifications.
///
public RecyclableMemoryStreamManager()
: this(DefaultBlockSize, DefaultLargeBufferMultiple, DefaultMaximumBufferSize) { }
///
/// Initializes the memory manager with the given block requiredSize.
///
/// Size of each block that is pooled. Must be > 0.
/// Each large buffer will be a multiple of this value.
/// Buffers larger than this are not pooled
/// blockSize is not a positive number, or largeBufferMultiple is not a positive number, or maximumBufferSize is less than blockSize.
/// maximumBufferSize is not a multiple of largeBufferMultiple
public RecyclableMemoryStreamManager(int blockSize, int largeBufferMultiple, int maximumBufferSize)
{
if (blockSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(blockSize), blockSize, "blockSize must be a positive number");
}
if (largeBufferMultiple <= 0)
{
throw new ArgumentOutOfRangeException(nameof(largeBufferMultiple),
"largeBufferMultiple must be a positive number");
}
if (maximumBufferSize < blockSize)
{
throw new ArgumentOutOfRangeException(nameof(maximumBufferSize),
"maximumBufferSize must be at least blockSize");
}
this.blockSize = blockSize;
this.largeBufferMultiple = largeBufferMultiple;
this.maximumBufferSize = maximumBufferSize;
if (!this.IsLargeBufferMultiple(maximumBufferSize))
{
throw new ArgumentException("maximumBufferSize is not a multiple of largeBufferMultiple",
nameof(maximumBufferSize));
}
this.smallPool = new ConcurrentStack();
var numLargePools = maximumBufferSize / largeBufferMultiple;
// +1 to store size of bytes in use that are too large to be pooled
this.largeBufferInUseSize = new long[numLargePools + 1];
this.largeBufferFreeSize = new long[numLargePools];
this.largePools = new ConcurrentStack[numLargePools];
for (var i = 0; i < this.largePools.Length; ++i)
{
this.largePools[i] = new ConcurrentStack();
}
Events.Writer.MemoryStreamManagerInitialized(blockSize, largeBufferMultiple, maximumBufferSize);
}
///
/// The size of each block. It must be set at creation and cannot be changed.
///
public int BlockSize => this.blockSize;
///
/// All buffers are multiples of this number. It must be set at creation and cannot be changed.
///
public int LargeBufferMultiple => this.largeBufferMultiple;
///
/// Gets or sets the maximum buffer size.
///
/// Any buffer that is returned to the pool that is larger than this will be
/// discarded and garbage collected.
public int MaximumBufferSize => this.maximumBufferSize;
///
/// Number of bytes in small pool not currently in use
///
public long SmallPoolFreeSize => this.smallPoolFreeSize;
///
/// Number of bytes currently in use by stream from the small pool
///
public long SmallPoolInUseSize => this.smallPoolInUseSize;
///
/// Number of bytes in large pool not currently in use
///
public long LargePoolFreeSize => this.largeBufferFreeSize.Sum();
///
/// Number of bytes currently in use by streams from the large pool
///
public long LargePoolInUseSize => this.largeBufferInUseSize.Sum();
///
/// How many blocks are in the small pool
///
public long SmallBlocksFree => this.smallPool.Count;
///
/// How many buffers are in the large pool
///
public long LargeBuffersFree
{
get
{
long free = 0;
foreach (var pool in this.largePools)
{
free += pool.Count;
}
return free;
}
}
///
/// How many bytes of small free blocks to allow before we start dropping
/// those returned to us.
///
public long MaximumFreeSmallPoolBytes { get; set; }
///
/// How many bytes of large free buffers to allow before we start dropping
/// those returned to us.
///
public long MaximumFreeLargePoolBytes { get; set; }
///
/// Maximum stream capacity in bytes. Attempts to set a larger capacity will
/// result in an exception.
///
/// A value of 0 indicates no limit.
public long MaximumStreamCapacity { get; set; }
///
/// Whether to save callstacks for stream allocations. This can help in debugging.
/// It should NEVER be turned on generally in production.
///
public bool GenerateCallStacks { get; set; }
///
/// Whether dirty buffers can be immediately returned to the buffer pool. E.g. when GetBuffer() is called on
/// a stream and creates a single large buffer, if this setting is enabled, the other blocks will be returned
/// to the buffer pool immediately.
/// Note when enabling this setting that the user is responsible for ensuring that any buffer previously
/// retrieved from a stream which is subsequently modified is not used after modification (as it may no longer
/// be valid).
///
public bool AggressiveBufferReturn { get; set; }
///
/// Removes and returns a single block from the pool.
///
/// A byte[] array
internal byte[] GetBlock()
{
byte[] block;
if (!this.smallPool.TryPop(out block))
{
// We'll add this back to the pool when the stream is disposed
// (unless our free pool is too large)
block = new byte[this.BlockSize];
Events.Writer.MemoryStreamNewBlockCreated(this.smallPoolInUseSize);
ReportBlockCreated();
}
else
{
Interlocked.Add(ref this.smallPoolFreeSize, -this.BlockSize);
}
Interlocked.Add(ref this.smallPoolInUseSize, this.BlockSize);
return block;
}
///
/// Returns a buffer of arbitrary size from the large buffer pool. This buffer
/// will be at least the requiredSize and always be a multiple of largeBufferMultiple.
///
/// The minimum length of the buffer
/// The tag of the stream returning this buffer, for logging if necessary.
/// A buffer of at least the required size.
internal byte[] GetLargeBuffer(int requiredSize, string tag)
{
requiredSize = this.RoundToLargeBufferMultiple(requiredSize);
var poolIndex = requiredSize / this.largeBufferMultiple - 1;
byte[] buffer;
if (poolIndex < this.largePools.Length)
{
if (!this.largePools[poolIndex].TryPop(out buffer))
{
buffer = new byte[requiredSize];
Events.Writer.MemoryStreamNewLargeBufferCreated(requiredSize, this.LargePoolInUseSize);
ReportLargeBufferCreated();
}
else
{
Interlocked.Add(ref this.largeBufferFreeSize[poolIndex], -buffer.Length);
}
}
else
{
// Buffer is too large to pool. They get a new buffer.
// We still want to track the size, though, and we've reserved a slot
// in the end of the inuse array for nonpooled bytes in use.
poolIndex = this.largeBufferInUseSize.Length - 1;
// We still want to round up to reduce heap fragmentation.
buffer = new byte[requiredSize];
string callStack = null;
if (this.GenerateCallStacks)
{
// Grab the stack -- we want to know who requires such large buffers
callStack = Environment.StackTrace;
}
Events.Writer.MemoryStreamNonPooledLargeBufferCreated(requiredSize, tag, callStack);
ReportLargeBufferCreated();
}
Interlocked.Add(ref this.largeBufferInUseSize[poolIndex], buffer.Length);
return buffer;
}
private int RoundToLargeBufferMultiple(int requiredSize)
{
return ((requiredSize + this.LargeBufferMultiple - 1) / this.LargeBufferMultiple) * this.LargeBufferMultiple;
}
private bool IsLargeBufferMultiple(int value)
{
return (value != 0) && (value % this.LargeBufferMultiple) == 0;
}
///
/// Returns the buffer to the large pool
///
/// The buffer to return.
/// The tag of the stream returning this buffer, for logging if necessary.
/// buffer is null
/// buffer.Length is not a multiple of LargeBufferMultiple (it did not originate from this pool)
internal void ReturnLargeBuffer(byte[] buffer, string tag)
{
if (buffer == null)
{
throw new ArgumentNullException(nameof(buffer));
}
if (!this.IsLargeBufferMultiple(buffer.Length))
{
throw new ArgumentException(
"buffer did not originate from this memory manager. The size is not a multiple of " +
this.LargeBufferMultiple);
}
var poolIndex = buffer.Length / this.largeBufferMultiple - 1;
if (poolIndex < this.largePools.Length)
{
if ((this.largePools[poolIndex].Count + 1) * buffer.Length <= this.MaximumFreeLargePoolBytes ||
this.MaximumFreeLargePoolBytes == 0)
{
this.largePools[poolIndex].Push(buffer);
Interlocked.Add(ref this.largeBufferFreeSize[poolIndex], buffer.Length);
}
else
{
Events.Writer.MemoryStreamDiscardBuffer(Events.MemoryStreamBufferType.Large, tag,
Events.MemoryStreamDiscardReason.EnoughFree);
ReportLargeBufferDiscarded(Events.MemoryStreamDiscardReason.EnoughFree);
}
}
else
{
// This is a non-poolable buffer, but we still want to track its size for inuse
// analysis. We have space in the inuse array for this.
poolIndex = this.largeBufferInUseSize.Length - 1;
Events.Writer.MemoryStreamDiscardBuffer(Events.MemoryStreamBufferType.Large, tag,
Events.MemoryStreamDiscardReason.TooLarge);
ReportLargeBufferDiscarded(Events.MemoryStreamDiscardReason.TooLarge);
}
Interlocked.Add(ref this.largeBufferInUseSize[poolIndex], -buffer.Length);
ReportUsageReport(this.smallPoolInUseSize, this.smallPoolFreeSize, this.LargePoolInUseSize,
this.LargePoolFreeSize);
}
///
/// Returns the blocks to the pool
///
/// Collection of blocks to return to the pool
/// The tag of the stream returning these blocks, for logging if necessary.
/// blocks is null
/// blocks contains buffers that are the wrong size (or null) for this memory manager
internal void ReturnBlocks(ICollection blocks, string tag)
{
if (blocks == null)
{
throw new ArgumentNullException(nameof(blocks));
}
var bytesToReturn = blocks.Count * this.BlockSize;
Interlocked.Add(ref this.smallPoolInUseSize, -bytesToReturn);
foreach (var block in blocks)
{
if (block == null || block.Length != this.BlockSize)
{
throw new ArgumentException("blocks contains buffers that are not BlockSize in length");
}
}
foreach (var block in blocks)
{
if (this.MaximumFreeSmallPoolBytes == 0 || this.SmallPoolFreeSize < this.MaximumFreeSmallPoolBytes)
{
Interlocked.Add(ref this.smallPoolFreeSize, this.BlockSize);
this.smallPool.Push(block);
}
else
{
Events.Writer.MemoryStreamDiscardBuffer(Events.MemoryStreamBufferType.Small, tag,
Events.MemoryStreamDiscardReason.EnoughFree);
ReportBlockDiscarded();
break;
}
}
ReportUsageReport(this.smallPoolInUseSize, this.smallPoolFreeSize, this.LargePoolInUseSize,
this.LargePoolFreeSize);
}
internal void ReportBlockCreated()
{
this.BlockCreated?.Invoke();
}
internal void ReportBlockDiscarded()
{
this.BlockDiscarded?.Invoke();
}
internal void ReportLargeBufferCreated()
{
this.LargeBufferCreated?.Invoke();
}
internal void ReportLargeBufferDiscarded(Events.MemoryStreamDiscardReason reason)
{
this.LargeBufferDiscarded?.Invoke(reason);
}
internal void ReportStreamCreated()
{
this.StreamCreated?.Invoke();
}
internal void ReportStreamDisposed()
{
this.StreamDisposed?.Invoke();
}
internal void ReportStreamFinalized()
{
this.StreamFinalized?.Invoke();
}
internal void ReportStreamLength(long bytes)
{
this.StreamLength?.Invoke(bytes);
}
internal void ReportStreamToArray()
{
this.StreamConvertedToArray?.Invoke();
}
internal void ReportUsageReport(
long smallPoolInUseBytes, long smallPoolFreeBytes, long largePoolInUseBytes, long largePoolFreeBytes)
{
this.UsageReport?.Invoke(smallPoolInUseBytes, smallPoolFreeBytes, largePoolInUseBytes, largePoolFreeBytes);
}
///
/// Retrieve a new MemoryStream object with no tag and a default initial capacity.
///
/// A MemoryStream.
public MemoryStream GetStream()
{
return new RecyclableMemoryStream(this);
}
///
/// Retrieve a new MemoryStream object with the given tag and a default initial capacity.
///
/// A tag which can be used to track the source of the stream.
/// A MemoryStream.
public MemoryStream GetStream(string tag)
{
return new RecyclableMemoryStream(this, tag);
}
///
/// Retrieve a new MemoryStream object with the given tag and at least the given capacity.
///
/// A tag which can be used to track the source of the stream.
/// The minimum desired capacity for the stream.
/// A MemoryStream.
public MemoryStream GetStream(string tag, int requiredSize)
{
return new RecyclableMemoryStream(this, tag, requiredSize);
}
///
/// Retrieve a new MemoryStream object with the given tag and at least the given capacity, possibly using
/// a single continugous underlying buffer.
///
/// Retrieving a MemoryStream which provides a single contiguous buffer can be useful in situations
/// where the initial size is known and it is desirable to avoid copying data between the smaller underlying
/// buffers to a single large one. This is most helpful when you know that you will always call GetBuffer
/// on the underlying stream.
/// A tag which can be used to track the source of the stream.
/// The minimum desired capacity for the stream.
/// Whether to attempt to use a single contiguous buffer.
/// A MemoryStream.
public MemoryStream GetStream(string tag, int requiredSize, bool asContiguousBuffer)
{
if (!asContiguousBuffer || requiredSize <= this.BlockSize)
{
return this.GetStream(tag, requiredSize);
}
return new RecyclableMemoryStream(this, tag, requiredSize, this.GetLargeBuffer(requiredSize, tag));
}
///
/// Retrieve a new MemoryStream object with the given tag and with contents copied from the provided
/// buffer. The provided buffer is not wrapped or used after construction.
///
/// The new stream's position is set to the beginning of the stream when returned.
/// A tag which can be used to track the source of the stream.
/// The byte buffer to copy data from.
/// The offset from the start of the buffer to copy from.
/// The number of bytes to copy from the buffer.
/// A MemoryStream.
[SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
public MemoryStream GetStream(string tag, byte[] buffer, int offset, int count)
{
var stream = new RecyclableMemoryStream(this, tag, count);
stream.Write(buffer, offset, count);
stream.Position = 0;
return stream;
}
///
/// Triggered when a new block is created.
///
public event EventHandler BlockCreated;
///
/// Triggered when a new block is created.
///
public event EventHandler BlockDiscarded;
///
/// Triggered when a new large buffer is created.
///
public event EventHandler LargeBufferCreated;
///
/// Triggered when a new stream is created.
///
public event EventHandler StreamCreated;
///
/// Triggered when a stream is disposed.
///
public event EventHandler StreamDisposed;
///
/// Triggered when a stream is finalized.
///
public event EventHandler StreamFinalized;
///
/// Triggered when a stream is finalized.
///
public event StreamLengthReportHandler StreamLength;
///
/// Triggered when a user converts a stream to array.
///
public event EventHandler StreamConvertedToArray;
///
/// Triggered when a large buffer is discarded, along with the reason for the discard.
///
public event LargeBufferDiscardedEventHandler LargeBufferDiscarded;
///
/// Periodically triggered to report usage statistics.
///
public event UsageReportEventHandler UsageReport;
}
}