Skip to content

Commit 11cd360

Browse files
feat(v1alpha api): Allow passing parameters in Run Workflow API call (#323)
Changes the Run workflow API implementation to allow passing parameters and improve performance. Previously, the request was synchronously waiting for repo-proxy to create a hook by retrieving data from the git provider and then schedule the workflow on the plumber. This had performance issues, and it did not support passing parameters to be used in the pipeline. The new implementation goes directly to the plumber, which only saves the request in the database before responding, and all hook processing is done asynchronously. This should improve performance, and also the plumber supports passing parameters out-of-the-box.
1 parent 5e719e2 commit 11cd360

40 files changed

+481
-598
lines changed

docs/docs/reference/api.md

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ Parameters:
100100
- `reference` (**required**) - git reference for the desired branch, tag, or pull request--e.g. *refs/heads/master*, *refs/tags/v1.0*, or *refs/pull/123*.
101101
- `commit_sha` (*optional*) - Commit sha of the desired commit.
102102
- `pipeline_file` (*optional*) - The path within the repository to the YAML file that contains the pipeline definition. The default value is *.semaphore/semaphore.yml*.
103+
- `parameters`: (key-values) specify parameter values that will be available in all jobs of the initial pipeline and can be used in the same way as the parameters from the [parameterized promotions](../using-semaphore/promotions#parameters).
103104

104105
Response:
105106

@@ -108,17 +109,25 @@ HTTP status: 200
108109

109110
{
110111
"workflow_id": "32a689e0-9082-4c5b-a648-bb3dc645452d",
111-
"pipeline_id": "2abeb1a9-eb4a-4834-84b8-cb7806aec063",
112-
"hook_id": "ff7d57ef-92c5-4fcd-9c0c-6ae9e24bfcec"
112+
"pipeline_id": "2abeb1a9-eb4a-4834-84b8-cb7806aec063"
113113
}
114114
```
115115

116116
Example:
117117

118118
```shell
119-
curl -i -H "Authorization: Token {api_token}" \
120-
-d "project_id={project_id}&reference={reference}" \
121-
-X POST "https://<organization-url>.semaphoreci.com/api/v1alpha/plumber-workflows"
119+
curl -X POST --location "https://<organization-url>.semaphoreci.com/api/v1alpha/plumber-workflows" \
120+
-H "Authorization: Token {api_token}" \
121+
-H "Content-Type: application/json" \
122+
-d $'{
123+
"project_id": "my_project_id",
124+
"reference": "refs/heads/master",
125+
"pipeline_file": "/.semaphore/deploy.yml",
126+
"parameters": {
127+
"PARAM_NAME": "PARAM_VALUE",
128+
"PARAM_NAME_2": "PARAM_VALUE_2"
129+
}
130+
}'
122131
```
123132

124133
### Describe a workflow

github_hooks/lib/internal_api/repo_proxy/repo_proxy_server.rb

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,64 @@ class RepoProxyServer < RepoProxyService::Service
5151
end
5252
end
5353

54+
define_rpc :create_blank do |req, logger|
55+
project = ::Project.find(req.project_id)
56+
user = ::User.find(req.requester_id)
57+
58+
payload_builder = InternalApi::RepoProxy::PayloadFactory.create(req.git.reference, req.git.commit_sha)
59+
payload = payload_builder.call(project, user)
60+
61+
params = ActionController::Parameters.new
62+
params["hash_id"] = project.id
63+
params["payload"] = payload.to_json
64+
65+
workflow = ::Semaphore::RepoHost::Hooks::Recorder.record_hook(params, project)
66+
workflow.update(:result => ::Workflow::RESULT_OK)
67+
68+
branch = ::Branch.find_or_create_for_workflow(workflow)
69+
branch.unarchive
70+
workflow.update(:branch_id => branch.id)
71+
72+
if workflow.payload.pull_request?
73+
branch.update(:pull_request_mergeable => true)
74+
workflow.update(
75+
:commit_author => payload["commit_author"],
76+
:commit_sha => payload["merge_commit_sha"],
77+
:git_ref => payload["semaphore_ref"]
78+
)
79+
end
80+
81+
workflow.update(:ppl_id => req.pipeline_id)
82+
workflow.update(:wf_id => req.wf_id)
83+
workflow.update(:state => Workflow::STATE_LAUNCHING)
84+
85+
InternalApi::RepoProxy::CreateBlankResponse.new(
86+
:hook_id => workflow.id,
87+
:wf_id => req.wf_id,
88+
:pipeline_id => req.pipeline_id,
89+
:branch_id => branch.id,
90+
:repo => InternalApi::RepoProxy::CreateBlankResponse::Repo.new(
91+
:owner => branch.project.repository.owner,
92+
:repo_name => branch.project.repository.name,
93+
:branch_name => branch.name,
94+
:commit_sha => workflow.commit_sha,
95+
:repository_id => branch.project.repository.id
96+
)
97+
)
98+
99+
rescue ::InternalApi::RepoProxy::PrPayload::PrNotMergeableError => e
100+
raise GRPC::Aborted, e.message
101+
rescue ::InternalApi::RepoProxy::PayloadFactory::InvalidReferenceError => e
102+
raise GRPC::InvalidArgument, e.message
103+
rescue ::RepoHost::RemoteException::NotFound
104+
raise GRPC::NotFound, "Reference not found on GitHub #{req.git.reference} #{req.git.commit_sha}"
105+
rescue ::RepoHost::RemoteException::Unknown => e
106+
logger.error("Unknown error", error: e.message)
107+
raise GRPC::Internal, "Unknown error"
108+
rescue ::ActiveRecord::RecordNotFound => e
109+
raise GRPC::NotFound, e.message
110+
end
111+
54112
define_rpc :create do |req, logger|
55113
project = ::Project.find(req.project_id)
56114

@@ -102,8 +160,6 @@ def create_for_github_project(req, logger)
102160
workflow.branch_name
103161
end
104162

105-
integration_token, = ::Semaphore::ProjectIntegrationToken.new.project_token(branch.project)
106-
107163
client = InternalApi::PlumberWF::WorkflowService::Stub.new(App.plumber_internal_url, :this_channel_is_insecure)
108164
request = InternalApi::PlumberWF::ScheduleRequest.new(
109165
:service => InternalApi::PlumberWF::ScheduleRequest::ServiceType::GIT_HUB,

github_hooks/spec/lib/internal_api/repo_proxy/repo_proxy_server_spec.rb

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,170 @@
217217
end
218218
end
219219

220+
describe "#create_blank" do
221+
let(:user) { FactoryBot.create(:user, :github_connection) }
222+
let(:repository) do
223+
FactoryBot.create(
224+
:repository,
225+
name: "sandbox",
226+
owner: "renderedtext",
227+
integration_type: "github_app"
228+
)
229+
end
230+
let(:project) { FactoryBot.create(:project, repository: repository) }
231+
let(:workflow) { FactoryBot.create(:workflow_with_branch, project: project) }
232+
let(:branch) { workflow.branch }
233+
234+
let(:git) do
235+
InternalApi::RepoProxy::CreateBlankRequest::Git.new(
236+
reference: "refs/heads/main",
237+
commit_sha: "abc123"
238+
)
239+
end
240+
241+
let(:req) do
242+
InternalApi::RepoProxy::CreateBlankRequest.new(
243+
project_id: project.id,
244+
requester_id: user.id,
245+
pipeline_id: "pipeline-id",
246+
wf_id: "workflow-id",
247+
git: git
248+
)
249+
end
250+
251+
let(:payload_hash) do
252+
{
253+
"commit_author" => "[email protected]",
254+
"merge_commit_sha" => "abc123",
255+
"semaphore_ref" => "refs/merge"
256+
}
257+
end
258+
259+
before do
260+
payload = instance_double(InternalApi::RepoProxy::PrPayload, call: payload_hash)
261+
allow(InternalApi::RepoProxy::PayloadFactory).to receive(:create)
262+
.with(req.git.reference, req.git.commit_sha)
263+
.and_return(payload)
264+
265+
allow(Semaphore::RepoHost::Hooks::Recorder).to receive(:record_hook)
266+
.and_return(workflow)
267+
268+
allow(Branch).to receive(:find_or_create_for_workflow).with(workflow).and_return(branch)
269+
allow(branch).to receive(:unarchive)
270+
allow(branch).to receive(:update)
271+
allow(workflow).to receive(:update)
272+
end
273+
274+
it "creates a blank hook and returns the expected response" do
275+
allow(payload_hash).to receive(:pull_request?).and_return(false)
276+
allow(workflow).to receive(:payload).and_return(payload_hash)
277+
expect(workflow).to receive(:update).with(state: Workflow::STATE_LAUNCHING)
278+
result = server.create_blank(req, call)
279+
280+
expect(result).to be_a(InternalApi::RepoProxy::CreateBlankResponse)
281+
expect(result.hook_id).to eq(workflow.id)
282+
expect(result.wf_id).to eq(req.wf_id)
283+
expect(result.pipeline_id).to eq(req.pipeline_id)
284+
expect(result.branch_id).to eq(branch.id)
285+
286+
repo = result.repo
287+
expect(repo.owner).to eq(repository.owner)
288+
expect(repo.repo_name).to eq(repository.name)
289+
expect(repo.branch_name).to eq(branch.name)
290+
expect(repo.commit_sha).to eq(workflow.commit_sha)
291+
expect(repo.repository_id).to eq(repository.id)
292+
end
293+
294+
context "when pull request is not mergeable" do
295+
before do
296+
allow(InternalApi::RepoProxy::PayloadFactory).to receive(:create).and_raise(
297+
InternalApi::RepoProxy::PrPayload::PrNotMergeableError.new("PR not mergeable")
298+
)
299+
end
300+
301+
it "raises GRPC::Aborted" do
302+
expect do
303+
server.create_blank(req, call)
304+
end.to raise_error(GRPC::Aborted, /PR not mergeable/)
305+
end
306+
end
307+
308+
context "when reference is invalid" do
309+
before do
310+
allow(InternalApi::RepoProxy::PayloadFactory).to receive(:create)
311+
.and_raise(InternalApi::RepoProxy::PayloadFactory::InvalidReferenceError.new("Invalid ref"))
312+
end
313+
314+
it "raises GRPC::InvalidArgument" do
315+
expect do
316+
server.create_blank(req, call)
317+
end.to raise_error(GRPC::InvalidArgument, /Invalid ref/)
318+
end
319+
end
320+
321+
context "when reference is not found on GitHub" do
322+
before do
323+
allow(InternalApi::RepoProxy::PayloadFactory).to receive(:create)
324+
.and_raise(RepoHost::RemoteException::NotFound)
325+
end
326+
327+
it "raises GRPC::NotFound" do
328+
expect do
329+
server.create_blank(req, call)
330+
end.to raise_error(GRPC::NotFound, /Reference not found/)
331+
end
332+
end
333+
334+
context "when unknown error occurs" do
335+
before do
336+
allow(Semaphore::RepoHost::Hooks::Recorder).to receive(:record_hook)
337+
.and_raise(RepoHost::RemoteException::Unknown.new("Boom"))
338+
end
339+
340+
it "raises GRPC::Internal" do
341+
expect do
342+
server.create_blank(req, call)
343+
end.to raise_error(GRPC::Internal, /Unknown error/)
344+
end
345+
end
346+
347+
context "when the user is not found" do
348+
before do
349+
@invalid_req = InternalApi::RepoProxy::CreateBlankRequest.new(
350+
project_id: project.id,
351+
requester_id: "invalid-user-id",
352+
pipeline_id: "pipeline-id",
353+
wf_id: "workflow-id",
354+
git: git
355+
)
356+
end
357+
358+
it "raises GRPC::NotFound for missing user" do
359+
expect do
360+
server.create_blank(@invalid_req, call)
361+
end.to raise_error(GRPC::NotFound, /Couldn't find User/)
362+
end
363+
end
364+
365+
context "when the project is not found" do
366+
before do
367+
@invalid_req = InternalApi::RepoProxy::CreateBlankRequest.new(
368+
project_id: "invalid-project-id",
369+
requester_id: user.id,
370+
pipeline_id: "pipeline-id",
371+
wf_id: "workflow-id",
372+
git: git
373+
)
374+
end
375+
376+
it "raises GRPC::NotFound for missing project" do
377+
expect do
378+
server.create_blank(@invalid_req, call)
379+
end.to raise_error(GRPC::NotFound, /Couldn't find Project/)
380+
end
381+
end
382+
end
383+
220384
describe "#create" do
221385
before "when unknown remote error is raised" do
222386
allow(InternalApi::RepoProxy::PayloadFactory).to receive(

plumber/ppl/lib/ppl/actions/schedule_impl.ex

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,11 @@ defmodule Ppl.Actions.ScheduleImpl do
7979

8080
# Schedule
8181

82-
def schedule(ctx, top_level?, initial_request?, task_workflow?) do
82+
def schedule(ctx, top_level?, initial_request?, start_in_conceived?) do
8383
log_run_request(ctx)
8484

8585
ctx
86-
|> prepare_request_multi(top_level?, initial_request?, task_workflow?)
86+
|> prepare_request_multi(top_level?, initial_request?, start_in_conceived?)
8787
|> persist_request
8888
|> case do
8989
{:ok, %{ppl_req: ppl_req}} ->
@@ -92,7 +92,7 @@ defmodule Ppl.Actions.ScheduleImpl do
9292
retry_count: publish_retry_count(), timeout_ms: publish_timeout()),
9393

9494
predicate <- fn query -> query |> where(ppl_id: ^ppl_req.id) end,
95-
:ok <- execute_first_state_with_predicate(predicate, task_workflow?),
95+
:ok <- execute_first_state_with_predicate(predicate, start_in_conceived?),
9696
do: response(ppl_req)
9797
# Idempotency -> return {:ok, ...}
9898
{:error, :ppl_req, {:request_token_exists, request_token}, _} ->
@@ -136,32 +136,32 @@ defmodule Ppl.Actions.ScheduleImpl do
136136
|> Map.put("suppressed_attributes", attribute_list)
137137
end
138138

139-
def prepare_request_multi(ctx, top_level?, initial_request?, task_workflow?) do
139+
def prepare_request_multi(ctx, top_level?, initial_request?, start_in_conceived?) do
140140
ctx = RequestReviser.revise(ctx)
141141

142142
Multi.new()
143143
# insert pipeline request
144144
|> Multi.run(:ppl_req, fn _, _ ->
145145
Metrics.benchmark("Ppl.schedule_break_down", ["insert_request"], fn ->
146-
PplRequestsQueries.insert_request(ctx, top_level?, initial_request?, task_workflow?)
146+
PplRequestsQueries.insert_request(ctx, top_level?, initial_request?, start_in_conceived?)
147147
end)
148148
end)
149149
# insert pipeline based on that request
150150
|> Multi.run(:ppl, fn _, %{ppl_req: ppl_req} ->
151151
Metrics.benchmark("Ppl.schedule_break_down", ["insert_pipeline"], fn ->
152-
PplsQueries.insert(ppl_req, "", task_workflow?)
152+
PplsQueries.insert(ppl_req, "", start_in_conceived?)
153153
end)
154154
end)
155155
# update pipeline to include wf_number
156156
|> Multi.run(:wf_num, fn _, %{ppl_req: ppl_req, ppl: ppl} ->
157157
Metrics.benchmark("Ppl.schedule_break_down", ["set_wf_num"], fn ->
158-
set_workflow_number(ppl, ppl_req, task_workflow?)
158+
set_workflow_number(ppl, ppl_req, start_in_conceived?)
159159
end)
160160
end)
161161
# insert pipeline sub init for this pipeline
162162
|> Multi.run(:ppl_sub_init, fn _, %{ppl_req: ppl_req} ->
163163
Metrics.benchmark("Ppl.schedule_break_down", ["insert_subinit"], fn ->
164-
PplSubInitsQueries.insert(ppl_req, "regular", task_workflow?)
164+
PplSubInitsQueries.insert(ppl_req, "regular", start_in_conceived?)
165165
end)
166166
end)
167167
# save inital_request separately for easier debug
@@ -179,25 +179,25 @@ defmodule Ppl.Actions.ScheduleImpl do
179179
end
180180

181181
# promotions
182-
def set_workflow_number(ppl, req = %{request_args: %{"wf_number" => num}}, task_workflow?)
182+
def set_workflow_number(ppl, req = %{request_args: %{"wf_number" => num}}, start_in_conceived?)
183183
when is_integer(num) and num > 0 do
184184
with service <- Map.get(req.request_args, "service"),
185-
{:ok, _ppl} <- update_ppl(ppl, service, num, task_workflow?),
185+
{:ok, _ppl} <- update_ppl(ppl, service, num, start_in_conceived?),
186186
do: {:ok, num}
187187
end
188188
# partial rebuilds
189-
def set_workflow_number(ppl = %{partial_rebuild_of: val}, ppl_req, task_workflow?)
189+
def set_workflow_number(ppl = %{partial_rebuild_of: val}, ppl_req, start_in_conceived?)
190190
when is_binary(val) and val != "" do
191191
with {:ok, l_wf} <- calculate_wf_num(ppl, ppl_req),
192192
service <- Map.get(ppl_req.request_args, "service"),
193-
{:ok, _ppl} <- update_ppl(ppl, service, l_wf.wf_number + 1, task_workflow?),
193+
{:ok, _ppl} <- update_ppl(ppl, service, l_wf.wf_number + 1, start_in_conceived?),
194194
do: {:ok, l_wf.wf_number + 1}
195195
end
196196
# regular schedule and wf_rebuild
197-
def set_workflow_number(ppl, ppl_req, task_workflow?) do
197+
def set_workflow_number(ppl, ppl_req, start_in_conceived?) do
198198
with {:ok, l_wf} <- read_from_latest_wf_table(ppl, ppl_req),
199199
service <- Map.get(ppl_req.request_args, "service"),
200-
{:ok, _ppl} <- update_ppl(ppl, service, l_wf.wf_number + 1, task_workflow?),
200+
{:ok, _ppl} <- update_ppl(ppl, service, l_wf.wf_number + 1, start_in_conceived?),
201201
{:ok, _} <- LatestWfsQueries.insert_or_update(l_wf, ppl_req, l_wf.wf_number + 1),
202202
do: {:ok, l_wf.wf_number + 1}
203203
end
@@ -224,8 +224,8 @@ defmodule Ppl.Actions.ScheduleImpl do
224224
defp get_initial_wf_ppl(%{wf_id: wf_id}, _ppl),
225225
do: PplsQueries.get_initial_wf_ppl(wf_id)
226226

227-
defp update_ppl(ppl, service, wf_num, task_workflow?) do
228-
with_repo_data? = !task_workflow?
227+
defp update_ppl(ppl, service, wf_num, start_in_conceived?) do
228+
with_repo_data? = !start_in_conceived?
229229

230230
ppl
231231
|> Ppls.changeset(%{wf_number: wf_num}, service == "listener_proxy", with_repo_data?)

0 commit comments

Comments
 (0)