Redis Streams persistent stream provider for Microsoft Orleans 10.x
Uses Redis Streams (XADD / XREADGROUP / XACK) as a durable, cross-silo message transport for Orleans streaming.
Orleans ships with persistent stream providers for Azure Queue, Azure Event Hubs, and Amazon SQS — but not Redis. If you already use Redis for Orleans grain persistence and clustering, adding another infrastructure dependency just for streaming is unnecessary.
This package fills the gap: a lightweight Redis Streams adapter that works with any Redis 5.0+ instance.
- Cross-silo delivery — events published on Silo A are delivered to subscribers on Silo B
- Consumer groups — automatic offset tracking per silo via Redis consumer groups
- Partitioned queues — configurable number of stream partitions for parallelism
- Bounded retention —
MAXLENonXADDprevents unbounded stream growth - Multi-target — supports
net8.0,net9.0,net10.0 - JSON payload mode — optional human-readable JSON encoding for interop with non-Orleans consumers
- Minimal dependencies — only
Microsoft.Orleans.Streaming+StackExchange.Redis
dotnet add package Orleans.Streaming.Redisusing Orleans.Streaming.Redis.Configuration;
builder.Host.UseOrleans(silo =>
{
silo.AddRedisStreams("StreamProvider", options =>
{
options.ConnectionString = "localhost:6379";
options.QueueCount = 8; // number of partitions
options.MaxStreamLength = 10_000; // MAXLEN per stream key
options.KeyPrefix = "orleans:stream";
options.ConsumerGroup = "orleans";
});
});var streamProvider = clusterClient.GetStreamProvider("StreamProvider");
var streamId = StreamId.Create("my-namespace", "my-key");
var stream = streamProvider.GetStream<MyEvent>(streamId);
await stream.OnNextAsync(new MyEvent { ... });[ImplicitStreamSubscription("my-namespace")]
public class MyGrain : Grain, IAsyncObserver<MyEvent>
{
public override async Task OnActivateAsync(CancellationToken ct)
{
var provider = this.GetStreamProvider("StreamProvider");
var streamId = StreamId.Create("my-namespace", this.GetPrimaryKeyString());
var stream = provider.GetStream<MyEvent>(streamId);
await stream.SubscribeAsync(this);
}
public Task OnNextAsync(MyEvent item, StreamSequenceToken? token) { ... }
public Task OnCompletedAsync() => Task.CompletedTask;
public Task OnErrorAsync(Exception ex) => Task.CompletedTask;
}| Option | Default | Description |
|---|---|---|
ConnectionString |
"" |
Redis connection string (StackExchange.Redis format). Required. |
QueueCount |
8 |
Number of stream partitions. More = better parallelism. |
KeyPrefix |
"orleans:stream" |
Redis key prefix. Stream keys: {prefix}:{index} |
ConsumerGroup |
"orleans" |
Redis consumer group name. Each silo auto-joins. |
MaxBatchSize |
100 |
Max messages per XREADGROUP poll. |
MaxStreamLength |
10000 |
MAXLEN for XADD. Caps stream size. 0 = unlimited. |
Database |
-1 |
Redis database index. -1 = default from connection string. |
CacheSize |
4096 |
In-memory cache capacity (batch containers) per queue partition. |
PayloadMode |
Binary |
Binary (Orleans binary, default) or Json (human-readable, for interop with non-Orleans consumers). |
JsonSerializerOptions |
null |
Custom System.Text.Json.JsonSerializerOptions for JSON mode. null = camelCase, no indentation. |
DeadLetterPrefix |
null |
Redis key prefix for dead-letter stream. When set, undeserializable messages are forwarded here and XACK'd. null = disabled. |
Producer Grain
↓ stream.OnNextAsync(event)
RedisStreamAdapter.QueueMessageBatchAsync()
↓ XADD orleans:stream:{partition} * data <serialized>
Redis Stream (durable)
↓ XREADGROUP orleans silo-{guid} COUNT 100 > orleans:stream:{partition}
RedisStreamReceiver.GetQueueMessagesAsync()
↓ IBatchContainer
Orleans PullingAgent → IQueueCache → Subscriber Grain.OnNextAsync()
Each silo runs a pulling agent per queue partition. Messages are distributed across partitions via consistent hashing on StreamId.
By default, events are serialized using Orleans binary format (compact but opaque). Enable JSON mode to write human-readable entries that non-Orleans consumers can read directly:
silo.AddRedisStreams("StreamProvider", options =>
{
options.ConnectionString = "localhost:6379";
options.PayloadMode = RedisStreamPayloadMode.Json;
});JSON-mode entries in Redis look like this:
> XRANGE orleans:stream:0 - + COUNT 1
1) 1) "1679000000000-0"
2) 1) "_payload_mode" 2) "json"
3) "stream_namespace" 4) "orders"
5) "stream_key" 6) "ORD-42"
7) "payload" 8) "[{\"orderId\":\"ORD-42\",\"amount\":99.95}]"
Reading from Node.js, Python, or Go is straightforward — just XREADGROUP and parse the payload field as JSON.
The read path auto-detects the format (Binary vs JSON) via a discriminator field, so mixed entries are handled transparently during rolling deployments from Binary to JSON mode.
- Redis 5.0+ (Streams support)
- Redis Cluster is supported (stream keys are partitioned by prefix)
- Recommended: enable
maxmemory-policy allkeys-lruor setMaxStreamLengthto prevent OOM
v1.1.0 — Core functionality, cross-silo delivery, consumer groups, crash recovery, dead-letter support, JSON payload mode.
- Cross-silo delivery via Redis Streams
- Consumer groups with automatic offset tracking
- Crash recovery via XPENDING + XCLAIM
- Dead-letter stream support
- Optional JSON payload mode for non-Orleans consumers (v1.1.0)
- Getting Started — install, configure, produce, consume in 5 minutes
- Configuration Reference — all options, pulling agent tuning, client setup, multiple providers
- Architecture — data flow, components, serialization, crash recovery, metrics
- Production Deployment — Redis sizing, HA, monitoring, scaling, Docker Compose
- Troubleshooting — common issues, Redis CLI debugging, logging
Contributions are welcome! Check out the Contributing Guide to get started.
- Browse
good first issuefor beginner-friendly tasks - Read the Code of Conduct
- Review the Security Policy for reporting vulnerabilities
- GitHub Discussions — questions, ideas, show & tell
- GitHub Issues — bug reports and feature requests
If this project is useful to you, please consider giving it a ⭐ — it helps others discover it!
MIT — free for commercial and open-source use.