DiskQueue 1.2.0

A thread-safe, multi-process(ish) persistent queue, based very heavily on http://ayende.com/blog/3479/rhino-queues-storage-disk .
Stores serialised data to the local file system for reading either later or concurrently

Install-Package DiskQueue -Version 1.2.0
dotnet add package DiskQueue --version 1.2.0
<PackageReference Include="DiskQueue" Version="1.2.0" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add DiskQueue --version 1.2.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.

DiskQueue

A thread-safe, multi-process(ish) persistent queue, based very heavily on http://ayende.com/blog/3479/rhino-queues-storage-disk .

Requirements and Environment

Works on .Net 4+ and Mono 2.10.8+ (3.0.6+ recommended)

Requires access to filesystem storage

Basic Usage

  • PersistentQueue.WaitFor(...) is the main entry point. This will attempt to gain an exclusive lock
    on the given storage location. On first use, a directory will be created with the required files
    inside it.
  • This queue object can be shared among threads. Each thread should call OpenSession() to get its
    own session object.
  • Both IPersistentQueues and IPersistentQueueSessions should be wrapped in using() clauses, or otherwise
    disposed of properly.

Example

Queue on one thread, consume on another; retry some exceptions.

Note this is one queue being shared between two sessions. You should not open two queue instances for one storage location at once.

IPersistentQueue queue = new PersistentQueue("queue_a");
var t1 = new Thread(() =>
{
	while (HaveWork())
	{
		using (var session = queue.OpenSession())
		{
			session.Enqueue(NextWorkItem());
			session.Flush();
		}
	}
});
var t2 = new Thread(()=> {
	while (true) {
		using (var session = queue.OpenSession()) {
			var data = session.Dequeue();
			if (data == null) {Thread.Sleep(100); continue;}
			
			try {
				MaybeDoWork(data)
				session.Flush();
			} catch (RetryException) {
				continue;
			} catch {
				session.Flush();
			}
		}
	}
});

t1.Start();
t2.Start();

Example

Batch up a load of work and have another thread work through it.

IPersistentQueue queue = new PersistentQueue("batchQueue");
var worker = new Thread(()=> {
	using (var session = queue.OpenSession()) {
		byte[] data;
		while ((data = session.Dequeue()) != null) {
			MaybeDoWork(data)
			session.Flush();
		}
	}
});

using (var session = queue.OpenSession()) {
	foreach (var item in LoadsOfStuff()) {
		session.Enqueue(item);
	}
	session.Flush();
}

worker.IsBackground = true; // anything not complete when we close will be left on the queue for next time.
worker.Start();

Transactions

Each session is a transaction. Any Enqueues or Dequeues will be rolled back when the session is disposed unless
you call session.Flush(). Data will only be visible between threads once it has been flushed.
Each flush incurs a performance penalty. By default, each flush is persisted to disk before continuing. You
can get more speed at a safety cost by setting queue.ParanoidFlushing = false;

Data loss and transaction truncation

By default, DiskQueue will silently discard transaction blocks that have been truncated; it will throw an InvalidOperationException
when transaction block markers are overwritten (this happens if more than one process is using the queue by mistake. It can also happen with some kinds of disk corruption).
If you construct your queue with throwOnConflict: false, all recoverable transaction errors will be silently truncated. This should only be used when
uptime is more important than data consistency.

using (var queue = new PersistentQueue(path, Constants._32Megabytes, throwOnConflict: false)) {
    . . .
}

Multi-Process Usage

Each IPersistentQueue gives exclusive access to the storage until it is disposed.
There is a static helper method PersistentQueue.WaitFor(&quot;path&quot;, TimeSpan...) which will wait to gain access until
other processes release the lock or the timeout expires.
If each process uses the lock for a short time and wait long enough, they can share a storage location.

E.g.

...
void AddToQueue(byte[] data) {
	Thread.Sleep(150);
	using (var queue = PersistentQueue.WaitFor(SharedStorage, TimeSpan.FromSeconds(30)))
	using (var session = queue.OpenSession()) {
		session.Enqueue(data);
		session.Flush();
	}
}

byte[] ReadQueue() {
	Thread.Sleep(150);
	using (var queue = PersistentQueue.WaitFor(SharedStorage, TimeSpan.FromSeconds(30)))
	using (var session = queue.OpenSession()) {
		var data = session.Dequeue();
		session.Flush();
		return data;
	}
}
...

If you need the transaction semantics of sessions across multiple processes, try a more robust solution like https://github.com/i-e-b/SevenDigital.Messaging

