Skip to content

Commit 1ffae0f

Browse files
committed
Add outbox wake-up notification to Coordinator Gateway
Uses an inproc PAIR socket self-pipe pattern so that Gateway's zmq::poll() returns immediately when Handler or Executor pushes a message to the outbox queue, eliminating the poll-timeout latency.
1 parent a07f840 commit 1ffae0f

2 files changed

Lines changed: 97 additions & 22 deletions

File tree

csrc/setu/coordinator/Coordinator.cpp

Lines changed: 75 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,14 @@ Coordinator::Coordinator(std::size_t port, PlannerPtr planner)
5353
planner_(planner) {
5454
gateway_ = std::make_unique<Gateway>(zmq_context_, port_, inbox_queue_,
5555
outbox_queue_);
56+
57+
auto outbox_notify = [this]() { gateway_->NotifyOutbox(); };
58+
5659
handler_ = std::make_unique<Handler>(inbox_queue_, outbox_queue_, metastore_,
57-
planner_queue_);
58-
executor_ = std::make_unique<Executor>(planner_queue_, outbox_queue_,
59-
metastore_, *planner_, hint_store_);
60+
planner_queue_, outbox_notify);
61+
executor_ =
62+
std::make_unique<Executor>(planner_queue_, outbox_queue_, metastore_,
63+
*planner_, hint_store_, outbox_notify);
6064
}
6165

6266
Coordinator::~Coordinator() {
@@ -136,9 +140,28 @@ Coordinator::Gateway::~Gateway() {
136140
void Coordinator::Gateway::InitSockets() {
137141
node_agent_socket_ = ZmqHelper::CreateAndBindSocket(
138142
zmq_context_, zmq::socket_type::router, port_);
143+
144+
// Create inproc PAIR sockets for self-pipe wakeup pattern
145+
wakeup_recv_ =
146+
std::make_shared<zmq::socket_t>(*zmq_context_, zmq::socket_type::pair);
147+
wakeup_send_ =
148+
std::make_shared<zmq::socket_t>(*zmq_context_, zmq::socket_type::pair);
149+
wakeup_recv_->bind("inproc://gateway-wakeup");
150+
wakeup_send_->connect("inproc://gateway-wakeup");
151+
}
152+
153+
void Coordinator::Gateway::NotifyOutbox() {
154+
// Send a single byte to wake the Gateway from zmq::poll().
155+
// Uses dontwait to avoid blocking the caller if the pipe is full.
156+
zmq::message_t signal(1);
157+
static_cast<char*>(signal.data())[0] = 'W';
158+
[[maybe_unused]] auto result =
159+
wakeup_send_->send(std::move(signal), zmq::send_flags::dontwait);
139160
}
140161

141162
void Coordinator::Gateway::CloseSockets() {
163+
if (wakeup_send_) wakeup_send_->close();
164+
if (wakeup_recv_) wakeup_recv_->close();
142165
if (node_agent_socket_) {
143166
node_agent_socket_->close();
144167
}
@@ -163,8 +186,9 @@ void Coordinator::Gateway::Stop() {
163186
void Coordinator::Gateway::Loop() {
164187
running_ = true;
165188
while (running_) {
166-
// Poll for incoming messages from NodeAgents
167-
auto ready = Comm::PollForRead({node_agent_socket_}, kPollTimeoutMs);
189+
// Poll for incoming messages from NodeAgents OR wakeup signal
190+
auto ready =
191+
Comm::PollForRead({node_agent_socket_, wakeup_recv_}, kPollTimeoutMs);
168192

169193
for (const auto& socket : ready) {
170194
if (socket == node_agent_socket_) {
@@ -175,6 +199,12 @@ void Coordinator::Gateway::Loop() {
175199
if (status == boost::queue_op_status::closed) {
176200
return;
177201
}
202+
} else if (socket == wakeup_recv_) {
203+
// Drain all wakeup signals (there may be multiple)
204+
zmq::message_t drain;
205+
while (
206+
wakeup_recv_->recv(drain, zmq::recv_flags::dontwait).has_value()) {
207+
}
178208
}
179209
}
180210

@@ -198,11 +228,18 @@ void Coordinator::Gateway::Loop() {
198228
Coordinator::Handler::Handler(Queue<InboxMessage>& inbox_queue,
199229
Queue<OutboxMessage>& outbox_queue,
200230
MetaStore& metastore,
201-
Queue<PlannerTask>& planner_queue)
231+
Queue<PlannerTask>& planner_queue,
232+
OutboxNotifyFn outbox_notify)
202233
: inbox_queue_(inbox_queue),
203234
outbox_queue_(outbox_queue),
204235
metastore_(metastore),
205-
planner_queue_(planner_queue) {}
236+
planner_queue_(planner_queue),
237+
outbox_notify_(std::move(outbox_notify)) {}
238+
239+
void Coordinator::Handler::PushOutbox(OutboxMessage msg) {
240+
outbox_queue_.push(std::move(msg));
241+
outbox_notify_();
242+
}
206243

207244
void Coordinator::Handler::Start() {
208245
if (running_.load()) {
@@ -274,12 +311,12 @@ void Coordinator::Handler::HandleRegisterTensorShardRequest(
274311
if (shard_metadata_ptr) {
275312
RegisterTensorShardCoordinatorResponse response(
276313
request.request_id, ErrorCode::kSuccess, *shard_metadata_ptr);
277-
outbox_queue_.push(OutboxMessage{node_agent_identity, response});
314+
PushOutbox(OutboxMessage{node_agent_identity, response});
278315
} else {
279316
LOG_ERROR("Failed to register tensor shard: {}", request.tensor_shard_spec);
280317
RegisterTensorShardCoordinatorResponse response(
281318
request.request_id, ErrorCode::kInvalidArguments);
282-
outbox_queue_.push(OutboxMessage{node_agent_identity, response});
319+
PushOutbox(OutboxMessage{node_agent_identity, response});
283320
return;
284321
}
285322

@@ -305,7 +342,7 @@ void Coordinator::Handler::HandleRegisterTensorShardRequest(
305342
for (const auto& [owner_id, shard_ids] : owner_to_shard_ids) {
306343
Identity owner_identity = to_string(owner_id) + "_dealer";
307344
AllocateTensorRequest allocate_request(shard_ids);
308-
outbox_queue_.push(OutboxMessage{owner_identity, allocate_request});
345+
PushOutbox(OutboxMessage{owner_identity, allocate_request});
309346
}
310347
}
311348
}
@@ -370,9 +407,9 @@ void Coordinator::Handler::HandleShardSubmission(
370407
CopyOperationId copy_op_id = GenerateUUID();
371408

372409
LOG_INFO(
373-
"All shards submitted for {} -> {}, "
410+
"All {} shards submitted for {} -> {}, "
374411
"copy_op_id={}, adding to planner queue",
375-
copy_spec.src_name, copy_spec.dst_name, copy_op_id);
412+
expected_shards, copy_spec.src_name, copy_spec.dst_name, copy_op_id);
376413

377414
// Collect submitter identities
378415
std::vector<Identity> submitters;
@@ -395,7 +432,7 @@ void Coordinator::Handler::HandleShardSubmission(
395432
for (const auto& participant : result->participants) {
396433
SubmitCopyResponse response(participant.request_id, copy_op_id,
397434
ErrorCode::kSuccess);
398-
outbox_queue_.push(OutboxMessage{participant.identity, response});
435+
PushOutbox(OutboxMessage{participant.identity, response});
399436
}
400437
}
401438

@@ -428,7 +465,7 @@ void Coordinator::Handler::HandleExecuteResponse(
428465
// Send CopyOperationFinishedRequest to all SUBMITTERS
429466
for (const auto& submitter_identity : state->submitters) {
430467
CopyOperationFinishedRequest finish_req(response.copy_op_id);
431-
outbox_queue_.push(OutboxMessage{submitter_identity, finish_req});
468+
PushOutbox(OutboxMessage{submitter_identity, finish_req});
432469
}
433470

434471
copy_operations_.erase(it);
@@ -449,7 +486,7 @@ void Coordinator::Handler::HandleGetTensorSpecRequest(
449486

450487
GetTensorSpecResponse response(request.request_id, ErrorCode::kSuccess,
451488
*tensor_spec);
452-
outbox_queue_.push(OutboxMessage{node_agent_identity, response});
489+
PushOutbox(OutboxMessage{node_agent_identity, response});
453490
}
454491

455492
//==============================================================================
@@ -458,12 +495,19 @@ void Coordinator::Handler::HandleGetTensorSpecRequest(
458495
Coordinator::Executor::Executor(Queue<PlannerTask>& planner_queue,
459496
Queue<OutboxMessage>& outbox_queue,
460497
MetaStore& metastore, Planner& planner,
461-
HintStore& hint_store)
498+
HintStore& hint_store,
499+
OutboxNotifyFn outbox_notify)
462500
: planner_queue_(planner_queue),
463501
outbox_queue_(outbox_queue),
464502
metastore_(metastore),
465503
planner_(planner),
466-
hint_store_(hint_store) {}
504+
hint_store_(hint_store),
505+
outbox_notify_(std::move(outbox_notify)) {}
506+
507+
void Coordinator::Executor::PushOutbox(OutboxMessage msg) {
508+
outbox_queue_.push(std::move(msg));
509+
outbox_notify_();
510+
}
467511

468512
void Coordinator::Executor::Start() {
469513
if (running_.load()) {
@@ -486,11 +530,14 @@ void Coordinator::Executor::Loop() {
486530
while (running_) {
487531
try {
488532
PlannerTask task = planner_queue_.pull();
533+
auto t_after_dequeue = std::chrono::steady_clock::now();
489534

490535
LOG_DEBUG("Executor received task for copy_op_id: {}", task.copy_op_id);
491536

492537
auto hints = hint_store_.Snapshot();
538+
auto t_compile_start = std::chrono::steady_clock::now();
493539
Plan plan = planner_.Compile(task.copy_spec, metastore_, hints);
540+
auto t_compile_end = std::chrono::steady_clock::now();
494541

495542
LOG_DEBUG("Compiled plan:\n{}", plan);
496543

@@ -503,16 +550,24 @@ void Coordinator::Executor::Loop() {
503550

504551
ExecuteRequest execute_request(task.copy_op_id, std::move(node_plan));
505552

506-
outbox_queue_.push(OutboxMessage{node_identity, execute_request});
553+
PushOutbox(OutboxMessage{node_identity, execute_request});
507554
}
508555

509556
// Set expected responses
510557
// memory order release so Handler thread can pick it up (using memory
511558
// order aqcuire)
512559
task.state->expected_responses.store(fragments.size(),
513560
std::memory_order_release);
514-
LOG_DEBUG("Executor: Set expected_responses={} for copy_op_id={}",
515-
fragments.size(), task.copy_op_id);
561+
562+
auto t_end = std::chrono::steady_clock::now();
563+
auto to_us = [](auto d) {
564+
return std::chrono::duration_cast<std::chrono::microseconds>(d).count();
565+
};
566+
LOG_INFO(
567+
"Executor: copy_op_id={}, compile={}us, fragment+dispatch={}us, "
568+
"total={}us",
569+
task.copy_op_id, to_us(t_compile_end - t_compile_start),
570+
to_us(t_end - t_compile_end), to_us(t_end - t_after_dequeue));
516571

517572
} catch (const boost::concurrent::sync_queue_is_closed&) {
518573
return;

csrc/setu/coordinator/Coordinator.h

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ class Coordinator {
161161
void Start();
162162
void Stop();
163163

164+
/// @brief Wake the Gateway from its poll() call so it drains the outbox
165+
/// immediately. Thread-safe: can be called from any thread.
166+
void NotifyOutbox();
167+
164168
private:
165169
void InitSockets();
166170
void CloseSockets();
@@ -174,24 +178,35 @@ class Coordinator {
174178

175179
ZmqSocketPtr node_agent_socket_;
176180

181+
/// Inproc PAIR sockets for self-pipe wakeup pattern.
182+
/// When a producer pushes to outbox_queue_, it sends a byte on
183+
/// wakeup_send_ which causes zmq::poll() (watching wakeup_recv_) to
184+
/// return immediately.
185+
ZmqSocketPtr wakeup_recv_;
186+
ZmqSocketPtr wakeup_send_;
187+
177188
std::thread thread_;
178189
std::atomic<bool> running_{false};
179190
};
180191

181192
//============================================================================
182193
// Handler: Processes incoming requests (pure business logic, no ZMQ)
183194
//============================================================================
195+
using OutboxNotifyFn = std::function<void()>;
196+
184197
struct Handler {
185198
Handler(Queue<InboxMessage>& inbox_queue,
186199
Queue<OutboxMessage>& outbox_queue, MetaStore& metastore,
187-
Queue<PlannerTask>& planner_queue);
200+
Queue<PlannerTask>& planner_queue, OutboxNotifyFn outbox_notify);
188201

189202
void Start();
190203
void Stop();
191204

192205
private:
193206
void Loop();
194207

208+
void PushOutbox(OutboxMessage msg);
209+
195210
void HandleRegisterTensorShardRequest(
196211
const Identity& node_agent_identity,
197212
const RegisterTensorShardRequest& request);
@@ -228,6 +243,7 @@ class Coordinator {
228243
Queue<OutboxMessage>& outbox_queue_;
229244
MetaStore& metastore_;
230245
Queue<PlannerTask>& planner_queue_;
246+
OutboxNotifyFn outbox_notify_;
231247

232248
/// Aggregates shard submissions per (src, dst) pair until all expected
233249
/// shards arrive
@@ -249,19 +265,23 @@ class Coordinator {
249265
struct Executor {
250266
Executor(Queue<PlannerTask>& planner_queue,
251267
Queue<OutboxMessage>& outbox_queue, MetaStore& metastore,
252-
Planner& planner, HintStore& hint_store);
268+
Planner& planner, HintStore& hint_store,
269+
OutboxNotifyFn outbox_notify);
253270

254271
void Start();
255272
void Stop();
256273

257274
private:
258275
void Loop();
259276

277+
void PushOutbox(OutboxMessage msg);
278+
260279
Queue<PlannerTask>& planner_queue_;
261280
Queue<OutboxMessage>& outbox_queue_;
262281
MetaStore& metastore_;
263282
Planner& planner_;
264283
HintStore& hint_store_;
284+
OutboxNotifyFn outbox_notify_;
265285

266286
std::thread thread_;
267287
std::atomic<bool> running_{false};

0 commit comments

Comments
 (0)