Skip to content

Add streaming HTTP response support to gremlin-driver#3419

Open
Cole-Greer wants to merge 11 commits into
masterfrom
java-streaming
Open

Add streaming HTTP response support to gremlin-driver#3419
Cole-Greer wants to merge 11 commits into
masterfrom
java-streaming

Conversation

@Cole-Greer
Copy link
Copy Markdown
Contributor

Summary

Adds streaming HTTP response deserialization to gremlin-driver for GraphBinary. Instead of buffering the entire HTTP response body before processing, the driver now deserializes results incrementally as data arrives from the server.

Design

Four new classes in gremlin-driver:

  • ByteBufQueueInputStream — Bridges the Netty event loop and a reader thread via a BlockingQueue of ByteBufs. The event loop offers HTTP content chunks; the reader thread consumes them through standard InputStream reads.
  • InputStreamBuffer — Read-only Buffer implementation backed by an InputStream. Allows existing TypeSerializer implementations to work unchanged over a streaming response body.
  • GraphBinaryStreamResponseReader — Reads the GraphBinary v4 wire format one item at a time, pushing each result to the ResultSet as it is deserialized.
  • HttpStreamingResponseHandler — Netty handler that replaces HttpObjectAggregator + HttpGremlinResponseDecoder in the pipeline for GraphBinary responses.

Non-GraphBinary serializers (GraphSON, custom MessageSerializer implementations) fall back to the previous aggregating pipeline.

A new cached thread pool (gremlin-driver-stream-reader) is added to Cluster for the blocking reader threads, bounded by connection pool size.

@Cole-Greer Cole-Greer force-pushed the java-streaming branch 3 times, most recently from c76f2bc to c284dc8 Compare May 11, 2026 16:54
Cole-Greer and others added 9 commits May 11, 2026 16:03
Adds incremental GraphBinary response deserialization to the Java driver.
HTTP response chunks are fed to a BlockingQueue-backed InputStream, and a
dedicated reader thread deserializes items one at a time using the existing
TypeSerializer infrastructure via an InputStream-backed Buffer adapter.

Results are pushed to the ResultSet as they are deserialized, enabling
consumers to process results before the full response is received.

Non-GraphBinary serializers (GraphSON, custom MessageSerializer) fall back
to the previous aggregating pipeline (HttpObjectAggregator +
HttpGremlinResponseDecoder).
…peline

Server: Guard sendLastHttpContent with state check so it is skipped when
writeError() in makeChunk() already terminated the response. Without this,
a serialization error mid-stream sends two LastHttpContent messages,
corrupting HTTP framing on keep-alive connections.

Client: Null out queueInputStream after signalEndOfStream on LastHttpContent
so spurious content between responses is dropped rather than offered to the
closed stream.
Prevent writeError() from sending data after the response is already in
ERROR or FINISHED state. Without this guard, any code path that calls
writeError() after the response is terminated sends a second
LastHttpContent, corrupting HTTP framing on keep-alive connections and
causing subsequent requests on the same connection to hang.

Assisted-by: Kiro:claude-sonnet-4-20250514
…ing pipeline

1. HttpGremlinEndpointHandler.makeChunk(): Defer setRequestState(FINISHED) until
   after serialization succeeds. Previously, serialization failure left the state
   as FINISHED causing writeError() to bail out, resulting in empty responses and
   client-side EOFExceptions.

2. Connection.write(): Change whenCompleteAsync to whenComplete so returnToPool()
   runs synchronously when readCompleted fires. This prevents a race where the
   caller submits the next request before the connection is returned to the pool,
   causing connection starvation and idle timeout hangs.

Assisted-by: Kiro:claude-sonnet-4-20250514
When a channel error occurs (e.g. TooLongFrameException), GremlinResponseHandler
calls ctx.close() which initiates an async channel close. The whenComplete callback
fires synchronously and checks isDead() (channel.isActive()), but isActive() may
still return true because the close hasn't completed deregistration yet.

