diff --git a/packages/react-native/ReactAndroid/src/main/java/com/facebook/react/devsupport/CxxInspectorPackagerConnection.kt b/packages/react-native/ReactAndroid/src/main/java/com/facebook/react/devsupport/CxxInspectorPackagerConnection.kt index ef98c3df2ec2..c6b6b7839353 100644 --- a/packages/react-native/ReactAndroid/src/main/java/com/facebook/react/devsupport/CxxInspectorPackagerConnection.kt +++ b/packages/react-native/ReactAndroid/src/main/java/com/facebook/react/devsupport/CxxInspectorPackagerConnection.kt @@ -9,12 +9,18 @@ package com.facebook.react.devsupport import android.os.Handler import android.os.Looper +import com.facebook.common.logging.FLog import com.facebook.jni.HybridData import com.facebook.proguard.annotations.DoNotStrip import com.facebook.proguard.annotations.DoNotStripAny +import com.facebook.react.common.annotations.VisibleForTesting import com.facebook.react.devsupport.inspector.DevSupportHttpClient import com.facebook.soloader.SoLoader import java.io.Closeable +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.util.ArrayDeque +import java.util.Queue import okhttp3.Request import okhttp3.Response import okhttp3.WebSocket @@ -66,7 +72,7 @@ internal class CxxInspectorPackagerConnection( */ @DoNotStripAny private interface IWebSocket : Closeable { - fun send(message: String) + fun send(chunk: ByteBuffer) /** * Close the WebSocket connection. NOTE: There is no close() method in the C++ interface. @@ -75,6 +81,95 @@ internal class CxxInspectorPackagerConnection( override fun close() } + /** + * A simple WebSocket wrapper that prevents having more than 16MiB of messages queued + * simultaneously. This is done to stop OkHttp from closing the WebSocket connection. + * + * https://github.com/facebook/react-native/issues/39651. + * https://github.com/square/okhttp/blob/4e7dbec1ea6c9cf8d80422ac9d44b9b185c749a3/okhttp/src/commonJvmAndroid/kotlin/okhttp3/internal/ws/RealWebSocket.kt#L684. + */ + private class InspectorPackagerWebSocketImpl( + private val nativeWebSocket: WebSocket, + private val handler: Handler, + ) : IWebSocket { + private val messageQueue: Queue> = ArrayDeque() + private val queueLock = Any() + private val drainRunnable = + object : Runnable { + override fun run() { + FLog.d(TAG, "Attempting to drain the message queue after ${drainDelayMs}ms") + tryDrainQueue() + } + } + + /** + * We are providing a String to OkHttp's WebSocket, because there is no guarantee that all CDP + * clients will support binary data format. + */ + override fun send(chunk: ByteBuffer) { + synchronized(queueLock) { + val messageSize = chunk.capacity() + val message = StandardCharsets.UTF_8.decode(chunk).toString() + val currentQueueSize = nativeWebSocket.queueSize() + + if (currentQueueSize + messageSize > MAX_QUEUE_SIZE) { + FLog.d(TAG, "Reached queue size limit. Queueing the message.") + messageQueue.offer(Pair(message, messageSize)) + scheduleDrain() + } else { + if (messageQueue.isEmpty()) { + nativeWebSocket.send(message) + } else { + messageQueue.offer(Pair(message, messageSize)) + tryDrainQueue() + } + } + } + } + + override fun close() { + synchronized(queueLock) { + handler.removeCallbacks(drainRunnable) + messageQueue.clear() + nativeWebSocket.close(1000, "End of session") + } + } + + private fun tryDrainQueue() { + synchronized(queueLock) { + while (messageQueue.isNotEmpty()) { + val (nextMessage, nextMessageSize) = messageQueue.peek() ?: break + val currentQueueSize = nativeWebSocket.queueSize() + + if (currentQueueSize + nextMessageSize <= MAX_QUEUE_SIZE) { + messageQueue.poll() + if (!nativeWebSocket.send(nextMessage)) { + // The WebSocket is closing, closed, or cancelled. + handler.removeCallbacks(drainRunnable) + messageQueue.clear() + + break + } + } else { + scheduleDrain() + break + } + } + } + } + + private fun scheduleDrain() { + FLog.d(TAG, "Scheduled a task to drain messages queue.") + handler.removeCallbacks(drainRunnable) + handler.postDelayed(drainRunnable, drainDelayMs) + } + + companion object { + private val TAG: String = InspectorPackagerWebSocketImpl::class.java.simpleName + private const val drainDelayMs: Long = 100 + } + } + /** Java implementation of the C++ InspectorPackagerConnectionDelegate interface. */ private class DelegateImpl { private val httpClient = DevSupportHttpClient.websocketClient @@ -124,15 +219,8 @@ internal class CxxInspectorPackagerConnection( } }, ) - return object : IWebSocket { - override fun send(message: String) { - webSocket.send(message) - } - override fun close() { - webSocket.close(1000, "End of session") - } - } + return InspectorPackagerWebSocketImpl(webSocket, handler) } @DoNotStrip @@ -146,6 +234,8 @@ internal class CxxInspectorPackagerConnection( SoLoader.loadLibrary("react_devsupportjni") } + @VisibleForTesting internal const val MAX_QUEUE_SIZE = 16L * 1024 * 1024 // 16MiB + @JvmStatic private external fun initHybrid( url: String, diff --git a/packages/react-native/ReactAndroid/src/main/java/com/facebook/react/devsupport/inspector/InspectorNetworkHelper.kt b/packages/react-native/ReactAndroid/src/main/java/com/facebook/react/devsupport/inspector/InspectorNetworkHelper.kt index 1dab02ea9dc4..5b6198b5d8a0 100644 --- a/packages/react-native/ReactAndroid/src/main/java/com/facebook/react/devsupport/inspector/InspectorNetworkHelper.kt +++ b/packages/react-native/ReactAndroid/src/main/java/com/facebook/react/devsupport/inspector/InspectorNetworkHelper.kt @@ -55,7 +55,7 @@ internal object InspectorNetworkHelper { response.body().use { responseBody -> if (responseBody != null) { val inputStream = responseBody.byteStream() - val chunkSize = 1024 + val chunkSize = 8 * 1024 // 8Kb val buffer = ByteArray(chunkSize) var bytesRead: Int diff --git a/packages/react-native/ReactAndroid/src/main/jni/react/devsupport/JCxxInspectorPackagerConnectionWebSocket.cpp b/packages/react-native/ReactAndroid/src/main/jni/react/devsupport/JCxxInspectorPackagerConnectionWebSocket.cpp index 7de472736303..efd1948f7aa5 100644 --- a/packages/react-native/ReactAndroid/src/main/jni/react/devsupport/JCxxInspectorPackagerConnectionWebSocket.cpp +++ b/packages/react-native/ReactAndroid/src/main/jni/react/devsupport/JCxxInspectorPackagerConnectionWebSocket.cpp @@ -5,6 +5,8 @@ * LICENSE file in the root directory of this source tree. */ +#include + #include "JCxxInspectorPackagerConnectionWebSocket.h" using namespace facebook::jni; @@ -12,10 +14,35 @@ using namespace facebook::react::jsinspector_modern; namespace facebook::react::jsinspector_modern { +namespace { + +local_ref getReadOnlyByteBufferFromStringView( + std::string_view sv) { + auto buffer = JByteBuffer::wrapBytes( + const_cast(reinterpret_cast(sv.data())), + sv.size()); + + /** + * Return a read-only buffer that shares the underlying contents. + * This guards from accidential mutations on the Java side, since we did + * casting above. + * + * https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html#asReadOnlyBuffer-- + */ + static auto method = + buffer->javaClassStatic()->getMethod( + "asReadOnlyBuffer"); + return method(buffer); +} + +} // namespace + void JCxxInspectorPackagerConnectionWebSocket::send(std::string_view message) { static auto method = - javaClassStatic()->getMethod("send"); - method(self(), std::string(message)); + javaClassStatic()->getMethod)>( + "send"); + auto byteBuffer = getReadOnlyByteBufferFromStringView(message); + method(self(), byteBuffer); } void JCxxInspectorPackagerConnectionWebSocket::close() { diff --git a/packages/react-native/ReactAndroid/src/test/java/com/facebook/react/devsupport/CxxInspectorPackagerConnectionTest.kt b/packages/react-native/ReactAndroid/src/test/java/com/facebook/react/devsupport/CxxInspectorPackagerConnectionTest.kt new file mode 100644 index 000000000000..462317018ec7 --- /dev/null +++ b/packages/react-native/ReactAndroid/src/test/java/com/facebook/react/devsupport/CxxInspectorPackagerConnectionTest.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +package com.facebook.react.devsupport + +import okhttp3.internal.ws.RealWebSocket +import org.assertj.core.api.Assertions.assertThat +import org.junit.Test + +class CxxInspectorPackagerConnectionTest { + + @Test + fun testMaxQueueSizeEquality() { + val okHttpRealWebSocketClass = RealWebSocket::class.java + val okHttpMaxQueueSizeField = okHttpRealWebSocketClass.getDeclaredField("MAX_QUEUE_SIZE") + okHttpMaxQueueSizeField.isAccessible = true + + val okHttpMaxQueueSize = okHttpMaxQueueSizeField.getLong(null) + assertThat(okHttpMaxQueueSize).isNotNull + + assertThat(okHttpMaxQueueSize) + .isEqualTo(CxxInspectorPackagerConnection.Companion.MAX_QUEUE_SIZE) + } +} diff --git a/packages/react-native/ReactCommon/jsinspector-modern/TracingAgent.cpp b/packages/react-native/ReactCommon/jsinspector-modern/TracingAgent.cpp index 2c8b2030a7a3..956f6246c8b7 100644 --- a/packages/react-native/ReactCommon/jsinspector-modern/TracingAgent.cpp +++ b/packages/react-native/ReactCommon/jsinspector-modern/TracingAgent.cpp @@ -26,12 +26,8 @@ const uint16_t TRACE_EVENT_CHUNK_SIZE = 1000; /** * The maximum number of ProfileChunk trace events * that will be sent in a single CDP Tracing.dataCollected message. - * TODO(T219394401): Increase the size once we manage the queue on OkHTTP - side - * properly and avoid WebSocket disconnections when sending a message larger - * than 16MB. */ -const uint16_t PROFILE_TRACE_EVENT_CHUNK_SIZE = 1; +const uint16_t PROFILE_TRACE_EVENT_CHUNK_SIZE = 10; } // namespace