@@ -655,7 +655,7 @@ public Answer[] send(final Long hostId, final Commands commands, int timeout) th
655655 final Command [] cmds = checkForCommandsAndTag (commands );
656656
657657 //check what agent is returned.
658- final AgentAttache agent = getAttache (hostId );
658+ AgentAttache agent = getAttache (hostId );
659659 if (agent == null || agent .isClosed ()) {
660660 throw new AgentUnavailableException ("agent not logged into this management server" , hostId );
661661 }
@@ -665,7 +665,41 @@ public Answer[] send(final Long hostId, final Commands commands, int timeout) th
665665
666666 reconcileCommandService .persistReconcileCommands (hostId , req .getSequence (), cmds );
667667
668- final Answer [] answers = agent .send (req , wait );
668+ final int retries = Math .max (0 , getAgentSendRetryCount ());
669+ final int intervalMs = Math .max (0 , getAgentSendRetryIntervalMs ());
670+ AgentUnavailableException last = null ;
671+ Answer [] answers = null ;
672+
673+ for (int attempt = 0 ; attempt <= retries ; attempt ++) {
674+ if (attempt > 0 && intervalMs > 0 ) {
675+ sleepRetry (intervalMs );
676+ }
677+
678+ try {
679+ agent = resolveAttacheForRetry (hostId , agent , attempt > 0 );
680+ } catch (AgentUnavailableException e ) {
681+ last = e ;
682+ continue ;
683+ }
684+
685+ if (isForwardWithoutPeer (agent , hostId )) {
686+ last = new AgentUnavailableException ("Unable to find peer" , hostId );
687+ agent = null ;
688+ continue ;
689+ }
690+
691+ try {
692+ answers = agent .send (req , wait );
693+ break ;
694+ } catch (AgentUnavailableException e ) {
695+ last = e ;
696+ agent = null ;
697+ }
698+ }
699+
700+ if (answers == null ) {
701+ throw (last != null ) ? last : new AgentUnavailableException ("agent not logged into this management server" , hostId );
702+ }
669703
670704 reconcileCommandService .processAnswers (req .getSequence (), cmds , answers );
671705
@@ -705,7 +739,7 @@ protected AgentAttache getAttache(final Long hostId) throws AgentUnavailableExce
705739
706740 @ Override
707741 public long send (final Long hostId , final Commands commands , final Listener listener ) throws AgentUnavailableException {
708- final AgentAttache agent = getAttache (hostId );
742+ AgentAttache agent = getAttache (hostId );
709743 if (agent .isClosed ()) {
710744 throw new AgentUnavailableException (String .format (
711745 "Agent [id: %d, name: %s] is closed" ,
@@ -717,10 +751,134 @@ public long send(final Long hostId, final Commands commands, final Listener list
717751 final Request req = new Request (hostId , agent .getName (), _nodeId , cmds , commands .stopOnError (), true );
718752 req .setSequence (agent .getNextSequence ());
719753
720- agent .send (req , listener );
754+ final int retries = Math .max (0 , getAgentSendRetryCount ());
755+ final int intervalMs = Math .max (0 , getAgentSendRetryIntervalMs ());
756+ AgentUnavailableException last = null ;
757+
758+ for (int attempt = 0 ; attempt <= retries ; attempt ++) {
759+ if (attempt > 0 && intervalMs > 0 ) {
760+ sleepRetry (intervalMs );
761+ }
762+
763+ try {
764+ agent = resolveAttacheForRetry (hostId , agent , attempt > 0 );
765+ } catch (AgentUnavailableException e ) {
766+ last = e ;
767+ continue ;
768+ }
769+
770+ if (isForwardWithoutPeer (agent , hostId )) {
771+ last = new AgentUnavailableException ("Unable to find peer" , hostId );
772+ agent = null ;
773+ continue ;
774+ }
775+
776+ try {
777+ agent .send (req , listener );
778+ last = null ;
779+ break ;
780+ } catch (AgentUnavailableException e ) {
781+ last = e ;
782+ agent = null ;
783+ }
784+ }
785+
786+ if (last != null ) {
787+ throw last ;
788+ }
721789 return req .getSequence ();
722790 }
723791
792+ protected int getAgentSendRetryCount () {
793+ return 0 ;
794+ }
795+
796+ protected int getAgentSendRetryIntervalMs () {
797+ return 0 ;
798+ }
799+
800+ protected AgentAttache resolveAttacheForRetry (final Long hostId , final AgentAttache current , final boolean forceReload )
801+ throws AgentUnavailableException {
802+ AgentAttache agent = current ;
803+ if (forceReload || agent == null || agent .isClosed ()) {
804+ agent = findAttache (hostId );
805+ }
806+ if (agent == null || agent .isClosed ()) {
807+ agent = getAttache (hostId );
808+ }
809+ return agent ;
810+ }
811+
812+ protected boolean isForwardWithoutPeer (final AgentAttache agent , final Long hostId ) {
813+ if (agent == null || hostId == null ) {
814+ return false ;
815+ }
816+ if (!(this instanceof ClusteredAgentManagerImpl ) || !agent .forForward ()) {
817+ return false ;
818+ }
819+ final ClusteredAgentManagerImpl clusteredMgr = (ClusteredAgentManagerImpl )this ;
820+ return clusteredMgr .getPeerName (hostId ) == null ;
821+ }
822+
823+ protected void sleepRetry (final int intervalMs ) {
824+ try {
825+ Thread .sleep (intervalMs );
826+ } catch (InterruptedException ie ) {
827+ Thread .currentThread ().interrupt ();
828+ }
829+ }
830+
831+ protected AgentAttache resolveAttacheForSend (final Long hostId , final AgentAttache agent ) throws AgentUnavailableException {
832+ if (hostId == null || agent == null ) {
833+ return agent ;
834+ }
835+
836+ // Only clustered forwarding attaches need peer resolution.
837+ if (!(this instanceof ClusteredAgentManagerImpl ) || !agent .forForward ()) {
838+ return agent ;
839+ }
840+
841+ final ClusteredAgentManagerImpl clusteredMgr = (ClusteredAgentManagerImpl )this ;
842+ if (clusteredMgr .getPeerName (hostId ) != null ) {
843+ return agent ;
844+ }
845+
846+ final int retries = Math .max (0 , clusteredMgr .getPeerLookupRetryCount ());
847+ final int intervalMs = Math .max (0 , clusteredMgr .getPeerLookupRetryIntervalMs ());
848+ if (retries <= 0 ) {
849+ return agent ;
850+ }
851+
852+ for (int attempt = 1 ; attempt <= retries ; attempt ++) {
853+ if (intervalMs > 0 ) {
854+ try {
855+ Thread .sleep (intervalMs );
856+ } catch (InterruptedException ie ) {
857+ Thread .currentThread ().interrupt ();
858+ break ;
859+ }
860+ }
861+
862+ final AgentAttache current = findAttache (hostId );
863+ if (current == null || current .isClosed ()) {
864+ continue ;
865+ }
866+
867+ // If the agent reconnected locally while we were retrying, use it.
868+ if (!current .forForward ()) {
869+ return current ;
870+ }
871+
872+ // If the host ownership mapping updated to a remote MS, forwarding can proceed.
873+ if (clusteredMgr .getPeerName (hostId ) != null ) {
874+ return current ;
875+ }
876+ }
877+
878+ // Preserve the original error semantics, but fail before we persist reconcile commands.
879+ throw new AgentUnavailableException ("Unable to find peer" , hostId );
880+ }
881+
724882 public void removeAgent (final AgentAttache attache , final Status nextState ) {
725883 if (attache == null ) {
726884 return ;
0 commit comments