Apache.Iggy 0.5.0

There is a newer prerelease version of this package available.
See the version list below for details.
dotnet add package Apache.Iggy --version 0.5.0
                    
NuGet\Install-Package Apache.Iggy -Version 0.5.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Apache.Iggy" Version="0.5.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Apache.Iggy" Version="0.5.0" />
                    
Directory.Packages.props
<PackageReference Include="Apache.Iggy" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Apache.Iggy --version 0.5.0
                    
#r "nuget: Apache.Iggy, 0.5.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Apache.Iggy@0.5.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Apache.Iggy&version=0.5.0
                    
Install as a Cake Addin
#tool nuget:?package=Apache.Iggy&version=0.5.0
                    
Install as a Cake Tool

C# SDK for Iggy

<div align="center">

Nuget (with prereleases)

</div>

Getting Started

Currently supported transfer protocols

  • TCP
  • HTTP

The whole SDK revolves around IIggyClient interface to create an instance of it, use following code

var loggerFactory = LoggerFactory.Create(builder =>
{
    builder
        .AddFilter("Iggy_SDK.MessageStream.Implementations;", LogLevel.Trace)
        .AddConsole();
});
var bus = MessageStreamFactory.CreateMessageStream(options =>
{
    options.BaseAdress = "127.0.0.1:8090";
    options.Protocol = Protocol.Tcp;
    options.TlsSettings = x =>
    {
        x.Enabled = false;
        x.Hostname = "iggy";
        x.Authenticate = false;
    };
}, loggerFactory);

Iggy necessitates the use of ILoggerFactory to generate logs from locations that are inaccessible to the user.

In addition to the basic configuration settings, Iggy provides support for batching send/poll messages at intervals, which effectively decreases the frequency of network calls, this option is enabled by default.

//---Snip---
var bus = MessageStreamFactory.CreateMessageStream(options =>
{
    options.BaseAdress = "127.0.0.1:8090";
    options.Protocol = protocol;
    options.TlsSettings = x =>
    {
        x.Enabled = false;
        x.Hostname = "iggy";
        x.Authenticate = false;
    };

    options.IntervalBatchingConfig = x =>
    {
        x.Enabled = true;
        x.Interval = TimeSpan.FromMilliseconds(100);
        x.MaxMessagesPerBatch = 1000;
        x.MaxRequests = 4096;
    };
    options.MessagePollingSettings = x =>
    {
        x.Interval = TimeSpan.FromMilliseconds(100);
        x.StoreOffsetStrategy = StoreOffset.AfterProcessingEachMessage;
    };
}, loggerFactory);

Creating and logging in a user

To begin, utilize the root account (note that the root account cannot be removed or updated).

var response = await bus.LoginUser(new LoginUserRequest
{
    Username = "iggy",
    Password = "iggy",
});

Furthermore, after logging in, you have the option to create an account with customizable Permissions.

//---Snip---
await bus.CreateUser(new CreateUserRequest
{
    Username = "test_user",
    Password = "pa55w0rD!@",
    Status = UserStatus.Active,
    Permissions = new Permissions
    {
        Global = new GlobalPermissions
        {
            ManageServers = true,
            ManageUsers = true,
            ManageStreams = true,
            ManageTopics = true,
            PollMessages = true,
            ReadServers = true,
            ReadStreams = true,
            ReadTopics = true,
            ReadUsers = true,
            SendMessages = true
        },
        Streams = new Dictionary<int, StreamPermissions>
        {
            {
                streamId, new StreamPermissions
                {
                    ManageStream = true,
                    ReadStream = true,
                    SendMessages = true,
                    PollMessages = true,
                    ManageTopics = true,
                    ReadTopics = true,
                    Topics = new Dictionary<int, TopicPermissions>
                    {
                        {
                            topicId, new TopicPermissions
                            {
                                ManageTopic = true,
                                ReadTopic = true,
                                PollMessages = true,
                                SendMessages = true
                            }
                        }
                    }
                }
            }
        }
    }
});

var response = await bus.LoginUser(new LoginUserRequest
{
    Username = "test_user",
    Password = "pa55w0rD!@",
});

Alternatively, once you've logged in, you can create a Personal Access Token that can be reused for further logins.

var response = await bus.LoginUser(new LoginUserRequest
{
    Username = "your_username",
    Password = "your_password",
});

