Skip to content

Commit c93938e

Browse files
authored
Add Bech32 and Prometheus metrics support
1 parent 3e4d16b commit c93938e

1 file changed

Lines changed: 89 additions & 9 deletions

File tree

src/apps/relay/RelayWebsocket.cpp

Lines changed: 89 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
#include "StrfryTemplates.h"
44
#include "app_git_version.h"
55
#include "Favicon.h"
6+
#include "Bech32Utils.h"
7+
#include "PrometheusMetrics.h"
68

79

810

@@ -19,7 +21,9 @@ static std::string preGenerateHttpResponse(const std::string &contentType, const
1921
return output;
2022
};
2123

24+
2225
static std::string preGenerateHttpRedirect(const std::string &location, const std::string &contentType, const std::string &content) {
26+
2327
std::string output = "HTTP/1.1 301 Moved Permanently\r\n";
2428
output += std::string("Content-Type: ") + contentType + "\r\n";
2529
output += "Access-Control-Allow-Origin: *\r\n";
@@ -32,6 +36,7 @@ static std::string preGenerateHttpRedirect(const std::string &location, const st
3236
return output;
3337
};
3438

39+
3540
void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
3641
struct Connection {
3742
uWS::WebSocket<uWS::SERVER> *websocket;
@@ -43,6 +48,10 @@ void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
4348
uint64_t bytesUpCompressed = 0;
4449
uint64_t bytesDown = 0;
4550
uint64_t bytesDownCompressed = 0;
51+
// Application bytes handed to uWS that have not yet been fully drained
52+
// to the kernel (either still queued in uWS or in-flight in a partial
53+
// send). Useful for diagnosing slow or stalled clients.
54+
uint64_t pendingOutbound = 0;
4655
} stats;
4756

4857
Connection(uWS::WebSocket<uWS::SERVER> *p, uint64_t connId_)
@@ -56,13 +65,21 @@ void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
5665
flat_hash_map<uint64_t, Connection*> connIdToConnection;
5766
uint64_t nextConnectionId = 1;
5867
bool gracefulShutdown = false;
68+
uint64_t serverStart = ::time(nullptr);
5969

6070
std::string tempBuf;
6171
tempBuf.reserve(cfg().events__maxEventSize + MAX_SUBID_SIZE + 100);
6272

6373

6474
auto supportedNips = []{
65-
tao::json::value output = tao::json::value::array({ 1, 2, 4, 9, 11, 22, 28, 40, 45, 70, 77 });
75+
tao::json::value output = tao::json::value::array({ 1, 2, 4, 9, 11, 28, 40, 70 });
76+
77+
if (cfg().relay__auth__enabled && cfg().relay__auth__serviceUrl.size() > 0) output.push_back(42);
78+
if (cfg().relay__maxFilterLimitCount > 0) output.push_back(45);
79+
if (cfg().relay__negentropy__enabled) output.push_back(77);
80+
81+
std::sort(output.get_array().begin(), output.get_array().end());
82+
6683
if (cfg().relay__info__nips.size() == 0) return output;
6784

6885
try {
@@ -78,6 +95,14 @@ void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
7895

7996
auto getServerInfoHttpResponse = [&supportedNips, ver = uint64_t(0), rendered = std::string("")]() mutable {
8097
if (ver != cfg().version()) {
98+
auto maybeNpub = [](std::string_view sv){
99+
if (sv.starts_with("npub1")) {
100+
return to_hex(decodeBech32Simple(sv));
101+
} else {
102+
return std::string(sv);
103+
}
104+
};
105+
81106
tao::json::value nip11 = tao::json::value({
82107
{ "supported_nips", supportedNips() },
83108
{ "software", "git+https://github.com/hoytech/strfry.git" },
@@ -93,10 +118,10 @@ void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
93118
if (cfg().relay__info__name.size()) nip11["name"] = cfg().relay__info__name;
94119
if (cfg().relay__info__description.size()) nip11["description"] = cfg().relay__info__description;
95120
if (cfg().relay__info__contact.size()) nip11["contact"] = cfg().relay__info__contact;
96-
if (cfg().relay__info__pubkey.size()) nip11["pubkey"] = cfg().relay__info__pubkey;
121+
if (cfg().relay__info__pubkey.size()) nip11["pubkey"] = maybeNpub(cfg().relay__info__pubkey);
97122
if (cfg().relay__info__icon.size()) nip11["icon"] = cfg().relay__info__icon;
98123
if (cfg().relay__info__banner.size()) nip11["banner"] = cfg().relay__info__banner;
99-
if (cfg().relay__info__self.size()) nip11["self"] = cfg().relay__info__self;
124+
if (cfg().relay__info__self.size()) nip11["self"] = maybeNpub(cfg().relay__info__self);
100125
if (cfg().relay__info__privacy.size()) nip11["privacy_policy"] = cfg().relay__info__privacy;
101126
if (cfg().relay__info__terms.size()) nip11["terms_of_service"] = cfg().relay__info__terms;
102127

@@ -107,16 +132,42 @@ void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
107132
return std::string_view(rendered); // memory only valid until next call
108133
};
109134

110-
auto getLandingPageHttpResponse = [&supportedNips, ver = uint64_t(0), rendered = std::string("")]() mutable {
111-
if (ver != cfg().version()) {
135+
auto getLandingPageHttpResponse = [&supportedNips, &serverStart, ver = uint64_t(0), lastUpdate = uint64_t(0), rendered = std::string("")]() mutable {
136+
if (ver != cfg().version() || (uint64_t)::time(nullptr) - lastUpdate > 3600) {
137+
auto maybeUrl = [](std::string_view inp){
138+
if (inp.starts_with("http://") || inp.starts_with("https://")) {
139+
std::string output = "<a href=\"";
140+
output += templarInternal::htmlEscape(inp, true);
141+
output += "\">";
142+
output += templarInternal::htmlEscape(inp, false);
143+
output += "</a>";
144+
return output;
145+
}
146+
return templarInternal::htmlEscape(inp, false);
147+
};
148+
149+
auto maybeNpub = [](std::string_view inp){
150+
try {
151+
if (inp.size() != 64) throw herr("invalid length for pubkey");
152+
return encodeBech32Simple("npub", hoytech::from_hex(inp));
153+
} catch(...) {
154+
}
155+
156+
return std::string(inp);
157+
};
158+
112159
struct {
113160
std::string supportedNips;
114161
std::string version;
115162
uint64_t negentropy;
116-
} ctx = { tao::json::to_string(supportedNips()), APP_GIT_VERSION, negentropy::PROTOCOL_VERSION - 0x60 };
163+
std::function<std::string(std::string_view)> maybeUrl;
164+
std::function<std::string(std::string_view)> maybeNpub;
165+
uint64_t uptime;
166+
} ctx = { tao::json::to_string(supportedNips()), APP_GIT_VERSION, negentropy::PROTOCOL_VERSION - 0x60, maybeUrl, maybeNpub, (uint64_t)::time(nullptr) - serverStart };
117167

118168
rendered = preGenerateHttpRedirect("https://www.soloco.nl/", "text/html", ::strfrytmpl::landing(ctx).str);
119169
ver = cfg().version();
170+
lastUpdate = ::time(nullptr);
120171
}
121172

122173
return std::string_view(rendered); // memory only valid until next call
@@ -245,6 +296,8 @@ void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
245296
<< " sliding=" << (compSlidingWindow ? 'Y' : 'N')
246297
;
247298

299+
PrometheusMetrics::getInstance().activeConnections.inc();
300+
248301
if (cfg().relay__enableTcpKeepalive) {
249302
int optval = 1;
250303
if (setsockopt(ws->getFd(), SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval))) {
@@ -264,13 +317,16 @@ void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
264317
<< " (" << code << "/" << (message ? std::string_view(message, length) : "-") << ")"
265318
<< " UP: " << renderSize(c->stats.bytesUp) << " (" << upComp << " compressed)"
266319
<< " DN: " << renderSize(c->stats.bytesDown) << " (" << downComp << " compressed)"
320+
<< " Pending: " << renderSize(c->stats.pendingOutbound)
267321
;
268322

269323
tpIngester.dispatch(connId, MsgIngester{MsgIngester::CloseConn{connId}});
270324

271325
connIdToConnection.erase(connId);
272326
delete c;
273327

328+
PrometheusMetrics::getInstance().activeConnections.dec();
329+
274330
if (gracefulShutdown) {
275331
LI << "Graceful shutdown in progress: " << connIdToConnection.size() << " connections remaining";
276332
if (connIdToConnection.size() == 0) {
@@ -298,10 +354,34 @@ void RelayServer::runWebsocket(ThreadPool<MsgWebsocket>::Thread &thr) {
298354
if (it == connIdToConnection.end()) return;
299355
auto &c = *it->second;
300356

357+
// Track bytes still inside uWS's outbound path (either queued or
358+
// partially sent). Increment before send(), decrement in the
359+
// completion callback. The payload size is smuggled through the
360+
// callback's user-data pointer so the callback captures nothing and
361+
// decays to a plain function pointer.
362+
const size_t payloadSize = payload.size();
363+
c.stats.pendingOutbound += payloadSize;
364+
301365
size_t compressedSize;
302-
auto cb = [](uWS::WebSocket<uWS::SERVER> *webSocket, void *data, bool cancelled, void *reserved){};
303-
c.websocket->send(payload.data(), payload.size(), opCode, cb, nullptr, true, &compressedSize);
304-
c.stats.bytesUp += payload.size();
366+
367+
auto cb = [](uWS::WebSocket<uWS::SERVER> *ws, void *data, bool /*cancelled*/, void * /*reserved*/){
368+
// uWS invokes this exactly once per send() on every path:
369+
// - immediate send success (ws != nullptr, cancelled=false)
370+
// - immediate send failure (ws != nullptr, cancelled=true)
371+
// - queued drain success (ws != nullptr, cancelled=false)
372+
// - socket teardown (onEnd flush) (ws == nullptr, cancelled=true)
373+
// The teardown path runs after our onDisconnection handler has
374+
// already logged and deleted the Connection, so we must not
375+
// dereference anything via ws in that case. Gate on ws.
376+
if (!ws) return;
377+
auto *conn = static_cast<Connection*>(ws->getUserData());
378+
if (!conn) return;
379+
conn->stats.pendingOutbound -= reinterpret_cast<uintptr_t>(data);
380+
};
381+
c.websocket->send(payload.data(), payloadSize, opCode, cb,
382+
reinterpret_cast<void*>(static_cast<uintptr_t>(payloadSize)),
383+
true, &compressedSize);
384+
c.stats.bytesUp += payloadSize;
305385
c.stats.bytesUpCompressed += compressedSize;
306386
};
307387

0 commit comments

Comments
 (0)