Skip to content

Commit e443c32

Browse files
committed
Bound the thread pools used by API host clients.
Replaced unbounded CachedThreadPools with bounded thread pools (max 200 threads) in both JdkHttpApiHostClient and JettyHttpApiHostClient. This prevents excessive thread creation during RPC retry storms, which can overwhelm the JVM and the Appserver, leading to masked INTERNAL_ERROR responses. The bounded pools will queue requests instead of spawning new threads indefinitely.
1 parent 7f28c34 commit e443c32

File tree

3 files changed

+125
-320
lines changed

3 files changed

+125
-320
lines changed

runtime/runtime_impl_jetty121/src/main/java/com/google/apphosting/runtime/http/HttpApiHostClient.java

Lines changed: 35 additions & 293 deletions
Original file line numberDiff line numberDiff line change
@@ -16,306 +16,48 @@
1616

1717
package com.google.apphosting.runtime.http;
1818

19-
import com.google.apphosting.base.protos.RuntimePb.APIRequest;
20-
import com.google.apphosting.base.protos.RuntimePb.APIResponse;
21-
import com.google.apphosting.base.protos.RuntimePb.APIResponse.ERROR;
22-
import com.google.apphosting.base.protos.RuntimePb.APIResponse.RpcError;
23-
import com.google.apphosting.base.protos.Status.StatusProto;
24-
import com.google.apphosting.base.protos.api_bytes.RemoteApiPb;
19+
import static com.google.apphosting.runtime.http.HttpApiHostClient.REQUEST_ENDPOINT;
20+
2521
import com.google.apphosting.runtime.anyrpc.APIHostClientInterface;
26-
import com.google.apphosting.runtime.anyrpc.AnyRpcCallback;
27-
import com.google.apphosting.runtime.anyrpc.AnyRpcClientContext;
28-
import com.google.apphosting.utils.runtime.ApiProxyUtils;
29-
import com.google.auto.value.AutoValue;
30-
import com.google.common.base.Preconditions;
31-
import com.google.common.collect.ImmutableMap;
22+
import com.google.apphosting.runtime.http.HttpApiHostClient.Config;
3223
import com.google.common.flogger.GoogleLogger;
33-
import com.google.protobuf.ByteString;
34-
import com.google.protobuf.CodedInputStream;
35-
import com.google.protobuf.ExtensionRegistry;
36-
import com.google.protobuf.UninitializedMessageException;
37-
import java.io.IOException;
38-
import java.util.Optional;
24+
import com.google.common.net.HostAndPort;
3925
import java.util.OptionalInt;
4026

41-
/** A client of the APIHost service over HTTP. */
42-
abstract class HttpApiHostClient implements APIHostClientInterface {
27+
/** Makes instances of {@link HttpApiHostClient}. */
28+
public class HttpApiHostClientFactory {
4329
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
30+
private HttpApiHostClientFactory() {}
4431

4532
/**
46-
* Extra timeout that will be used for the HTTP request. If the API timeout is 5 seconds, the HTTP
47-
* request will have a timeout of 5 + {@value #DEFAULT_EXTRA_TIMEOUT_SECONDS} seconds. Usually
48-
* another timeout will happen first, either the API timeout on the server or the TimedFuture
49-
* timeout on the client, but this one enables us to clean up the HttpClient if the server is
50-
* unresponsive.
51-
*/
52-
static final double DEFAULT_EXTRA_TIMEOUT_SECONDS = 2.0;
53-
54-
static final ImmutableMap<String, String> HEADERS =
55-
ImmutableMap.of(
56-
"X-Google-RPC-Service-Endpoint", "app-engine-apis",
57-
"X-Google-RPC-Service-Method", "/VMRemoteAPI.CallRemoteAPI");
58-
static final String CONTENT_TYPE_VALUE = "application/octet-stream";
59-
static final String REQUEST_ENDPOINT = "/rpc_http";
60-
static final String DEADLINE_HEADER = "X-Google-RPC-Service-Deadline";
61-
62-
private static final int UNKNOWN_ERROR_CODE = 1;
63-
64-
// TODO: study the different limits that we have for different transports and
65-
// make them more consistent, as well as sharing definitions like this one.
66-
/** The maximum size in bytes that we will allow in a request or a response payload. */
67-
static final int MAX_PAYLOAD = 50 * 1024 * 1024;
68-
69-
/**
70-
* Extra bytes that we allow in the HTTP content, basically to support serializing the other proto
71-
* fields besides the payload.
33+
* Creates a new HttpApiHostClient instance to talk to the HTTP-based API server on the given host
34+
* and port. This method is called reflectively from ApiHostClientFactory.
35+
*
36+
* <p>The maximum number of concurrent connections can be configured by setting the {@code
37+
* APPENGINE_API_MAX_CONNECTIONS} environment variable to a positive integer. If set, this
38+
* value overrides the {@code maxConcurrentRpcs} parameter.
39+
*
40+
* @param hostAndPort The host and port of the API server.
41+
* @param maxConcurrentRpcs The default maximum number of concurrent RPCs, used if the
42+
* environment variable is not set.
43+
* @return A new {@link APIHostClientInterface} instance.
7244
*/
73-
static final int EXTRA_CONTENT_BYTES = 4096;
74-
75-
@AutoValue
76-
abstract static class Config {
77-
abstract double extraTimeoutSeconds();
78-
79-
abstract OptionalInt maxConnectionsPerDestination();
80-
81-
/** For testing that we handle missing Content-Length correctly. */
82-
abstract boolean ignoreContentLength();
83-
84-
/**
85-
* Treat {@link java.nio.channels.ClosedChannelException} as indicating cancellation. We know
86-
* that this happens occasionally in a test that generates many interrupts. But we don't know if
87-
* there are other reasons for which it might arise, so for now we do not do this in production.
88-
*
89-
* <p>See <a href="http://b/70494739#comment31">this bug</a> for further background.
90-
*/
91-
abstract boolean treatClosedChannelAsCancellation();
92-
93-
static Builder builder() {
94-
return new AutoValue_HttpApiHostClient_Config.Builder()
95-
.setExtraTimeoutSeconds(DEFAULT_EXTRA_TIMEOUT_SECONDS)
96-
.setIgnoreContentLength(false)
97-
.setTreatClosedChannelAsCancellation(false);
98-
}
99-
100-
abstract Builder toBuilder();
101-
102-
@AutoValue.Builder
103-
abstract static class Builder {
104-
abstract Builder setMaxConnectionsPerDestination(OptionalInt value);
105-
106-
abstract Builder setExtraTimeoutSeconds(double value);
107-
108-
abstract Builder setIgnoreContentLength(boolean value);
109-
110-
abstract Builder setTreatClosedChannelAsCancellation(boolean value);
111-
112-
abstract Config build();
113-
}
114-
}
115-
116-
private final Config config;
117-
118-
HttpApiHostClient(Config config) {
119-
this.config = config;
120-
}
121-
122-
Config config() {
123-
return config;
124-
}
125-
126-
static HttpApiHostClient create(String url, Config config) {
127-
if (System.getenv("APPENGINE_API_CALLS_USING_JDK_CLIENT") != null) {
128-
logger.atInfo().log("Using JDK HTTP client for API calls");
129-
return JdkHttpApiHostClient.create(url, config);
130-
} else {
131-
return JettyHttpApiHostClient.create(url, config);
132-
}
133-
}
134-
135-
static class Context implements AnyRpcClientContext {
136-
private final long startTimeMillis;
137-
138-
private int applicationError;
139-
private String errorDetail;
140-
private StatusProto status;
141-
private Throwable exception;
142-
private Optional<Long> deadlineNanos = Optional.empty();
143-
144-
Context() {
145-
this.startTimeMillis = System.currentTimeMillis();
146-
}
147-
148-
@Override
149-
public int getApplicationError() {
150-
return applicationError;
151-
}
152-
153-
void setApplicationError(int applicationError) {
154-
this.applicationError = applicationError;
155-
}
156-
157-
@Override
158-
public String getErrorDetail() {
159-
return errorDetail;
160-
}
161-
162-
void setErrorDetail(String errorDetail) {
163-
this.errorDetail = errorDetail;
164-
}
165-
166-
@Override
167-
public Throwable getException() {
168-
return exception;
169-
}
170-
171-
void setException(Throwable exception) {
172-
this.exception = exception;
173-
}
174-
175-
@Override
176-
public long getStartTimeMillis() {
177-
return startTimeMillis;
178-
}
179-
180-
@Override
181-
public StatusProto getStatus() {
182-
return status;
183-
}
184-
185-
void setStatus(StatusProto status) {
186-
this.status = status;
187-
}
188-
189-
@Override
190-
public void setDeadline(double seconds) {
191-
Preconditions.checkArgument(seconds >= 0);
192-
double nanos = 1_000_000_000 * seconds;
193-
Preconditions.checkArgument(nanos <= Long.MAX_VALUE);
194-
this.deadlineNanos = Optional.of((long) nanos);
195-
}
196-
197-
Optional<Long> getDeadlineNanos() {
198-
return deadlineNanos;
199-
}
200-
201-
@Override
202-
public void startCancel() {
203-
logger.atWarning().log("Canceling HTTP API call has no effect");
204-
}
205-
}
206-
207-
@Override
208-
public Context newClientContext() {
209-
return new Context();
210-
}
211-
212-
static void communicationFailure(
213-
Context context, String errorDetail, AnyRpcCallback<APIResponse> callback, Throwable cause) {
214-
context.setApplicationError(0);
215-
context.setErrorDetail(errorDetail);
216-
context.setStatus(
217-
StatusProto.newBuilder()
218-
.setSpace("RPC")
219-
.setCode(UNKNOWN_ERROR_CODE)
220-
.setCanonicalCode(UNKNOWN_ERROR_CODE)
221-
.setMessage(errorDetail)
222-
.build());
223-
context.setException(cause);
224-
callback.failure();
225-
}
226-
227-
// This represents a timeout of our HTTP request. We don't usually expect this, because we
228-
// include a timeout in the API call which the server should respect. However, this fallback
229-
// logic ensures that we will get an appropriate and timely exception if the server is very slow
230-
// to respond for some reason.
231-
// ApiProxyImpl will normally have given up before this happens, so the main purpose of the
232-
// timeout is to free up resources from the failed HTTP request.
233-
static void timeout(AnyRpcCallback<APIResponse> callback) {
234-
APIResponse apiResponse =
235-
APIResponse.newBuilder()
236-
.setError(APIResponse.ERROR.RPC_ERROR_VALUE)
237-
.setRpcError(RpcError.DEADLINE_EXCEEDED)
238-
.build();
239-
callback.success(apiResponse);
240-
// This is "success" in the sense that we got back a response, but one that will provoke
241-
// an ApiProxy.ApiDeadlineExceededException.
242-
}
243-
244-
static void cancelled(AnyRpcCallback<APIResponse> callback) {
245-
APIResponse apiResponse = APIResponse.newBuilder().setError(ERROR.CANCELLED_VALUE).build();
246-
callback.success(apiResponse);
247-
// This is "success" in the sense that we got back a response, but one that will provoke
248-
// an ApiProxy.CancelledException.
249-
}
250-
251-
@Override
252-
public void call(AnyRpcClientContext ctx, APIRequest req, AnyRpcCallback<APIResponse> cb) {
253-
Context context = (Context) ctx;
254-
ByteString payload = req.getPb();
255-
if (payload.size() > MAX_PAYLOAD) {
256-
requestTooBig(cb);
257-
return;
258-
}
259-
RemoteApiPb.Request requestPb =
260-
RemoteApiPb.Request.newBuilder()
261-
.setServiceName(req.getApiPackage())
262-
.setMethod(req.getCall())
263-
.setRequest(payload)
264-
.setRequestId(req.getSecurityTicket())
265-
.setTraceContext(req.getTraceContext().toByteString())
266-
.build();
267-
send(requestPb.toByteArray(), context, cb);
268-
}
269-
270-
static void receivedResponse(
271-
byte[] responseBytes,
272-
int responseLength,
273-
Context context,
274-
AnyRpcCallback<APIResponse> callback) {
275-
logger.atFine().log("Response size %d", responseLength);
276-
CodedInputStream input = CodedInputStream.newInstance(responseBytes, 0, responseLength);
277-
RemoteApiPb.Response responsePb;
278-
try {
279-
responsePb = RemoteApiPb.Response.parseFrom(input, ExtensionRegistry.getEmptyRegistry());
280-
} catch (UninitializedMessageException | IOException e) {
281-
String errorDetail = "Failed to parse RemoteApiPb.Response";
282-
logger.atWarning().withCause(e).log("%s", errorDetail);
283-
communicationFailure(context, errorDetail, callback, e);
284-
return;
285-
}
286-
287-
if (responsePb.hasApplicationError()) {
288-
RemoteApiPb.ApplicationError applicationError = responsePb.getApplicationError();
289-
context.setApplicationError(applicationError.getCode());
290-
context.setErrorDetail(applicationError.getDetail());
291-
context.setStatus(StatusProto.getDefaultInstance());
292-
callback.failure();
293-
return;
294-
}
295-
296-
APIResponse apiResponse =
297-
APIResponse.newBuilder()
298-
.setError(ApiProxyUtils.remoteApiErrorToApiResponseError(responsePb).getNumber())
299-
.setPb(responsePb.getResponse())
300-
.build();
301-
callback.success(apiResponse);
302-
}
303-
304-
abstract void send(byte[] requestBytes, Context context, AnyRpcCallback<APIResponse> callback);
305-
306-
private static void requestTooBig(AnyRpcCallback<APIResponse> cb) {
307-
APIResponse apiResponse =
308-
APIResponse.newBuilder().setError(ERROR.REQUEST_TOO_LARGE_VALUE).build();
309-
cb.success(apiResponse);
310-
// This is "success" in the sense that we got back a response, but one that will provoke
311-
// an ApiProxy.RequestTooLargeException.
312-
}
313-
314-
static void responseTooBig(AnyRpcCallback<APIResponse> cb) {
315-
APIResponse apiResponse =
316-
APIResponse.newBuilder().setError(ERROR.RESPONSE_TOO_LARGE_VALUE).build();
317-
cb.success(apiResponse);
318-
// This is "success" in the sense that we got back a response, but one that will provoke
319-
// an ApiProxy.ResponseTooLargeException.
45+
public static APIHostClientInterface create(
46+
HostAndPort hostAndPort, OptionalInt maxConcurrentRpcs) {
47+
String url = "http://" + hostAndPort + REQUEST_ENDPOINT;
48+
String maxConnectionsEnv = System.getenv("APPENGINE_API_MAX_CONNECTIONS");
49+
if (maxConnectionsEnv != null) {
50+
try {
51+
int maxConnections = Integer.parseInt(maxConnectionsEnv);
52+
if (maxConnections > 0) {
53+
maxConcurrentRpcs = OptionalInt.of(maxConnections);
54+
}
55+
} catch (NumberFormatException e) {
56+
logger.atWarning().withCause(e).log(
57+
"Failed to parse APPENGINE_API_MAX_CONNECTIONS: %s", maxConnectionsEnv);
58+
}
59+
}
60+
Config config = Config.builder().setMaxConnectionsPerDestination(maxConcurrentRpcs).build();
61+
return HttpApiHostClient.create(url, config);
32062
}
32163
}

0 commit comments

Comments
 (0)