From 328691d0c930697b813f08bdb98917f231dd78d4 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Fri, 11 Jul 2025 12:32:28 -0400 Subject: [PATCH] Co-execution client Google Cloud Platform Batch v1. --- dev-requirements.txt | 1 + docs/containers.rst | 158 ++++++++++ docs/galaxy_conf.rst | 7 +- docs/gen_erd_diagrams.py | 16 + docs/job_destination_parameters_gcp.png | Bin 0 -> 26025 bytes docs/job_managers.rst | 2 +- ...ar_gcp_coexecution_deployment.plantuml.svg | 170 ++++++++++ ...ar_gcp_coexecution_deployment.plantuml.txt | 48 +++ docs/pulsar_gcp_deployment.plantuml.svg | 150 +++++++++ docs/pulsar_gcp_deployment.plantuml.txt | 39 +++ pulsar/client/client.py | 191 +++++++++++- pulsar/client/container_job_config.py | 292 ++++++++++++++++++ pulsar/client/manager.py | 8 +- pulsar/managers/util/gcp_util.py | 80 +++++ pulsar/managers/util/tes.py | 76 +---- requirements.txt | 1 + test/container_job_config_test.py | 36 +++ test/tes_test.py | 19 +- 18 files changed, 1193 insertions(+), 101 deletions(-) create mode 100644 docs/gen_erd_diagrams.py create mode 100644 docs/job_destination_parameters_gcp.png create mode 100644 docs/pulsar_gcp_coexecution_deployment.plantuml.svg create mode 100644 docs/pulsar_gcp_coexecution_deployment.plantuml.txt create mode 100644 docs/pulsar_gcp_deployment.plantuml.svg create mode 100644 docs/pulsar_gcp_deployment.plantuml.txt create mode 100644 pulsar/client/container_job_config.py create mode 100644 pulsar/managers/util/gcp_util.py create mode 100644 test/container_job_config_test.py diff --git a/dev-requirements.txt b/dev-requirements.txt index b781363d..e209d613 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -31,6 +31,7 @@ types-pycurl types-requests types-psutil sentry-sdk +types-google-cloud-ndb # For release build diff --git a/docs/containers.rst b/docs/containers.rst index 01d639df..570163ba 100644 --- a/docs/containers.rst +++ b/docs/containers.rst @@ -132,6 +132,164 @@ GA4GH TES GA4GH TES job execution with Conda dependencies for the tool and no message queue. +A Galaxy job configuration (job_conf.yml) for using TES with Pulsar and RabbitMQ might look like: + +:: + + runners: + local: + load: galaxy.jobs.runners.local:LocalJobRunner + pulsar_tes: + load: galaxy.jobs.runners.pulsar:PulsarTesJobRunner + # RabbitMQ URL from Galaxy server. + amqp_url: + # If Pulsar needs to talk to Galaxy at a particular host and port, set that here. + #galaxy_url: + + execution: + default: pulsar_tes + environments: + local: + runner: local + local_slots: 1 + pulsar_tes: + runner: pulsar_tes + # TES URL to use. + tes_url: + pulsar_app_config: + # This needs to be the RabbitMQ server, but this should be the host + # and port that your TES nodes would connect to the server via. + message_queue_url: + + tools: + - class: local + environment: local + +For testing on a Macbook with RabbitMQ installed via homebrew and Docker Desktop available +and a Funnel with default configuration server running locally, a configuration might look like: + +:: + + runners: + local: + load: galaxy.jobs.runners.local:LocalJobRunner + pulsar_tes: + load: galaxy.jobs.runners.pulsar:PulsarTesJobRunner + # RabbitMQ URL from Galaxy server. + amqp_url: amqp://guest:guest@localhost:5672// + # Communicate to Pulsar nodes that Galaxy should be accessed on the Docker + # host - the Macbook. + galaxy_url: http://host.docker.internal:8080/ + + execution: + default: pulsar_tes + environments: + local: + runner: local + local_slots: 1 + pulsar_tes: + runner: pulsar_tes + # Funnel will run on 8000 by default. + tes_url: http://localhost:8000 + pulsar_app_config: + message_queue_url: amqp://guest:guest@host.docker.internal:5672// + + tools: + - class: local + environment: local + + +Google Cloud Platform Batch +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. figure:: pulsar_gcp_coexecution_deployment.plantuml.svg + + GA4GH TES job execution with a biocontainer for the tool and no message queue. + +.. figure:: pulsar_gcp_deployment.plantuml.svg + + GA4GH TES job execution with Conda dependencies for the tool and no message queue. + +Pulsar job destination options to configure these scenarios: + +.. figure:: job_destination_parameters_gcp.png + +A Galaxy job configuration (job_conf.yml) for using GCP with Pulsar and RabbitMQ might look like: + +:: + + runners: + local: + load: galaxy.jobs.runners.local:LocalJobRunner + pulsar_gcp: + load: galaxy.jobs.runners.pulsar:PulsarGcpBatchJobRunner + # RabbitMQ URL from Galaxy server. + amqp_url: + # If Pulsar needs to talk to Galaxy at a particular host and port, set that here. + #galaxy_url: + + execution: + default: pulsar_gcp + environments: + local: + runner: local + local_slots: 1 + pulsar_gcp: + runner: pulsar_gcp + # GCP Project ID to use (required) + project_id: project-id-here + # GCP region or zone to use (optional) + #region: us-central1 + # Max walltime to use in seconds (defaults to 60 * 60 * 24) + #walltime_limit: 216000 + # GCP Credentials setup. + #credentials_file: ~/.config/gcloud/application_default_credentials.json + pulsar_app_config: + # RabbitMQ URL the execute nodes should use to connect to the AMQP server. + message_queue_url: + + tools: + - class: local + environment: local + +For testing these configurations - John setup a production-ish RabbitMQ server on +173.255.213.165 with user `john` and password `password` that is accessible from +anywhere. John also opened the router ports to expose their Macbook and set Galaxy +to bind to ``0.0.0.0`` using the `bind` option in the `gunicorn` section of `galaxy.yml`. + +The job configuration for this test setup looked something like: + +:: + + runners: + local: + load: galaxy.jobs.runners.local:LocalJobRunner + pulsar_gcp: + load: galaxy.jobs.runners.pulsar:PulsarGcpBatchJobRunner + amqp_url: "amqp://john:password@173.255.213.165/" + # If Pulsar needs to talk to Galaxy at a particular host and port, set that here. + galaxy_url: http://71.162.7.202:8080/ + + execution: + default: pulsar_gcp + environments: + local: + runner: local + local_slots: 1 + pulsar_gcp: + runner: pulsar_gcp + project_id: tonal-bloom-123435 + region: us-central1 + walltime_limit: 216000 + pulsar_app_config: + # RabbitMQ URL the execute nodes should use to connect to the AMQP server. + message_queue_url: "amqp://john:password@173.255.213.165/" + + tools: + - class: local + environment: local + + AWS Batch ~~~~~~~~~~ diff --git a/docs/galaxy_conf.rst b/docs/galaxy_conf.rst index 19af161e..be6026f8 100644 --- a/docs/galaxy_conf.rst +++ b/docs/galaxy_conf.rst @@ -90,10 +90,11 @@ making use of the HTTP transport method: .. literalinclude:: files/job_conf_sample_mq_rsync.xml :language: xml -Targeting Apache Mesos (Prototype) -`````````````````````````````````` +Targeting GCP Batch, Kubernetes, or TES +``````````````````````````````````````` -See `commit message `_ for initial work on this and `this post on galaxy-dev `_. +Check out :ref:`containers` for information on using Pulsar with these +container-native execution environments. Generating Galaxy Metadata in Pulsar Jobs ````````````````````````````````````````` diff --git a/docs/gen_erd_diagrams.py b/docs/gen_erd_diagrams.py new file mode 100644 index 00000000..56d120a5 --- /dev/null +++ b/docs/gen_erd_diagrams.py @@ -0,0 +1,16 @@ +import os +import sys + +import erdantic as erd + +sys.path.insert(1, os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))) + +from pulsar.client.container_job_config import GcpJobParams + +DOC_SOURCE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__))) +class_to_diagram = { + GcpJobParams: "job_destination_parameters_gcp", +} + +for clazz, diagram_name in class_to_diagram.items(): + erd.draw(clazz, out=f"{DOC_SOURCE_DIR}/{diagram_name}.png") \ No newline at end of file diff --git a/docs/job_destination_parameters_gcp.png b/docs/job_destination_parameters_gcp.png new file mode 100644 index 0000000000000000000000000000000000000000..6f4b4e9022a5846dd59cb6368d76ac8c42d8f8a6 GIT binary patch literal 26025 zcmbSz2Rzs9{`c2NNJ4~CHf2+xB_x}WP-aF^i6mQOgls~kY#|{u?2-{tkyKPxWRxu{ z<9Yw?&VA0g&v^d-=W$-=K1X+czT>(+pZEG)Av)SB>uERB5(KecO;t&cAjr<+zelLa z@h5!}#Txh@DoYI&C1REIpOnh1Sc2dp)RYwT-ERN5<8E^F#o}smfYxRMh3(YpslIo6 z49+M#R5iVpVq&A)QD<;%-J1JpCNt|Ed7ORp`(}N+Exy$j z_2eJpEF{<@eNAbo{23{xn8d~-53XeA=f^vyvW4A0X=`h1WyRadL(|yz?VCXBry2`u zCK(wSnx~yHV{s{onwpx|y-L}_!g|KW#1<4MjsEzQzkK9hzR|l;UCp(3?_MV-r)>v3 z^i53mpZ_d-^ytykr%&_prq|Zirba$9Ja+6$*`?ekPZH=-^>UKErb6Q54U#8Um!^*h z#L^Jz>gs-(teZB;dCUaw&(ztzdGqFtg4(-eWDE@qwx#9Vkn{@>O#bruGvC3l2gn;= zzn0#;`5=IUQfVh{o%^d-v{sUS8{D5a;FWyLScn ziqg{h%EzldbsrOoTkP^2XWzUztnaxUtPw*~E()U!~VhmOHTh zyMz_(eghsmKY#yyd-oC+uWs%3GrV-^((3ZK&-B%I?d^57wfiqGeZ+Hr_3G8jmqG^) zEWgh)CQ|O-|MC61=TJ-P)RYq|YeIXjVN+vcWvlPKd-sNhhOS+^=Gx`{qNBro>~pCX zb`7oB@>f-kTne(4-$a#Ah8-L0-EoWw7={90vyTH@CL5Q7GczAeN zS65eIx6k+Yc{26l;^H#W(k>S-p0%^nJ$5WpFUM#3C#wR(#c$1IzIew?U%o7Va(LR_ z+DhhYTJAE*Px^S>ep1(F6T#`@K(Y~`cP|SN}VUS2kNtx(4 zoqT7%`_vuRE;8S!C^o1Te3s>)S75k&`SQ>V?wnGAFFu%o$T!Fl&N=haiQ|H zdJ(}$**h?>hQQOHRG42}B=^M+X=$xjU}$Y^CAh@J%?%9d6HQv;yk=Te2nKd`HHO;h z!8c^`y1IoK83FU3;!p@uZaiIC6juM$Q%%9FSQb-KBFoCkx@JwFTC70bix+dVvrLp% z_EH~QUD|0k>p5N-#3aQ=cfIBH>#>gF)4jdDl9H0oV>u3ZF!Pr>_L3_wtfl9W&q@$8 z<)Q9-y1FdBcXVioi`uWwlFTW1 zFaL!HC?FuP9A|d?_`KVfXVWy=<_l)Sw4raT7&D=I2xe*RpYPF}5$ zq**u?NXM(8@MWWu>@aFdtl*LNt*!3g^PX(pvSqOO<}|j&)xf}zsnj4wakr1h(%9J9 zu}V|FVy8)&XlsuxEllq`)qF$Js{a1*;<4_h8wlIfwZwk!MP>rOMCRN_iIdMMtItIr zC4Bq(EV(li;^Wi&FMO>hi~LaHI-X=&aQ;YLoBi+|p9y)ZW00nS) z@h7h6VyOwyW7_T~A7LS{#*neIZ{O(12sy#R5*NNjCatZdrRCAt4^TX zPk&3LcqgjDws|uZC1r}0si~=tj}MB-)RVd1nxN;?KZb`D)}o|8ap-oraG}&|L5g7T z87GS<;;E$YU}^odZvVlliYUW{GQQ=FDK_J3KxE87*FR}vd_w% z>vWR-L^+e1e|-JfvuC?2Jzd`<9ZH#{92+)Zt`_H*ZdgIoR3RDb#GT z)wjVD>3{j~LxFiZg>>Sp(<8Ztq^&^4B`rPMB>uF*`0&DWhSFxRTFu#14 zb5uh^gT@c-U;^u`$A80P`%ic8-8*Av7xFEV@xXxtCr_Tl50<-(-r9XO9{0W{dS+oT zS=^+=5$kYvcGh$DhorPLJuR(b@y>;Xg-?&|dDn6o6qwzKkFR%CN%o#)k5k_*DXFHe zUgk7_mV>XClV4v53kx@14v&m{Q+)c3lA5B+brVz5o0=5P%4>f|S&q%&S0l>H%CIQr;ezwN@pZ*J`l z6=zsrBwAWq70z~1(GD~BB)4_-X}uM+t;-7xtV_B0jE-@?fs~K}@7;cCaw^_S4}{sh zGcq%mSALD_3q7i=T>f70R5?9^EzG%^Q#O5rpkP*6*8N3^ytRTn}vFvY-k@chD2TBzfq`l@+5bsieika$D`cT)RS6T;d~mk z7S;?D6#gS4XVl`i51Y{}o~EsEylkYf>MPhQNVC(4tt{d#*E1`AHacZG3q!wZi(10v zfBc5Lz^qbAM&_XVRKFu@?Wy8nXAcvT+dY|CN0OD5lu$s|KUvS#EOEt@rlF|`y**m5 z%3r?rOLG9TzM0wO>2F#U^hPE<%L9BHT=-M&-i=R4kh#9bkCBmax$)&>+P!GBM$ZkpkY|dHOY_FE%>%;llKwARpg@jvt4D=yUGf;~BX3_N^YU z74fmsQ|4vq!-vitMK_9-9Hf=%&Jtv*W6%5da&QMohZ(wMMTK4SY$CAp*1A>At znwr)TdPk4$l#&WyGchr#@LF&p=4U@S=r*93u!>43kINwM#NqVT z%fCit{J74XJ!^mN96?mR!TTH<=BDn$enwTtTUc3HsW^p=V`yl&k@5}v>T|Y<$H5-a+qlc(0GBaf3_PET-P{APub11@#^|5@#5y%W~OD% zJ6!lF4r^%SW@J!qUAuN|Iz`ncMlZGu%X@s$*M!;YIAwQjMr~NXOo?vpJ^f0Ch`M=` zhRBZX>+K~YzJ2?Kml_fhGNeLNh(3(sw2Q2HU*;5kX>)Tk0K~)}Uk@Md5*E&V{P;2O zO>?v6yF7xxCS?^jyKhuzDNi-MX$Ga`*%q)Z4Z;3c}sp-I{ue`P8XXem99{phxZ$#wSj+h#M;_Q#rVKdoQQt+1uG&dFrZN zOSc*Kgd0SET3YhL{Q}3>8S|Bgoi}`o(zb1IVBnIA%j={A#uAz+)GkoZL(kxopjD*7Op@9c@=fM^!B? z8woa|rM>-_j*dKo5^$BP>ywg^)6E;%*+WJ?*Vos-d&m4}jd|$A0lFK#iqC5*EIEwL z%3Vaml`p!u$a&2>%F93dTzXMhNQl}`)^+T5Y3UxKq`VviIjsR*I8nwS35XHRd1-0M zAm5~;t1Ch*mA)bAfM?5&pqj~_KM%*H-@U8+`Wv1?^w;~!%F3?eU57yy2Fl9HZr!@| zB4X=COZ#KT+82L*CxIz^ZKRw;2YV+fwR0z~0VTvhR~O*;sFv0%hO^2pryQ@x#fgfF z`hclokAo5B=Zp0vi@qJKk@yz*!|zXe;;<Rr(@&=z?jyi;cZ!42ZG+TB|9(gl}w0#%Lmfe;5vOB9k!Vw z*Nq!DI&)nUuAP{CZ|p$!3{ZXkdC*&)8`YB6UZ?|8wUpHKSQHv5C@4I7^r*eP9eXwL zpikAjCiIgC-45Yir&7odfQNWa`&`GCmZsmVO+Ea+vy=2V&_qhh%hTI)4?VnBR@T|w z{q$)rIHa7M+#wNifA+#Kb;Gojlm{nkP@bge_+wEj)7RMbp^9>eisn7dtytGDaiehU z<*GWWKt)|Wy_m-=X!&e)q_%sipKI7m4ZR1&%GNeJpeb%gi<5{+qm;(Ulf7KZ8-ju} zKEwg*_ifdbywO{as{DLiDBl<1!pD!PLd27zcUzwh*?jlN)gs-=l9rTP%C@#}B?ptm;P(AG)~?F@b>Gef)S9i~0~_^trQVFXH28XXQL+@1qz81qI!^ zCnRJjvY!A}5F6qZ9i2&Oo&gGq&L# z%vTo^f;UY`+IQ}{eh3WS#pUsh@NLz&97_iWhqGt*)oKDWG`#9Y)j>^Y8EacUX>Hv< zJiMn?F*Y_9a>%`c0&zjX1XRex#9xESK50&MF;8oaN}X(=ECF(C5;MWNJYZEHHvV+A ztEQ%=vvU!pCawRE>+yas6sO+)mh`3gt2o4xn-d#s`&zyLo_)9ma^leEknz@Z74Rnt zt=%)Ds<>wPG1-0RKbP#e_$^q*r*Wd~$U)O8FL&&HuV44B1_ZR&K71qh>qq$3-0Jq# z9yXIa^|#M;YlGe7AuBO$+z6dSe9M+ZbfxsHs5)p?VQ3CBGc##vX)P@}PzccffHdvb5lJkkwmYhp@1f1RMB*WuejOH%U-oju)Q__w!Rce?C7e zOE9QLO;uG#N5|3CRpl`_Sxnk(O@V2&Loizuv)LGa4V{!8fpbsJe^EbjL}u5n480sO zGBU;Us{lBt`pvIi9hT}OdU|`S4f8ElmX8HGhoJ1}<)|KuWkq>o5jMCjdx1w%vbC)( zw$3sWsz*RT$_doC>B&jxkZJFHpthg^;B8h_Rr&iNj>H$b`2?^aRER+-z z)7I6Eem6O`lg7Uq-C>QdMuJ!=6qh}Fs=GIT-Lq$pYJe<=3PjVA61HK=*jv4*HPhc- zowcO#Q0%$D;BuWba_QE!@9b(f)Z<-eI6Ay#+GZo5QhK_fjz~S{a(xD2dm&L5a1Ox_> zJ;NGn<@faT+_`h7b6Ap_-uSl4ZT;!@^Yg1b=gvE8X0<^LXq6m4)Y0CanVES^PtQ+m z&Dyo0dOM++fDu|24I&00grRG=P6T{#H@5izfr7S%M#dhOE`LJ zt@iQb$4{O5HqeyNniCt_7Ro9fRV0&4EaU!g$)}rdOkz8e%bz`)8y9yB+#GKL#n}C{ zT9#Jg)uiE-7=j2_rXW;QRIb-Pe@EVh@ijJ<{68x}wLH_93x$*A7_2(do z*x8Z!4!%haO|srj6%-hF2YXyv`u(R*#tnB{@YW#;adUG+e}JsoV0A6Dw?w=!B|l&8 z;x|qDpc>0U$^tp}sQ~p%u@etfCt^_0Vq!q?V*>^wnd>01H8AvRLN^Q$WG-x{WPk6% zVWrv;*D~xZ6n85oMsehIvC)0cxiOl0*b@n{v7upMN=#1)9dmO|PR@fiFPRB>^Id$l zOj%F)(%6(cYcwujURhcG1?;H$x|7M<{&ZunqotJcw`2 zw@u63mbo-M6Q5AWX5(a}L~ zdVJ^3mz<+nY;;!>5)y)f*8Ug#N)kbm)rcN=288_OovyB~5F8%{FkZ6!jewyPLOvxU z8i_O;4WMc?{}1`5!XhHp#T}Xg#c64NGAB@2tgZPxj-5MqPE$+k@WQi`#T~ZH4AqI; z-F805Bx@fttb70dy*bo-@uy2pAox-bN=x0LX5mjED{)gK-ns>l*fJR>Y=~~%vUeBv z&w}*yjm*sYsH$;sflLB|g5~AqpE^1eR_%rGG~Sw{NHxr*ycA>F_Kk(Z_&jW(Ld!K$ z^$iU}U=)-JGdtwp^g4QZd+TbV>^jkYMIjn;Z3KA(h{I|+eAot0AOFf*Kuud)c{O7r z2ZxE7SyEJ#?~Dg}(g~DIGqZeXJ2^S-KRy=X@_v5*KmitT@;g>Qp{Pn-r`f~xju*$b)XGLr9{3sb)7g6^o||FqobnY;%Goj{AxbVmoCwSEKLn)ynSt0 zU`C@nG&(xqC<}oU8xIJWXVYHl#d7hSa;92{1VBuHKtgVIqn(c+%iXzi2dHYhxcFeX zP{j(3pY7SRc;yrzRJQU-#5&1-i_j&GLtY|IHtg{X~B{2*_^hYKl`70KcMsyZNAQ^}-;JE3|gMeEOg zDXe-~U4kvFKiatf;%*^I!meHV&j(&I+maFaGC|Ugsyt~(;~!*d!?Ej!(8-Da$;zJNlc_oVriq zcsFja*%@{d#-i!U*$K2d-w031EhsIq$zL_+C3gpwobXamFg&;bE*qT(M#N4J8MhlYkGh?~>z=lKZr2YvU5s;WhCM-cs1y<^9; z)YW;YA%G6SUfAqy!}Vtt;2F#;=w`QX-|k6%-V<{&;D@X@*}th`4ULTodbJR)Ak2Bo z5pN%Re2li6yw-b2$M$k-+TjdGx|pAb96VK3gKyp1oY2HiO}=bAiZ<`SygsKnCcn0= zt*yH|dG0ROI7$+YpS;`{+uK75tLUS_W=E-quFFTX*zO`6ot>2uL_|a;Rzzw*z($@2u4^49b7J?KI6xSQ zdfj0ApO|@@h*TxU>|tdAl`U-X&U+?t6t~L%(f*o#U-qo zx8M95W-!+zQd(`%aQZ4Z={CUl1f#}sk(QRGqe?%baO}rEzM7BYe@I)@#Ge3y%oh|! zM&=0sG|YK>XrZrf??GW@Y9<@jhEx^J-5`udwmx@&O#v?rx~QU};?(qX-u?U6LPIm{ zqkr&R%&+}Zw2IM>MWu|4j9g352BsAhkor1QXLMa?66guE-z0!M;C$l;q~K0$Bcjer zD#!ds{ffrjE3B67SAM_|TQdl*5)u<|I6+UcwC`9-umJ)L9rEa|@-+_ z_;CLdnu7kyhzM<9NPKHxu&M0U?b|=b#?GHUy_SH;Ozw-4@0?R!p4c?dSC>$uWNmG& zqOz7i3A=IYR@w%R8JU9z4?@CAPxl`udq&5Kju|)c(J5JRnuGquhYze@s<7tvB*weE1}AkN@bXa-6=Bk@}|`tqt!Kq^70=+|48EedY8ANouxKGCy&Uw-i1=}C;}>9vwze8;DZo0n z(;X!y=UrW8I*N*@3icBthVvbrETW)c=Z)WpR+Yi z|3eaDL6Kl)&NzuhP@W$UIWjVWdayV@f8YG+HgD{$>m2)B&Y=>$m0s`9zQ)3EYPImm zlQ;1Fp(NAM-OtXJ*u8tXKlVt(wgdN2&+XeJZs@`=S+yb&Hdb*e!jEgBOn~l`eAseh zkAm+qHPud?-{y_{TTWRZ1OO5|3nnqcyYfwij{Hp`fq{Y8jZ9+tZ@;S}NHOX2(o&)( z{K}Op0j!(U$_y>-?bq{Kg>XZ2y4+zxExT80o}XLJKJ=D^u@! z%pEUwBYv*RI+m*`>c z)SrES-`(oNtv8QTufI+-n5d#UI-bs1wJ7pm6cF-!F*UB^^fJ$e+! z6^wG!*wpl?v$KfQfJl4FzLR4>|% z%sOlS+nAX*R$YQ5H{??YJrg=Vq_6(|esrIr;$q;m(SZRt2jfFSs-4GAoG3fjQG|Ek z=SRM7-8%ez>kJ0BaCY@!FS~`Y&;lKH#)6*Qjs2lyJ?Nfd&@Z&LC!Rd0m>AjYsd~7= zQmG`y;puX`dM374%mslXNuS%={Iy{*6m4;cjub5lBxCLi!r zmh zfm|^8KI@N$#=duxo~hM_oRSzuNrS%c-!(GzeBd5A-^0;ca*~p^G3`>* z&^YwgfD{0s_S1j1g~2PCZ)hexQcf`ohc7HcY@@RLlrwBw@};A5bX7WiZV`}Wk0{lE))#60-ijlU2#p;Qe<2r zB4LIHR4g;s-K~>9_)|?Wy;U{GpxJS!UeaqsI97gsFrORy5*iV)(ibW&w>;AZ38~U; zGVQ^GC}vH#Jo}K2Id|^S#p6l=bjp!wQytV_s+z`L_pNT%E~U6E~Q2R zzR%CkbD!F$boH{i)SCSKUaO1}COC;u!ccY**xE#ZU2<@6zz=G9wr@#v_{QzqpTB(Z z2KImcToK6!YbjV8B*y4l)0=X`E6b~F_Bn5Q0gV$g)uqVRWAGec9b)r=%iY;~=~}gg z8=8lk8}(E1-ntO8#Vz9+87H<~oM>-^5!;i@(lFrE%429^S`&dBL|~Zc!OKhY%X8g= zN8ThK1pR5;QvKpZka{L6b$x9uz<8p1=J}4I%*T&6h1b0up6Tzmt~%P6Ts8~ow!a;! z5I*&Ukr5$pVPT=B)(F|>;Cv4SF)=%Pd48f7ZM}s$J2BB%SC>(N;dggz2RB7`Y~%E* zr^|G%dgwJ*XP6(>+%wUx=Hyfq9nDBku3Za;4NsP<+F%zUm{C^0P48xP^i0I`^WE=( z)>=2cAX*Sr1D*2r_U@tv-9f=oJ$~!escEro8~xlajsD#v5z41qtK~Bn_#|X(;hlC{0DMwY+gX9F@xz}aNm9e`#N;AA%=S@A4Y)sYc3uc_``fy{d;fDuOdiALK z&Dp1{f2u2Q1&rt~VpV{VAVo&3ZAYM6&4<^aFZ+@bo{H#>>mvH@!yMOh^M`u|>>-K_-M?}0v+yK^R$AKe z{=ETk4Szvi9!waoU!zZG{18O|VImxd-|{0%2n0B9+0>Q_S*Njn^>DtQa2a;mjAHjh;^SpR23*1g=^rk5}8 zm6scrko~DK18V$f^1FF>EdKg6-IWfB{?eu03Ji(T_B^IE3Jj#skrO|q!rztVoWHe? zZ^b?-4QNW#;wA06Y#c zBDMPfsR(HJNbkkP*_=9+maITHIyrd&)m>u|2FQe`qU*EvcT!*$h0;&Q``qSmDgta} z=pAI?@qQ_t`ETbPbM<228zISSL()H73U_v4*{)7$ys!=o3KZaoa!!- zPK8;pvhZVMXo!&igC7liVeEt+La1wK1fFgfv_Z$xo2?CwU^ueAM4i9McI02!5Tj&i zLBV)!$i}%J9Wde2Qd6(g9}Vy=lLim1xPnTg+PW7~EuB(6;IpQZhQtlYXWL!)@qCc@ zj*gB-!{1J#HS7Jj0-D~VvZJ^nz0A+Y#|QorUf24kbb6}k4=Z2hA8hE9xAy^}IZ|c$9nj!@c z*0UFl8uHCqFa~sWPdYfP*1yFiqliKY8yXwC{IxzD5i$~*udY^5Q4!p}JsZBXu<*J% zO>J!#81u!&6u!upAXE#w#zc7m$|^7uDcgMaE)pleOl0!d1?uf^q6eI!yL*d$hK|V? z*xB2!A?SnrwJ*e}Z=~5UP497T0rCKz3W40bnUPU2`5ul4cz$J!BymwX$nbI1n*cr} z=tx#(ur#dsJflK#7KDDB=f+H#NgEL=2+c-;bDti+g6sy<*fxgkxb4Z4WCZpTG#FT4 zcn-#o&+WOqRPvXuQZfB{AMpUT4%`^CUu$&@rKJ3i9y|8v^0Eh!(guZa7^-ZIVP^We zoaP=?j~5HQEk+KoijkArdr7E3wKBNqC%_$GOyZFGH$n%9)Z82)jD?5yKmD_DKY0~7 zf%!=*s9eZahc;SoW@D>p6>1$F-&mpLoJ^OAOOJdRZDc#3P>7T?dtuIYC0Jb-7+)c8#W7h^k}!L*JRlJD+$?+vU(i? z^pXEOdhsuE+CL3t{Qfsw7gznKfmR_>)xUUwVF&e3@r9bOtziMLfY4%x8y`O|3he0U zNnhA;4)5jRfje%Zp!QWT_}&~`tpI|DmP4qjsln)_D;w$W50IYY*bbEV3j%S=Bw`HU zyY)T1j{l7_-f&wOy!Q!M&0`j;n-J}>ok-Nb_jw@J1>(^2NH8AQ zzaJwM*qprT^vHUlG7lLYJGL8mO@5(2{8j? z;Kh#@QLcf{lmv?n@m@G}s_Q=?H+1Nbf1M@2fB+x^sV|-9|MdB@7fK6qQC3!8cb|Pv zry^ta9C$-FN55K^>fg<3?lnB`j8KuFXXy*=mXY}Y(X^%)H5Kg~Ss+(uXFdS|Kw2@g z3O1Yej*gqq)bHHMLnX&@u(b4hzP8oOuP7Wko3`JkA|nw@0wE5blw_u_4?-!F{`452 zgIU<85g2IjfG^741{ccB&ZegELh8Y9%{usEh_`?3>%;uT09MuM2o5sBq|CVye7CWY zX63!KAT`hS?Y1z)5l=^%#M};iF@EYk68$__{y)&qO}hV1Kcy2z<8MXByRBiONS3`| znQRzM%%iq%*t$0ZR|8`-J3ry4Bu0=k4voBU(Qx3c&xSAeJx|GUN==nrK0_(PkArp+q~E zk(D(qw|GeDL_X!0HOIGjA-6zIKzpS2GkSD}C0rTEs+-ym0g*?Ag=D^AVRU`tdw-0M z9?psKVo4WF(QOMtf z*sr0tjv`_pF@ZaQaBe-vMj+E9puVze^LhdS!PcC>z%_fuW8>qm*4~Dtbclg+$J(GO z9l6CJUgy2ui_XN2eXe4A??~Cu5-F}{bk3K6K+v(EtRy&GD|02?$#&NXNPT-BKYH})_3PQm$sMAiyQQRfR=VV9 z`MIYDKkeAGemMPt3TXz7)S12DrFVNFXq$=wq05-$p~wqhT=NeXyYi!IF2hO(EvMH6 z*-UXYt%Gp?z`%u4ldeL&3}O8fCusf86v(_C+*u#gyhtyHr31E&g#S7kn#MP8o)i@c ziHVKvd_XerlMecb)u+`we~zdEBZBCoqX$avWJVj8!Ul8TlDaelA?kl&Sl8D4VOaBJ zqEZO#?@jrX`{p!Vn_&9Kxc?+C2h}jrDJ8RHN2>Gm@o6Xb4-SrZKlK4Z%1Fr?dg{dY zuaFM4=W>fdI0I^{AOJ56o2m&Qkc^BBtwiZpHzX+nC(ii(J=&Q_r(otEi7^RmmVs~I zVz`0B2nlGy+yd*%D)IgTjJG$1SsEg86|zkQsiC7s+W>cFW@l-s-VQsjT^sc7oj#R6 z838~EIe}6^-_S62Ymc1!&k+_PsvG$0i-{QE+D1_`bqv*@1-gu$KwdrZ)hVf)a66F_ zA)fU*>M+fS)+AE6AN!B`xxaDFfwTvYAFp5xj5K@)13u;oOiTDh#g89%6`$U~{t=gP z>{wt#fk~;QjZNVDRIM8lWCY}YLptAjf-*HXcgx0&#fZlU#g)Ojf?|<&#TTLphCGlyN#`a& zqk=YYc9mS92!C2uN-~2PDXV{*-NKk)j4s}DNag?6Rn~KNFGU5ud-q$R)eDj*@3`Y{ zmN#njX)s7k)yc2M75{dcEp3|WXLWRfAGc616kl(~# zfAna5>o&8DF?9wc$6mjBB2$i+Hq2sP05qS{dO10S-u7XbmlGOj&ABeXYkDRjYqd% zS9mj(#zEaEq^Y*G;}SpC0wV3prauv+n4FBxI3Puy&;)q_)#|*RT?dp&M2p1Sx0bGe zzq_w4&$Z5yLx@$}wt*W-Tu#y$;|n0+RN2uj9jYhpYzSEnqf`_1<7x;oQb1`!-{!N6 zWUG!jAr+zDCDkz+r?dTfb)Hy_g@j*N-=+VS9o_LeJDS?BlEQbuz|z(>cW|LH7Sy^6#1+b$j+a{KHx1cnc^v!G!7WF_VL;KNB_)T?RJD+dvJ zUNgtQNRIkje*)Li6ayGU7=8%$Jq4!=|KG=B9mPO+-O1V}U!jWP;)^At(Khv+K2*Q~ zkfCAACl6sa15&D}E}=`1W+B0AEq5&c?PvD$@L0l8^K#n5c0rOV_fpEff%V$2x9u!* zn;Fv8)C_x=$JP`U{Uo?)zzo?e1VrKa4z`yJLqXTn#6S;!pxi>haBuR+8Dy+Qg@q{y zY)Pj9^qpOE>Y9#U=pz@Nl zGhz(sDJh7GBH@L^5EBy<$#j|$7qUt!Y!S9Yl%>cT@mWbdArg+(orj>XfB$}DS=Q0g z>NFtgfqjWU>aJb8NYiN5_c7=C;KEmBu>{ftO7CSQ3E`?f6f>96&h}5AajG``!>g3p z&LRk!mv_n0QJ!b_^I>t}u0He^{7%2;=14Fiwf6>HW>{2&6d|lsD7u2*%%R7!6wAnC z#!coXe48s6$`(=LHp#R15k&8uI}g1u_x_2*$n3283!O$DmdR_n-NNn*Q#KevURWk4 z5a{8JbU(-uu&!IH}+l{*|Khw^qV0;v=|<7a41MfFa*)EwCpRn@D+XfKl%aE zC7%*5%*)Z){i_}D(~2K{oQc>R0Gq%ew}YDVSHBC&Ka2b`3y}Vo=?kg?>6fhLUb+Sb z0*mikyic_c{&hldBd^JKHsh$+o~iI8CgLZ(39+voX=^=$~HtKF;1-N-BBX#}z^*5B_L z=Z=Bdj|rW$Bx(X<;#lDz+5bL}VRd8V^tl@2USeXTU6`Gk8nB+Nn^lK=lMv#X+a%4kn{y+c z7vPg{@!;>vEGs(^998*SH{lzF2qOD2ZU(+F*=E1djZ1H8+2}auxV-yT=?ep zo}-vbJ$J6+)T_?a*J<_aIG;n5EK0$Y-(l<=kIaos(^_Clk z*tTrpZ5@YzO*})G7(FZ6r54YL+HbpgtWsK+$6PVXdY1_Kbg^tp!kE)Xa>wyP$?+3cy(RZnfdXK5p$Yacu=e00PopzM3HX%CtiZ-TCd(9bT7r#I0cTeP&~etN*2RV0-@x(CPVj{a z(%AF_BH@R#RNJO;6{=qa{*pwG5`f_Nx3s%1)9*n=u67&j z@#HXoP1f_9_CwK=&jJMlohbFp$3@7ihN0bLo~leJXZoY^+-4OGHKQ(q*G8MOvKQyehtR0 z_YQnAWofi=Dm8IN9?cVSA^@4sRHGoCYxe0Y*3aBcDGE4Ap!`bnHIB%kY4l4pABsQw zXzI|1|74`Ne<+TcqbEqjo&fI*&HOB>=Is0siZWVMc=&q%YWXusUK0j8PR+B`?Ucm| z#56^4csSmaz#aNUnwJd?J>kdA&Bf!HqQyyBRtF2ripx)97!UFEJ}hshbp>W~1j5^a43uPq^WHn^nHY5@LM$QFOuNd-ywao2P%NPdV_Lvg|DHH!U8H6uAGrhq zH`w1Fm1GHL3+ouW5aWjx(SOZ*V{&Y*VZI1XF$uZ%?)L=cvEAl@=~>kFcf#3~(e8!4 z&UL^>BgiQzG}P6_%&Wqv)>Twi!Ul5tTEBkR%E-5G2GETeC{JAc7y7)(j|)}X;J(q7 zvDbfQ!%=)_!aFfORe1lt2sd{%#+yk2MT}KF_=~B6RYx%VJr=GM{5ux@qSvuSNs9P` zXbUMK=llUg@7=roEDGq0#d!%QUv*8bNII(KQL3;i*tfN_HcnI8Wci?|DKHd zJ#Wc!8N_+lyOBQ(rtS8g`~koae3f4-+;+dm(By8cuWP&rSd@F@VXUC4DoHzQ!wZj$ zt5>h0-67~dC3blu2g#Jp7d)H&i6M4LYOB4P8rtHw`F=Yeg1mjBx6)%A4X_%QJb4U9zj*bNf7Ce0T za3%j`(`x!vjGq1B1!*ZLkOnM=IM+zrXr*@HKNP5#I+6FX&cZq>`VlDv7p8}?NSs%6 zAHk~i>*E|trPiE?2oyekYKMr(^(CXk-*c2OiByY}=#;7c1F-2ho_2M3<3lB_=RJl0 ze(u6w4q^DqH`W6_j?g(sIdbSZ7;l;*4bu-bMDqg1fLXFge!@P4U{obJ7=7qJIz+!H z{&0vs{mx&8d{C?w()x!hgurITwQICeRWLWp$`)ZR!BA#nN4*-LZ`$6;#9?CIq@`nf}~c$=7~MimQAvd7U7+xJ9L%5rGKM}&A4 z5~7BAM_AnuY`T0_moYXOeB;pR(}M0I;^GCFnY%eG)BTS9+fd~P#2~;;d%l0)!psb3 z1t5a7$B=Hr@Kumy=)2tEqbA=I z40f|F@-cKFy`m!(do*I?=3Ky}y&&U432>uu9qr7_&Yp*$axG>w!Td5}=)BUxzk&+X z!FfnyZLFu4<|mO0_E<9&?uAvL_2k@3R(#K}#ehp!^jF#54D z9p|?-TI1*^L22pu5B5Jl(E68a;Uxjnk)}ot^6EAqt_;oXK650cAgb0p>f2;)CP2-rU^zqOoxp^@v}bkrKBuiER2ZG+K=K z@jN+dEp-#@4I0Pn^fYNaAdGz{)}-i->3fhdJVyIN3L9BF^ggL8h^zPu`L zv{PzPN}9?_oQ_d9&DZy3B4t=X3XKpWlWsZ{|21x^z~D5?DJ{Ninri_)8fimx3}(vD zay%6{$WP3^&HMhiYAo-&r{|}xU%$TAPVei!_$hyjO;5tF1*_71bDu9JP49nmVD9!s z2hxE$Nyg_(`8S6%X=xl6_;9!O7&fNK1B+?|j$n_?AlF}D*t;7^XW%3(o(IrKTu)Hq>A*)2 zJvb6VEP-^02M)!7Rbc$&`~jCAA5pt>QVs^-Z~~z?1Th0$#b>U5r}z4}xV&=x^zryL z#1NDPU)*etK7yZo#53DnW1WxIk@6Kym9##hO!PrdaT|c)>DO7!+_;%SykJQ@&?KJW zWVzlPytp;?MzPhjR#fLZHl*L`QFL*3Mmd&Sny8uRK9*}x`sF;chbYDIPG&L1>K(A} zZLaP)@p@f_LWx>=3?!TK{QNOIE6gYd6`a#MWnAS|s>Y%0#`sYKkZy7t zh%-dt!+C2I2j&;k405rwx6%eqwCgCHJu7@z7q9zk(#xi%SXD4B{+mrfSE>-1)S(Ug zg;3; ztG36fpHwd!O#L)UPf3o*8AgqmX#p3c10*uIU6Hj19W;d{Y#mN*?lQ2k0J zRW%AbKa(2rQ0iBnY&TKSRGBqsNP>S{W6T_(?uhi;j%$0J`l-(-VhEJeXX%Pm#<|UG zjdzFhU&{aS#*ddV&5v!;&gdO%2gM3eBDwJJrl&kA@wP8VY^4uslPYeRZh-ulIPe03hf3C*$G_kb9WIlsTv8=T~=%ve3gg+$A}=F%82GW`{r;C;vU5R zagSk4%a>P+-^f9Z{frD8EB~Tb6_BEv7sB=s+4s+WwCYK24?-`65V3o6cwb%;NDw&l z=l1qXvpsvc(r{S8ep|EMG@t+?46bW;9m}k$NF^czI9M0Nt?d|uUNQ!iz9`3m2G5Y9(Q9MEv z$@^T-LuQNEy6-)-Soi5uGw!zAnNK`9zg7R1dZwSl#oO9~W<}lOXonC2mvAH>vT0Rd z2Dk3Mcv`sQsmuRq?ArgKO#AR8geHfHLMY5Sr&*^o>qTQ|nk2^|)EcLpVo2qbat=vC zPDx0c6+-2lQ_`S=RI;%-tgHwXo6^3$*X;ZL0q^`i&*z!vx$p0NUEl9@UE_7|FTBw@ z??=DE@(%WQIP(3S=J}>s2%5S~sz+|I<7mZUiyi9#J(`h-f)fbqZksnNSMY7*t&_E)v@x1)(66(oc@c z);EP3bXR}HKmm5oH3R^`#?Tp3i}?Jl)wcyfB$o8CmRt^F4Th$&4Fyh9W;>#YE!Bn(N~DTxb>HSo;;Pcm{F z_f)d*Gfm7A!A@?6#wu@0)%||UR3&q~I=)UZPO3@*kJ(d4B3UN?Qpfc?sgj}DwiPrh zuY{{$`HYRU91<4GfoO`IFhT9C)%q5{_3zs82i5buT#Xuc^Iog)0MiVofZ+Us5c%vDzP)w75_~V&)qrMOPxf@7a2amof4^W|tnskTM zJZd$!wlmXZO!Y*|#USUo6RXFa-pNkqJi%O8VPVt9#gV9U-%tK9**e$T)%D3lxQ*4> z)s?ySe^*$<4--2eI)-~zxFXkn5-`(f*O>)oSXfv{yTkDx)cVD~CAf=z_!8x#%O()| zYB{=9{*LAbc$?`*+gd^$YrcQuTh`!yIqZ^5=bMjA6cuXtRoW)epH1giJi?qjKOAM~ z!6_KHXf?e{LPeFy;5s#t64_wrBhQ{qa16ouAv9Y3`qjdPmRJ`T7l(MAm6u`YB|&xO zA+Eowp>yUMf`WUOjuvv8I(}Ov&I@6cw8HgT1J!!rOJJwXOZRM}G24^7o zY3&_I;b2{rHo0*)ivS83xSPoziH^Q!Th6om1q57EL)=IWcplvFGoj9vtOOcFZoEL3 zwl_hr)LM9eJVDZGZo@2l7Fb^zpp(}hSdVQfwqZlK(G^4v6t`z3)6ro4fs_o3S|S_H z5*QH>n0e623CCh3t735GepfyowEfMV$qs>IhH_z2?lsaml~5v;m+#fiLGE&R>)}l9 z^WT3LElcZ}M2pC-+&-j{cTeLS)H=9GJ*`r^Lx89&S(=&Lo2-hS$(Ys3@YdhyO2tF7 z!TL?3^BhBBT!1K%NY30_$y$G4cNc^K{CsOl2ha%ouwEuH*ibI8zJQ&JwAzkbB36uS z+V%2s?(`>_Gz)TF-&C!0rKEYdkwV>YKgAMf>;A!r)+_8>^p~LJLP}a#=FrQRetNQb zmgswTj^DB<-M#Ka^XT*^NWg~Q|K&ze6BmMn0D;bZ@Y9A*T) z>}!lkWfB$7ovlx`E*}kCSXgMo@K-1AxsM(OGZ4lx%isbScz* doq5@xB0eH;E)P zTy6+1v{Kb+1zz^?0YZe9*k5IYJrcNcd@X@c;@Nrh!aj7yer^mAC|jRED^bLoTeNVk zp%#kX@iIE)R6W$?lcJEGRHCM2aq7~!2>WMn{}vS;n46*5AQHibB7GSlVF;re_pFA0 z9Te)(TM(NtBMi;8B}Lei2{g8YgOs^V*j>63+o0gcvRH}J_fxwC_NipjrQ!oHgDm&pZT&?fx-3+M*D95zFH8WzN;2RPm-ui9AfEAwUj0(P94!KCL-0;PbR`L&&5f@2z;*x6E3DP#fk^<~TJ~Y~l-V~mGNYZ9SE9yyM(4<|prC$K zvVc9Pg^%G9g1V^S(Oa|XpN|=!|BVc$$S-v3OrCvy7CN{+db?9Etifm!i~b zdgJ+2NXOz;P@swzR5{TM23cIlUL)Id+nG`~M2SQj2WVww`n!%SF}$ntgp$U;wx11* zlcildwk0+-%eLmm_Mz@1Q1H>f`VKc~v!~fMK{~K|(~8C44FzB5iKi*1l(^;cC9+Ef z{sgEEYJP7}QlVU%_sDyRDRk{kjgE=Q)_)yFD)B)?BwZI&$HM&-(Mp$^@myQ3w7s*? zxNDr^?1l*X6@Ff}APD;vJztU{X)`2$UGoG2N?OZKAy}%;hSoT_IyhuA$TwO3K;T*G zc5X&)uALI6i^^a40me92B62AhM^oPvlo`T)S|_vV+Bt9%9n{)XY<-sI;Pq-rZ;Q+Y ziT7iLrUZfjmG>V3it)_J4~DjujUwvoNKGb4$N zf7X>Fp2p-tmqI#xLNlRFydbTkKfSg|XP&e0fQbmjCC!#Eh9GxSX&uj}lxa#hgL*VK zhsSMOE@|PhS%}~ICkP#%cvGa|;j>np842ca#H)T{kV$bJoDUPo-#mL{n_o}r1Z(%h z2bGe0*B_|S^4G}&)FmF>-O_ZC>xr%iB$<@_oqRhs8J~Xcr|%k>R7Lnyx@Tj55rZr# zA}~~?lmUOm1lkJn($ZrV0i9lgudF)Nx~D(wmQt5YfzVWyBrF{v@^U_uIzA^TkHBP& z>;WIG?c2LmFE*$*Mr=&&2J~m=dg=(9y&pT0A;B(-XQo*wM;w(C;vd^c+OcMsFGsiR zyZCbFX<=On(XOL?STaip(2e;e{gpV4_mt+kHl-%8b6*EGS2i?+^}lvcj~JFBSlin2 zIZ{1pUM((C%x$ooXF`7qFi5i~wJj%;_L50pe)F@ln%dfMw0hU=k`x)x`1?8urj^