DiskQueue

A thread-safe, multi-process(ish) persistent queue, based very heavily on http://ayende.com/blog/3479/rhino-queues-storage-disk .

Requirements and Environment

Works on .Net 4+ and Mono 2.10.8+ (3.0.6+ recommended)

Requires access to filesystem storage

Basic Usage

  • PersistentQueue.WaitFor(...) is the main entry point. This will attempt to gain an exclusive lock
    on the given storage location. On first use, a directory will be created with the required files
    inside it.
  • This queue object can be shared among threads. Each thread should call OpenSession() to get its
    own session object.
  • Both IPersistentQueues and IPersistentQueueSessions should be wrapped in using() clauses, or otherwise
    disposed of properly.

Example

Queue on one thread, consume on another; retry some exceptions.

Note this is one queue being shared between two sessions. You should not open two queue instances for one storage location at once.

IPersistentQueue queue = new PersistentQueue("queue_a");
var t1 = new Thread(() =>
{
	while (HaveWork())
	{
		using (var session = queue.OpenSession())
		{
			session.Enqueue(NextWorkItem());
			session.Flush();
		}
	}
});
var t2 = new Thread(()=> {
	while (true) {
		using (var session = queue.OpenSession()) {
			var data = session.Dequeue();
			if (data == null) {Thread.Sleep(100); continue;}
			
			try {
				MaybeDoWork(data)
				session.Flush();
			} catch (RetryException) {
				continue;
			} catch {
				session.Flush();
			}
		}
	}
});

t1.Start();
t2.Start();

Example

Batch up a load of work and have another thread work through it.

IPersistentQueue queue = new PersistentQueue("batchQueue");
var worker = new Thread(()=> {
	using (var session = queue.OpenSession()) {
		byte[] data;
		while ((data = session.Dequeue()) != null) {
			MaybeDoWork(data)
			session.Flush();
		}
	}
});

using (var session = queue.OpenSession()) {
	foreach (var item in LoadsOfStuff()) {
		session.Enqueue(item);
	}
	session.Flush();
}

worker.IsBackground = true; // anything not complete when we close will be left on the queue for next time.
worker.Start();

Transactions

Each session is a transaction. Any Enqueues or Dequeues will be rolled back when the session is disposed unless
you call session.Flush(). Data will only be visible between threads once it has been flushed.
Each flush incurs a performance penalty. By default, each flush is persisted to disk before continuing. You
can get more speed at a safety cost by setting queue.ParanoidFlushing = false;

Data loss and transaction truncation

By default, DiskQueue will silently discard transaction blocks that have been truncated; it will throw an InvalidOperationException
when transaction block markers are overwritten (this happens if more than one process is using the queue by mistake. It can also happen with some kinds of disk corruption).
If you construct your queue with throwOnConflict: false, all recoverable transaction errors will be silently truncated. This should only be used when
uptime is more important than data consistency.

using (var queue = new PersistentQueue(path, Constants._32Megabytes, throwOnConflict: false)) {
    . . .
}

Multi-Process Usage

Each IPersistentQueue gives exclusive access to the storage until it is disposed.
There is a static helper method PersistentQueue.WaitFor(&quot;path&quot;, TimeSpan...) which will wait to gain access until
other processes release the lock or the timeout expires.
If each process uses the lock for a short time and wait long enough, they can share a storage location.

E.g.

...
void AddToQueue(byte[] data) {
	Thread.Sleep(150);
	using (var queue = PersistentQueue.WaitFor(SharedStorage, TimeSpan.FromSeconds(30)))
	using (var session = queue.OpenSession()) {
		session.Enqueue(data);
		session.Flush();
	}
}

byte[] ReadQueue() {
	Thread.Sleep(150);
	using (var queue = PersistentQueue.WaitFor(SharedStorage, TimeSpan.FromSeconds(30)))
	using (var session = queue.OpenSession()) {
		var data = session.Dequeue();
		session.Flush();
		return data;
	}
}
...

If you need the transaction semantics of sessions across multiple processes, try a more robust solution like https://github.com/i-e-b/SevenDigital.Messaging

Release Notes

Corrects ordering of reinstated queue items after a dispose.

Dependencies

This package has no dependencies.

This package is not used by any popular GitHub repositories.

Version History

Version Downloads Last updated
1.2.0 2,881 8/6/2018
1.1.0 556 2/22/2018
1.0.0 2,931 9/3/2015