@@ -60,9 +60,8 @@ Coordinator::Coordinator(std::size_t port, PlannerPtr planner)
6060
6161 handler_ = std::make_unique<Handler>(inbox_queue_, outbox_queue_, metastore_,
6262 planner_queue_, outbox_notify);
63- executor_ =
64- std::make_unique<Executor>(planner_queue_, outbox_queue_, metastore_,
65- *planner_, hint_store_, outbox_notify);
63+ executor_ = std::make_unique<Executor>(planner_queue_, outbox_queue_,
64+ metastore_, *planner_, outbox_notify);
6665}
6766
6867Coordinator::~Coordinator () {
@@ -108,12 +107,6 @@ std::optional<CopyOperationId> Coordinator::SubmitCopy(
108107 return std::nullopt ;
109108}
110109
111- void Coordinator::AddHint (setu::planner::hints::CompilerHint hint) {
112- hint_store_.AddHint (std::move (hint));
113- }
114-
115- void Coordinator::ClearHints () { hint_store_.Clear (); }
116-
117110void Coordinator::PlanExecuted (CopyOperationId copy_op_id) {
118111 LOG_DEBUG (" Plan executed for copy operation ID: {}" , copy_op_id);
119112
@@ -387,7 +380,8 @@ void Coordinator::Handler::HandleSubmitCopyRequest(
387380 metastore_.GetNumShardsForTensor (request.copy_spec .dst_name );
388381
389382 HandleShardSubmission (node_agent_identity, request.request_id ,
390- request.shard_id , request.copy_spec , expected_shards);
383+ request.shard_id , request.copy_spec , expected_shards,
384+ std::vector (request.hints ), request.hints_fingerprint );
391385}
392386
393387void Coordinator::Handler::HandleSubmitPullRequest (
@@ -413,17 +407,38 @@ void Coordinator::Handler::HandleSubmitPullRequest(
413407 metastore_.GetNumShardsForTensor (request.copy_spec .dst_name );
414408
415409 HandleShardSubmission (node_agent_identity, request.request_id ,
416- request.shard_id , request.copy_spec , expected_shards);
410+ request.shard_id , request.copy_spec , expected_shards,
411+ std::vector (request.hints ), request.hints_fingerprint );
417412}
418413
419414void Coordinator::Handler::HandleShardSubmission (
420415 const Identity& node_agent_identity, const RequestId& request_id,
421416 const ShardId& shard_id, const CopySpec& copy_spec,
422- std::size_t expected_shards) {
417+ std::size_t expected_shards,
418+ std::vector<setu::planner::hints::CompilerHint> hints,
419+ std::uint64_t hints_fingerprint) {
423420 using setu::commons::utils::AggregationParticipant;
424421
425422 CopyKey copy_key{copy_spec.src_name , copy_spec.dst_name };
426423
424+ // First-writer-wins hint storage: the first shard submission's hints
425+ // become authoritative for this operation.
426+ if (operation_hints_.find (copy_key) == operation_hints_.end ()) {
427+ // First shard for this operation — store its hints
428+ operation_hints_[copy_key] = std::move (hints);
429+ operation_fingerprints_[copy_key] = hints_fingerprint;
430+ } else {
431+ // Subsequent shard — verify fingerprint in debug mode
432+ if (setu::commons::Logger::log_level <= setu::commons::LogLevel::kDebug ) {
433+ ASSERT_VALID_RUNTIME (
434+ hints_fingerprint == operation_fingerprints_[copy_key],
435+ " SPMD hint mismatch for {} -> {}: shard {} sent fingerprint {} but "
436+ " first submission had {}" ,
437+ copy_spec.src_name , copy_spec.dst_name , shard_id, hints_fingerprint,
438+ operation_fingerprints_[copy_key]);
439+ }
440+ }
441+
427442 auto result = shard_aggregator_.Submit (
428443 copy_key, shard_id, copy_spec,
429444 AggregationParticipant{node_agent_identity, request_id}, expected_shards,
@@ -465,8 +480,14 @@ void Coordinator::Handler::HandleShardSubmission(
465480 // Store the shared state (will be accessed by HandleExecuteResponse)
466481 copy_operations_.emplace (copy_op_id, state);
467482
468- // Add to planner queue with copy_op_id and shared state
469- planner_queue_.push (PlannerTask{copy_op_id, result->payload , state});
483+ // Extract per-operation hints and clean up tracking maps
484+ auto op_hints = std::move (operation_hints_[copy_key]);
485+ operation_hints_.erase (copy_key);
486+ operation_fingerprints_.erase (copy_key);
487+
488+ // Add to planner queue with copy_op_id, shared state, and per-op hints
489+ planner_queue_.push (PlannerTask{copy_op_id, result->payload , state,
490+ HintStore (std::move (op_hints))});
470491
471492 // Send responses to all waiting participants with copy_op_id
472493 for (const auto & participant : result->participants ) {
@@ -583,6 +604,16 @@ void Coordinator::Handler::HandleDeregisterShardsRequest(
583604 tensor_names.contains (key.dst_name );
584605 });
585606
607+ // Clean up per-operation hint tracking for cancelled operations
608+ std::erase_if (operation_hints_, [&tensor_names](const auto & entry) {
609+ return tensor_names.contains (entry.first .src_name ) ||
610+ tensor_names.contains (entry.first .dst_name );
611+ });
612+ std::erase_if (operation_fingerprints_, [&tensor_names](const auto & entry) {
613+ return tensor_names.contains (entry.first .src_name ) ||
614+ tensor_names.contains (entry.first .dst_name );
615+ });
616+
586617 // Send error responses to cancelled participants
587618 for (const auto & participant : cancelled_participants) {
588619 LOG_INFO (
@@ -637,13 +668,11 @@ void Coordinator::Handler::HandleDeregisterShardsRequest(
637668Coordinator::Executor::Executor (Queue<PlannerTask>& planner_queue,
638669 Queue<OutboxMessage>& outbox_queue,
639670 MetaStore& metastore, Planner& planner,
640- HintStore& hint_store,
641671 OutboxNotifyFn outbox_notify)
642672 : planner_queue_(planner_queue),
643673 outbox_queue_(outbox_queue),
644674 metastore_(metastore),
645675 planner_(planner),
646- hint_store_(hint_store),
647676 outbox_notify_(std::move(outbox_notify)) {}
648677
649678void Coordinator::Executor::PushOutbox (OutboxMessage msg) {
@@ -676,9 +705,8 @@ void Coordinator::Executor::Loop() {
676705
677706 LOG_DEBUG (" Executor received task for copy_op_id: {}" , task.copy_op_id );
678707
679- auto hints = hint_store_.Snapshot ();
680708 auto t_compile_start = std::chrono::steady_clock::now ();
681- Plan plan = planner_.Compile (task.copy_spec , metastore_, hints);
709+ Plan plan = planner_.Compile (task.copy_spec , metastore_, task. hints );
682710 auto t_compile_end = std::chrono::steady_clock::now ();
683711
684712 LOG_DEBUG (" Compiled plan:\n {}" , plan);
0 commit comments