Adding a channel.isOpen() check resolves this TOCTOU race because isOpen() returns
false immediately when close() is called, before deregistration completes.

Assisted-by: Kiro:claude-sonnet-4-20250514
…g pipeline

The streaming reader thread calls markComplete() when all results are
deserialized, but this could fire before the HTTP response's LastHttpContent
is processed by the codec. Reusing the connection before HTTP framing is
complete caused the codec to silently drop subsequent responses.

Changes:
- Move connection return (returnToPool) from the whenComplete callback to
  GremlinResponseHandler, triggered by LAST_CONTENT_READ_RESPONSE. This
  ensures the connection is only reused after HTTP framing is fully complete.
- Change HttpStreamingResponseHandler type parameter from DefaultHttpObject to
  HttpObject. Netty's LastHttpContent.EMPTY_LAST_CONTENT does not extend
  DefaultHttpObject, causing it to bypass decode() and preventing
  LAST_CONTENT_READ_RESPONSE from being emitted.
- Add streaming flag to GremlinResponseHandler to skip markComplete() for
  streaming responses (the reader thread owns ResultSet completion).
- Remove returnToPool() from Connection.write() whenComplete success path;
  it is now handled by the pipeline on LastHttpContent for both streaming
  and non-streaming paths.

Assisted-by: Kiro:claude-sonnet-4-20250514
- Make Connection.returnToPool() idempotent via AtomicBoolean guard to
  prevent double-return corrupting the connection pool in error paths
- Change streamingReaderPool max to scale with cluster topology
  (maxConnectionPoolSize * contactPoints * 4) instead of per-host limit
- Catch Throwable (not just Exception) in GraphBinaryStreamResponseReader
  to prevent consumer hangs on Error subclasses (OOM, StackOverflow)
- Add 30s poll timeout to ByteBufQueueInputStream to prevent reader
  threads from blocking indefinitely if end-of-stream is never signaled
- Set BYTES_READ attribute on first content (not headers) to preserve
  idle timeout error messaging when server sends headers before execution
- Clear pendingResultSet in non-GraphBinary error path to prevent stale
  state blocking graceful shutdown
- Rename isGraphBinaryResponse() to shouldStreamResponse() for clarity
- Expand upgrade documentation with user-visible behavior details
- Add HttpStreamingResponseHandlerTest covering happy path, double
  LastHttpContent, max content length, channelInactive, and error cases
- Add error-then-reuse integration test for connection recovery
@Cole-Greer
Copy link
Copy Markdown
Contributor Author

VOTE +1

Copy link
Copy Markdown
Contributor

@GumpacG GumpacG left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VOTE +1

Comment on lines +260 to +270
if (useStreaming) {
pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder);
pipeline.addLast(PIPELINE_HTTP_DECODER, streamingResponseHandler);
pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder);
} else {
pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, new HttpObjectAggregator(cluster.getMaxResponseContentLength() > 0
? (int) cluster.getMaxResponseContentLength() : Integer.MAX_VALUE));
pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder);
pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder);
pipeline.addLast(PIPELINE_HTTP_DECODER, gremlinResponseDecoder);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
if (useStreaming) {
pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder);
pipeline.addLast(PIPELINE_HTTP_DECODER, streamingResponseHandler);
pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder);
} else {
pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, new HttpObjectAggregator(cluster.getMaxResponseContentLength() > 0
? (int) cluster.getMaxResponseContentLength() : Integer.MAX_VALUE));
pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder);
pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder);
pipeline.addLast(PIPELINE_HTTP_DECODER, gremlinResponseDecoder);
}
if (!useStreaming) {
pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, new HttpObjectAggregator(cluster.getMaxResponseContentLength() > 0
? (int) cluster.getMaxResponseContentLength() : Integer.MAX_VALUE));
}
pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder);
pipeline.addLast(PIPELINE_HTTP_DECODER, streamingResponseHandler);
pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder);

@kenhuuu
Copy link
Copy Markdown
Contributor

kenhuuu commented May 15, 2026

VOTE +1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants