2626import java .util .ArrayList ;
2727import java .util .Collection ;
2828import java .util .Collections ;
29+ import java .util .Comparator ;
2930import java .util .EnumSet ;
3031import java .util .HashMap ;
3132import java .util .HashSet ;
3233import java .util .List ;
3334import java .util .Map ;
3435import java .util .Optional ;
3536import java .util .Set ;
37+ import java .util .concurrent .Executors ;
38+ import java .util .concurrent .ScheduledExecutorService ;
39+ import java .util .concurrent .TimeUnit ;
3640import java .util .stream .Collectors ;
3741
3842import javax .inject .Inject ;
43+ import javax .naming .ConfigurationException ;
3944
4045import org .apache .cloudstack .acl .RoleType ;
4146import org .apache .cloudstack .api .ApiConstants ;
5964import org .apache .cloudstack .framework .extensions .api .UnregisterExtensionCmd ;
6065import org .apache .cloudstack .framework .extensions .api .UpdateCustomActionCmd ;
6166import org .apache .cloudstack .framework .extensions .api .UpdateExtensionCmd ;
67+ import org .apache .cloudstack .framework .extensions .command .GetExtensionEntryPointChecksumCommand ;
6268import org .apache .cloudstack .framework .extensions .dao .ExtensionCustomActionDao ;
6369import org .apache .cloudstack .framework .extensions .dao .ExtensionCustomActionDetailsDao ;
6470import org .apache .cloudstack .framework .extensions .dao .ExtensionDao ;
7177import org .apache .cloudstack .framework .extensions .vo .ExtensionResourceMapDetailsVO ;
7278import org .apache .cloudstack .framework .extensions .vo .ExtensionResourceMapVO ;
7379import org .apache .cloudstack .framework .extensions .vo .ExtensionVO ;
80+ import org .apache .cloudstack .managed .context .ManagedContextRunnable ;
81+ import org .apache .cloudstack .management .ManagementServerHost ;
82+ import org .apache .cloudstack .utils .identity .ManagementServerNode ;
7483import org .apache .commons .collections .CollectionUtils ;
7584import org .apache .commons .collections .MapUtils ;
7685import org .apache .commons .lang3 .EnumUtils ;
7988
8089import com .cloud .agent .AgentManager ;
8190import com .cloud .agent .api .Answer ;
91+ import com .cloud .agent .api .Command ;
8292import com .cloud .agent .api .RunCustomActionAnswer ;
8393import com .cloud .agent .api .RunCustomActionCommand ;
94+ import com .cloud .alert .AlertManager ;
95+ import com .cloud .cluster .ClusterManager ;
96+ import com .cloud .cluster .ManagementServerHostVO ;
97+ import com .cloud .cluster .dao .ManagementServerHostDao ;
8498import com .cloud .dc .ClusterVO ;
8599import com .cloud .dc .dao .ClusterDao ;
86100import com .cloud .event .ActionEvent ;
95109import com .cloud .hypervisor .ExternalProvisioner ;
96110import com .cloud .hypervisor .Hypervisor ;
97111import com .cloud .org .Cluster ;
112+ import com .cloud .serializer .GsonHelper ;
98113import com .cloud .utils .Pair ;
99114import com .cloud .utils .component .ManagerBase ;
100115import com .cloud .utils .component .PluggableService ;
116+ import com .cloud .utils .concurrency .NamedThreadFactory ;
101117import com .cloud .utils .db .EntityManager ;
102118import com .cloud .utils .db .Filter ;
119+ import com .cloud .utils .db .GlobalLock ;
103120import com .cloud .utils .db .SearchBuilder ;
104121import com .cloud .utils .db .SearchCriteria ;
105122import com .cloud .utils .db .Transaction ;
@@ -154,6 +171,17 @@ public class ExtensionsManagerImpl extends ManagerBase implements ExtensionsMana
154171 @ Inject
155172 EntityManager entityManager ;
156173
174+ @ Inject
175+ ManagementServerHostDao managementServerHostDao ;
176+
177+ @ Inject
178+ ClusterManager clusterManager ;
179+
180+ @ Inject
181+ AlertManager alertManager ;
182+
183+ private ScheduledExecutorService entryPointSyncCheckExecutor ;
184+
157185 protected String getExtensionSafeName (String name ) {
158186 return name .replaceAll ("[^a-zA-Z0-9._-]" , "_" ).toLowerCase ();
159187 }
@@ -969,6 +997,37 @@ private Map<String, Object> getExternalAccessDetails(Map<String, String> actionD
969997 return externalDetails ;
970998 }
971999
1000+ @ Override
1001+ public String handleGetExtensionEntryPointChecksumCommand (GetExtensionEntryPointChecksumCommand cmd ) {
1002+ final String extensionName = cmd .getExtensionName ();
1003+ final String extensionRelativeEntryPointPath = cmd .getExtensionRelativeEntryPointPath ();
1004+ logger .debug ("Received GetExtensionEntryPointChecksumCommand from MS: {} for extension [id: {}, name: {}, relativeEntryPoint: {}]" ,
1005+ cmd .getMsId (), cmd .getExtensionId (), extensionName , extensionRelativeEntryPointPath );
1006+ return externalProvisioner .getChecksumForExtensionEntryPoint (extensionName , extensionRelativeEntryPointPath );
1007+ }
1008+
1009+ @ Override
1010+ public boolean start () {
1011+ long syncCheckInitialDelay = 120 ;
1012+ long syncCheckInterval = 600 ;
1013+ logger .debug ("Scheduling extensions entrypoint sync check task with initial delay={}s and interval={}s" ,
1014+ syncCheckInitialDelay , syncCheckInterval );
1015+ entryPointSyncCheckExecutor .scheduleWithFixedDelay (new EntryPointSyncCheckWorker (),
1016+ syncCheckInitialDelay , syncCheckInterval , TimeUnit .SECONDS );
1017+ return true ;
1018+ }
1019+
1020+ @ Override
1021+ public boolean configure (String name , Map <String , Object > params ) throws ConfigurationException {
1022+ try {
1023+ entryPointSyncCheckExecutor = Executors .newScheduledThreadPool (1 ,
1024+ new NamedThreadFactory ("Extension-EntryPoint-Sync-Check" ));
1025+ } catch (final Exception e ) {
1026+ throw new ConfigurationException ("Unable to to configure ExtensionsManagerImpl" );
1027+ }
1028+ return true ;
1029+ }
1030+
9721031 @ Override
9731032 public List <Class <?>> getCommands () {
9741033 List <Class <?>> cmds = new ArrayList <>();
@@ -986,4 +1045,94 @@ public List<Class<?>> getCommands() {
9861045 cmds .add (UnregisterExtensionCmd .class );
9871046 return cmds ;
9881047 }
1048+
1049+ public class EntryPointSyncCheckWorker extends ManagedContextRunnable {
1050+ protected void sendExtensionEntryPointOutOfSyncAlert (Extension extension ) {
1051+ String msg = String .format ("Entry-point for %s are out of sync across management servers" ,
1052+ extension );
1053+ alertManager .sendAlert (AlertManager .AlertType .ALERT_TYPE_USERVM , 0L , 0L , msg , msg );
1054+ }
1055+
1056+ protected void updateExtensionSync (Extension extension , boolean sync ) {
1057+ if (!sync ) {
1058+ sendExtensionEntryPointOutOfSyncAlert (extension );
1059+ }
1060+ if (extension .isEntryPointSync () == sync ) {
1061+ return ;
1062+ }
1063+ ExtensionVO extensionVO = extensionDao .createForUpdate (extension .getId ());
1064+ extensionVO .setEntryPointSync (sync );
1065+ extensionDao .update (extension .getId (), extensionVO );
1066+ }
1067+
1068+ protected String getChecksumFromMSPeer (Extension extension , ManagementServerHostVO msHost ) {
1069+ final String msPeer = Long .toString (msHost .getMsid ());
1070+ logger .debug ("Retrieving checksum for {} from MS: {}" , extension , msPeer );
1071+ final Command [] cmds = new Command [1 ];
1072+ cmds [0 ] = new GetExtensionEntryPointChecksumCommand (ManagementServerNode .getManagementServerId (),
1073+ extension .getId (), extension .getName (), extension .getRelativeEntryPoint ());
1074+ return clusterManager .execute (msPeer , 0L , GsonHelper .getGson ().toJson (cmds ), true );
1075+ }
1076+
1077+ protected void checkSyncForOrchestrator (Extension extension , List <ManagementServerHostVO > msHosts ) {
1078+ if (CollectionUtils .isEmpty (msHosts )) {
1079+ updateExtensionSync (extension , true );
1080+ return ;
1081+ }
1082+ String checksum = externalProvisioner .getChecksumForExtensionEntryPoint (extension .getName (),
1083+ extension .getRelativeEntryPoint ());
1084+ if (StringUtils .isBlank (checksum )) {
1085+ updateExtensionSync (extension , false );
1086+ return ;
1087+ }
1088+ for (ManagementServerHostVO msHost : msHosts ) {
1089+ final String msPeerChecksum = getChecksumFromMSPeer (extension , msHost );
1090+ if (!checksum .equals (msPeerChecksum )) {
1091+ logger .warn ("Entry-point checksum for {} is different [msid: {}, checksum: {}] and [msid: {}, checksum: {}]" ,
1092+ extension , ManagementServerNode .getManagementServerId (), checksum ,
1093+ msHost .getMsid (), msPeerChecksum );
1094+ updateExtensionSync (extension , false );
1095+ return ;
1096+ }
1097+ }
1098+ updateExtensionSync (extension , true );
1099+ }
1100+
1101+ protected void runCleanupForLongestRunningManagementServer () {
1102+ try {
1103+ List <ManagementServerHostVO > msHosts = managementServerHostDao .listBy (ManagementServerHost .State .Up );
1104+ msHosts .sort (Comparator .comparingLong (ManagementServerHostVO ::getRunid ));
1105+ ManagementServerHostVO msHost = msHosts .remove (0 );
1106+ if (msHost == null || (msHost .getMsid () != ManagementServerNode .getManagementServerId ())) {
1107+ logger .debug ("Skipping the extensions entrypoint sync check on this management server" );
1108+ return ;
1109+ }
1110+ List <ExtensionVO > extensions = extensionDao .listAll ();
1111+ for (ExtensionVO extension : extensions ) {
1112+ if (!Extension .Type .Orchestrator .equals (extension .getType ())) {
1113+ continue ;
1114+ }
1115+ checkSyncForOrchestrator (extension , msHosts );
1116+ }
1117+ } catch (Exception e ) {
1118+ logger .warn ("Cleanup task failed to cleanup old webhook deliveries" , e );
1119+ }
1120+ }
1121+
1122+ @ Override
1123+ protected void runInContext () {
1124+ GlobalLock gcLock = GlobalLock .getInternLock ("ExtensionEntryPointSyncCheck" );
1125+ try {
1126+ if (gcLock .lock (3 )) {
1127+ try {
1128+ runCleanupForLongestRunningManagementServer ();
1129+ } finally {
1130+ gcLock .unlock ();
1131+ }
1132+ }
1133+ } finally {
1134+ gcLock .releaseRef ();
1135+ }
1136+ }
1137+ }
9891138}
0 commit comments