var patResponse = await bus.CreatePersonalAccessTokenAsync(new CreatePersonalAccessTokenRequest
{
    Name = "first-pat",
    Expiry = 60, // seconds from creation time
});
await bus.LoginWithPersonalAccessToken(new LoginWithPersonalAccessToken
{
    Token = patResponse.Token
});

Creating first stream and topic

In order to create stream use CreateStreamAsync method.

await bus.CreateStreamAsync(new StreamRequest
{
    StreamId = 1,
    Name = "first-stream",
});

Every stream has a topic to which you can broadcast messages, for the purpose of create one use CreateTopicAsync method.

var streamId = Identifier.Numeric(1);
await bus.CreateTopicAsync(streamId, new TopicRequest
{
    Name = "first-topic",
    PartitionsCount = 3,
    TopicId = 1
});

Notice that both Stream aswell as Topic use - instead of space in its name, Iggy will replace any spaces in name with - instead, so keep that in mind.

Sending messages

To send messages you can use SendMessagesAsync method.

Func<byte[], byte[]> encryptor = static payload =>
{
    string aes_key = "AXe8YwuIn1zxt3FPWTZFlAa14EHdPAdN9FaZ9RQWihc=";
    string aes_iv = "bsxnWolsAyO7kCfWuyrnqg==";

    var key = Convert.FromBase64String(aes_key);
    var iv = Convert.FromBase64String(aes_iv);

    using Aes aes = Aes.Create();
    ICryptoTransform encryptor = aes.CreateEncryptor(key, iv);

    using MemoryStream memoryStream = new MemoryStream();
    using CryptoStream cryptoStream = new CryptoStream(memoryStream, encryptor, CryptoStreamMode.Write);
    using BinaryWriter streamWriter = new BinaryWriter(cryptoStream);
    streamWriter.Write(payload);

    return memoryStream.ToArray();
};

var messages = new List<Message>(); // your messages
var streamId = Identifier.Numeric(1);
var topicId = Identifier.Numeric(1);
await bus.SendMessagesAsync(new MessageSendRequest
{
    Messages = new List<Message>(),
    Partitioning = Partitioning.PartitionId(1),
    StreamId = streamId,
    TopicId = topicId,
}, encryptor); //encryptor is optional

The Message struct has two fields Id and Payload.

struct Message
{
    public required MessageHeader Header { get; init; }
    public required byte[] Payload { get; init; }
    public Dictionary<HeaderKey, HeaderValue>? UserHeaders { get; init; }
}

public readonly struct MessageHeader
{
    public ulong Checksum { get; init; }
    public UInt128 Id { get; init; }
    public ulong Offset { get; init; }
    public DateTimeOffset Timestamp { get; init; }
    public ulong OriginTimestamp { get; init; }
    public int UserHeadersLength { get; init; }
    public int PayloadLength { get; init; }
}

Furthermore, there's a generic overload for this method that takes binary serializer as argument.

//---Snip---
Func<Envelope, byte[]> serializer = static envelope =>
{
    Span<byte> buffer = stackalloc byte[envelope.MessageType.Length + 4 + envelope.Payload.Length];

    BinaryPrimitives.WriteInt32LittleEndian(buffer[..4], envelope.MessageType.Length);
    Encoding.UTF8.GetBytes(envelope.MessageType).CopyTo(buffer[4..(envelope.MessageType.Length + 4)]);
    Encoding.UTF8.GetBytes(envelope.Payload).CopyTo(buffer[(envelope.MessageType.Length + 4)..]);

    return buffer.ToArray();
};

var messages = new List<Envelope>(); // your messages
await bus.SendMessagesAsync(new MessageSendRequest<Envelope>
{
    StreamId = streamId,
    TopicId = topicId,
    Partitioning = Partitioning.PartitionId(1),
    Messages = messages
},
serializer,
encryptor);

Both generic and non generic method accept optional Headers dictionary.

