Fmacias.TplQueue.Observers
0.1.0-preview.1
Prefix Reserved
dotnet add package Fmacias.TplQueue.Observers --version 0.1.0-preview.1
NuGet\Install-Package Fmacias.TplQueue.Observers -Version 0.1.0-preview.1
<PackageReference Include="Fmacias.TplQueue.Observers" Version="0.1.0-preview.1" />
<PackageVersion Include="Fmacias.TplQueue.Observers" Version="0.1.0-preview.1" />
<PackageReference Include="Fmacias.TplQueue.Observers" />
paket add Fmacias.TplQueue.Observers --version 0.1.0-preview.1
#r "nuget: Fmacias.TplQueue.Observers, 0.1.0-preview.1"
#:package Fmacias.TplQueue.Observers@0.1.0-preview.1
#addin nuget:?package=Fmacias.TplQueue.Observers&version=0.1.0-preview.1&prerelease
#tool nuget:?package=Fmacias.TplQueue.Observers&version=0.1.0-preview.1&prerelease
Fmacias.TplQueue.Observers
Observer support package for TplQueue job lifecycle events.
See also:
- TplQueue.Adapter root README
- TplQueue.Core observers section
- TplQueue.Usage QueueObserverConsole sample
- TplQueue.Usage QueueObserverSignalRDashboard sample
- Fmacias.TplQueue README
Repository-wide packaging and strong-name signing rules are documented in the TplQueue.Adapter root README.
Table of contents
- Summary
- Install
- Why observers matter
- Module purpose
- Factory-first usage
- Built-in observers
- Queue integration
- Custom observers in consumer applications
- Dispatcher integration for UI applications
- Dashboard and SignalR bridge example
- Reactive Extensions
- Logging helper
- Design justification
Summary
Fmacias.TplQueue.Observers owns the observer implementations and observer factory used by the adapter facade. Consumers work with the public contracts from Fmacias.TplQueue.Contracts, especially:
IObserverFactoryIConsoleObserverILoggingObserverIFileLoggingObserverIProfilingObserverIObserverDispatcherIObserver<IJobEvent>
The built-in observer classes are internal implementation details. Use ObserverFactory.Create() or api.ObserverFactory() instead of constructing observer classes directly.
Install
dotnet add package Fmacias.TplQueue.Observers --version 0.1.0-preview.1
Why observers matter
Observer integration is a key part of the TplQueue architecture. A queue emits IJobEvent values while it executes IJobRoot and IDataJobRoot graphs. Those events can feed logs, metrics, audit streams, operational dashboards, and UI status panels without coupling the queue engine to a specific UI framework or transport.
This is especially useful for existing applications. A legacy WPF, WinForms, or ASP.NET application can keep its current UI and workflow while a TplQueue observer forwards job lifecycle events to a modern dashboard, a SignalR hub, a metrics pipeline, or a logging system. The application does not need to replace its legacy UI first; it can add visibility around background execution in parallel.
Module purpose
This package owns:
- the public
ObserverFactory - internal built-in observers for console, logging, file-style structured logging, and profiling scenarios
- the internal
DirectObserverDispatcherimplementation returned by the factory for inline dispatch - logging subscription helpers for queue event streams
Core still owns event publication through IQ : IObservable<IJobEvent>. This package owns the reusable observer implementations and the consumer-side construction entry point.
Factory-first usage
Use the factory from this package directly:
using Fmacias.TplQueue.Contracts;
using Fmacias.TplQueue.Observers;
using Microsoft.Extensions.Logging;
IObserverFactory observers = ObserverFactory.Create();
IConsoleObserver consoleObserver = observers.CreateConsoleObserver();
ILoggingObserver loggingObserver = observers.CreateLoggingObserver(
loggerFactory.CreateLogger<ILoggingObserver>());
IProfilingObserver profilingObserver = observers.CreateProfilingObserver(
loggerFactory.CreateLogger<IProfilingObserver>());
IFileLoggingObserver fileObserver = observers.CreateFileLoggingObserver(
loggerFactory.CreateLogger("TplQueue.Main"),
queueName: "main");
IObserverDispatcher dispatcher = observers.CreateObserverDispatcher();
Or use the top-level adapter facade:
IObserverFactory observers = api.ObserverFactory();
IConsoleObserver consoleObserver = observers.CreateConsoleObserver();
using IDisposable subscription = queue.Subscribe(consoleObserver);
Built-in observers
The factory can create these built-in observer contracts:
IConsoleObserver: writes basic event and error information to the consoleILoggingObserver: writes job lifecycle events throughILogger<ILoggingObserver>IFileLoggingObserver: writes structured queue event lines through an application-providedILoggerIProfilingObserver: writes memory and GC-oriented profiling information throughILogger<IProfilingObserver>IObserverDispatcher: dispatches observer callbacks through the default inline dispatcher
The concrete classes are internal so the package can evolve their implementation without forcing consumers to depend on constructor details.
Queue integration
Every queue implements IObservable<IJobEvent> through IQ. Subscribe observers directly to the queue:
IObserverFactory observers = api.ObserverFactory();
ILoggingObserver loggingObserver = observers.CreateLoggingObserver(
loggerFactory.CreateLogger<ILoggingObserver>());
using IDisposable subscription = queue.Subscribe(loggingObserver);
Useful event fields for monitoring include:
value.Statusvalue.JobInfo.Idvalue.JobInfo.Namevalue.JobInfo.CrossQueueIdvalue.Timestampvalue.RetryCountvalue.Exception
Observer callbacks should be treated as operational telemetry. They are useful for dashboards, logs, and diagnostics, but they should not become the transactional control path for the job itself.
Custom observers in consumer applications
Applications can implement their own observers by implementing IObserver<IJobEvent>.
using System;
using Fmacias.TplQueue.Contracts;
public sealed class DashboardObserver : IObserver<IJobEvent>
{
private readonly IJobDashboardSink _dashboard;
public DashboardObserver(IJobDashboardSink dashboard)
{
_dashboard = dashboard ?? throw new ArgumentNullException(nameof(dashboard));
}
public void OnNext(IJobEvent value)
{
if (value == null) throw new ArgumentNullException(nameof(value));
_dashboard.Publish(new JobDashboardEvent
{
JobId = value.JobInfo.Id,
JobName = value.JobInfo.Name,
Status = value.Status.ToString(),
Timestamp = value.Timestamp,
RetryCount = value.RetryCount
});
}
public void OnError(Exception error)
{
_dashboard.PublishError(error);
}
public void OnCompleted()
{
_dashboard.Complete();
}
}
Recommended rules:
- keep
OnNextfast - avoid blocking I/O directly in observer callbacks
- isolate observer failures from application workflow logic
- marshal to the required scheduler or UI thread outside Core
- treat observer delivery as monitoring and integration data, not as a replacement for durable business transactions
Dispatcher integration for UI applications
IObserverDispatcher is the small adapter service used when observer callbacks need to update UI-bound state. The observer package provides a direct inline dispatcher through CreateObserverDispatcher(). UI applications can implement their own dispatcher without adding UI framework dependencies to Core or to the observer contracts.
WPF
using System;
using System.Windows.Threading;
using Fmacias.TplQueue.Contracts;
public sealed class WpfObserverDispatcher : IObserverDispatcher
{
private readonly Dispatcher _dispatcher;
public WpfObserverDispatcher(Dispatcher dispatcher)
{
_dispatcher = dispatcher ?? throw new ArgumentNullException(nameof(dispatcher));
}
public void Invoke(Action action)
{
if (action == null) throw new ArgumentNullException(nameof(action));
_dispatcher.Invoke(action);
}
}
WinForms
using System;
using System.Windows.Forms;
using Fmacias.TplQueue.Contracts;
public sealed class WinFormsObserverDispatcher : IObserverDispatcher
{
private readonly Control _control;
public WinFormsObserverDispatcher(Control control)
{
_control = control ?? throw new ArgumentNullException(nameof(control));
}
public void Invoke(Action action)
{
if (action == null) throw new ArgumentNullException(nameof(action));
if (_control.InvokeRequired)
{
_control.BeginInvoke(action);
return;
}
action();
}
}
UWP
using System;
using Windows.UI.Core;
using Fmacias.TplQueue.Contracts;
public sealed class UwpObserverDispatcher : IObserverDispatcher
{
private readonly CoreDispatcher _dispatcher;
public UwpObserverDispatcher(CoreDispatcher dispatcher)
{
_dispatcher = dispatcher ?? throw new ArgumentNullException(nameof(dispatcher));
}
public void Invoke(Action action)
{
if (action == null) throw new ArgumentNullException(nameof(action));
_ = _dispatcher.RunAsync(CoreDispatcherPriority.Normal, () => action());
}
}
MAUI
using System;
using Fmacias.TplQueue.Contracts;
public sealed class MauiObserverDispatcher : IObserverDispatcher
{
public void Invoke(Action action)
{
if (action == null) throw new ArgumentNullException(nameof(action));
Microsoft.Maui.ApplicationModel.MainThread.BeginInvokeOnMainThread(action);
}
}
WinUI
using System;
using Microsoft.UI.Dispatching;
using Fmacias.TplQueue.Contracts;
public sealed class WinUiObserverDispatcher : IObserverDispatcher
{
private readonly DispatcherQueue _dispatcherQueue;
public WinUiObserverDispatcher(DispatcherQueue dispatcherQueue)
{
_dispatcherQueue = dispatcherQueue ?? throw new ArgumentNullException(nameof(dispatcherQueue));
}
public void Invoke(Action action)
{
if (action == null) throw new ArgumentNullException(nameof(action));
_dispatcherQueue.TryEnqueue(() => action());
}
}
Blazor or generic synchronization-context hosts
using System;
using System.Threading;
using Fmacias.TplQueue.Contracts;
public sealed class SynchronizationContextObserverDispatcher : IObserverDispatcher
{
private readonly SynchronizationContext _syncContext;
public SynchronizationContextObserverDispatcher(SynchronizationContext syncContext)
{
_syncContext = syncContext ?? throw new ArgumentNullException(nameof(syncContext));
}
public void Invoke(Action action)
{
if (action == null) throw new ArgumentNullException(nameof(action));
_syncContext.Post(_ => action(), null);
}
}
ASP.NET or background-hosted dashboard bridge
Server-side applications usually should not marshal to a UI thread. They can implement an observer that forwards events to an application service, channel, message bus, or SignalR hub.
using System;
using Fmacias.TplQueue.Contracts;
public sealed class DashboardForwardingObserver : IObserver<IJobEvent>
{
private readonly IDashboardEventPublisher _publisher;
public DashboardForwardingObserver(IDashboardEventPublisher publisher)
{
_publisher = publisher ?? throw new ArgumentNullException(nameof(publisher));
}
public void OnNext(IJobEvent value)
{
_publisher.PublishJobEvent(value);
}
public void OnError(Exception error)
{
_publisher.PublishObserverError(error);
}
public void OnCompleted()
{
_publisher.PublishObserverCompleted();
}
}
Dashboard and SignalR bridge example
The observer contract is suitable for real-time browser updates. Keep transport-specific code outside Core and outside the queue implementation.
using System;
using Fmacias.TplQueue.Contracts;
using Microsoft.AspNetCore.SignalR;
public sealed class JobEventSignalRObserver : IObserver<IJobEvent>
{
private readonly IHubContext<JobEventsHub> _hubContext;
public JobEventSignalRObserver(IHubContext<JobEventsHub> hubContext)
{
_hubContext = hubContext ?? throw new ArgumentNullException(nameof(hubContext));
}
public void OnNext(IJobEvent value)
{
_ = _hubContext.Clients.All.SendAsync("JobUpdated", new
{
value.JobInfo.Id,
value.JobInfo.Name,
Status = value.Status.ToString(),
value.Timestamp,
value.RetryCount
});
}
public void OnError(Exception error)
{
}
public void OnCompleted()
{
}
}
The separation is deliberate:
- the queue emits
IJobEvent - the observer maps those events to the dashboard or transport payload
- the application owns the transport, UI framework, and delivery policy
Reactive Extensions
If your application already uses System.Reactive, the queue can be consumed directly because IQ already implements IObservable<IJobEvent>.
That means operators such as filtering, buffering, throttling, and scheduler switching can be applied without a Core-specific adapter layer. Keep those Rx choices in the consumer application because they usually depend on the hosting model, scheduler, and dashboard or UI requirements.
Logging helper
SubscribeFileLogger is a convenience extension for queue event streams:
using Fmacias.TplQueue.Observers;
using IDisposable subscription = queue.SubscribeFileLogger(
loggerFactory,
queueName: "main");
Use the factory when you need to keep ownership of the observer instance:
IFileLoggingObserver observer = api.ObserverFactory().CreateFileLoggingObserver(
loggerFactory.CreateLogger("TplQueue.Main"),
queueName: "main");
using IDisposable subscription = queue.Subscribe(observer);
Design justification
Concrete observers do not belong in the thin top-level adapter facade. The observer package owns them because observer construction, default dispatching, and logging-oriented subscriptions are all part of the observer module.
Keeping the concrete observers internal gives consumers a stable contract-based surface:
Fmacias.TplQueuecan exposeapi.ObserverFactory()without owning observer implementations- consumers can create the built-in observers without depending on concrete constructor details
- applications can add WPF, WinForms, ASP.NET, SignalR, or dashboard-specific observers without changing Core
- Core stays focused on job graph execution, queue scheduling, retry integration, and event publication
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
| .NET Core | netcoreapp2.0 was computed. netcoreapp2.1 was computed. netcoreapp2.2 was computed. netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
| .NET Standard | netstandard2.0 is compatible. netstandard2.1 was computed. |
| .NET Framework | net461 was computed. net462 was computed. net463 was computed. net47 was computed. net471 was computed. net472 was computed. net48 was computed. net481 was computed. |
| MonoAndroid | monoandroid was computed. |
| MonoMac | monomac was computed. |
| MonoTouch | monotouch was computed. |
| Tizen | tizen40 was computed. tizen60 was computed. |
| Xamarin.iOS | xamarinios was computed. |
| Xamarin.Mac | xamarinmac was computed. |
| Xamarin.TVOS | xamarintvos was computed. |
| Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETStandard 2.0
- Fmacias.TplQueue.Abstractions (>= 0.1.0-preview.1)
- Microsoft.Extensions.Logging.Abstractions (>= 9.0.8)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on Fmacias.TplQueue.Observers:
| Package | Downloads |
|---|---|
|
Fmacias.TplQueue
Top-level adapter facade for composing TplQueue Core with retry policies, observers, cache providers, serializers, and configuration-driven queue creation. |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 0.1.0-preview.1 | 54 | 5/21/2026 |
Preview release of the TplQueue adapter package line for .NET Standard 2.0. This release focuses on public package metadata, consumer-facing README documentation, and strong-name-ready official build support ahead of the first stable 1.0.0 release.