@@ -60,9 +60,8 @@ Coordinator::Coordinator(std::size_t port, PlannerPtr planner)
6060
6161 handler_ = std::make_unique<Handler>(inbox_queue_, outbox_queue_, metastore_,
6262 planner_queue_, outbox_notify);
63- executor_ =
64- std::make_unique<Executor>(planner_queue_, outbox_queue_, metastore_,
65- *planner_, hint_store_, outbox_notify);
63+ executor_ = std::make_unique<Executor>(planner_queue_, outbox_queue_,
64+ metastore_, *planner_, outbox_notify);
6665}
6766
6867Coordinator::~Coordinator () {
@@ -108,12 +107,6 @@ std::optional<CopyOperationId> Coordinator::SubmitCopy(
108107 return std::nullopt ;
109108}
110109
111- void Coordinator::AddHint (setu::planner::hints::CompilerHint hint) {
112- hint_store_.AddHint (std::move (hint));
113- }
114-
115- void Coordinator::ClearHints () { hint_store_.Clear (); }
116-
117110void Coordinator::PlanExecuted (CopyOperationId copy_op_id) {
118111 LOG_DEBUG (" Plan executed for copy operation ID: {}" , copy_op_id);
119112
@@ -387,7 +380,8 @@ void Coordinator::Handler::HandleSubmitCopyRequest(
387380 metastore_.GetNumShardsForTensor (request.copy_spec .dst_name );
388381
389382 HandleShardSubmission (node_agent_identity, request.request_id ,
390- request.shard_id , request.copy_spec , expected_shards);
383+ request.shard_id , request.copy_spec , expected_shards,
384+ std::vector (request.hints ), request.hints_fingerprint );
391385}
392386
393387void Coordinator::Handler::HandleSubmitPullRequest (
@@ -413,18 +407,21 @@ void Coordinator::Handler::HandleSubmitPullRequest(
413407 metastore_.GetNumShardsForTensor (request.copy_spec .dst_name );
414408
415409 HandleShardSubmission (node_agent_identity, request.request_id ,
416- request.shard_id , request.copy_spec , expected_shards);
410+ request.shard_id , request.copy_spec , expected_shards,
411+ std::vector (request.hints ), request.hints_fingerprint );
417412}
418413
419414void Coordinator::Handler::HandleShardSubmission (
420415 const Identity& node_agent_identity, const RequestId& request_id,
421416 const ShardId& shard_id, const CopySpec& copy_spec,
422- std::size_t expected_shards) {
417+ std::size_t expected_shards,
418+ std::vector<setu::planner::hints::CompilerHint> hints,
419+ std::uint64_t hints_fingerprint) {
423420 using setu::commons::utils::AggregationParticipant;
424421
425422 CopyKey copy_key{copy_spec.src_name , copy_spec.dst_name };
426423
427- auto result = shard_aggregator_. Submit (
424+ auto result = pending_dispatch_. SubmitShard (
428425 copy_key, shard_id, copy_spec,
429426 AggregationParticipant{node_agent_identity, request_id}, expected_shards,
430427 [](const CopySpec& stored, const CopySpec& incoming) {
@@ -437,7 +434,8 @@ void Coordinator::Handler::HandleShardSubmission(
437434 *incoming.dst_selection == *stored.dst_selection ,
438435 " Shard submission {} -> {}: destination selection mismatch" ,
439436 incoming.src_name , incoming.dst_name );
440- });
437+ },
438+ std::move (hints), hints_fingerprint);
441439
442440 if (!result.has_value ()) {
443441 return ;
@@ -459,14 +457,15 @@ void Coordinator::Handler::HandleShardSubmission(
459457 }
460458
461459 // Create shared state with submitter identities
462- auto state = std::make_shared<CopyOperationState>(result-> payload ,
463- std::move (submitters));
460+ auto state =
461+ std::make_shared<CopyOperationState>(result-> spec , std::move (submitters));
464462
465463 // Store the shared state (will be accessed by HandleExecuteResponse)
466464 copy_operations_.emplace (copy_op_id, state);
467465
468- // Add to planner queue with copy_op_id and shared state
469- planner_queue_.push (PlannerTask{copy_op_id, result->payload , state});
466+ // Add to planner queue with copy_op_id, shared state, and per-op hints
467+ planner_queue_.push (PlannerTask{copy_op_id, result->spec , state,
468+ HintStore (std::move (result->hints ))});
470469
471470 // Send responses to all waiting participants with copy_op_id
472471 for (const auto & participant : result->participants ) {
@@ -588,15 +587,25 @@ void Coordinator::Handler::HandleDeregisterShardsRequest(
588587 metastore_.MarkTensorDeregistered (name);
589588 }
590589
591- // Cancel partial entries in the shard aggregator for these tensors.
590+ // Cancel partial entries in the pending dispatch for these tensors.
592591 // This cleans up groups that will never complete because the shards are
593592 // going away.
594593 auto cancelled_participants =
595- shard_aggregator_ .CancelIf ([&tensor_names](const CopyKey& key) {
594+ pending_dispatch_ .CancelIf ([&tensor_names](const CopyKey& key) {
596595 return tensor_names.contains (key.src_name ) ||
597596 tensor_names.contains (key.dst_name );
598597 });
599598
599+ // Clean up per-operation hint tracking for cancelled operations
600+ std::erase_if (operation_hints_, [&tensor_names](const auto & entry) {
601+ return tensor_names.contains (entry.first .src_name ) ||
602+ tensor_names.contains (entry.first .dst_name );
603+ });
604+ std::erase_if (operation_fingerprints_, [&tensor_names](const auto & entry) {
605+ return tensor_names.contains (entry.first .src_name ) ||
606+ tensor_names.contains (entry.first .dst_name );
607+ });
608+
600609 // Send error responses to cancelled participants
601610 for (const auto & participant : cancelled_participants) {
602611 LOG_INFO (
@@ -651,13 +660,11 @@ void Coordinator::Handler::HandleDeregisterShardsRequest(
651660Coordinator::Executor::Executor (Queue<PlannerTask>& planner_queue,
652661 Queue<OutboxMessage>& outbox_queue,
653662 MetaStore& metastore, Planner& planner,
654- HintStore& hint_store,
655663 OutboxNotifyFn outbox_notify)
656664 : planner_queue_(planner_queue),
657665 outbox_queue_(outbox_queue),
658666 metastore_(metastore),
659667 planner_(planner),
660- hint_store_(hint_store),
661668 outbox_notify_(std::move(outbox_notify)) {}
662669
663670void Coordinator::Executor::PushOutbox (OutboxMessage msg) {
@@ -690,9 +697,8 @@ void Coordinator::Executor::Loop() {
690697
691698 LOG_DEBUG (" Executor received task for copy_op_id: {}" , task.copy_op_id );
692699
693- auto hints = hint_store_.Snapshot ();
694700 auto t_compile_start = std::chrono::steady_clock::now ();
695- Plan plan = planner_.Compile (task.copy_spec , metastore_, hints);
701+ Plan plan = planner_.Compile (task.copy_spec , metastore_, task. hints );
696702 auto t_compile_end = std::chrono::steady_clock::now ();
697703
698704 LOG_DEBUG (" Compiled plan:\n {}" , plan);
0 commit comments