//---Snip---
var headers = new Dictionary<HeaderKey, HeaderValue>
{
    { new HeaderKey { Value = "key_1".ToLower() }, HeaderValue.FromString("test-value-1") },
    { new HeaderKey { Value = "key_2".ToLower() }, HeaderValue.FromInt32(69) },
    { new HeaderKey { Value = "key_3".ToLower() }, HeaderValue.FromFloat(420.69f) },
    { new HeaderKey { Value = "key_4".ToLower() }, HeaderValue.FromBool(true) },
    { new HeaderKey { Value = "key_5".ToLower() }, HeaderValue.FromBytes(byteArray) },
    { new HeaderKey { Value = "key_6".ToLower() }, HeaderValue.FromInt128(new Int128(6969696969, 420420420)) },
    { new HeaderKey { Value = "key7".ToLower() }, HeaderValue.FromGuid(Guid.NewGuid()) }
};

await bus.SendMessagesAsync<Envelope>(new MessageSendRequest<Envelope>
{
    StreamId = streamId,
    TopicId = topicId,
    Partitioning = Partitioning.PartitionId(1),
    Messages = messages
},
serializer,
encryptor,
headers);

Fetching Messages

Fetching messages is done with FetchMessagesAsync.

Func<byte[], byte[]> decryptor = static payload =>
{
    string aes_key = "AXe8YwuIn1zxt3FPWTZFlAa14EHdPAdN9FaZ9RQWihc=";
    string aes_iv = "bsxnWolsAyO7kCfWuyrnqg==";

    var key = Convert.FromBase64String(aes_key);
    var iv = Convert.FromBase64String(aes_iv);

    using Aes aes = Aes.Create();
    ICryptoTransform decryptor = aes.CreateDecryptor(key, iv);

    using MemoryStream memoryStream = new MemoryStream(payload);
    using CryptoStream cryptoStream = new CryptoStream(memoryStream, decryptor, CryptoStreamMode.Read);
    using BinaryReader binaryReader = new BinaryReader(cryptoStream);

    return binaryReader.ReadBytes(payload.Length);
};

var messages = await bus.FetchMessagesAsync(new MessageFetchRequest
{
    StreamId = streamId,
    TopicId = topicId,
    Consumer = Consumer.New(1),
    Count = 1,
    PartitionId = 1,
    PollingStrategy = PollingStrategy.Next(),
    AutoCommit = true
},
decryptor);

Similarly, as with SendMessagesAsync, there's a generic overload that accepts a binary deserializer.

//---Snip---
Func<byte[], Envelope> deserializer = serializedData =>
{
    Envelope envelope = new Envelope();
    int messageTypeLength = BitConverter.ToInt32(serializedData, 0);
    envelope.MessageType = Encoding.UTF8.GetString(serializedData, 4, messageTypeLength);
    envelope.Payload = Encoding.UTF8.GetString(serializedData, 4 + messageTypeLength, serializedData.Length - (4 + messageTypeLength));
    return envelope;
};

var messages = await bus.FetchMessagesAsync<Envelope>(new MessageFetchRequest
{
    StreamId = streamId,
    TopicId = topicId,
    Consumer = Consumer.New(1),
    Count = 1,
    PartitionId = 1,
    PollingStrategy = PollingStrategy.Next(),
    AutoCommit = true
}, deserializer, decryptor);

Beyond the FetchMessagesAsync functionality, there's also a PollMessagesAsync method that spawns new thread which polls messages in background.

//---Snip---
await foreach (var messageResponse in bus.PollMessagesAsync<Envelope>(new PollMessagesRequest
{
    Consumer = Consumer.New(consumerId),
    Count = 1,
    TopicId = topicId,
    StreamId = streamId,
    PartitionId = 1,
    PollingStrategy = PollingStrategy.Next(),
}, deserializer, decryptor))
{
    //handle the message response
}

It is worth noting that every method (except PollMessagesAsync) will throw an InvalidResponseException when encountering an error.

If you register IIggyClient in a dependency injection container, you will have access to interfaces that encapsulate smaller parts of the system IIggyStream IIggyTopic IIggyPublisher IIggyConsumer IIggyConsumerGroup IIggyOffset IIggyPartition IIggyUsers IIggyUtils

For more information about how Iggy works check its documentation

Producer / Consumer Sample

To run the samples, first get Iggy, Run the server with cargo run --bin iggy-server, then get the SDK, cd into Iggy_SDK and run following commands: dotnet run -c Release --project Iggy_Sample_Producer for producer, dotnet run -c Release --project Iggy_Sample_Consumer for consumer.

TODO

  • Add support for ASP.NET Core Dependency Injection
Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.6.0-edge.1 12 8/15/2025
0.5.0 46 8/10/2025
0.5.0-edge.1 64 7/27/2025