Skip to content

Neftedollar/Orleans.Streaming.Redis

Orleans.Streaming.Redis

CI NuGet NuGet Downloads codecov License: MIT .NET PRs Welcome GitHub Stars

Redis Streams persistent stream provider for Microsoft Orleans 10.x

Uses Redis Streams (XADD / XREADGROUP / XACK) as a durable, cross-silo message transport for Orleans streaming.

Why?

Orleans ships with persistent stream providers for Azure Queue, Azure Event Hubs, and Amazon SQS — but not Redis. If you already use Redis for Orleans grain persistence and clustering, adding another infrastructure dependency just for streaming is unnecessary.

This package fills the gap: a lightweight Redis Streams adapter that works with any Redis 5.0+ instance.

Features

  • Cross-silo delivery — events published on Silo A are delivered to subscribers on Silo B
  • Consumer groups — automatic offset tracking per silo via Redis consumer groups
  • Partitioned queues — configurable number of stream partitions for parallelism
  • Bounded retentionMAXLEN on XADD prevents unbounded stream growth
  • Multi-target — supports net8.0, net9.0, net10.0
  • JSON payload mode — optional human-readable JSON encoding for interop with non-Orleans consumers
  • Minimal dependencies — only Microsoft.Orleans.Streaming + StackExchange.Redis

Quick Start

Install

dotnet add package Orleans.Streaming.Redis

Configure

using Orleans.Streaming.Redis.Configuration;

builder.Host.UseOrleans(silo =>
{
    silo.AddRedisStreams("StreamProvider", options =>
    {
        options.ConnectionString = "localhost:6379";
        options.QueueCount = 8;           // number of partitions
        options.MaxStreamLength = 10_000; // MAXLEN per stream key
        options.KeyPrefix = "orleans:stream";
        options.ConsumerGroup = "orleans";
    });
});

Produce

var streamProvider = clusterClient.GetStreamProvider("StreamProvider");
var streamId = StreamId.Create("my-namespace", "my-key");
var stream = streamProvider.GetStream<MyEvent>(streamId);
await stream.OnNextAsync(new MyEvent { ... });

Consume

[ImplicitStreamSubscription("my-namespace")]
public class MyGrain : Grain, IAsyncObserver<MyEvent>
{
    public override async Task OnActivateAsync(CancellationToken ct)
    {
        var provider = this.GetStreamProvider("StreamProvider");
        var streamId = StreamId.Create("my-namespace", this.GetPrimaryKeyString());
        var stream = provider.GetStream<MyEvent>(streamId);
        await stream.SubscribeAsync(this);
    }

    public Task OnNextAsync(MyEvent item, StreamSequenceToken? token) { ... }
    public Task OnCompletedAsync() => Task.CompletedTask;
    public Task OnErrorAsync(Exception ex) => Task.CompletedTask;
}

Configuration Reference

Option Default Description
ConnectionString "" Redis connection string (StackExchange.Redis format). Required.
QueueCount 8 Number of stream partitions. More = better parallelism.
KeyPrefix "orleans:stream" Redis key prefix. Stream keys: {prefix}:{index}
ConsumerGroup "orleans" Redis consumer group name. Each silo auto-joins.
MaxBatchSize 100 Max messages per XREADGROUP poll.
MaxStreamLength 10000 MAXLEN for XADD. Caps stream size. 0 = unlimited.
Database -1 Redis database index. -1 = default from connection string.
CacheSize 4096 In-memory cache capacity (batch containers) per queue partition.
PayloadMode Binary Binary (Orleans binary, default) or Json (human-readable, for interop with non-Orleans consumers).
JsonSerializerOptions null Custom System.Text.Json.JsonSerializerOptions for JSON mode. null = camelCase, no indentation.
DeadLetterPrefix null Redis key prefix for dead-letter stream. When set, undeserializable messages are forwarded here and XACK'd. null = disabled.

Architecture

Producer Grain
    ↓ stream.OnNextAsync(event)
RedisStreamAdapter.QueueMessageBatchAsync()
    ↓ XADD orleans:stream:{partition} * data <serialized>
Redis Stream (durable)
    ↓ XREADGROUP orleans silo-{guid} COUNT 100 > orleans:stream:{partition}
RedisStreamReceiver.GetQueueMessagesAsync()
    ↓ IBatchContainer
Orleans PullingAgent → IQueueCache → Subscriber Grain.OnNextAsync()

Each silo runs a pulling agent per queue partition. Messages are distributed across partitions via consistent hashing on StreamId.

JSON Payload Mode

By default, events are serialized using Orleans binary format (compact but opaque). Enable JSON mode to write human-readable entries that non-Orleans consumers can read directly:

silo.AddRedisStreams("StreamProvider", options =>
{
    options.ConnectionString = "localhost:6379";
    options.PayloadMode = RedisStreamPayloadMode.Json;
});

JSON-mode entries in Redis look like this:

> XRANGE orleans:stream:0 - + COUNT 1
1) 1) "1679000000000-0"
   2) 1) "_payload_mode"  2) "json"
      3) "stream_namespace"  4) "orders"
      5) "stream_key"  6) "ORD-42"
      7) "payload"  8) "[{\"orderId\":\"ORD-42\",\"amount\":99.95}]"

Reading from Node.js, Python, or Go is straightforward — just XREADGROUP and parse the payload field as JSON.

The read path auto-detects the format (Binary vs JSON) via a discriminator field, so mixed entries are handled transparently during rolling deployments from Binary to JSON mode.

Redis Requirements

  • Redis 5.0+ (Streams support)
  • Redis Cluster is supported (stream keys are partitioned by prefix)
  • Recommended: enable maxmemory-policy allkeys-lru or set MaxStreamLength to prevent OOM

Status

v1.1.0 — Core functionality, cross-silo delivery, consumer groups, crash recovery, dead-letter support, JSON payload mode.

Changelog

  • Cross-silo delivery via Redis Streams
  • Consumer groups with automatic offset tracking
  • Crash recovery via XPENDING + XCLAIM
  • Dead-letter stream support
  • Optional JSON payload mode for non-Orleans consumers (v1.1.0)

Documentation

Contributing

Contributions are welcome! Check out the Contributing Guide to get started.

Community

If this project is useful to you, please consider giving it a ⭐ — it helps others discover it!

License

MIT — free for commercial and open-source use.

About

Redis Streams persistent stream provider for Microsoft Orleans 10.x

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages