diff --git a/Loop Deployed Object/Loop Deployed Object.step b/Loop Deployed Object/Loop Deployed Object.step index e1eb95cd..4c7fa543 100644 --- a/Loop Deployed Object/Loop Deployed Object.step +++ b/Loop Deployed Object/Loop Deployed Object.step @@ -1 +1 @@ -{"creationTimeStamp":"2024-06-14T18:32:21.570Z","modifiedTimeStamp":"2025-06-04T10:31:35.974Z","createdBy":"Ethan.Kavanaugh@sas.com","modifiedBy":"Remco.Gooijer@sas.com","name":"Loop Deployed Object.step","displayName":"Loop Deployed Object.step","localDisplayName":"Loop Deployed Object.step","properties":{},"links":[{"method":"GET","rel":"self","href":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a","uri":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a","type":"application/vnd.sas.data.flow.step"},{"method":"GET","rel":"alternate","href":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a","uri":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a","type":"application/vnd.sas.data.flow.step.summary"},{"method":"GET","rel":"up","href":"/dataFlows/steps","uri":"/dataFlows/steps","type":"application/vnd.sas.collection","itemType":"application/vnd.sas.data.flow.step.summary"},{"method":"PUT","rel":"update","href":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a","uri":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a","type":"application/vnd.sas.data.flow.step","responseType":"application/vnd.sas.data.flow.step"},{"method":"DELETE","rel":"delete","href":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a","uri":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a"},{"method":"POST","rel":"copy","href":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a/copy","uri":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a/copy","responseType":"application/vnd.sas.data.flow.step"},{"method":"GET","rel":"transferExport","href":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a","uri":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a","responseType":"application/vnd.sas.transfer.object"},{"method":"PUT","rel":"transferImportUpdate","href":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a","uri":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a","type":"application/vnd.sas.transfer.object","responseType":"application/vnd.sas.summary"}],"metadataVersion":0.0,"version":2,"type":"code","flowMetadata":{"inputPorts":[{"name":"_input_parameters","displayName":"_input_parameters","localDisplayName":"_input_parameters","minEntries":1,"maxEntries":1,"defaultEntries":0,"type":"table"}],"outputPorts":[{"name":"_output","displayName":"_output","localDisplayName":"_output","minEntries":1,"maxEntries":1,"defaultEntries":0,"type":"table","supportsView":false,"requiresStructure":false}]},"ui":"{\n\t\"showPageContentOnly\": true,\n\t\"pages\": [\n\t\t{\n\t\t\t\"id\": \"page1\",\n\t\t\t\"type\": \"page\",\n\t\t\t\"label\": \"Deployed Object Properties\",\n\t\t\t\"children\": [\n\t\t\t\t{\n\t\t\t\t\t\"id\": \"_input_parameters\",\n\t\t\t\t\t\"type\": \"inputtable\",\n\t\t\t\t\t\"label\": \"Select the source table:\",\n\t\t\t\t\t\"required\": true,\n\t\t\t\t\t\"placeholder\": \"\",\n\t\t\t\t\t\"visible\": \"\"\n\t\t\t\t},\n\t\t\t\t{\n\t\t\t\t\t\"id\": \"_output\",\n\t\t\t\t\t\"type\": \"outputtable\",\n\t\t\t\t\t\"label\": \"Output table label 1:\",\n\t\t\t\t\t\"required\": true,\n\t\t\t\t\t\"placeholder\": \"\",\n\t\t\t\t\t\"visible\": \"\"\n\t\t\t\t},\n\t\t\t\t{\n\t\t\t\t\t\"id\": \"_jobname\",\n\t\t\t\t\t\"type\": \"textfield\",\n\t\t\t\t\t\"label\": \"Specify the deployed object name: (Case sensitive!)\",\n\t\t\t\t\t\"placeholder\": \"\",\n\t\t\t\t\t\"required\": true,\n\t\t\t\t\t\"visible\": \"\"\n\t\t\t\t}\n\t\t\t]\n\t\t},\n\t\t{\n\t\t\t\"id\": \"page3\",\n\t\t\t\"type\": \"page\",\n\t\t\t\"label\": \"Scheduling Properties\",\n\t\t\t\"children\": [\n\t\t\t\t{\n\t\t\t\t\t\"id\": \"concurrent_jobs\",\n\t\t\t\t\t\"type\": \"numstepper\",\n\t\t\t\t\t\"label\": \"Number of concurrent processes:\",\n\t\t\t\t\t\"required\": true,\n\t\t\t\t\t\"integer\": true,\n\t\t\t\t\t\"min\": 1,\n\t\t\t\t\t\"max\": 10,\n\t\t\t\t\t\"stepsize\": 1\n\t\t\t\t},\n\t\t\t\t{\n\t\t\t\t\t\"id\": \"wait_for_processes\",\n\t\t\t\t\t\"type\": \"checkbox\",\n\t\t\t\t\t\"label\": \"Wait for processes to finish\",\n\t\t\t\t\t\"visible\": \"\"\n\t\t\t\t},\n\t\t\t\t{\n\t\t\t\t\t\"id\": \"abort_on_exception\",\n\t\t\t\t\t\"type\": \"checkbox\",\n\t\t\t\t\t\"label\": \"Abort on exception\",\n\t\t\t\t\t\"visible\": \"$wait_for_processes\",\n\t\t\t\t\t\"enabled\": \"$wait_for_processes\"\n\t\t\t\t},\n\t\t\t\t{\n\t\t\t\t\t\"id\": \"_delayed\",\n\t\t\t\t\t\"type\": \"checkbox\",\n\t\t\t\t\t\"label\": \"Delayed execution\",\n\t\t\t\t\t\"visible\": \"\"\n\t\t\t\t},\n\t\t\t\t{\n\t\t\t\t\t\"id\": \"_delayedseconds\",\n\t\t\t\t\t\"type\": \"numstepper\",\n\t\t\t\t\t\"label\": \"Delayed seconds:\",\n\t\t\t\t\t\"required\": false,\n\t\t\t\t\t\"integer\": true,\n\t\t\t\t\t\"min\": 1,\n\t\t\t\t\t\"max\": 5,\n\t\t\t\t\t\"stepsize\": 1,\n\t\t\t\t\t\"enabled\": \"$_delayed\",\n\t\t\t\t\t\"visible\": \"\"\n\t\t\t\t}\n\t\t\t]\n\t\t},\n\t\t{\n\t\t\t\"id\": \"page2\",\n\t\t\t\"type\": \"page\",\n\t\t\t\"label\": \"About\",\n\t\t\t\"children\": [\n\t\t\t\t{\n\t\t\t\t\t\"id\": \"text2\",\n\t\t\t\t\t\"type\": \"text\",\n\t\t\t\t\t\"text\": \"Loop Deployed Object custom step\\n=========================\\n\\nThis custom step executes, in parallel, one deployed flow or deployed SAS program for a given set of parameters. \\nBoth can be stored in SAS Content and/or on a file system.\\n\\nFor that to work it needs\\n* An input dataset where the column names are treated as macro variables and the rows as macro variable values. You can have as much columns as you need.\\n* The name of the deployed flow or deployed SAS program. \\n\\nMake sure that the column(s) in the input table have the same name(s) as the macro variable(s) used by the deployed flow or deployed SAS program.\",\n\t\t\t\t\t\"visible\": \"\"\n\t\t\t\t},\n\t\t\t\t{\n\t\t\t\t\t\"id\": \"section1\",\n\t\t\t\t\t\"type\": \"section\",\n\t\t\t\t\t\"label\": \"Changelog\",\n\t\t\t\t\t\"open\": false,\n\t\t\t\t\t\"visible\": \"\",\n\t\t\t\t\t\"children\": [\n\t\t\t\t\t\t{\n\t\t\t\t\t\t\t\"id\": \"text1\",\n\t\t\t\t\t\t\t\"type\": \"text\",\n\t\t\t\t\t\t\t\"text\": \"* Version 1.95 (17JUN2025)\\n- Hard exit on crucial missing components.\\n\\n* Version 1.9 (03OCT2024)\\n - Removed a bug where the custom step would never finish.\\n\\n* Version 1,8 (16MAY2024)\\n - Added the 'Abort on exception' option.\\n\\n* Version 1.7 (18MAR2024)\\n - Name change from 'Loop' to 'Loop Deployed Object'\\n\\n* Version 1.0 - version 1.6 \\n - Were released internally.\",\n\t\t\t\t\t\t\t\"visible\": \"\"\n\t\t\t\t\t\t}\n\t\t\t\t\t]\n\t\t\t\t}\n\t\t\t]\n\t\t}\n\t],\n\t\"syntaxversion\": \"1.3.0\",\n\t\"values\": {\n\t\t\"_input_parameters\": {\n\t\t\t\"library\": \"\",\n\t\t\t\"table\": \"\"\n\t\t},\n\t\t\"_output\": {\n\t\t\t\"library\": \"\",\n\t\t\t\"table\": \"\"\n\t\t},\n\t\t\"_jobname\": \"\",\n\t\t\"concurrent_jobs\": 4,\n\t\t\"wait_for_processes\": true,\n\t\t\"abort_on_exception\": false,\n\t\t\"_delayed\": false,\n\t\t\"_delayedseconds\": 1\n\t}\n}","templates":{"SAS":"/*\n\tInitialize the variables.\n*/\n%let nbr_job_rows = 0;\n%let nbr_parameter_rows = 0;\n%let nbr_rows = 0;\n%let job_uri =;\n\n/*\n\tInput for the input table validation.\n\tCount the number of rows. This number needs to be bigger then 0.\n*/\ndata _NULL_;\n\tset \n\t\t&_input_parameters end=last;\n\tif last then call symputx(\"nbr_parameter_rows\", _n_);\nrun;\n\n/*\n\tThe following macro is responsible for retrieving the URI of the deployed flow.\n\n\tNote that the loop custom step is build around the assumption that the name of the job definition is unique and exists.\n\tIn case it is not, the loop custom step will abort its operation.\n*/\n%macro get_job_uri\n(\n\tjob_name=\t\n);\n\tdata _null_; \n\t\tcall symputx('viyaHost', kreverse(ksubstr(ksubstr(kreverse(\"&_BASEURL.\"), 2), kfind(ksubstr(kreverse(\"&_BASEURL.\"), 2), '/') + 1)));\n\trun;\n\n\t/*\n\t\tSetup and execute the http request.\n\t*/\n\tfilename joburi temp;\n\tproc http\n\t\turl = \"&viyahost./jobExecution/jobRequests\"\n\t\tout= joburi\n\t\tmethod='get'\n\t\t\toauth_bearer = sas_services\n\t\t\t%if &job_name ne %then %do;\n\t\t\t\tquery= ('filter' = \"eq('name', '&job_name')\");\n\t\t\t%end;\n\t\t\t%else %do;\n\t\t\t\t;\n\t\t\t%end;\n\t\theaders\n\t\t\t'Accept'= 'application/vnd.sas.collection+json';\n\trun;\t\n\n\t/*\n\t\tProcess the response file, if it exists.\n\t*/\n\t%if %sysfunc(fexist(joburi)) %then %do;\n\n\t\tlibname joburi json;\n\t\t/*\n\t\t\tCheck that the items table exists.\n\t\t*/\n\t\t%if %sysfunc(exist(joburi.items)) %then %do;\n\t\t\t/*\n\t\t\t\tSelect the jobs for which the URI exists.\n\t\t\t*/\n\t\t\tdata job_uri;\n\t\t\t\tset joburi.items\n\t\t\t\t(\n\t\t\t\t\tkeep = \n\t\t\t\t\t\tname \n\t\t\t\t\t\tjobDefinitionUri \n\t\t\t\t\twhere = \n\t\t\t\t\t\t(\n\t\t\t\t\t\t\tjobDefinitionUri ~= ''\n\t\t\t\t\t\t)\n\t\t\t\t);\n\t\t\trun;\n\t\t\tlibname joburi;\n\t\t%end;\n\t\t%else %do;\n\t\t\t%put ERROR: The items table in the response file does not exist. Aborting process.;\n\t\t\t%abort exit;\n\t\t%end;\n\t%end;\n\t%else %do;\n\t\t%put ERROR: The response file does not exist. Aborting process.;\n\t\t%abort exit;\n\t%end;\n\n\t/*\n\t\tAssign the job uri to the 'job_uri' macro variable.\n\t*/\n\t%if %sysfunc(exist(work.job_uri)) %then %do;\n\t\t/*\n\t\t\tCount the number of jobs returned.\n\n\t\t\tOnly continue in case the number equals to 1.\n\t\t*/\n\t\tproc sql noprint;\n\t\t\tselect \n\t\t\t\tcount(*) into :nbr_job_rows \n\t\t\tfrom \n\t\t\t\twork.job_uri;\n\t\tquit;\n\n\t\t/*\n\t\t\tAbort in case the number of returned values isn't exactly one.\n\t\t*/\n\t\t%if &nbr_job_rows NE 1 %then %do;\n\t\t\t%put ERROR: 0 or more then 1 job encountered with name &job_name. Aborting process.;\n\t\t\t%abort exit;\n\t\t%end;\n\t\t%else %do;\n\t\t\tdata _null_;\n\t\t\t\tset\n\t\t\t\t\twork.job_uri;\n\t\t\t\tcall symputx(\"job_uri\", kstrip(jobDefinitionUri));\n\t\t\trun;\t\t\n\t\t%end;\n\t%end;\n\t%else %do;\n\t\t%put ERROR: job_uri table does not exist. Aborting process.;\n\t\t%abort exit;\n\t%end;\n%mend;\n%get_job_uri(\n\tjob_name = &_jobname\n);\n\n/*\n\tOnly start the process in case the parameter table contains more than 0 rows.\n*/\n%if &nbr_parameter_rows eq 0 %then %do;\n\t%put ERROR: Not enough parameters (&nbr_parameter_rows) to run the loop step. Aborting process.;\n\t%abort exit;\n%end;\n%else %do;\n\t/*\n\t\tThe number of rows in the input table check out. Continue to run the process.\n\t*/\n\tdata _null_; \n\t\tcall symputx('viyaHost', kreverse(ksubstr(ksubstr(kreverse(\"&_BASEURL.\"), 2), kfind(ksubstr(kreverse(\"&_BASEURL.\"), 2), '/') + 1)));\n\trun;\n\n\t/*\n\t\tThe following macro is responsible for creating the JSON request for the job\n\t\tand the given set parameters/macro variables.\n\t*/\n\t%macro createJsonRequest(\n\t\trow=\t\n\t);\n\t\t/*\n\t\t\tthe value assigned to the macro variable 'process_name' is used to make the process visible in environment manager.\n\t\t*/\n\t\t%let process_name = Running '&_jobname' for input table row &row;\n\t\tdata _null_;\n\t\t\tlength \n\t\t\t\tparameter $32767\n\t\t\t\tvalue $32767;\n\n\t\t\t/*\n\t\t\t\tOpen the custom step input dataset.\n\t\t\t*/\n\t\t\tdsid = open(\"&_input_parameters\");\n\n\t\t\t/*\n\t\t\t\tNote that the request file reference is defined out-side of this macro!\n\t\t\t*/\t\n\t\t\tif (dsid > 0) then do;\n\t\t\t\tfile request;\n\t\t\t\tput '{';\n\t\t\t\tput '\"name\": ' \"\"\"&process_name\"\"\" ',';\n\t\t\t\tput '\"jobDefinitionUri\": ' \"\"\"&job_uri\"\"\" ',';\n\t\t\t\tput '\"arguments\": {';\n\t\t\t\n\t\t\t\t/*\n\t\t\t\t\tFetch the 'row'-th record.\n\t\t\t\t*/\n\t\t\t\trc = fetchobs(dsid, &row);\n\n\t\t\t\t/*\n\t\t\t\t\tRetrieve the number of columns.\n\t\t\t\t*/\n\t\t\t\tcolumns = attrn(dsid,'nvars');\n\t\t\t\t\n\t\t\t\t/*\n\t\t\t\t\tFor each column and value pair create an argument entry in the JSON file.\n\t\t\t\t*/\n\t\t\t\tdo index = 1 to columns;\n\t\t\t\t\t/*\n\t\t\t\t\t\tGet the column name and the column type.\n\t\t\t\t\t*/\n\t\t\t\t\tvname = kstrip(varname(dsid, index));\n\t\t\t\t\tvtype = vartype(dsid, index);\n\t\t\t\t\n\t\t\t\t\t/*\n\t\t\t\t\t\tDepending on data type, handle the value appropriately.\n\t\t\t\t\t*/\n\t\t\t\t\tif (vtype~='C') then do;\t\n\t\t\t\t\t\t/*\n\t\t\t\t\t\t\tNumeric values.\n\t\t\t\t\t\t*/\n\t\t\t\t\t\tvalue = put(getvarn(dsid, index), best32.);\n\t\t\t\t\t\tif index < columns then do;\n\t\t\t\t\t\t\tparameter = '\"' || kstrip(vname) || '\": \"' || kstrip(value) || '\",';\n\t\t\t\t\t\t\tput parameter;\n\t\t\t\t\t\tend;\n\t\t\t\t\t\telse do;\n\t\t\t\t\t\t\tparameter = '\"' || kstrip(vname) || '\": \"' || kstrip(value) || '\"';\n\t\t\t\t\t\t\tput parameter;\n\t\t\t\t\t\tend;\n\t\t\t\t\tend;\n\t\t\t\t\telse do;\n\t\t\t\t\t\t/*\n\t\t\t\t\t\t\tCharacter values.\n\t\t\t\t\t\t*/\n\t\t\t\t\t\tvalue = getvarc(dsid, index);\n\n\t\t\t\t\t\tif index < columns then do;\n\t\t\t\t\t\t\tparameter = '\"' || kstrip(vname) || '\": \"' || kstrip(value) || '\",';\n\t\t\t\t\t\t\tput parameter;\n\t\t\t\t\t\tend;\t\t\t\t\n\t\t\t\t\t\telse do;\n\t\t\t\t\t\t\tparameter = '\"' || kstrip(vname) || '\": \"' || kstrip(value) || '\"';\n\t\t\t\t\t\t\tput parameter;\n\t\t\t\t\t\tend;\n\t\t\t\t\tend;\n\t\t\t\tend;\n\t\t\t\tput '}';\n\t\t\t\tput '}';\n\t\t\tend;\n\t\t\telse do;\n\t\t\t\tput \"ERROR%QUOTE(:) Input table could not be opened.\";\n\t\t\tend;\n\t\t\tdsid = close(dsid);\n\t\trun;\n\t%mend;\n\n\t/*\n\t\tThe following macro is responsible for creating and assigning the input columns, and their properties, to the global macro variable 'column_line', to be \n\t\tused in the step that creates the output table.\n\t*/\n\t%let columns_line=;\n\t%macro addColumns(\n\t\tds=\n\t);\n\t\t/*\n\t\t\tTry to open the dataset.\n\t\t*/\n\t\t%let dsid = %sysfunc(open(&ds, i));\n\t\t%if (&dsid > 0) %then %do;\n\n\t\t\t/*\n\t\t\t\tRetrieve the number of columns.\n\t\t\t*/\n\t\t\t%let columns = %sysfunc(attrn(&dsid, nvars));\n\n\t\t\t/*\n\t\t\t\tFor each column:\n\t\t\t*/\n\t\t\t%do i = 1 %to &columns;\n\t\t\t\t/*\n\t\t\t\t\tRetrieve the name, type, length and format.\n\t\t\t\t*/\n\t\t\t\t%let vname = %sysfunc(varname(&dsid, &i));\n\t\t\t\t%let vtype = %sysfunc(vartype(&dsid, &i));\n\t\t\t\t%let vlength = %sysfunc(varlen(&dsid, &i));\n\t\t\t\t%let vformat = %sysfunc(varfmt(&dsid, &i));\n\t\n\t\t\t\t/*\n\t\t\t\t\tBased on the column type and the existance of a format, create the column definition.\n\t\t\t\t*/\n\t\t\t\t%if &vtype EQ C %then %do;\n\t\t\t\t\t%if &vformat eq %then %do;\n\t\t\t\t\t\t%let columns_line = %str(&columns_line attrib &vname length=$&vlength;);\n\t\t\t\t\t%end;\n\t\t\t\t\t%else %do;\n\t\t\t\t\t\t%let columns_line = %str(&columns_line attrib &vname length=$&vlength format=&vformat;);\n\t\t\t\t\t%end;\n\t\t\t\t%end;\n\t\t\t\t%else %do;\n\t\t\t\t\t%if &vformat eq %then %do;\n\t\t\t\t\t\t%let columns_line = %str(&columns_line attrib &vname length=&vlength;);\n\t\t\t\t\t%end;\n\t\t\t\t\t%else %do;\n\t\t\t\t\t\t%let columns_line = %str(&columns_line attrib &vname length=&vlength format=&vformat;);\n\t\t\t\t\t%end;\n\t\t\t\t%end;\t\n\t\t\t%end;\t\n\t\t\t%let dsid = %sysfunc(close(&dsid));\n\t\t%end;\n\t\t%else %do;\n\t\t\t/*\n\t\t\t\tThe table could not be opened. Specify the error message.\n\t\t\t*/\n\t\t\t%put %sysfunc(sysmsg());\n\t\t\t%let dsid = %sysfunc(close(&dsid));\n\t\t%end;\n\t%mend;\n\t%addColumns(\n\t\tds=&_input_parameters\n\t);\n\n\t/*\n\t\tCreate the status tables that will hold all execution statuses for all executed job requests.\n\t\tBased on this table you can create a polling mechanism to validate that each process has finished successfully.\n\t*/\n\tdata &_output;\n\t\tattrib id length=$36.;\n\t\tattrib job_name length=$128.;\n\t\t/*\n\t\t\tAdd input parameters/variables to the output table.\n\t\t\t\n\t\t\tAssigning the value to the macro variable, 'column_line', is performed by the 'addColumns' macro.\n\t\t*/\n\t\t&columns_line;\n\t\tattrib state length=$32.;\n\t\tattrib start_dttm length=8 format=datetime22.3;\n\t\tattrib end_dttm length=8 format=datetime22.3;\n\t\tstop;\n\trun;\n\n\t%macro addColumnValues(\n\t\tds=,\n\t\trow=\n\t);\n\n\t\t%let values_line=;\n\t\t%let dsid = %sysfunc(open(&ds, i));\n\t\t%if (&dsid > 0) %then %do;\n\n\t\t\t%let columns = %sysfunc(attrn(&dsid, nvars));\n\t\t\t%let rc = %sysfunc(fetchobs(&dsid, &row));\n\n\t\t\t%do i = 1 %to &columns;\n\t\t\t\t%let vname = %sysfunc(varname(&dsid, &i));\n\t\t\t\t%let vtype = %sysfunc(vartype(&dsid, &i));\n\n\t\t\t\t%if &vtype EQ C %then %do;\n\t\t\t\t\t%let value = %BQUOTE(%sysfunc(getvarc(&dsid, &i)));\n\t\t\t\t\t%let values_line = %str(&values_line &vname=%str(%\")&value%str(%\"););\n\t\t\t\t%end;\n\t\t\t\t%else %do;\n\t\t\t\t\t%let value = %sysfunc(getvarn(&dsid, &i));\n\t\t\t\t\t%let values_line = %str(&values_line &vname=&value;);\n\t\t\t\t%end;\n\t\t\t%end;\n\t\t\t/*\n\t\t\t\tAssign the generated line to the macro variable so that it can be used in a data step.\n\t\t\t*/\n\t\t\tdata _null_;\n\t\t\t\tcall symputx('columnValues_line', strip(\"&Values_line\"));\n\t\t\trun;\n\t\t\t%let dsid = %sysfunc(close(&dsid));\n\t\t%end;\n\t\t%else %do;\n\t\t\t%put %sysfunc(sysmsg());\n\t\t\t%let dsid = %sysfunc(close(&dsid));\n\t\t%end;\n\t%mend;\t\n\n\t%macro exec_job\n\t(\n\t\tjob_uri=,\n\t\trow=\n\t);\n\n\t\t%if %unquote(%str(%')&job_uri%str(%')) ne '' %then %do;\n\n\t\t\t/*\n\t\t\t\tIn case the user asked for a delayed execution...\n\t\t\t*/\t\t\n\t\t\t%if &_delayed eq 1 %then %do;\n\t\t\t\t%put NOTE: Delayed executing, waiting for &_delayedseconds seconds.;\n\t\t\t\tdata _null_;\n\t\t\t\t\tsleeping = sleep(&_delayedseconds, 1);\n\t\t\t\trun;\n\t\t\t%end;\n\n\t\t\t/*\n\t\t\t\tCreate the JSON request file.\n\t\t\t*/\n\t\t\t%let process_name = Flow instance for parameter table row &row;\n\t\t\tfilename request temp;\n\t\t\t\n\t\t\t/*\n\t\t\t\tCreate the JSON request.\n\t\t\t\t\n\t\t\t\tThe file reference 'request' is used to create the resulting JSON file.\n\t\t\t*/\n\t\t\t%createJSONrequest(\n\t\t\t\trow=&row\n\t\t\t);\n\n\t\t\t/*\n\t\t\t\tSetup and execute the HTTP request.\n\t\t\t*/\n\t\t\tfilename response temp;\n\t\t\tproc http\n\t\t\t\turl = \"&viyahost./jobExecution/jobs?submitter=workflow\"\n\t\t\t\tin = request\n\t\t\t\tout= response\n\t\t\t\tmethod='post'\n\t\t\t\t\toauth_bearer = sas_services;\n\t\t\t\theaders\n\t\t\t\t\t'Content-Type'='application/vnd.sas.job.execution.job.request+json'\n\t\t\t\t\t'Accept'='application/vnd.sas.job.execution.job+json';\n\t\t\trun;\n\n\t\t\t%if %sysfunc(fexist(response)) %then %do;\n\n\t\t\t\t%let columnValues_line=;\n\t\t\t\t%addColumnValues(\n\t\t\t\t\tds=&_input_parameters,\n\t\t\t\t\trow=&row\n\t\t\t\t);\n\n\t\t\t\t/*\n\t\t\t\t\tAdd the state, should be 'running', of the current job to the execute_status table.\n\t\t\t\t\n\t\t\t\t\tNote that the column specification can differ. This will cause warnings during the PROC APPEND step.\n\t\t\t\t\tThat is why the resulting table firstly needs to be created based on the internally defined status table.\n\t\t\t\t*/\n\t\t\t\tlibname resp json fileref=response;\n\t\t\t\tdata work.root(drop=creationTimeStamp);\n\t\t\t\t\tattrib job_name length=$128.;\n\t\t\t\t\tattrib end_dttm length=8 format=datetime22.3;\t\n\t\t\t\t\tattrib start_dttm length=8 format=datetime22.3;\n\t\t\t\t\tset \n\t\t\t\t\t\t&_output(obs=0)\n\t\t\t\t\t\tresp.root(keep=id state creationTimeStamp);\n\n\t\t\t\t\tstart_dttm = input(creationTimeStamp, E8601DZ.);\n\t\t\t\t\tjob_name = \"&process_name\";\n\t\t\t\t\t/*\n\t\t\t\t\t\tAdd the current input parameters to the output/status table.\n\t\t\t\t\t*/\n\t\t\t\t\t&columnValues_line;\n\t\t\t\trun;\n\t\t\t\tproc append data=work.root base=&_output force; run;\n\t\t\t\tlibname resp;\n\t\t\t%end;\n\t\t\t%else %do;\n\t\t\t\t%put ERROR: The response file does not exist. Aborting process.;\n\t\t\t\t%abort exit;\n\t\t\t%end;\n\t\t%end;\n\t\t%else %do;\n\t\t\t%put ERROR: No URI provided. Aborting process.;\n\t\t\t%abort exit;\n\t\t%end;\n\t%mend;\n\n\t%macro update_status\n\t(\n\t\tId=\n\t);\n\t\t/*\n\t\t\tThis process only makes sense when the execute_status table exists.\n\t\t*/\n\t\t%if %sysfunc(exist(&_output, data)) %then %do;\n\t\t\t/*\n\t\t\t\tSetup and execute the HTTP request.\n\t\t\t*/\n\t\t\tfilename response temp;\n\t\t\tproc http\n\t\t\t\turl=\"&viyahost./jobExecution/jobs/&Id\"\n\t\t\t\tout=response\n\t\t\t\tmethod=\"get\"\n\t\t\t\t\toauth_bearer = sas_services;\n\t\t\t\theaders\n\t\t\t\t\t'Accept'='application/json';\n\t\t\trun;\n\n\t\t\t%if %sysfunc(fexist(response)) %then %do;\n\t\t\t\tlibname status json fileref=response;\n\n\t\t\t\t/*\n\t\t\t\t\tUpdate the output table with the values from the REST API.\n\t\t\t\t\tNote that the root table will always contain one row.\n\t\t\t\t*/\n\t\t\t\tproc sql noprint;\n\t\t\t\t\tupdate \n\t\t\t\t\t\t&_output\n\t\t\t\t\tset \t\n\t\t\t\t\t\tstate = \n\t\t\t\t\t\t\t(\n\t\t\t\t\t\t\t\tselect \n\t\t\t\t\t\t\t\t\tkstrip(state) \n\t\t\t\t\t\t\t\tfrom \n\t\t\t\t\t\t\t\t\tstatus.root\n\t\t\t\t\t\t\t),\n\t\t\t\t\t\tend_dttm = \n\t\t\t\t\t\t\t(\n\t\t\t\t\t\t\t\tselect \n\t\t\t\t\t\t\t\t\tinput(modifiedTimeStamp, E8601DZ.)\n\t\t\t\t\t\t\t\tfrom\n\t\t\t\t\t\t\t\t\tstatus.root\n\t\t\t\t\t\t\t)\n\t\t\t\t\twhere\n\t\t\t\t\t\tid = \"&id\";\n\t\t\t\tquit;\n\t\t\t\tlibname status clear;\n\t\t\t%end;\n\t\t\t%else %do;\n\t\t\t\t/*\n\t\t\t\t\tNo abort, might be a hick-up...\n\t\t\t\t*/\n\t\t\t\t%put ERROR: Unable to update the status table. No response file encountered.;\n\t\t\t%end;\n\t\t%end;\n\t\t%else %do;\n\t\t\t%put ERROR: The status table, &_output, does not exist.;\n\t\t\t%abort exit;\n\t\t%end;\n\t%mend;\n\n\t/*\n\t\tThe following macro is responsible for:\n\t\t- Maintaining the job waiting queue.\n\t\t- Executing the jobs\n\t\t- Updating the status table.\n\t*/\n\t%macro exec_jobs;\n\t\t%put NOTE: Initial number of jobs in queue: &nbr_parameter_rows.;\n\n\t\t/*\n\t\t\tInitialize macro variables.\n\t\t*/\n\t\t%let index = 1;\n\t\t%do %while (&index le &nbr_parameter_rows);\n\t\t\t/*\n\t\t\t\tCheck for the number running jobs, to be less or equal to\n\t\t\t\tthe number of concurrent jobs.\n\n\t\t\t\tNote that in case that the _output table is empty, the macro variable 'running_jobs' is never reset. \n\t\t\t\tThat is the reason why the macro variable is initialized to 0 each time the following code is run.\n\t\t\t*/\n\t\t\t%let running_jobs = 0;\n\t\t\tdata _null_;\n\t\t\t\tset \n\t\t\t\t\t&_output(\n\t\t\t\t\t\twhere=(kupcase(state)='RUNNING')\n\t\t\t\t\t) end=last;\n\t\t\t\tif last then call symput(\"running_jobs\", _n_);\n\t\t\trun;\n\t\t\t%put NOTE: Number of running jobs: &running_jobs;\n\t\t\t\n\t\t\t/*\n\t\t\t\tIf the number of running jobs is less then the amount of jobs allowed to run concurrenly,\n\t\t\t\tstart a job and register it in the status table.\n\t\t\t*/\n\t\t\t%if (&running_jobs lt &concurrent_jobs) %then %do;\n\t\t\t\t%put NOTE: Adding new job from the queue;\n\t\t\t\t%exec_job(\n\t\t\t\t\tjob_uri = &job_uri,\n\t\t\t\t\trow = &index\n\t\t\t\t);\n\t\t\t\t/*\n\t\t\t\t\tincrement the index parameter, ready to start the next job. \n\t\t\t\t*/\n\t\t\t\t%let index = %eval(&index + 1);\t\n\t\t\t%end;\n\n\t\t\t/*\n\t\t\t\tOnly update the status of jobs with state 'running'.\n\t\t\t*/\n\t\t\tdata _null_;\n\t\t\t\tset\n\t\t\t\t\t&_output(where=(kupcase(state)='RUNNING'));\n\n\t\t\t\tcall execute(\n\t\t\t\t\tcats(\n\t\t\t\t\t\t'%update_status(Id= %STR(',\n\t\t\t\t\t\tkSTRIP(id),\n\t\t\t\t\t\t'))'\n\t\t\t\t\t)\n\t\t\t\t);\n\t\t\trun;\n\t\t\t/*\n\t\t\t\tWait 1/2 second for the next iteration.\n\n\t\t\t\tThis will keep the log file size down.\n\t\t\t*/\n\t\t\tdata _null_;\n\t\t\t\tslept = sleep(.50, 1);\n\t\t\trun;\n\t\t%end;\n\t%mend;\n\t%exec_jobs;\t\n%end;\n\n/*\n\tThe folllwing section is there to wait for the processes, just started, to be finished before continuing\n\twith the next (custom) step.\n*/\n%if &wait_for_processes %then %do;\n\n\t%macro check_for_completion;\n\t\t/*\n\t\t\tUpdate for all processes the status and determain if all processes have finished.\n\t\t*/\n\t\t%if %sysfunc(exist(&_output, data)) %then %do;\n\t\t\t%let still_running = -1;\n\t\t\t%let wait_cycles = 0;\n\n\t\t\t%do %while (&still_running ne 0);\n\t\t\t\t/*\n\t\t\t\t\tUpdate the status for all running jobs.\n\t\t\t\t*/\n\t\t\t\tdata _null_;\n\t\t\t\t\tset\n\t\t\t\t\t\t&_output\n\t\t\t\t\t\t(\n\t\t\t\t\t\t\twhere=(kupcase(state) = 'RUNNING')\n\t\t\t\t\t\t);\n\n\t\t\t\t\t\tcall execute\n\t\t\t\t\t\t(\n\t\t\t\t\t\t\tcats\n\t\t\t\t\t\t\t(\n\t\t\t\t\t\t\t\t'%update_status(Id= %STR(',\n\t\t\t\t\t\t\t\tkSTRIP(id),\n\t\t\t\t\t\t\t\t'))'\n\t\t\t\t\t\t\t)\n\t\t\t\t\t\t);\n\t\t\t\trun;\n\t\t\n\t\t\t\t/*\n\t\t\t\t\tCount the number of jobs that are running.\n\t\t\t\t*/\n\t\t\t\tproc sql noprint;\n\t\t\t\t\tselect \n\t\t\t\t\t\tcount(*) into :still_running \n\t\t\t\t\tfrom \n\t\t\t\t\t\t&_output \n\t\t\t\t\twhere \n\t\t\t\t\t\tkupcase(state) = 'RUNNING';\n\t\t\t\tquit;\n\t\t\t\t%put NOTE: Number of jobs that are running: &still_running;\n\t\t\n\t\t\t\t/*\n\t\t\t\t\tSleep for one second before checking for completion again.\n\t\t\t\t*/\n\t\t\t\tdata _null_;\n\t\t\t\t\tslept = sleep(1, 1);\n\t\t\t\trun;\n\t\t\t\t%let wait_cycles = %eval(&wait_cycles + 1);\n\t\t\t%end;\n\t\t\t%put NOTE: Total number of wait cycles of 1 second: &wait_cycles;\n\t\t%end;\n\t\t%else %do;\n\t\t\t%put ERROR: The table status table, &_output, does not exist.;\n\t\t\t%abort exit;\n\t\t%end;\n\t%mend;\n\t%check_for_completion;\n\n\t/*\n\t\tAborting in case of one or more processes failed only makes sense \n\t\tif the custom step is set to wait for all processes to finish.\n\t*/\n\t%macro abort_on_failure;\n\t\t%let exception = 0;\n\t\t%if &abort_on_exception eq 1 %then %do;\n\t\t\tproc sql noprint;\n\t\t\t\tselect \n\t\t\t\t\tcount(*) into :exception trimmed\n\t\t\t\tfrom \n\t\t\t\t\t&_output \n\t\t\t\twhere \n\t\t\t\t\tkupcase(state) ne 'COMPLETED';\n\t\t\tquit;\n\t\t\t\t\n\t\t\t%if &exception ne 0 %then %do;\n\t\t\t\t%put ERROR: &exception exception(s) encountered. Aborting process.;\n\t\t\t\t%abort exit;\n\t\t\t%end;\n\t\t\t%else %do;\n\t\t\t\t%put NOTE: No exceptions encountered. Continuing with the rest of the flow.;\n\t\t\t%end;\n\t\t%end;\n\t%mend;\n\t%abort_on_failure;\n%end;\n\n/*\n\tRemove all macro variables from memory.\n*/\n%symdel nbr_job_rows /NOWARN;\n%symdel nbr_parameter_rows /NOWARN;\n%symdel nbr_rows /NOWARN;\n%symdel job_uri /NOWARN;\n%symdel columns_line /NOWARN;\n/*\n\tRemove all macros from memory.\n*/\n%SYSMACDELETE get_job_uri / NOWARN;\n%SYSMACDELETE createJsonRequest / NOWARN;\n%SYSMACDELETE addColumns / NOWARN;\n%SYSMACDELETE addColumnValues / NOWARN;\n%SYSMACDELETE exec_job / NOWARN;\n%SYSMACDELETE update_status / NOWARN;\n%SYSMACDELETE exec_jobs / NOWARN;\n%SYSMACDELETE check_for_completion / NOWARN;\n%SYSMACDELETE abort_on_failure / NOWARN;"}} \ No newline at end of file +{"creationTimeStamp":"2024-06-14T18:32:21.570114Z","createdBy":"Ethan.Kavanaugh@sas.com","modifiedTimeStamp":"2025-09-28T15:54:12.220647Z","modifiedBy":"Remco.Gooijer@sas.com","name":"Loop Deployed Object.step","displayName":"Loop Deployed Object.step","localDisplayName":"Loop Deployed Object.step","links":[{"method":"GET","rel":"self","href":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a","uri":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a","type":"application/vnd.sas.data.flow.step"},{"method":"GET","rel":"alternate","href":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a","uri":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a","type":"application/vnd.sas.data.flow.step.summary"},{"method":"GET","rel":"up","href":"/dataFlows/steps","uri":"/dataFlows/steps","type":"application/vnd.sas.collection","itemType":"application/vnd.sas.data.flow.step.summary"},{"method":"PUT","rel":"update","href":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a","uri":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a","type":"application/vnd.sas.data.flow.step","responseType":"application/vnd.sas.data.flow.step"},{"method":"DELETE","rel":"delete","href":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a","uri":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a"},{"method":"POST","rel":"copy","href":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a/copy","uri":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a/copy","responseType":"application/vnd.sas.data.flow.step"},{"method":"GET","rel":"transferExport","href":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a","uri":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a","responseType":"application/vnd.sas.transfer.object"},{"method":"PUT","rel":"transferImportUpdate","href":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a","uri":"/dataFlows/steps/cceba973-bfed-4e95-ba94-29c0f467a96a","type":"application/vnd.sas.transfer.object","responseType":"application/vnd.sas.summary"}],"metadataVersion":1,"version":2,"type":"code","flowMetadata":{"inputPorts":[{"name":"_input_parameters","displayName":"_input_parameters","localDisplayName":"_input_parameters","minEntries":1,"maxEntries":1,"defaultEntries":0,"type":"table"}],"outputPorts":[{"name":"_output","displayName":"_output","localDisplayName":"_output","minEntries":1,"maxEntries":1,"defaultEntries":0,"type":"table"}]},"ui":"{\n\t\"showPageContentOnly\": true,\n\t\"pages\": [\n\t\t{\n\t\t\t\"id\": \"page1\",\n\t\t\t\"type\": \"page\",\n\t\t\t\"label\": \"Deployed Object Properties\",\n\t\t\t\"children\": [\n\t\t\t\t{\n\t\t\t\t\t\"id\": \"_input_parameters\",\n\t\t\t\t\t\"type\": \"inputtable\",\n\t\t\t\t\t\"label\": \"Select the source table:\",\n\t\t\t\t\t\"required\": true,\n\t\t\t\t\t\"placeholder\": \"\",\n\t\t\t\t\t\"visible\": \"\"\n\t\t\t\t},\n\t\t\t\t{\n\t\t\t\t\t\"id\": \"_output\",\n\t\t\t\t\t\"type\": \"outputtable\",\n\t\t\t\t\t\"label\": \"Output table label 1:\",\n\t\t\t\t\t\"required\": true,\n\t\t\t\t\t\"placeholder\": \"\",\n\t\t\t\t\t\"visible\": \"\"\n\t\t\t\t},\n\t\t\t\t{\n\t\t\t\t\t\"id\": \"_jobname\",\n\t\t\t\t\t\"type\": \"textfield\",\n\t\t\t\t\t\"label\": \"Specify the deployed object name: (Case sensitive!)\",\n\t\t\t\t\t\"placeholder\": \"\",\n\t\t\t\t\t\"required\": true,\n\t\t\t\t\t\"visible\": \"\"\n\t\t\t\t}\n\t\t\t]\n\t\t},\n\t\t{\n\t\t\t\"id\": \"page3\",\n\t\t\t\"type\": \"page\",\n\t\t\t\"label\": \"Scheduling Properties\",\n\t\t\t\"children\": [\n\t\t\t\t{\n\t\t\t\t\t\"id\": \"concurrent_jobs\",\n\t\t\t\t\t\"type\": \"numstepper\",\n\t\t\t\t\t\"label\": \"Number of concurrent processes:\",\n\t\t\t\t\t\"required\": true,\n\t\t\t\t\t\"integer\": true,\n\t\t\t\t\t\"min\": 1,\n\t\t\t\t\t\"max\": 10,\n\t\t\t\t\t\"stepsize\": 1\n\t\t\t\t},\n\t\t\t\t{\n\t\t\t\t\t\"id\": \"wait_for_processes\",\n\t\t\t\t\t\"type\": \"checkbox\",\n\t\t\t\t\t\"label\": \"Wait for processes to finish\",\n\t\t\t\t\t\"visible\": \"\"\n\t\t\t\t},\n\t\t\t\t{\n\t\t\t\t\t\"id\": \"abort_on_exception\",\n\t\t\t\t\t\"type\": \"checkbox\",\n\t\t\t\t\t\"label\": \"Abort on exception\",\n\t\t\t\t\t\"visible\": \"$wait_for_processes\",\n\t\t\t\t\t\"enabled\": \"$wait_for_processes\"\n\t\t\t\t},\n\t\t\t\t{\n\t\t\t\t\t\"id\": \"_delayed\",\n\t\t\t\t\t\"type\": \"checkbox\",\n\t\t\t\t\t\"label\": \"Delayed execution\",\n\t\t\t\t\t\"visible\": \"\"\n\t\t\t\t},\n\t\t\t\t{\n\t\t\t\t\t\"id\": \"_delayedseconds\",\n\t\t\t\t\t\"type\": \"numstepper\",\n\t\t\t\t\t\"label\": \"Delayed seconds:\",\n\t\t\t\t\t\"required\": false,\n\t\t\t\t\t\"integer\": true,\n\t\t\t\t\t\"min\": 1,\n\t\t\t\t\t\"max\": 5,\n\t\t\t\t\t\"stepsize\": 1,\n\t\t\t\t\t\"enabled\": \"$_delayed\",\n\t\t\t\t\t\"visible\": \"\"\n\t\t\t\t}\n\t\t\t]\n\t\t},\n\t\t{\n\t\t\t\"id\": \"page2\",\n\t\t\t\"type\": \"page\",\n\t\t\t\"label\": \"About\",\n\t\t\t\"children\": [\n\t\t\t\t{\n\t\t\t\t\t\"id\": \"text2\",\n\t\t\t\t\t\"type\": \"text\",\n\t\t\t\t\t\"text\": \"Loop Deployed Object custom step\\n=========================\\n\\nThis custom step executes, in parallel, one deployed flow or deployed SAS program for a given set of parameters. \\nBoth can be stored in SAS Content and/or on a file system.\\n\\nFor that to work it needs\\n* An input dataset where the column names are treated as macro variables and the rows as macro variable values. You can have as much columns as you need.\\n* The name of the deployed flow or deployed SAS program. \\n\\nMake sure that the column(s) in the input table have the same name(s) as the macro variable(s) used by the deployed flow or deployed SAS program.\",\n\t\t\t\t\t\"visible\": \"\"\n\t\t\t\t},\n\t\t\t\t{\n\t\t\t\t\t\"id\": \"section1\",\n\t\t\t\t\t\"type\": \"section\",\n\t\t\t\t\t\"label\": \"Changelog\",\n\t\t\t\t\t\"open\": false,\n\t\t\t\t\t\"visible\": \"\",\n\t\t\t\t\t\"children\": [\n\t\t\t\t\t\t{\n\t\t\t\t\t\t\t\"id\": \"text1\",\n\t\t\t\t\t\t\t\"type\": \"text\",\n\t\t\t\t\t\t\t\"text\": \"* Version 1.96 (28SEP2025)\\n- Removed small issue.\\n\\n* Version 1.95 (17JUN2025)\\n- Hard exit on crucial missing components.\\n\\n* Version 1.9 (03OCT2024)\\n - Removed a bug where the custom step would never finish.\\n\\n* Version 1,8 (16MAY2024)\\n - Added the 'Abort on exception' option.\\n\\n* Version 1.7 (18MAR2024)\\n - Name change from 'Loop' to 'Loop Deployed Object'\\n\\n* Version 1.0 - version 1.6 \\n - Were released internally.\",\n\t\t\t\t\t\t\t\"visible\": \"\"\n\t\t\t\t\t\t}\n\t\t\t\t\t]\n\t\t\t\t}\n\t\t\t]\n\t\t}\n\t],\n\t\"syntaxversion\": \"1.3.0\",\n\t\"values\": {\n\t\t\"_input_parameters\": {\n\t\t\t\"library\": \"\",\n\t\t\t\"table\": \"\"\n\t\t},\n\t\t\"_output\": {\n\t\t\t\"library\": \"\",\n\t\t\t\"table\": \"\"\n\t\t},\n\t\t\"_jobname\": \"\",\n\t\t\"concurrent_jobs\": 4,\n\t\t\"wait_for_processes\": true,\n\t\t\"abort_on_exception\": false,\n\t\t\"_delayed\": false,\n\t\t\"_delayedseconds\": 1\n\t}\n}","localUi":"{\"pages\":[{\"children\":[{\"id\":\"_input_parameters\",\"label\":\"Select the source table:\",\"placeholder\":\"\",\"required\":true,\"type\":\"inputtable\",\"visible\":\"\"},{\"id\":\"_output\",\"label\":\"Output table label 1:\",\"placeholder\":\"\",\"required\":true,\"type\":\"outputtable\",\"visible\":\"\"},{\"id\":\"_jobname\",\"label\":\"Specify the deployed object name: (Case sensitive!)\",\"placeholder\":\"\",\"required\":true,\"type\":\"textfield\",\"visible\":\"\"}],\"id\":\"page1\",\"label\":\"Deployed Object Properties\",\"type\":\"page\"},{\"children\":[{\"id\":\"concurrent_jobs\",\"integer\":true,\"label\":\"Number of concurrent processes:\",\"max\":10,\"min\":1,\"required\":true,\"stepsize\":1,\"type\":\"numstepper\"},{\"id\":\"wait_for_processes\",\"label\":\"Wait for processes to finish\",\"type\":\"checkbox\",\"visible\":\"\"},{\"enabled\":\"$wait_for_processes\",\"id\":\"abort_on_exception\",\"label\":\"Abort on exception\",\"type\":\"checkbox\",\"visible\":\"$wait_for_processes\"},{\"id\":\"_delayed\",\"label\":\"Delayed execution\",\"type\":\"checkbox\",\"visible\":\"\"},{\"enabled\":\"$_delayed\",\"id\":\"_delayedseconds\",\"integer\":true,\"label\":\"Delayed seconds:\",\"max\":5,\"min\":1,\"required\":false,\"stepsize\":1,\"type\":\"numstepper\",\"visible\":\"\"}],\"id\":\"page3\",\"label\":\"Scheduling Properties\",\"type\":\"page\"},{\"children\":[{\"id\":\"text2\",\"text\":\"Loop Deployed Object custom step\\n=========================\\n\\nThis custom step executes, in parallel, one deployed flow or deployed SAS program for a given set of parameters. \\nBoth can be stored in SAS Content and/or on a file system.\\n\\nFor that to work it needs\\n* An input dataset where the column names are treated as macro variables and the rows as macro variable values. You can have as much columns as you need.\\n* The name of the deployed flow or deployed SAS program. \\n\\nMake sure that the column(s) in the input table have the same name(s) as the macro variable(s) used by the deployed flow or deployed SAS program.\",\"type\":\"text\",\"visible\":\"\"},{\"children\":[{\"id\":\"text1\",\"text\":\"* Version 1.96 (28SEP2025)\\n- Removed small issue.\\n\\n* Version 1.95 (17JUN2025)\\n- Hard exit on crucial missing components.\\n\\n* Version 1.9 (03OCT2024)\\n - Removed a bug where the custom step would never finish.\\n\\n* Version 1,8 (16MAY2024)\\n - Added the 'Abort on exception' option.\\n\\n* Version 1.7 (18MAR2024)\\n - Name change from 'Loop' to 'Loop Deployed Object'\\n\\n* Version 1.0 - version 1.6 \\n - Were released internally.\",\"type\":\"text\",\"visible\":\"\"}],\"id\":\"section1\",\"label\":\"Changelog\",\"open\":false,\"type\":\"section\",\"visible\":\"\"}],\"id\":\"page2\",\"label\":\"About\",\"type\":\"page\"}],\"showPageContentOnly\":true,\"syntaxversion\":\"1.3.0\",\"values\":{\"_delayed\":false,\"_delayedseconds\":1,\"_input_parameters\":{\"library\":\"\",\"table\":\"\"},\"_jobname\":\"\",\"_output\":{\"library\":\"\",\"table\":\"\"},\"abort_on_exception\":false,\"concurrent_jobs\":4,\"wait_for_processes\":true}}","templates":{"SAS":"/*\n\tInitialize the variables.\n*/\n%let nbr_job_rows = 0;\n%let nbr_parameter_rows = 0;\n%let nbr_rows = 0;\n%let job_uri =;\n\n/*\n\tInput for the input table validation.\n\tCount the number of rows. This number needs to be bigger then 0.\n*/\ndata _NULL_;\n\tset \n\t\t&_input_parameters end=last;\n\tif last then call symputx(\"nbr_parameter_rows\", _n_);\nrun;\n\ndata _null_; \n\tcall symputx('viyaHost', kreverse(ksubstr(ksubstr(kreverse(\"&_BASEURL.\"), 2), kfind(ksubstr(kreverse(\"&_BASEURL.\"), 2), '/') + 1)));\nrun;\n\n/*\n\tThe following macro is responsible for retrieving the URI of the deployed flow.\n\n\tNote that the loop custom step is build around the assumption that the name of the job definition is unique and exists.\n\tIn case it is not, the loop custom step will abort its operation.\n*/\n%macro get_job_uri\n(\n\tjob_name=\t\n);\n\n\t%if %unquote(%str(%')&job_name%str(%')) ne '' %then %do;\n\t\t/*\n\t\t\tSetup and execute the http request.\n\t\t*/\n\t\tfilename joburi temp;\n\t\tproc http\n\t\t\turl = \"&viyahost./jobExecution/jobRequests\"\n\t\t\tout= joburi\n\t\t\tmethod='get'\n\t\t\t\toauth_bearer = sas_services\n\t\t\t\t%if &job_name ne %then %do;\n\t\t\t\t\tquery= ('filter' = \"eq('name', '&job_name')\");\n\t\t\t\t%end;\n\t\t\t\t%else %do;\n\t\t\t\t\t;\n\t\t\t\t%end;\n\t\t\theaders\n\t\t\t\t'Accept'= 'application/vnd.sas.collection+json';\n\t\trun;\t\n\n\t\t/*\n\t\t\tProcess the response file, if it exists.\n\t\t*/\n\t\t%if %sysfunc(fexist(joburi)) %then %do;\n\n\t\t\tlibname joburi json;\n\t\t\t/*\n\t\t\t\tCheck that the items table exists.\n\t\t\t*/\n\t\t\t%if %sysfunc(exist(joburi.items)) %then %do;\n\t\t\t\t/*\n\t\t\t\t\tSelect the jobs for which the URI exists.\n\n\t\t\t\t\tNote that for this to work, the columns 'name' and 'jobDefinitionUri' need\n\t\t\t\t\tto exist in the response file!\n\t\t\t\t*/\n\t\t\t\t%let ds = %sysfunc(open(joburi.items));\n\t\t\t\t%if &ds > 0 %then %do;\n\t\t\t\t\t%let dsName = %sysfunc(varnum(&ds, name));\n\t\t\t\t\t%let dsUri = %sysfunc(varnum(&ds, jobDefinitionUri));\n\t\t\t\t\t%let rc = %sysfunc(close(&ds));\n\n\t\t\t\t\t%if &dsName and &dsUri %then %do;\n\t\t\t\t\t\tdata job_uri;\n\t\t\t\t\t\tset joburi.items\n\t\t\t\t\t\t(\n\t\t\t\t\t\t\tkeep = \n\t\t\t\t\t\t\t\tname \n\t\t\t\t\t\t\t\tjobDefinitionUri \n\t\t\t\t\t\t\twhere = \n\t\t\t\t\t\t\t\t(\n\t\t\t\t\t\t\t\t\tjobDefinitionUri ~= ''\n\t\t\t\t\t\t\t\t)\n\t\t\t\t\t\t);\n\t\t\t\t\t\trun;\n\t\t\t\t\t\tlibname joburi;\n\t\t\t\t\t%end;\n\t\t\t\t\t%else %do;\n\t\t\t\t\t\t%put ERROR: Importants columns do not exist in the response file. Aborting process.;\n\t\t\t\t\t\t%abort exit;\n\t\t\t\t\t%end;\n\t\t\t\t%end;\n\t\t\t\t%else %do;\n\t\t\t\t\t%put ERROR: Unable to open the 'items' array. Aborting process.;\n\t\t\t\t\t%abort exit;\n\t\t\t\t%end;\t\t\t\t\t\n\t\t\t%end;\n\t\t\t%else %do;\n\t\t\t\t%put ERROR: The items table in the response file does not exist. Aborting process.;\n\t\t\t\t%abort exit;\n\t\t\t%end;\n\t\t%end;\n\t\t%else %do;\n\t\t\t%put ERROR: The response file does not exist. Aborting process.;\n\t\t\t%abort exit;\n\t\t%end;\n\n\t\t/*\n\t\t\tAssign the job uri to the 'job_uri' macro variable.\n\t\t*/\n\t\t%if %sysfunc(exist(work.job_uri)) %then %do;\n\t\t\t/*\n\t\t\t\tCount the number of jobs returned.\n\n\t\t\t\tOnly continue in case the number equals to 1.\n\t\t\t*/\n\t\t\tdata _null_;\n\t\t\t\tif 0 then set work.job_uri nobs=rows;\n\t\t\t\tcall symputx('nbr_job_rows', rows);\n\t\t\trun;\n\n\t\t\t/*\n\t\t\t\tAbort in case the number of returned values isn't exactly one.\n\t\t\t*/\n\t\t\t%if &nbr_job_rows NE 1 %then %do;\n\t\t\t\t%put ERROR: &nbr_job_rows jobs encountered with name &job_name. Aborting process.;\n\t\t\t\t%abort exit;\n\t\t\t%end;\n\t\t\t%else %do;\n\t\t\t\tdata _null_;\n\t\t\t\t\tset\n\t\t\t\t\t\twork.job_uri;\n\t\t\t\t\tcall symputx(\"job_uri\", kstrip(jobDefinitionUri));\n\t\t\t\trun;\t\t\n\t\t\t%end;\n\t\t%end;\n\t\t%else %do;\n\t\t\t%put ERROR: job_uri table does not exist. Aborting process.;\n\t\t\t%abort exit;\n\t\t%end;\n\t%end;\n\t%else %do;\n\t\t%put ERROR: No deployed object name provided. Aborting process.;\n\t\t%abort exit;\n\t%end;\n%mend;\n%get_job_uri(\n\tjob_name = &_jobname\n);\n\n/*\n\tOnly start the process in case the parameter table contains more than 0 rows.\n*/\n%if &nbr_parameter_rows eq 0 %then %do;\n\t%put ERROR: Not enough parameters (&nbr_parameter_rows) to run the loop step. Aborting process.;\n\t%abort exit;\n%end;\n%else %do;\n\t/*\n\t\tThe number of rows in the input table check out. Continue to run the process.\n\t*/\n\tdata _null_; \n\t\tcall symputx('viyaHost', kreverse(ksubstr(ksubstr(kreverse(\"&_BASEURL.\"), 2), kfind(ksubstr(kreverse(\"&_BASEURL.\"), 2), '/') + 1)));\n\trun;\n\n\t/*\n\t\tThe following macro is responsible for creating the JSON request for the job\n\t\tand the given set parameters/macro variables.\n\t*/\n\t%macro createJsonRequest(\n\t\trow=\t\n\t);\n\t\t/*\n\t\t\tthe value assigned to the macro variable 'process_name' is used to make the process visible in environment manager.\n\t\t*/\n\t\t%let process_name = Running '&_jobname' for input table row &row;\n\t\tdata _null_;\n\t\t\tlength \n\t\t\t\tparameter $32767\n\t\t\t\tvalue $32767;\n\n\t\t\t/*\n\t\t\t\tOpen the custom step input dataset.\n\t\t\t*/\n\t\t\tdsid = open(\"&_input_parameters\");\n\n\t\t\t/*\n\t\t\t\tNote that the request file reference is defined out-side of this macro!\n\t\t\t*/\t\n\t\t\tif (dsid > 0) then do;\n\t\t\t\tfile request;\n\t\t\t\tput '{';\n\t\t\t\tput '\"name\": ' \"\"\"&process_name\"\"\" ',';\n\t\t\t\tput '\"jobDefinitionUri\": ' \"\"\"&job_uri\"\"\" ',';\n\t\t\t\tput '\"arguments\": {';\n\t\t\t\n\t\t\t\t/*\n\t\t\t\t\tFetch the 'row'-th record.\n\t\t\t\t*/\n\t\t\t\trc = fetchobs(dsid, &row);\n\n\t\t\t\t/*\n\t\t\t\t\tRetrieve the number of columns.\n\t\t\t\t*/\n\t\t\t\tcolumns = attrn(dsid,'nvars');\n\t\t\t\t\n\t\t\t\t/*\n\t\t\t\t\tFor each column and value pair create an argument entry in the JSON file.\n\t\t\t\t*/\n\t\t\t\tdo index = 1 to columns;\n\t\t\t\t\t/*\n\t\t\t\t\t\tGet the column name and the column type.\n\t\t\t\t\t*/\n\t\t\t\t\tvname = kstrip(varname(dsid, index));\n\t\t\t\t\tvtype = vartype(dsid, index);\n\t\t\t\t\n\t\t\t\t\t/*\n\t\t\t\t\t\tDepending on data type, handle the value appropriately.\n\t\t\t\t\t*/\n\t\t\t\t\tif (vtype~='C') then do;\t\n\t\t\t\t\t\t/*\n\t\t\t\t\t\t\tNumeric values.\n\t\t\t\t\t\t*/\n\t\t\t\t\t\tvalue = put(getvarn(dsid, index), best32.);\n\t\t\t\t\t\tif index < columns then do;\n\t\t\t\t\t\t\tparameter = '\"' || kstrip(vname) || '\": \"' || kstrip(value) || '\",';\n\t\t\t\t\t\t\tput parameter;\n\t\t\t\t\t\tend;\n\t\t\t\t\t\telse do;\n\t\t\t\t\t\t\tparameter = '\"' || kstrip(vname) || '\": \"' || kstrip(value) || '\"';\n\t\t\t\t\t\t\tput parameter;\n\t\t\t\t\t\tend;\n\t\t\t\t\tend;\n\t\t\t\t\telse do;\n\t\t\t\t\t\t/*\n\t\t\t\t\t\t\tCharacter values.\n\t\t\t\t\t\t*/\n\t\t\t\t\t\tvalue = getvarc(dsid, index);\n\n\t\t\t\t\t\tif index < columns then do;\n\t\t\t\t\t\t\tparameter = '\"' || kstrip(vname) || '\": \"' || kstrip(value) || '\",';\n\t\t\t\t\t\t\tput parameter;\n\t\t\t\t\t\tend;\t\t\t\t\n\t\t\t\t\t\telse do;\n\t\t\t\t\t\t\tparameter = '\"' || kstrip(vname) || '\": \"' || kstrip(value) || '\"';\n\t\t\t\t\t\t\tput parameter;\n\t\t\t\t\t\tend;\n\t\t\t\t\tend;\n\t\t\t\tend;\n\t\t\t\tput '}';\n\t\t\t\tput '}';\n\t\t\tend;\n\t\t\telse do;\n\t\t\t\tput \"ERROR%QUOTE(:) Input table could not be opened.\";\n\t\t\tend;\n\t\t\tdsid = close(dsid);\n\t\trun;\n\t%mend;\n\n\t/*\n\t\tThe following macro is responsible for creating and assigning the input columns, and their properties, to the global macro variable 'column_line', to be \n\t\tused in the step that creates the output table.\n\t*/\n\t%let columns_line=;\n\t%macro addColumns(\n\t\tds=\n\t);\n\t\t/*\n\t\t\tTry to open the dataset.\n\t\t*/\n\t\t%let dsid = %sysfunc(open(&ds, i));\n\t\t%if (&dsid > 0) %then %do;\n\n\t\t\t/*\n\t\t\t\tRetrieve the number of columns.\n\t\t\t*/\n\t\t\t%let columns = %sysfunc(attrn(&dsid, nvars));\n\n\t\t\t/*\n\t\t\t\tFor each column:\n\t\t\t*/\n\t\t\t%do i = 1 %to &columns;\n\t\t\t\t/*\n\t\t\t\t\tRetrieve the name, type, length and format.\n\t\t\t\t*/\n\t\t\t\t%let vname = %sysfunc(varname(&dsid, &i));\n\t\t\t\t%let vtype = %sysfunc(vartype(&dsid, &i));\n\t\t\t\t%let vlength = %sysfunc(varlen(&dsid, &i));\n\t\t\t\t%let vformat = %sysfunc(varfmt(&dsid, &i));\n\t\n\t\t\t\t/*\n\t\t\t\t\tBased on the column type and the existance of a format, create the column definition.\n\t\t\t\t*/\n\t\t\t\t%if &vtype EQ C %then %do;\n\t\t\t\t\t%if &vformat eq %then %do;\n\t\t\t\t\t\t%let columns_line = %str(&columns_line attrib &vname length=$&vlength;);\n\t\t\t\t\t%end;\n\t\t\t\t\t%else %do;\n\t\t\t\t\t\t%let columns_line = %str(&columns_line attrib &vname length=$&vlength format=&vformat;);\n\t\t\t\t\t%end;\n\t\t\t\t%end;\n\t\t\t\t%else %do;\n\t\t\t\t\t%if &vformat eq %then %do;\n\t\t\t\t\t\t%let columns_line = %str(&columns_line attrib &vname length=&vlength;);\n\t\t\t\t\t%end;\n\t\t\t\t\t%else %do;\n\t\t\t\t\t\t%let columns_line = %str(&columns_line attrib &vname length=&vlength format=&vformat;);\n\t\t\t\t\t%end;\n\t\t\t\t%end;\t\n\t\t\t%end;\t\n\t\t\t%let dsid = %sysfunc(close(&dsid));\n\t\t%end;\n\t\t%else %do;\n\t\t\t/*\n\t\t\t\tThe table could not be opened. Specify the error message.\n\t\t\t*/\n\t\t\t%put %sysfunc(sysmsg());\n\t\t\t%let dsid = %sysfunc(close(&dsid));\n\t\t%end;\n\t%mend;\n\t%addColumns(\n\t\tds=&_input_parameters\n\t);\n\n\t/*\n\t\tCreate the status tables that will hold all execution statuses for all executed job requests.\n\t\tBased on this table you can create a polling mechanism to validate that each process has finished successfully.\n\t*/\n\tdata &_output;\n\t\tattrib id length=$36.;\n\t\tattrib job_name length=$128.;\n\t\t/*\n\t\t\tAdd input parameters/variables to the output table.\n\t\t\t\n\t\t\tAssigning the value to the macro variable, 'column_line', is performed by the 'addColumns' macro.\n\t\t*/\n\t\t&columns_line;\n\t\tattrib state length=$32.;\n\t\tattrib start_dttm length=8 format=datetime22.3;\n\t\tattrib end_dttm length=8 format=datetime22.3;\n\t\tstop;\n\trun;\n\n\t%macro addColumnValues(\n\t\tds=,\n\t\trow=\n\t);\n\n\t\t%let values_line=;\n\t\t%let dsid = %sysfunc(open(&ds, i));\n\t\t%if (&dsid > 0) %then %do;\n\n\t\t\t%let columns = %sysfunc(attrn(&dsid, nvars));\n\t\t\t%let rc = %sysfunc(fetchobs(&dsid, &row));\n\n\t\t\t%do i = 1 %to &columns;\n\t\t\t\t%let vname = %sysfunc(varname(&dsid, &i));\n\t\t\t\t%let vtype = %sysfunc(vartype(&dsid, &i));\n\n\t\t\t\t%if &vtype EQ C %then %do;\n\t\t\t\t\t%let value = %BQUOTE(%sysfunc(getvarc(&dsid, &i)));\n\t\t\t\t\t%let values_line = %str(&values_line &vname=%str(%\")&value%str(%\"););\n\t\t\t\t%end;\n\t\t\t\t%else %do;\n\t\t\t\t\t%let value = %sysfunc(getvarn(&dsid, &i));\n\t\t\t\t\t%let values_line = %str(&values_line &vname=&value;);\n\t\t\t\t%end;\n\t\t\t%end;\n\t\t\t/*\n\t\t\t\tAssign the generated line to the macro variable so that it can be used in a data step.\n\t\t\t*/\n\t\t\tdata _null_;\n\t\t\t\tcall symputx('columnValues_line', strip(\"&Values_line\"));\n\t\t\trun;\n\t\t\t%let dsid = %sysfunc(close(&dsid));\n\t\t%end;\n\t\t%else %do;\n\t\t\t%put %sysfunc(sysmsg());\n\t\t\t%let dsid = %sysfunc(close(&dsid));\n\t\t%end;\n\t%mend;\t\n\n\t%macro exec_job\n\t(\n\t\tjob_uri=,\n\t\trow=\n\t);\n\n\t\t%if %unquote(%str(%')&job_uri%str(%')) ne '' %then %do;\n\n\t\t\t/*\n\t\t\t\tIn case the user asked for a delayed execution...\n\t\t\t*/\t\t\n\t\t\t%if &_delayed eq 1 %then %do;\n\t\t\t\t%put NOTE: Delayed executing, waiting for &_delayedseconds seconds.;\n\t\t\t\tdata _null_;\n\t\t\t\t\tsleeping = sleep(&_delayedseconds, 1);\n\t\t\t\trun;\n\t\t\t%end;\n\n\t\t\t/*\n\t\t\t\tCreate the JSON request file.\n\t\t\t*/\n\t\t\t%let process_name = Flow instance for parameter table row &row;\n\t\t\tfilename request temp;\n\t\t\t\n\t\t\t/*\n\t\t\t\tCreate the JSON request.\n\t\t\t\t\n\t\t\t\tThe file reference 'request' is used to create the resulting JSON file.\n\t\t\t*/\n\t\t\t%createJSONrequest(\n\t\t\t\trow=&row\n\t\t\t);\n\n\t\t\t/*\n\t\t\t\tSetup and execute the HTTP request.\n\t\t\t*/\n\t\t\tfilename response temp;\n\t\t\tproc http\n\t\t\t\turl = \"&viyahost./jobExecution/jobs?submitter=workflow\"\n\t\t\t\tin = request\n\t\t\t\tout= response\n\t\t\t\tmethod='post'\n\t\t\t\t\toauth_bearer = sas_services;\n\t\t\t\theaders\n\t\t\t\t\t'Content-Type'='application/vnd.sas.job.execution.job.request+json'\n\t\t\t\t\t'Accept'='application/vnd.sas.job.execution.job+json';\n\t\t\trun;\n\n\t\t\t%if %sysfunc(fexist(response)) %then %do;\n\n\t\t\t\t%let columnValues_line=;\n\t\t\t\t%addColumnValues(\n\t\t\t\t\tds=&_input_parameters,\n\t\t\t\t\trow=&row\n\t\t\t\t);\n\n\t\t\t\t/*\n\t\t\t\t\tAdd the state, should be 'running', of the current job to the execute_status table.\n\t\t\t\t\n\t\t\t\t\tNote that the column specification can differ. This will cause warnings during the PROC APPEND step.\n\t\t\t\t\tThat is why the resulting table firstly needs to be created based on the internally defined status table.\n\t\t\t\t*/\n\t\t\t\tlibname resp json fileref=response;\n\t\t\t\tdata work.root(drop=creationTimeStamp);\n\t\t\t\t\tattrib job_name length=$128.;\n\t\t\t\t\tattrib end_dttm length=8 format=datetime22.3;\t\n\t\t\t\t\tattrib start_dttm length=8 format=datetime22.3;\n\t\t\t\t\tset \n\t\t\t\t\t\t&_output(obs=0)\n\t\t\t\t\t\tresp.root(keep=id state creationTimeStamp);\n\n\t\t\t\t\tstart_dttm = input(creationTimeStamp, E8601DZ.);\n\t\t\t\t\tjob_name = \"&process_name\";\n\t\t\t\t\t/*\n\t\t\t\t\t\tAdd the current input parameters to the output/status table.\n\t\t\t\t\t*/\n\t\t\t\t\t&columnValues_line;\n\t\t\t\trun;\n\t\t\t\tproc append data=work.root base=&_output force; run;\n\t\t\t\tlibname resp;\n\t\t\t%end;\n\t\t\t%else %do;\n\t\t\t\t%put ERROR: The response file does not exist. Aborting process.;\n\t\t\t\t%abort exit;\n\t\t\t%end;\n\t\t%end;\n\t\t%else %do;\n\t\t\t%put ERROR: No URI provided. Aborting process.;\n\t\t\t%abort exit;\n\t\t%end;\n\t%mend;\n\n\t%macro update_status\n\t(\n\t\tId=\n\t);\n\t\t/*\n\t\t\tThis process only makes sense when the execute_status table exists.\n\t\t*/\n\t\t%if %sysfunc(exist(&_output, data)) %then %do;\n\t\t\t/*\n\t\t\t\tSetup and execute the HTTP request.\n\t\t\t*/\n\t\t\tfilename response temp;\n\t\t\tproc http\n\t\t\t\turl=\"&viyahost./jobExecution/jobs/&Id\"\n\t\t\t\tout=response\n\t\t\t\tmethod=\"get\"\n\t\t\t\t\toauth_bearer = sas_services;\n\t\t\t\theaders\n\t\t\t\t\t'Accept'='application/json';\n\t\t\trun;\n\n\t\t\t%if %sysfunc(fexist(response)) %then %do;\n\t\t\t\tlibname status json fileref=response;\n\n\t\t\t\t/*\n\t\t\t\t\tUpdate the output table with the values from the REST API.\n\t\t\t\t\tNote that the root table will always contain one row.\n\t\t\t\t*/\n\t\t\t\tproc sql noprint;\n\t\t\t\t\tupdate \n\t\t\t\t\t\t&_output\n\t\t\t\t\tset \t\n\t\t\t\t\t\tstate = \n\t\t\t\t\t\t\t(\n\t\t\t\t\t\t\t\tselect \n\t\t\t\t\t\t\t\t\tkstrip(state) \n\t\t\t\t\t\t\t\tfrom \n\t\t\t\t\t\t\t\t\tstatus.root\n\t\t\t\t\t\t\t),\n\t\t\t\t\t\tend_dttm = \n\t\t\t\t\t\t\t(\n\t\t\t\t\t\t\t\tselect \n\t\t\t\t\t\t\t\t\tinput(modifiedTimeStamp, E8601DZ.)\n\t\t\t\t\t\t\t\tfrom\n\t\t\t\t\t\t\t\t\tstatus.root\n\t\t\t\t\t\t\t)\n\t\t\t\t\twhere\n\t\t\t\t\t\tid = \"&id\";\n\t\t\t\tquit;\n\t\t\t\tlibname status clear;\n\t\t\t%end;\n\t\t\t%else %do;\n\t\t\t\t/*\n\t\t\t\t\tNo abort, might be a hick-up...\n\t\t\t\t*/\n\t\t\t\t%put ERROR: Unable to update the status table. No response file encountered.;\n\t\t\t%end;\n\t\t%end;\n\t\t%else %do;\n\t\t\t%put ERROR: The status table, &_output, does not exist.;\n\t\t\t%abort exit;\n\t\t%end;\n\t%mend;\n\n\t/*\n\t\tThe following macro is responsible for:\n\t\t- Maintaining the job waiting queue.\n\t\t- Executing the jobs\n\t\t- Updating the status table.\n\t*/\n\t%macro exec_jobs;\n\t\t%put NOTE: Initial number of jobs in queue: &nbr_parameter_rows.;\n\n\t\t/*\n\t\t\tInitialize macro variables.\n\t\t*/\n\t\t%let index = 1;\n\t\t%do %while (&index le &nbr_parameter_rows);\n\t\t\t/*\n\t\t\t\tCheck for the number running jobs, to be less or equal to\n\t\t\t\tthe number of concurrent jobs.\n\n\t\t\t\tNote that in case that the _output table is empty, the macro variable 'running_jobs' is never reset. \n\t\t\t\tThat is the reason why the macro variable is initialized to 0 each time the following code is run.\n\t\t\t*/\n\t\t\t%let running_jobs = 0;\n\t\t\tdata _null_;\n\t\t\t\tset \n\t\t\t\t\t&_output(\n\t\t\t\t\t\twhere=(kupcase(state)='RUNNING')\n\t\t\t\t\t) end=last;\n\t\t\t\tif last then call symput(\"running_jobs\", _n_);\n\t\t\trun;\n\t\t\t%put NOTE: Number of running jobs: &running_jobs;\n\t\t\t\n\t\t\t/*\n\t\t\t\tIf the number of running jobs is less then the amount of jobs allowed to run concurrenly,\n\t\t\t\tstart a job and register it in the status table.\n\t\t\t*/\n\t\t\t%if (&running_jobs lt &concurrent_jobs) %then %do;\n\t\t\t\t%put NOTE: Adding new job from the queue;\n\t\t\t\t%exec_job(\n\t\t\t\t\tjob_uri = &job_uri,\n\t\t\t\t\trow = &index\n\t\t\t\t);\n\t\t\t\t/*\n\t\t\t\t\tincrement the index parameter, ready to start the next job. \n\t\t\t\t*/\n\t\t\t\t%let index = %eval(&index + 1);\t\n\t\t\t%end;\n\n\t\t\t/*\n\t\t\t\tOnly update the status of jobs with state 'running'.\n\t\t\t*/\n\t\t\tdata _null_;\n\t\t\t\tset\n\t\t\t\t\t&_output(where=(kupcase(state)='RUNNING'));\n\n\t\t\t\tcall execute(\n\t\t\t\t\tcats(\n\t\t\t\t\t\t'%update_status(Id= %STR(',\n\t\t\t\t\t\tkSTRIP(id),\n\t\t\t\t\t\t'))'\n\t\t\t\t\t)\n\t\t\t\t);\n\t\t\trun;\n\t\t\t/*\n\t\t\t\tWait 1/2 second for the next iteration.\n\n\t\t\t\tThis will keep the log file size down.\n\t\t\t*/\n\t\t\tdata _null_;\n\t\t\t\tslept = sleep(.50, 1);\n\t\t\trun;\n\t\t%end;\n\t%mend;\n\t%exec_jobs;\t\n%end;\n\n/*\n\tThe folllwing section is there to wait for the processes, just started, to be finished before continuing\n\twith the next (custom) step.\n*/\n%if &wait_for_processes %then %do;\n\n\t%macro check_for_completion;\n\t\t/*\n\t\t\tUpdate for all processes the status and determain if all processes have finished.\n\t\t*/\n\t\t%if %sysfunc(exist(&_output, data)) %then %do;\n\t\t\t%let still_running = -1;\n\t\t\t%let wait_cycles = 0;\n\n\t\t\t%do %while (&still_running ne 0);\n\t\t\t\t/*\n\t\t\t\t\tUpdate the status for all running jobs.\n\t\t\t\t*/\n\t\t\t\tdata _null_;\n\t\t\t\t\tset\n\t\t\t\t\t\t&_output\n\t\t\t\t\t\t(\n\t\t\t\t\t\t\twhere=(kupcase(state) = 'RUNNING')\n\t\t\t\t\t\t);\n\n\t\t\t\t\t\tcall execute\n\t\t\t\t\t\t(\n\t\t\t\t\t\t\tcats\n\t\t\t\t\t\t\t(\n\t\t\t\t\t\t\t\t'%update_status(Id= %STR(',\n\t\t\t\t\t\t\t\tkSTRIP(id),\n\t\t\t\t\t\t\t\t'))'\n\t\t\t\t\t\t\t)\n\t\t\t\t\t\t);\n\t\t\t\trun;\n\t\t\n\t\t\t\t/*\n\t\t\t\t\tCount the number of jobs that are running.\n\t\t\t\t*/\n\t\t\t\tproc sql noprint;\n\t\t\t\t\tselect \n\t\t\t\t\t\tcount(*) into :still_running \n\t\t\t\t\tfrom \n\t\t\t\t\t\t&_output \n\t\t\t\t\twhere \n\t\t\t\t\t\tkupcase(state) = 'RUNNING';\n\t\t\t\tquit;\n\t\t\t\t%put NOTE: Number of jobs that are running: &still_running;\n\t\t\n\t\t\t\t/*\n\t\t\t\t\tSleep for one second before checking for completion again.\n\t\t\t\t*/\n\t\t\t\tdata _null_;\n\t\t\t\t\tslept = sleep(1, 1);\n\t\t\t\trun;\n\t\t\t\t%let wait_cycles = %eval(&wait_cycles + 1);\n\t\t\t%end;\n\t\t\t%put NOTE: Total number of wait cycles of 1 second: &wait_cycles;\n\t\t%end;\n\t\t%else %do;\n\t\t\t%put ERROR: The table status table, &_output, does not exist.;\n\t\t\t%abort exit;\n\t\t%end;\n\t%mend;\n\t%check_for_completion;\n\n\t/*\n\t\tAborting in case of one or more processes failed only makes sense \n\t\tif the custom step is set to wait for all processes to finish.\n\t*/\n\t%macro abort_on_failure;\n\t\t%let exception = 0;\n\t\t%if &abort_on_exception eq 1 %then %do;\n\t\t\tproc sql noprint;\n\t\t\t\tselect \n\t\t\t\t\tcount(*) into :exception trimmed\n\t\t\t\tfrom \n\t\t\t\t\t&_output \n\t\t\t\twhere \n\t\t\t\t\tkupcase(state) ne 'COMPLETED';\n\t\t\tquit;\n\t\t\t\t\n\t\t\t%if &exception ne 0 %then %do;\n\t\t\t\t%put ERROR: &exception exception(s) encountered. Aborting process.;\n\t\t\t\t%abort exit;\n\t\t\t%end;\n\t\t\t%else %do;\n\t\t\t\t%put NOTE: No exceptions encountered. Continuing with the rest of the flow.;\n\t\t\t%end;\n\t\t%end;\n\t%mend;\n\t%abort_on_failure;\n%end;\n\n/*\n\tRemove all macro variables from memory.\n*/\n%symdel nbr_job_rows /NOWARN;\n%symdel nbr_parameter_rows /NOWARN;\n%symdel nbr_rows /NOWARN;\n%symdel job_uri /NOWARN;\n%symdel columns_line /NOWARN;\n/*\n\tRemove all macros from memory.\n*/\n%SYSMACDELETE get_job_uri / NOWARN;\n%SYSMACDELETE createJsonRequest / NOWARN;\n%SYSMACDELETE addColumns / NOWARN;\n%SYSMACDELETE addColumnValues / NOWARN;\n%SYSMACDELETE exec_job / NOWARN;\n%SYSMACDELETE update_status / NOWARN;\n%SYSMACDELETE exec_jobs / NOWARN;\n%SYSMACDELETE check_for_completion / NOWARN;\n%SYSMACDELETE abort_on_failure / NOWARN;"},"eTag":"W/\"1759074852220647000\""} \ No newline at end of file diff --git a/Loop Deployed Object/README.md b/Loop Deployed Object/README.md index b1eb67ce..90c1e84a 100644 --- a/Loop Deployed Object/README.md +++ b/Loop Deployed Object/README.md @@ -102,6 +102,7 @@ Note that: ## Change Log +Version 1.96 (28SEP2025): Removed a small issue. Version 1.95 (17JUN2025): Hard exit in case crucial components are missing. diff --git a/Loop Deployed Object/img/UI_description.PNG b/Loop Deployed Object/img/UI_description.PNG index b53e692e..8eb28f04 100644 Binary files a/Loop Deployed Object/img/UI_description.PNG and b/Loop Deployed Object/img/UI_description.PNG differ