EventStreamProcessing.Kafka 1.0.0

Event stream processing micro-framework for Apache Kafka.

Install-Package EventStreamProcessing.Kafka -Version 1.0.0
dotnet add package EventStreamProcessing.Kafka --version 1.0.0
<PackageReference Include="EventStreamProcessing.Kafka" Version="1.0.0" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add EventStreamProcessing.Kafka --version 1.0.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.

Event Stream Processing Micro-Framework

Single event stream processing micro-framework for Apache Kafka using .NET Core

Introduction

This framework provides a set of interfaces and abstract base classes for building an event stream processing pipeline. These are contained in the EventStreamProcessing.Abstractions package, are generic in nature, and are not tied to any one streaming platform, such as Apache Kafka. To use these abstractions simply create a class that extends EventProcessor and supply the required consumers, producers and message handlers.

While the abstractions are not coupled to any streaming platform, the EventStreamProcessing.Kafka package provides an implementation that uses the Confluent.Kafka package to read and write event streams using Apache Kafka.

Sample Description

The best way to become familiar with this framework is to examine the EventStreamProcessing.Sample.Worker project in the samples folder. You can use Docker to run a local instance of the Kafka broker, then run the sample worker, consumer and producer apps.

Here is a diagram depicting how an event stream is processed by the Sample Worker service to validate, enrich and filter messages before writing them back to Kafka.

  1. The Sample Producer console app lets the user write a stream of events to the Kafka broker using the "raw-events" topic. The numeral represents the event key, and the text "Hello World" presents the event value.
  2. The Sample Worker service injects an IEventProcessor into the KafkaWorker class constuctor. Then ExecuteAsync method calls eventProcessor.Process in a while loop until the operation is cancelled.
  3. The Program.CreateHostBuilder method registers an IEventProcessor for dependency injection with a KafkaEventProcessor that uses KafkaEventConsumer, KafkaEventProducer and an array of MessageHandler with ValidationHandler, EnrichmentHandler and FilterHandler.
// Add event processor
services.AddSingleton<IEventProcessor>(sp =>
{
    // Create logger, consumer, producers
    var logger = sp.GetRequiredService<ILogger>();
    var kafkaConsumer = KafkaUtils.CreateConsumer(
        consumerOptions.Brokers, consumerOptions.TopicsList,
        sp.GetRequiredService<ILogger>());
    var producerOptions = sp.GetRequiredService<ProducerOptions>();
    var kafkaErrorProducer = KafkaUtils.CreateProducer(
        producerOptions.Brokers, producerOptions.ValidationTopic,
        sp.GetRequiredService<ILogger>());
    var kafkaFinalProducer = KafkaUtils.CreateProducer(
        producerOptions.Brokers, producerOptions.FinalTopic,
        sp.GetRequiredService<ILogger>());

    // Create handlers
    var handlers = new List<MessageHandler>
    {
        new ValidationHandler(
            sp.GetRequiredService<IDictionary<int, string>>(),
            new KafkaEventProducer<int, string>(kafkaErrorProducer, producerOptions.ValidationTopic, logger),
            logger),
        new EnrichmentHandler(
            sp.GetRequiredService<IDictionary<int, string>>(), logger),
        new FilterHandler(
            m => !m.Value.Contains("Hello"), logger) // Filter out English greetings
    };

    // Create event processor
    return new KafkaEventProcessor<int, string, int, string>(
        new KafkaEventConsumer<int, string>(kafkaConsumer, logger),
        new KafkaEventProducer<int, string>(kafkaFinalProducer, producerOptions.FinalTopic, logger),
        handlers.ToArray());
});
  1. The KafkaEventConsumer in Sample Worker subscribes to the "raw-events" topic of the Kafka broker running on localhost:9092. The message handlers validate, enrich and filter the events one at a time. If there are validation errors, those are written back to Kafka with a "validation-errors" topic. This takes place if the message key does not correlate to a key in the language store. The EnrichmentHandler looks up a translation for "Hello" in the language store and transforms the message with the selected translation. The FilterHandler accepts a lambda expression for filtering messages. In this case the English phrase "Hello" is filtered out. Lastly, the KafkaEventProducer writes processed events back to Kafka using the "final-events" topic.
  2. The Sample Consumer console app reads the "validation-errors" and "final-events" topics, displaying them in the console.

