1+ % %% @doc A device that implements the singleton pattern for processes specific
2+ % %% to an individual node. This device uses the `local-name@1.0' device to
3+ % %% register processes with names locally, persistenting them across reboots.
4+ % %%
5+ % %% Definitions of singleton processes are expected to be found with their
6+ % %% names in the `node_processes' section of the node message.
7+ -module (dev_node_process ).
8+ -export ([info /1 ]).
9+ -include (" include/hb.hrl" ).
10+ -include_lib (" eunit/include/eunit.hrl" ).
11+
12+ % % @doc Register a default handler for the device. Inherits `keys' and `set'
13+ % % from the default device.
14+ info (_Opts ) ->
15+ #{
16+ default => fun lookup /4 ,
17+ excludes => [<<" set" >>, <<" keys" >>]
18+ }.
19+
20+ % % @doc Lookup a process by name.
21+ lookup (Name , _Base , Req , Opts ) ->
22+ ? event (node_process , {lookup , {name , Name }}),
23+ LookupRes =
24+ hb_ao :resolve (
25+ #{ <<" device" >> => <<" local-name@1.0" >> },
26+ #{ <<" path" >> => <<" lookup" >>, <<" key" >> => Name , <<" load" >> => true },
27+ Opts
28+ ),
29+ case LookupRes of
30+ {ok , ProcessID } ->
31+ hb_cache :read (ProcessID , Opts );
32+ {error , not_found } ->
33+ case hb_ao :get (<<" spawn" >>, Req , true , Opts ) of
34+ true ->
35+ spawn_register (Name , Opts );
36+ false ->
37+ {error , not_found }
38+ end
39+ end .
40+
41+ % % @doc Spawn a new process according to the process definition found in the
42+ % % node message, and register it with the given name.
43+ spawn_register (Name , Opts ) ->
44+ case hb_opts :get (node_processes , #{}, Opts ) of
45+ #{ Name := BaseDef } ->
46+ % We have found the base process definition. Augment it with the
47+ % node's address as necessary, then commit to the result.
48+ ? event (node_process , {registering , {name , Name }, {base_def , BaseDef }}),
49+ Signed = hb_message :commit (augment_definition (BaseDef , Opts ), Opts ),
50+ ID = hb_message :id (Signed , signed , Opts ),
51+ ? event (node_process , {spawned , {name , Name }, {process , Signed }}),
52+ % `POST` to the schedule device for the process to start its sequence.
53+ {ok , Assignment } =
54+ hb_ao :resolve (
55+ Signed ,
56+ #{
57+ <<" path" >> => <<" schedule" >>,
58+ <<" method" >> => <<" POST" >>,
59+ <<" body" >> => Signed
60+ },
61+ Opts
62+ ),
63+ ? event (node_process , {initialized , {name , Name }, {assignment , Assignment }}),
64+ RegResult =
65+ dev_local_name :direct_register (
66+ #{ <<" key" >> => Name , <<" value" >> => ID },
67+ Opts
68+ ),
69+ ? event (node_process , {registered , {name , Name }, {process_id , ID }}),
70+ case RegResult of
71+ {ok , _ } ->
72+ {ok , Signed };
73+ {error , Err } ->
74+ {error , #{
75+ <<" status" >> => 500 ,
76+ <<" body" >> => <<" Failed to register process." >>,
77+ <<" details" >> => Err
78+ }}
79+ end ;
80+ _ ->
81+ % We could not find the base process definition for the given name
82+ % in the node message.
83+ {error , not_found }
84+ end .
85+
86+ % % @doc Augment the given process definition with the node's address.
87+ augment_definition (BaseDef , Opts ) ->
88+ Address =
89+ hb_util :human_id (
90+ ar_wallet :to_address (
91+ hb_opts :get (priv_wallet , no_viable_wallet , Opts )
92+ )
93+ ),
94+ hb_ao :set (
95+ BaseDef ,
96+ #{
97+ <<" scheduler" >> => Address
98+ }
99+ ).
100+
101+ % %% Tests
102+
103+ % %% The name that should be used for the singleton process during tests.
104+ -define (TEST_NAME , <<" test-node-process" >>).
105+
106+ % % @doc Helper function to generate a test environment and its options.
107+ generate_test_opts () ->
108+ {ok , Script } = file :read_file (<<" test/test.lua" >>),
109+ generate_test_opts (#{
110+ ? TEST_NAME => #{
111+ <<" device" >> => <<" process@1.0" >>,
112+ <<" execution-device" >> => <<" lua@5.3a" >>,
113+ <<" scheduler-device" >> => <<" scheduler@1.0" >>,
114+ <<" script" >> => Script
115+ }
116+ }).
117+ generate_test_opts (Defs ) ->
118+ #{
119+ store =>
120+ [
121+ #{
122+ <<" store-module" >> => hb_store_fs ,
123+ <<" prefix" >> =>
124+ <<
125+ " cache-TEST-" ,
126+ (integer_to_binary (os :system_time (millisecond )))/binary
127+ >>
128+ }
129+ ],
130+ node_processes => Defs ,
131+ priv_wallet => ar_wallet :new ()
132+ }.
133+
134+ lookup_no_spawn_test () ->
135+ Opts = generate_test_opts (),
136+ ? assertEqual (
137+ {error , not_found },
138+ lookup (<<" name1" >>, #{}, #{}, Opts )
139+ ).
140+
141+ lookup_spawn_test () ->
142+ Opts = generate_test_opts (),
143+ Res1 = {_ , Process1 } =
144+ hb_ao :resolve (
145+ #{ <<" device" >> => <<" node-process@1.0" >> },
146+ ? TEST_NAME ,
147+ Opts
148+ ),
149+ ? assertMatch (
150+ {ok , #{ <<" device" >> := <<" process@1.0" >> }},
151+ Res1
152+ ),
153+ {ok , Process2 } = hb_ao :resolve (
154+ #{ <<" device" >> => <<" node-process@1.0" >> },
155+ ? TEST_NAME ,
156+ Opts
157+ ),
158+ ? assertEqual (Process1 , Process2 ).
159+
160+ % % @doc Test that a process can be spawned, executed upon, and its result retrieved.
161+ lookup_execute_test () ->
162+ Opts = generate_test_opts (),
163+ Res1 =
164+ hb_ao :resolve_many (
165+ [
166+ #{ <<" device" >> => <<" node-process@1.0" >> },
167+ ? TEST_NAME ,
168+ #{
169+ <<" path" >> => <<" schedule" >>,
170+ <<" method" >> => <<" POST" >>,
171+ <<" body" >> =>
172+ hb_message :commit (
173+ #{
174+ <<" path" >> => <<" compute" >>,
175+ <<" test-key" >> => <<" test-value" >>
176+ },
177+ Opts
178+ )
179+ }
180+ ],
181+ Opts
182+ ),
183+ ? assertMatch (
184+ {ok , #{ <<" slot" >> := 1 }},
185+ Res1
186+ ),
187+ ? assertMatch (
188+ 42 ,
189+ hb_ao :get (
190+ << ? TEST_NAME /binary , " /now/results/output/body" >>,
191+ #{ <<" device" >> => <<" node-process@1.0" >> },
192+ Opts
193+ )
194+ ).
0 commit comments