@@ -1572,7 +1572,8 @@ static int track_schedule_info (std::shared_ptr<resource_ctx_t> &ctx,
15721572}
15731573
15741574static int parse_R (std::shared_ptr<resource_ctx_t > &ctx, const char *R,
1575- std::string &jgf, int64_t &starttime, uint64_t &duration)
1575+ std::string &R_graph_fmt, int64_t &starttime, uint64_t &duration,
1576+ std::string &format)
15761577{
15771578 int rc = 0 ;
15781579 int version = 0 ;
@@ -1581,6 +1582,8 @@ static int parse_R (std::shared_ptr<resource_ctx_t> &ctx, const char *R,
15811582 uint64_t et = 0 ;
15821583 json_t *o = NULL ;
15831584 json_t *graph = NULL ;
1585+ json_t *R_lite = NULL ;
1586+ json_t *nodelist = NULL ;
15841587 json_error_t error;
15851588 char *jgf_str = NULL ;
15861589
@@ -1590,9 +1593,11 @@ static int parse_R (std::shared_ptr<resource_ctx_t> &ctx, const char *R,
15901593 errno = EINVAL;
15911594 goto out;
15921595 }
1593- if ( (rc = json_unpack (o, " {s:i s:{s:I s:I} s?:o}" ,
1596+ if ( (rc = json_unpack (o, " {s:i s:{s:o s:o s: I s:I} s?:o}" ,
15941597 " version" , &version,
15951598 " execution" ,
1599+ " R_lite" , &R_lite,
1600+ " nodelist" , &nodelist,
15961601 " starttime" , &st,
15971602 " expiration" , &et,
15981603 " scheduling" , &graph)) < 0 ) {
@@ -1609,20 +1614,22 @@ static int parse_R (std::shared_ptr<resource_ctx_t> &ctx, const char *R,
16091614 static_cast <intmax_t > (st), static_cast <intmax_t > (et));
16101615 goto freemem_out;
16111616 }
1612- if (graph == NULL ) {
1613- rc = -1 ;
1614- errno = ENOENT;
1615- flux_log (ctx->h , LOG_ERR, " %s: no scheduling key in R" , __FUNCTION__);
1616- goto freemem_out;
1617- }
1618- if ( !(jgf_str = json_dumps (graph, JSON_INDENT (0 )))) {
1619- rc = -1 ;
1620- errno = ENOMEM;
1621- flux_log (ctx->h , LOG_ERR, " %s: json_dumps" , __FUNCTION__);
1622- goto freemem_out;
1617+ if (graph != NULL ) {
1618+ if ( !(jgf_str = json_dumps (graph, JSON_INDENT (0 )))) {
1619+ rc = -1 ;
1620+ errno = ENOMEM;
1621+ flux_log (ctx->h , LOG_ERR, " %s: json_dumps" , __FUNCTION__);
1622+ goto freemem_out;
1623+ }
1624+ R_graph_fmt = jgf_str;
1625+ free (jgf_str);
1626+ format = " jgf" ;
1627+ } else {
1628+ // Use the rv1exec reader
1629+ R_graph_fmt = R;
1630+ format = " rv1exec" ;
16231631 }
1624- jgf = jgf_str;
1625- free (jgf_str);
1632+
16261633 starttime = static_cast <int64_t > (st);
16271634 duration = et - st;
16281635
@@ -1714,18 +1721,33 @@ static int run (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
17141721}
17151722
17161723static int run (std::shared_ptr<resource_ctx_t > &ctx, int64_t jobid,
1717- const std::string &jgf, int64_t at, uint64_t duration)
1724+ const std::string &R, int64_t at, uint64_t duration,
1725+ std::string &format)
17181726{
17191727 int rc = 0 ;
17201728 dfu_traverser_t &tr = *(ctx->traverser );
17211729 std::shared_ptr<resource_reader_base_t > rd;
1722- if ((rd = create_resource_reader (" jgf" )) == nullptr ) {
1730+ if (format == " jgf" ) {
1731+ if ((rd = create_resource_reader (" jgf" )) == nullptr ) {
1732+ rc = -1 ;
1733+ flux_log (ctx->h , LOG_ERR, " %s: create JGF reader (id=%jd)" ,
1734+ __FUNCTION__, static_cast <intmax_t > (jobid));
1735+ goto out;
1736+ }
1737+ } else if (format == " rv1exec" ) {
1738+ if ((rd = create_resource_reader (" rv1exec" )) == nullptr ) {
1739+ rc = -1 ;
1740+ flux_log (ctx->h , LOG_ERR, " %s: create rv1exec reader (id=%jd)" ,
1741+ __FUNCTION__, static_cast <intmax_t > (jobid));
1742+ goto out;
1743+ }
1744+ } else {
17231745 rc = -1 ;
1724- flux_log (ctx->h , LOG_ERR, " %s: create_resource_reader (id=%jd)" ,
1725- __FUNCTION__, static_cast <intmax_t > (jobid));
1746+ flux_log (ctx->h , LOG_ERR, " %s: create rv1exec reader (id=%jd)" ,
1747+ __FUNCTION__, static_cast <intmax_t > (jobid));
17261748 goto out;
17271749 }
1728- if ((rc = tr.run (jgf , ctx->writers , rd, jobid, at, duration)) < 0 ) {
1750+ if ((rc = tr.run (R , ctx->writers , rd, jobid, at, duration)) < 0 ) {
17291751 flux_log (ctx->h , LOG_ERR, " %s: dfu_traverser_t::run (id=%jd): %s" ,
17301752 __FUNCTION__, static_cast <intmax_t > (jobid),
17311753 ctx->traverser ->err_message ().c_str ());
@@ -1795,15 +1817,15 @@ static int run_update (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
17951817 uint64_t duration = 0 ;
17961818 std::chrono::time_point<std::chrono::system_clock> start;
17971819 std::chrono::duration<double > elapsed;
1798- std::string jgf ;
1799- std::string R2 ;
1820+ std::string R_graph_fmt ;
1821+ std::string format ;
18001822
18011823 start = std::chrono::system_clock::now ();
1802- if ( (rc = parse_R (ctx, R, jgf , at, duration)) < 0 ) {
1824+ if ( (rc = parse_R (ctx, R, R_graph_fmt , at, duration, format )) < 0 ) {
18031825 flux_log_error (ctx->h , " %s: parsing R" , __FUNCTION__);
18041826 goto done;
18051827 }
1806- if ( (rc = run (ctx, jobid, jgf , at, duration)) < 0 ) {
1828+ if ( (rc = run (ctx, jobid, R_graph_fmt , at, duration, format )) < 0 ) {
18071829 flux_log_error (ctx->h , " %s: run" , __FUNCTION__);
18081830 goto done;
18091831 }
0 commit comments