Event Stream Processing Micro-Framework

Single event stream processing micro-framework for Apache Kafka using .NET Core

Introduction

This framework provides a set of interfaces and abstract base classes for building an event stream processing pipeline. These are contained in the EventStreamProcessing.Abstractions package, are generic in nature, and are not tied to any one streaming platform, such as Apache Kafka. To use these abstractions simply create a class that extends EventProcessor and supply the required consumers, producers and message handlers.

While the abstractions are not coupled to any streaming platform, the EventStreamProcessing.Kafka package provides an implementation that uses the Confluent.Kafka package to read and write event streams using Apache Kafka.

Sample Description

The best way to become familiar with this framework is to examine the EventStreamProcessing.Sample.Worker project in the samples folder. You can use Docker to run a local instance of the Kafka broker, then run the sample worker, consumer and producer apps.

Here is a diagram depicting how an event stream is processed by the Sample Worker service to validate, enrich and filter messages before writing them back to Kafka.

  1. The Sample Producer console app lets the user write a stream of events to the Kafka broker using the "raw-events" topic. The numeral represents the event key, and the text "Hello World" presents the event value.
  2. The Sample Worker service injects an IEventProcessor into the KafkaWorker class constuctor. Then ExecuteAsync method calls eventProcessor.Process in a while loop until the operation is cancelled.
  3. The Program.CreateHostBuilder method registers an IEventProcessor for dependency injection with a KafkaEventProcessor that uses KafkaEventConsumer, KafkaEventProducer and an array of MessageHandler with ValidationHandler, EnrichmentHandler and FilterHandler.
// Add event processor
services.AddSingleton<IEventProcessor>(sp =>
{
    // Create logger, consumer, producers
    var logger = sp.GetRequiredService<ILogger>();
    var kafkaConsumer = KafkaUtils.CreateConsumer(
        consumerOptions.Brokers, consumerOptions.TopicsList,
        sp.GetRequiredService<ILogger>());
    var producerOptions = sp.GetRequiredService<ProducerOptions>();
    var kafkaErrorProducer = KafkaUtils.CreateProducer(
        producerOptions.Brokers, producerOptions.ValidationTopic,
        sp.GetRequiredService<ILogger>());
    var kafkaFinalProducer = KafkaUtils.CreateProducer(
        producerOptions.Brokers, producerOptions.FinalTopic,
        sp.GetRequiredService<ILogger>());

    // Create handlers
    var handlers = new List<MessageHandler>
    {
        new ValidationHandler(
            sp.GetRequiredService<IDictionary<int, string>>(),
            new KafkaEventProducer<int, string>(kafkaErrorProducer, producerOptions.ValidationTopic, logger),
            logger),
        new EnrichmentHandler(
            sp.GetRequiredService<IDictionary<int, string>>(), logger),
        new FilterHandler(
            m => !m.Value.Contains("Hello"), logger) // Filter out English greetings
    };

    // Create event processor
    return new KafkaEventProcessor<int, string, int, string>(
        new KafkaEventConsumer<int, string>(kafkaConsumer, logger),
        new KafkaEventProducer<int, string>(kafkaFinalProducer, producerOptions.FinalTopic, logger),
        handlers.ToArray());
});
  1. The KafkaEventConsumer in Sample Worker subscribes to the "raw-events" topic of the Kafka broker running on localhost:9092. The message handlers validate, enrich and filter the events one at a time. If there are validation errors, those are written back to Kafka with a "validation-errors" topic. This takes place if the message key does not correlate to a key in the language store. The EnrichmentHandler looks up a translation for "Hello" in the language store and transforms the message with the selected translation. The FilterHandler accepts a lambda expression for filtering messages. In this case the English phrase "Hello" is filtered out. Lastly, the KafkaEventProducer writes processed events back to Kafka using the "final-events" topic.
  2. The Sample Consumer console app reads the "validation-errors" and "final-events" topics, displaying them in the console.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version History

Version Downloads Last updated
1.0.0 90 6/25/2020