Add streaming HTTP response support to gremlin-driver#3419
Open
Cole-Greer wants to merge 11 commits into
Open
Conversation
c76f2bc to
c284dc8
Compare
Adds incremental GraphBinary response deserialization to the Java driver. HTTP response chunks are fed to a BlockingQueue-backed InputStream, and a dedicated reader thread deserializes items one at a time using the existing TypeSerializer infrastructure via an InputStream-backed Buffer adapter. Results are pushed to the ResultSet as they are deserialized, enabling consumers to process results before the full response is received. Non-GraphBinary serializers (GraphSON, custom MessageSerializer) fall back to the previous aggregating pipeline (HttpObjectAggregator + HttpGremlinResponseDecoder).
…peline Server: Guard sendLastHttpContent with state check so it is skipped when writeError() in makeChunk() already terminated the response. Without this, a serialization error mid-stream sends two LastHttpContent messages, corrupting HTTP framing on keep-alive connections. Client: Null out queueInputStream after signalEndOfStream on LastHttpContent so spurious content between responses is dropped rather than offered to the closed stream.
Prevent writeError() from sending data after the response is already in ERROR or FINISHED state. Without this guard, any code path that calls writeError() after the response is terminated sends a second LastHttpContent, corrupting HTTP framing on keep-alive connections and causing subsequent requests on the same connection to hang. Assisted-by: Kiro:claude-sonnet-4-20250514
…ing pipeline 1. HttpGremlinEndpointHandler.makeChunk(): Defer setRequestState(FINISHED) until after serialization succeeds. Previously, serialization failure left the state as FINISHED causing writeError() to bail out, resulting in empty responses and client-side EOFExceptions. 2. Connection.write(): Change whenCompleteAsync to whenComplete so returnToPool() runs synchronously when readCompleted fires. This prevents a race where the caller submits the next request before the connection is returned to the pool, causing connection starvation and idle timeout hangs. Assisted-by: Kiro:claude-sonnet-4-20250514
When a channel error occurs (e.g. TooLongFrameException), GremlinResponseHandler calls ctx.close() which initiates an async channel close. The whenComplete callback fires synchronously and checks isDead() (channel.isActive()), but isActive() may still return true because the close hasn't completed deregistration yet. Adding a channel.isOpen() check resolves this TOCTOU race because isOpen() returns false immediately when close() is called, before deregistration completes. Assisted-by: Kiro:claude-sonnet-4-20250514
…g pipeline The streaming reader thread calls markComplete() when all results are deserialized, but this could fire before the HTTP response's LastHttpContent is processed by the codec. Reusing the connection before HTTP framing is complete caused the codec to silently drop subsequent responses. Changes: - Move connection return (returnToPool) from the whenComplete callback to GremlinResponseHandler, triggered by LAST_CONTENT_READ_RESPONSE. This ensures the connection is only reused after HTTP framing is fully complete. - Change HttpStreamingResponseHandler type parameter from DefaultHttpObject to HttpObject. Netty's LastHttpContent.EMPTY_LAST_CONTENT does not extend DefaultHttpObject, causing it to bypass decode() and preventing LAST_CONTENT_READ_RESPONSE from being emitted. - Add streaming flag to GremlinResponseHandler to skip markComplete() for streaming responses (the reader thread owns ResultSet completion). - Remove returnToPool() from Connection.write() whenComplete success path; it is now handled by the pipeline on LastHttpContent for both streaming and non-streaming paths. Assisted-by: Kiro:claude-sonnet-4-20250514
- Make Connection.returnToPool() idempotent via AtomicBoolean guard to prevent double-return corrupting the connection pool in error paths - Change streamingReaderPool max to scale with cluster topology (maxConnectionPoolSize * contactPoints * 4) instead of per-host limit - Catch Throwable (not just Exception) in GraphBinaryStreamResponseReader to prevent consumer hangs on Error subclasses (OOM, StackOverflow) - Add 30s poll timeout to ByteBufQueueInputStream to prevent reader threads from blocking indefinitely if end-of-stream is never signaled - Set BYTES_READ attribute on first content (not headers) to preserve idle timeout error messaging when server sends headers before execution - Clear pendingResultSet in non-GraphBinary error path to prevent stale state blocking graceful shutdown - Rename isGraphBinaryResponse() to shouldStreamResponse() for clarity - Expand upgrade documentation with user-visible behavior details - Add HttpStreamingResponseHandlerTest covering happy path, double LastHttpContent, max content length, channelInactive, and error cases - Add error-then-reuse integration test for connection recovery
b624acf to
1296ed4
Compare
Contributor
Author
|
VOTE +1 |
GumpacG
reviewed
May 15, 2026
Comment on lines
+260
to
+270
| if (useStreaming) { | ||
| pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder); | ||
| pipeline.addLast(PIPELINE_HTTP_DECODER, streamingResponseHandler); | ||
| pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder); | ||
| } else { | ||
| pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, new HttpObjectAggregator(cluster.getMaxResponseContentLength() > 0 | ||
| ? (int) cluster.getMaxResponseContentLength() : Integer.MAX_VALUE)); | ||
| pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder); | ||
| pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder); | ||
| pipeline.addLast(PIPELINE_HTTP_DECODER, gremlinResponseDecoder); | ||
| } |
Contributor
There was a problem hiding this comment.
nit:
Suggested change
| if (useStreaming) { | |
| pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder); | |
| pipeline.addLast(PIPELINE_HTTP_DECODER, streamingResponseHandler); | |
| pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder); | |
| } else { | |
| pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, new HttpObjectAggregator(cluster.getMaxResponseContentLength() > 0 | |
| ? (int) cluster.getMaxResponseContentLength() : Integer.MAX_VALUE)); | |
| pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder); | |
| pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder); | |
| pipeline.addLast(PIPELINE_HTTP_DECODER, gremlinResponseDecoder); | |
| } | |
| if (!useStreaming) { | |
| pipeline.addLast(PIPELINE_HTTP_AGGREGATOR, new HttpObjectAggregator(cluster.getMaxResponseContentLength() > 0 | |
| ? (int) cluster.getMaxResponseContentLength() : Integer.MAX_VALUE)); | |
| } | |
| pipeline.addLast(PIPELINE_HTTP_DECOMPRESSION_HANDLER, httpCompressionDecoder); | |
| pipeline.addLast(PIPELINE_HTTP_DECODER, streamingResponseHandler); | |
| pipeline.addLast(PIPELINE_HTTP_ENCODER, gremlinRequestEncoder); |
Contributor
|
VOTE +1 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds streaming HTTP response deserialization to gremlin-driver for GraphBinary. Instead of buffering the entire HTTP response body before processing, the driver now deserializes results incrementally as data arrives from the server.
Design
Four new classes in
gremlin-driver:Non-GraphBinary serializers (GraphSON, custom MessageSerializer implementations) fall back to the previous aggregating pipeline.
A new cached thread pool (
gremlin-driver-stream-reader) is added to Cluster for the blocking reader threads, bounded by connection pool size.