e(Hshl_N`e5O6#fx`H1W2Z%17WU|fxRUyealJ9I;`Pq z)nemu)6Ikn3^JI}bSM=jQCIC8=SnLw2}{7eCJYz7rIddi!%JZqtf#pX zkve%WeP%JpocQ!D68Z;=t;aMRDqbMT4oZ;>23;c?8*z9lWH{h1Y@ms3txc&6vaLoo zt3scYEX2?ErTZs>K(l2^$zq4< zkB%k2vc^x(x=0dGvVI^5|L*_S(vpu}ewH(1m%zS*0*YI9nw-2rKN<;ZQ?pq}&fm^*`d8(zeU#&(Q> z8O?oG&cKB_ls{g#7v($u%Eso52i%wrL8oR_&gm$%ifEFje0|c z&-CLfoW50hd1q9Wi>0U41U&@Q(QVV~yhB)K^oKMK6A0G&>VFe{E(kT0!=vSk*)}>P zA~OD~NS{4E86j?{8qlGj%@;%+z`@B#Vx7=WpD*CQ@z zt3mI1d?HY7Google Cloud PlatformJobgalaxyObject StorePulsar Tasks run with a batch_v1.AllocationPolicy.Disk of type "local-ssd"that is used for Pulsar's staging directory and shared across pulsar stagings andtool/biocontainer containers.Google Batch v1 APIpulsar Taskbiocontainer TaskDisk is unrestricted and doesnot need to be shared betweenPulsar and Galaxy.create, delete, get (for status)[manages]stage in and out \ No newline at end of file diff --git a/docs/pulsar_gcp_coexecution_deployment.plantuml.txt b/docs/pulsar_gcp_coexecution_deployment.plantuml.txt new file mode 100644 index 00000000..d892ffde --- /dev/null +++ b/docs/pulsar_gcp_coexecution_deployment.plantuml.txt @@ -0,0 +1,48 @@ +@startuml + +!include plantuml_options.txt + +component galaxy as "galaxy" { + +} + +storage disk as "Object Store" { + +} + +note as disknote + Disk is unrestricted and does + not need to be shared between + Pulsar and Galaxy. +end note + +disk ... disknote + +cloud cluster as "Google Cloud Platform" { + queue api as "Google Batch v1 API" { + + } + + frame pod as "Job" { + + component staging as "pulsar Task" { + } + + component tool as "biocontainer Task" { + } + + } + + note as stagingnote + Pulsar Tasks run with a batch_v1.AllocationPolicy.Disk of type "local-ssd" + that is used for Pulsar's staging directory and shared across pulsar stagings and + tool/biocontainer containers. + end note + pod ... stagingnote +} + +galaxy --> disk +galaxy --> api : create, delete, get (for status) +api -[dashed]-> pod : [manages] +staging --> galaxy : stage in and out +@enduml diff --git a/docs/pulsar_gcp_deployment.plantuml.svg b/docs/pulsar_gcp_deployment.plantuml.svg new file mode 100644 index 00000000..96f1297d --- /dev/null +++ b/docs/pulsar_gcp_deployment.plantuml.svg @@ -0,0 +1,150 @@ +Google Cloud PlatformJobgalaxyObject StoreGoogle Batch v1 APIpulsar+conda TaskDisk is unrestricted and doesnot need to be shared betweenPulsar and Galaxy.create, delete, get (for status)[manages]stage in and out \ No newline at end of file diff --git a/docs/pulsar_gcp_deployment.plantuml.txt b/docs/pulsar_gcp_deployment.plantuml.txt new file mode 100644 index 00000000..54cfe75b --- /dev/null +++ b/docs/pulsar_gcp_deployment.plantuml.txt @@ -0,0 +1,39 @@ +@startuml + +!include plantuml_options.txt + +component galaxy as "galaxy" { + +} + +storage disk as "Object Store" { + +} + +note as disknote + Disk is unrestricted and does + not need to be shared between + Pulsar and Galaxy. +end note + +disk ... disknote + +cloud cluster as "Google Cloud Platform" { + queue api as "Google Batch v1 API" { + + } + + frame pod as "Job" { + + component staging as "pulsar+conda Task" { + } + + } + +} + +galaxy --> disk +galaxy --> api : create, delete, get (for status) +api -[dashed]-> pod : [manages] +staging --> galaxy : stage in and out +@enduml diff --git a/pulsar/client/client.py b/pulsar/client/client.py index f480fdf3..0dcc785b 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -3,10 +3,9 @@ from enum import Enum from typing import ( Any, + cast, Callable, Dict, - List, - NamedTuple, Optional, ) from typing_extensions import Protocol @@ -17,9 +16,7 @@ TesExecutor, TesState, TesTask, - tes_client_from_dict, tes_galaxy_instance_id, - tes_resources, ) from pulsar.managers.util.pykube_util import ( ensure_pykube, @@ -33,6 +30,25 @@ pykube_client_from_dict, stop_job, ) +from pulsar.managers.util.gcp_util import ( + batch_v1, + delete_gcp_job, + ensure_client as ensure_gcp_client, + gcp_client, + get_gcp_job, +) +from pulsar.client.container_job_config import ( + CoexecutionContainerCommand, + container_command_to_gcp_runnable, + gcp_galaxy_instance_id, + gcp_job_request, + gcp_job_template, + parse_gcp_job_params, + parse_tes_job_params, + tes_client_from_params, + tes_resources, +) + from pulsar.managers import status as manager_status from .action_mapper import ( actions, @@ -341,6 +357,10 @@ class BaseRemoteConfiguredJobClient(BaseJobClient): client_manager: ClientManagerProtocol def __init__(self, destination_params, job_id, client_manager): + if "job_directory" not in destination_params: + default_staging_directory = self.default_staging_directory(destination_params) + if default_staging_directory: + destination_params["jobs_directory"] = default_staging_directory super().__init__(destination_params, job_id) if not self.job_directory: error_message = "Message-queue based Pulsar client requires destination define a remote job_directory to stage files into." @@ -374,6 +394,9 @@ def _build_setup_message(self, command_line, dependencies_description, env, remo launch_params["setup_params"] = setup_params return launch_params + def default_staging_directory(self, destination_params): + return None + def get_pulsar_app_config( self, pulsar_app_config, @@ -394,7 +417,7 @@ def get_pulsar_app_config( if ( "staging_directory" not in manager_config and "staging_directory" not in pulsar_app_config ): - pulsar_app_config["staging_directory"] = CONTAINER_STAGING_DIRECTORY + pulsar_app_config["staging_directory"] = self.default_staging_directory(self.destination_params) if self.amqp_key_prefix: pulsar_app_config["amqp_key_prefix"] = self.amqp_key_prefix @@ -545,14 +568,6 @@ def kill(self): pass -class CoexecutionContainerCommand(NamedTuple): - image: str - command: str - args: List[str] - working_directory: str - ports: Optional[List[int]] = None - - class ExecutionType(str, Enum): # containers run one after each other with similar configuration # like in TES or AWS Batch @@ -565,6 +580,9 @@ class CoexecutionLaunchMixin(BaseRemoteConfiguredJobClient): execution_type: ExecutionType pulsar_container_image: str + def default_staging_directory(self, destination_params): + return CONTAINER_STAGING_DIRECTORY + def launch( self, command_line, @@ -725,6 +743,9 @@ class LaunchesTesContainersMixin(CoexecutionLaunchMixin): ensure_library_available = ensure_tes_client execution_type = ExecutionType.SEQUENTIAL + def default_staging_directory(self, destination_params): + return CONTAINER_STAGING_DIRECTORY + def _launch_containers( self, pulsar_submit_container: CoexecutionContainerCommand, @@ -749,7 +770,7 @@ def _launch_containers( name=name, executors=executors, volumes=volumes, - resources=tes_resources(self.destination_params) + resources=tes_resources(self._tes_job_params) ) created_task = self._tes_client.create_task(tes_task) return ExternalId(created_task.id) @@ -765,7 +786,7 @@ def _container_to_executor(self, container: CoexecutionContainerCommand) -> TesE @property def _tes_client(self) -> TesClient: - return tes_client_from_dict(self.destination_params) + return tes_client_from_params(self._tes_job_params) @property def _tes_job_name(self): @@ -791,6 +812,11 @@ def raw_check_complete(self) -> Dict[str, Any]: "complete": "true" if tes_state_is_complete(tes_state) else "false", # Ancient John, what were you thinking? } + @property + def _tes_job_params(self): + tes_job_params = parse_tes_job_params(self.destination_params) + return tes_job_params + class TesPollingCoexecutionJobClient(BasePollingCoexecutionJobClient, LaunchesTesContainersMixin): """A client that co-executes pods via GA4GH TES and depends on amqp for status updates.""" @@ -999,6 +1025,141 @@ def _raw_check_complete(self): } +class LaunchesGcpContainersMixin(CoexecutionLaunchMixin): + ensure_library_available = ensure_gcp_client + # https://cloud.google.com/php/docs/reference/cloud-batch/latest/V1.Runnable.Barrier + # can we do barriers here to allow sequential? It would allow separate containers for startup + # and shutdown that don't run parallel to the job? + execution_type = ExecutionType.PARALLEL + + def default_staging_directory(self, destination_params): + ssd_name = destination_params.get("ssd_name", "pulsar_staging") + return f"/mnt/disks/{ssd_name}" + + def _setup_gcp_batch_client_properties(self, destination_params): + self.instance_id = gcp_galaxy_instance_id(destination_params) + + def _launch_containers( + self, + pulsar_submit_container: CoexecutionContainerCommand, + tool_container: Optional[CoexecutionContainerCommand], + pulsar_finish_container: Optional[CoexecutionContainerCommand] + ) -> None: + assert pulsar_finish_container is None + gcp_job_params = self._gcp_job_params + job = gcp_job_template(gcp_job_params) + runnable = container_command_to_gcp_runnable("pulsar-container", pulsar_submit_container) + job.task_groups[0].task_spec.runnables.append(runnable) + + if tool_container: + tool_runnable = container_command_to_gcp_runnable("tool-container", tool_container) + job.task_groups[0].task_spec.runnables.append(tool_runnable) + + job_name = self._job_name + create_request = gcp_job_request(gcp_job_params, job, job_name) + client = gcp_client(gcp_job_params.credentials_file) + job = client.create_job(create_request) + + @property + def _job_name(self): + # currently just _k8s_job_prefix... which might be fine? + job_id = self.job_id + job_name = produce_unique_k8s_job_name(app_prefix="pulsar", job_id=job_id, instance_id=self.instance_id) + return job_name + + @property + def _gcp_job_params(self): + gcp_job_params = parse_gcp_job_params(self.destination_params) + return gcp_job_params + + +class GcpMessageCoexecutionJobClient(BaseMessageCoexecutionJobClient, LaunchesGcpContainersMixin): + """A client that co-executes pods via GCP and depends on amqp for status updates.""" + + def __init__(self, destination_params, job_id, client_manager): + super().__init__(destination_params, job_id, client_manager) + self._setup_gcp_batch_client_properties(destination_params) + + +class GcpPollingCoexecutionJobClient(BasePollingCoexecutionJobClient, LaunchesGcpContainersMixin): + """A client that co-executes pods via GCP and doesn't depend on amqp.""" + + def __init__(self, destination_params, job_id, client_manager): + super().__init__(destination_params, job_id, client_manager) + self._setup_gcp_batch_client_properties(destination_params) + + def kill(self): + gcp_job_params = self._gcp_job_params + delete_gcp_job(gcp_job_params.project_id, gcp_job_params.region, self._job_name, gcp_job_params.credentials_file) + + def clean(self): + pass + + def full_status(self): + status = self.raw_check_complete() + return status + + def raw_check_complete(self) -> Dict[str, Any]: + gcp_job_params = self._gcp_job_params + job = get_gcp_job(gcp_job_params.project_id, gcp_job_params.region, self._job_name, gcp_job_params.credentials_file) + status = job.status + state = status.state + return { + "status": gcp_state_to_pulsar_status(state), + "complete": "true" if gcp_state_is_complete(state) else "false", # Ancient John, what were you thinking? + } + + +def gcp_state_to_pulsar_status(state: Optional["batch_v1.JobStatus.State"]) -> str: + state = state or cast(batch_v1.JobStatus.State, batch_v1.JobStatus.State.STATE_UNSPECIFIED) + # STATE_UNSPECIFIED Job state unspecified. + # QUEUED Job is admitted (validated and persisted) and waiting for resources. + # SCHEDULED Job is scheduled to run as soon as resource allocation is ready. The resource + # allocation may happen at a later time but with a high chance to succeed. + # RUNNING Resource allocation has been successful. At least one Task in the Job is RUNNING. + # SUCCEEDED All Tasks in the Job have finished successfully. + # FAILED At least one Task in the Job has failed. + # DELETION_IN_PROGRESS The Job will be deleted, but has not been deleted yet. Typically this is because resources + # used by the Job are still being cleaned up. + # CANCELLATION_IN_PROGRESS The Job cancellation is in progress, this is because the resources used by the Job are + # still being cleaned up. + # CANCELLED The Job has been cancelled, the task executions were stopped and the resources were cleaned up. + state_map = { + batch_v1.JobStatus.State.STATE_UNSPECIFIED: manager_status.FAILED, + batch_v1.JobStatus.State.QUEUED: manager_status.PREPROCESSING, + batch_v1.JobStatus.State.RUNNING: manager_status.RUNNING, + batch_v1.JobStatus.State.SCHEDULED: manager_status.COMPLETE, + batch_v1.JobStatus.State.FAILED: manager_status.FAILED, + batch_v1.JobStatus.State.DELETION_IN_PROGRESS: manager_status.FAILED, + batch_v1.JobStatus.State.CANCELLATION_IN_PROGRESS: manager_status.CANCELLED, + batch_v1.JobStatus.State.CANCELLED: manager_status.CANCELLED, + } + if state not in state_map: + log.warning(f"Unknown tes state encountered [{state}]") + return manager_status.FAILED + else: + return state_map[state] + + +def gcp_state_is_complete(state: Optional["batch_v1.JobStatus.State"]) -> bool: + state = state or cast(batch_v1.JobStatus.State, batch_v1.JobStatus.State.STATE_UNSPECIFIED) + state_map = { + batch_v1.JobStatus.State.STATE_UNSPECIFIED: True, + batch_v1.JobStatus.State.QUEUED: False, + batch_v1.JobStatus.State.RUNNING: False, + batch_v1.JobStatus.State.SCHEDULED: True, + batch_v1.JobStatus.State.FAILED: True, + batch_v1.JobStatus.State.DELETION_IN_PROGRESS: True, + batch_v1.JobStatus.State.CANCELLATION_IN_PROGRESS: True, + batch_v1.JobStatus.State.CANCELLED: True, + } + if state not in state_map: + log.warning(f"Unknown gcp state encountered [{state}]") + return True + else: + return state_map[state] + + class LaunchesAwsBatchContainersMixin(CoexecutionLaunchMixin): """...""" execution_type = ExecutionType.SEQUENTIAL diff --git a/pulsar/client/container_job_config.py b/pulsar/client/container_job_config.py new file mode 100644 index 00000000..2f63affa --- /dev/null +++ b/pulsar/client/container_job_config.py @@ -0,0 +1,292 @@ +"""Setup config objects for Pulsar client container jobs. + +In a traditional batch Pulsar setup, job configuration is configured per destination +by configuring the manager the Pulsar client connects to. In a container job setup, +there is no Pulsar server component and the Pulsar client is responsible for configuring +the job configuration. This module provides the necessary configuration objects and +documents how to map Galaxy job environment configuration objects to the container scheduling +infrastructure. +""" +import base64 +import re +from typing import ( + Dict, + List, + NamedTuple, + Optional, +) + +from galaxy.util import listify +from pydantic import ( + BaseModel, + Field, +) +from pydantictes.models import TesResources +from typing_extensions import Literal + +from pulsar.managers.util.gcp_util import ( + batch_v1, + ensure_client as ensure_gcp_client, +) +from pulsar.managers.util.tes import TesClient + + +DEFAULT_GCP_WALLTIME_LIMIT = 60 * 60 * 24 # Default wall time limit in seconds + + +class CoexecutionContainerCommand(NamedTuple): + image: str + command: str + args: List[str] + working_directory: str + ports: Optional[List[int]] = None + + +def attribute_docs(gcp_class_name: str, attribute: str) -> Optional[str]: + """ + Extracts the documentation string for a given attribute from a class docstring. + + Args: + cls: The class object containing the docstring. + attr_name: The attribute name to extract documentation for. + + Returns: + A string containing the attribute's documentation, or None if not found. + """ + gcp_class = getattr(batch_v1, gcp_class_name, None) + if not gcp_class: + return None + + doc = gcp_class.__doc__ + if not doc: + return None + + lines = doc.expandtabs().splitlines() + inside_attributes = False + current_attr = None + current_lines: List[str] = [] + attr_docs = {} + + attr_pattern = re.compile(r" (\w*).*:.*") + for line in lines: + stripped = line.strip() + + if not inside_attributes: + if stripped == "Attributes:": + inside_attributes = True + continue + + if inside_attributes: + if line.startswith(" ") and not line.startswith(" "): # attr line + match = attr_pattern.match(line) + if match: + if current_attr: + # Save previous attribute + attr_docs[current_attr] = "\n".join(current_lines).strip() + current_lines = [] + current_attr = match.group(1) + else: + continue + elif line.startswith(" ") and current_attr: + current_lines.append(stripped) + + if current_attr and current_lines: + attr_docs[current_attr] = "\n".join(current_lines).strip() + + docs = attr_docs.get(attribute) + if docs: + return f"Docs from {gcp_class_name}.{attribute}:\n{docs}" + else: + return None + + +class GcpJobParams(BaseModel): + project_id: str = Field( + ..., description="GCP project ID to use for job creation." + ) + credentials_file: Optional[str] = Field( + None, description="Path to GCP service account credentials file." + ) + region: str = Field( + "us-central1", description="GCP region where the job will run." + ) + walltime_limit: int = Field( + DEFAULT_GCP_WALLTIME_LIMIT, + description=f"Maximum wall time for the job in seconds. Maps to TaskSpec.max_run_duration.\n{attribute_docs('TaskSpec', 'max_run_duration')}" + ) + retry_count: int = Field( + 2, description=f"Maximum number of retries for the job. Maps to TaskSpec.max_retry_count.\n{attribute_docs('TaskSpec', 'max_retry_count')}" + ) + ssd_name: Optional[str] = Field( + None, description=f"Name of the SSD volume to be mounted in the task. Maps to Volume.device_name.\n{attribute_docs('Volume', 'device_name')}" + ) + disk_size: int = Field( + 375, description="Size of the shared local SSD disk in GB (must be a multiple of 375). Maps to AllocationPolicy.Disk.size_gb." + ) + machine_type: str = Field( + "n1-standard-1", description="Machine type for the job's VM." + ) + labels: Optional[Dict[str, str]] = Field(None) + + +def parse_gcp_job_params(params: dict) -> GcpJobParams: + """ + Parse GCP job parameters from a dictionary (e.g., Galaxy's job destination/environment params). + """ + return GcpJobParams(**params) + + +def gcp_job_template(params: GcpJobParams) -> "batch_v1.Job": + ensure_gcp_client() + + # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/batch/create/create_with_ssd.py + task = batch_v1.TaskSpec() + task.max_retry_count = params.retry_count + task.max_run_duration = f"{params.walltime_limit}s" + + ssd_name = params.ssd_name or "pulsar_staging" + + volume = batch_v1.Volume() + volume.device_name = ssd_name + mount_path = f"/mnt/disks/{ssd_name}" + volume.mount_path = mount_path + task.volumes = [volume] + + # override the staging directory since we cannot set the location of this mount path + # the way we can in K8S based on @jmchilton's initial testing. + environment = batch_v1.Environment( + variables={ + "PULSAR_CONFIG_OVERRIDE_STAGING_DIRECTORY": mount_path, + } + ) + task.environment = environment + + # Tasks are grouped inside a job using TaskGroups. + # Currently, it's possible to have only one task group. + group = batch_v1.TaskGroup() + group.task_count = 1 + group.task_spec = task + + disk = batch_v1.AllocationPolicy.Disk() + disk.type_ = "local-ssd" + # The size of all the local SSDs in GB. Each local SSD is 375 GB, + # so this value must be a multiple of 375 GB. + # For example, for 2 local SSDs, set this value to 750 GB. + disk.size_gb = params.disk_size + assert disk.size_gb % 375 == 0 + + # Policies are used to define on what kind of virtual machines the tasks will run on. + # The allowed number of local SSDs depends on the machine type for your job's VMs. + # In this case, we tell the system to use "n1-standard-1" machine type, which require to attach local ssd manually. + # Read more about local disks here: https://cloud.google.com/compute/docs/disks/local-ssd#lssd_disk_options + policy = batch_v1.AllocationPolicy.InstancePolicy() + policy.machine_type = params.machine_type + + attached_disk = batch_v1.AllocationPolicy.AttachedDisk() + attached_disk.new_disk = disk + attached_disk.device_name = ssd_name + policy.disks = [attached_disk] + + instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate() + instances.policy = policy + + allocation_policy = batch_v1.AllocationPolicy() + allocation_policy.instances = [instances] + + job = batch_v1.Job() + job.task_groups = [group] + job.allocation_policy = allocation_policy + job.labels = params.labels or {} + # We use Cloud Logging as it's an out of the box available option + job.logs_policy = batch_v1.LogsPolicy() + job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING # type: ignore[assignment] + + return job + + +def gcp_job_request(params: GcpJobParams, job: "batch_v1.Job", job_name: str) -> "batch_v1.CreateJobRequest": + create_request = batch_v1.CreateJobRequest() + create_request.job = job + create_request.job_id = job_name + region = params.region + project_id = params.project_id + create_request.parent = f"projects/{project_id}/locations/{region}" + return create_request + + +def container_command_to_gcp_runnable(name: str, container: CoexecutionContainerCommand) -> "batch_v1.Runnable": + runnable = batch_v1.Runnable() + runnable.container = batch_v1.Runnable.Container() + runnable.container.image_uri = container.image + runnable.container.commands = [container.command] + container.args + # ports not supported currently + return runnable + + +def gcp_galaxy_instance_id(destination_params: Dict[str, str]) -> Optional[str]: + return destination_params.get("galaxy_instance_id") + + +class BasicAuth(BaseModel): + username: str = Field(..., description="Username for basic authentication.") + password: str = Field(..., description="Password for basic authentication.") + + +class TesJobParams(TesResources): + tes_url: str = Field(..., description="URL of the TES service.") + authorization: Literal["none", "basic"] = Field( + "none", description="Authorization type for TES service." + ) + basic_auth: Optional[BasicAuth] = Field(None, description="Authorization for TES service.") + + +def parse_tes_job_params(params: dict) -> TesJobParams: + """ + Parse GCP job parameters parameters from a dictionary (e.g., Galaxy's job destination/environment params). + """ + legacy_style_keys = { + "tes_cpu_cores": "cpu_cores", + "tes_preemptible": "preemptible", + "tes_ram_gb": "ram_gb", + "tes_disk_gb": "disk_gb", + "tes_zones": "zones", + "tes_backend_parameters": "backend_parameters", + "tes_backend_parameters_strict": "backend_parameters_strict", + "tes_galaxy_instance_id": "galaxy_instance_id", + } + expanded_params = {} + for key, value in params.items(): + if key in legacy_style_keys: + new_key = legacy_style_keys[key] + expanded_params[new_key] = value + else: + expanded_params[key] = value + + if "zones" in expanded_params: + expanded_params["zones"] = listify(expanded_params["zones"]) + + return TesJobParams(**expanded_params) + + +def tes_client_from_params(tes_params: TesJobParams) -> TesClient: + tes_url = tes_params.tes_url + assert tes_url + auth_type = tes_params.authorization # Default to "none" + + headers = {} + + if auth_type == "basic": + basic_auth = tes_params.basic_auth + username = basic_auth.username if basic_auth else None + password = basic_auth.password if basic_auth else None + if username and password: + auth_string = f"{username}:{password}" + auth_base64 = base64.b64encode(auth_string.encode()).decode() + headers["Authorization"] = f"Basic {auth_base64}" + + return TesClient(url=tes_url, headers=headers) + + +def tes_resources(tes_params: TesJobParams) -> TesResources: + # TesJobParams subclasses it so just pass through as is. + return tes_params diff --git a/pulsar/client/manager.py b/pulsar/client/manager.py index 9c429734..2b239212 100644 --- a/pulsar/client/manager.py +++ b/pulsar/client/manager.py @@ -23,6 +23,8 @@ BaseJobClient, InputCachingJobClient, JobClient, + GcpMessageCoexecutionJobClient, + GcpPollingCoexecutionJobClient, K8sMessageCoexecutionJobClient, K8sPollingCoexecutionJobClient, MessageCLIJobClient, @@ -240,6 +242,8 @@ def get_client(self, destination_params, job_id, **kwargs): return K8sMessageCoexecutionJobClient(destination_params, job_id, self) elif destination_params.get("tes_url", False): return TesMessageCoexecutionJobClient(destination_params, job_id, self) + elif destination_params.get("project_id", False): + return GcpMessageCoexecutionJobClient(destination_params, job_id, self) else: return MessageJobClient(destination_params, job_id, self) @@ -256,6 +260,8 @@ def get_client(self, destination_params, job_id, **kwargs): return K8sPollingCoexecutionJobClient(destination_params, job_id, self) elif destination_params.get("tes_url", False): return TesPollingCoexecutionJobClient(destination_params, job_id, self) + elif destination_params.get("project_id", False): + return GcpPollingCoexecutionJobClient(destination_params, job_id, self) else: raise Exception("Unknown client configuration") @@ -268,7 +274,7 @@ def build_client_manager(**kwargs: Dict[str, Any]) -> ClientManagerInterface: return ClientManager(**kwargs) # TODO: Consider more separation here. elif kwargs.get('amqp_url', None): return MessageQueueClientManager(**kwargs) - elif kwargs.get("k8s_enabled") or kwargs.get("tes_url"): + elif kwargs.get("k8s_enabled") or kwargs.get("tes_enabled") or kwargs.get("gcp_batch_enabled"): return PollingJobClientManager(**kwargs) else: return ClientManager(**kwargs) diff --git a/pulsar/managers/util/gcp_util.py b/pulsar/managers/util/gcp_util.py new file mode 100644 index 00000000..cfba0b74 --- /dev/null +++ b/pulsar/managers/util/gcp_util.py @@ -0,0 +1,80 @@ +import logging +from typing import ( + Any, + Optional, +) + +try: + from google.cloud import batch_v1 # type: ignore + from google.oauth2 import service_account # type: ignore +except ImportError as exc: + service_account = None # type: ignore[assignment] + batch_v1 = None # type: ignore[assignment] + GCP_IMPORT_MESSAGE = ( + "The Python google-cloud-batch package is required to use " + "this feature, please install it or correct the " + f"following error:\nImportError {exc}" + ) + +log = logging.getLogger(__name__) + + +def ensure_client(): + if batch_v1 is None: + raise Exception(GCP_IMPORT_MESSAGE) + + +def gcp_client(credentials_file: Optional[str]) -> "batch_v1.BatchServiceClient": + if credentials_file: + credentials = service_account.Credentials.from_service_account_file(credentials_file) + client = batch_v1.BatchServiceClient(credentials=credentials) + else: + client = batch_v1.BatchServiceClient() + return client + + +def get_gcp_job( + project_id: str, + region: str, + job_name: str, + credentials_file: Optional[str] = None, +) -> "batch_v1.Job": + """ + Retrieve a GCP Batch job by its name. + + Args: + project_id: GCP project ID. + region: GCP region where the job is located. + job_name: Name of the job to retrieve. + credentials_file: Path to GCP service account credentials file (optional). + + Returns: + The GCP Batch job object. + """ + ensure_client() + client = gcp_client(credentials_file) + return client.get_job( + name=f"projects/{project_id}/locations/{region}/jobs/{job_name}" + ) + + +def delete_gcp_job( + project_id: str, + region: str, + job_name: str, + credentials_file: Optional[str] = None, +) -> Any: + ensure_client() + client = gcp_client(credentials_file) + return client.delete_job( + name=f"projects/{project_id}/locations/{region}/jobs/{job_name}" + ) + + +__all__ = ( + "ensure_client", + "gcp_client", + "batch_v1", + "get_gcp_job", + "delete_gcp_job", +) diff --git a/pulsar/managers/util/tes.py b/pulsar/managers/util/tes.py index 8b8989f5..8bdb3993 100644 --- a/pulsar/managers/util/tes.py +++ b/pulsar/managers/util/tes.py @@ -1,16 +1,9 @@ -import base64 from typing import ( Any, - cast, Dict, - List, Optional, ) -from galaxy.util import ( - asbool, - listify, -) IMPORT_MESSAGE = None try: @@ -40,73 +33,8 @@ def ensure_tes_client() -> None: raise Exception(IMPORT_MESSAGE) -def tes_client_from_dict(destination_params: Dict[str, Any]) -> TesClient: - tes_url = destination_params.get("tes_url") - auth_type = destination_params.get("authorization", "none") # Default to "none" - - headers = {} - - if auth_type == "basic": - basic_auth = destination_params.get("basic_auth", {}) - username = basic_auth.get("username") - password = basic_auth.get("password") - if username and password: - auth_string = f"{username}:{password}" - auth_base64 = base64.b64encode(auth_string.encode()).decode() - headers["Authorization"] = f"Basic {auth_base64}" - - return TesClient(url=tes_url, headers=headers) - - -def tes_resources(destination_params: Dict[str, Any]) -> TesResources: - cpu_cores: Optional[int] - preemptible: Optional[bool] - ram_gb: Optional[float] - disk_gb: Optional[float] - zones: Optional[List[str]] - backend_parameters: Optional[Dict[str, str]] = None - backend_parameters_strict: Optional[bool] - - raw_cpu_cores = destination_params.get("tes_cpu_cores") - cpu_cores = int(raw_cpu_cores) if raw_cpu_cores is not None else None - - raw_preemptible = destination_params.get("tes_preemptible") - preemptible = asbool(raw_preemptible) if raw_preemptible is not None else None - - raw_ram_gb = destination_params.get("tes_ram_gb") - ram_gb = float(raw_ram_gb) if raw_ram_gb is not None else None - - raw_disk_gb = destination_params.get("tes_disk_gb") - disk_gb = float(raw_disk_gb) if raw_disk_gb is not None else None - - raw_zones = destination_params.get("tes_zones") - zones = listify(raw_zones) if raw_zones is not None else None - - raw_backend_parameters = destination_params.get("tes_backend_parameters") - if raw_backend_parameters is not None: - backend_parameters = {} - for k, v in cast(dict, raw_backend_parameters).items(): - backend_parameters[str(k)] = str(v) - - raw_backend_parameters_strict = destination_params.get("tes_backend_parameters_strict") - if raw_backend_parameters_strict is not None: - backend_parameters_strict = asbool(raw_backend_parameters_strict) - else: - backend_parameters_strict = None - - return TesResources( - cpu_cores=cpu_cores, - preemptible=preemptible, - ram_gb=ram_gb, - disk_gb=disk_gb, - zones=zones, - backend_parameters=backend_parameters, - backend_parameters_strict=backend_parameters_strict, - ) - - -def tes_galaxy_instance_id(destinaton_params: Dict[str, Any]) -> Optional[str]: - return destinaton_params.get("tes_galaxy_instance_id") +def tes_galaxy_instance_id(destination_params: Dict[str, Any]) -> Optional[str]: + return destination_params.get("galaxy_instance_id") or destination_params.get("tes_galaxy_instance_id") __all__ = ( diff --git a/requirements.txt b/requirements.txt index 1db30fa4..5c135f1b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,7 @@ typing-extensions pydantic-tes>=0.1.5 pyjwt tuspy +google-cloud-batch ## Uncomment if using DRMAA queue manager. #drmaa diff --git a/test/container_job_config_test.py b/test/container_job_config_test.py new file mode 100644 index 00000000..1e35bf1f --- /dev/null +++ b/test/container_job_config_test.py @@ -0,0 +1,36 @@ +from pulsar.client.container_job_config import ( + GcpJobParams, + gcp_job_template, + parse_gcp_job_params, + DEFAULT_GCP_WALLTIME_LIMIT, + TesJobParams, +) + + +def test_docs(): + print(GcpJobParams.schema()) + print(TesJobParams.schema()) + + +def test_gcp_defaults(): + params = parse_gcp_job_params({"project_id": "moo"}) + assert params.project_id == "moo" + assert params.credentials_file is None + assert params.walltime_limit == DEFAULT_GCP_WALLTIME_LIMIT + + +def test_gcp_job_template(): + params = parse_gcp_job_params({"project_id": "moo"}) + job = gcp_job_template(params) + assert job is not None + assert len(job.task_groups) == 1 + task_group = job.task_groups[0] + task = task_group.task_spec + assert len(task.volumes) == 1 + + +def test_gcp_custom_walltime(): + custom_walltime = "7200" # 2 hours in seconds + params = parse_gcp_job_params(dict(project_id="moo", credentials_file="path/to/credentials.json", walltime_limit=custom_walltime)) + assert params.credentials_file == "path/to/credentials.json" + assert params.walltime_limit == int(custom_walltime) diff --git a/test/tes_test.py b/test/tes_test.py index 9b0849a3..ee9da63f 100644 --- a/test/tes_test.py +++ b/test/tes_test.py @@ -1,35 +1,40 @@ -from pulsar.managers.util.tes import tes_resources +from pulsar.client.container_job_config import ( + parse_tes_job_params, + tes_resources, +) def test_tes_resources_from_xml(): - resources = tes_resources({ + resources = tes_resources(parse_tes_job_params({ + "tes_url": "http://moo", "tes_cpu_cores": "2", "tes_preemptible": "true", "tes_ram_gb": "128.0", "tes_disk_gb": "512.0", "tes_zones": "us-west-1,us-east-1", - }) + })) assert resources.cpu_cores == 2 assert resources.preemptible is True assert resources.ram_gb == 128.0 assert resources.disk_gb == 512.0 assert resources.zones == ["us-west-1", "us-east-1"] assert resources.backend_parameters is None - assert resources.backend_parameters_strict is None + assert resources.backend_parameters_strict is False def test_tes_resources_from_yaml(): - resources = tes_resources({ + resources = tes_resources(parse_tes_job_params({ + "tes_url": "http://moo", "tes_cpu_cores": 4, "tes_ram_gb": 127.0, "tes_disk_gb": 513.0, "tes_zones": ["us-west-1", "us-east-1"], "tes_backend_parameters": {"moo": "cow"}, - }) + })) assert resources.cpu_cores == 4 assert resources.preemptible is None assert resources.ram_gb == 127.0 assert resources.disk_gb == 513.0 assert resources.zones == ["us-west-1", "us-east-1"] assert resources.backend_parameters == {"moo": "cow"} - assert resources.backend_parameters_strict is None + assert resources.backend_parameters_strict is False