|
1 | 1 | use crate::errors::EnqueueError; |
2 | 2 | use crate::schema::background_jobs; |
3 | | -use diesel::connection::LoadConnection; |
4 | 3 | use diesel::dsl::{exists, not}; |
5 | | -use diesel::pg::Pg; |
6 | 4 | use diesel::sql_types::{Int2, Jsonb, Text}; |
7 | 5 | use diesel::{ExpressionMethods, IntoSql, OptionalExtension, QueryDsl}; |
8 | | -use diesel_async::AsyncPgConnection; |
| 6 | +use diesel_async::{AsyncPgConnection, RunQueryDsl}; |
9 | 7 | use serde::de::DeserializeOwned; |
10 | 8 | use serde::Serialize; |
11 | 9 | use serde_json::Value; |
@@ -40,85 +38,28 @@ pub trait BackgroundJob: Serialize + DeserializeOwned + Send + Sync + 'static { |
40 | 38 | /// Execute the task. This method should define its logic. |
41 | 39 | fn run(&self, ctx: Self::Context) -> impl Future<Output = anyhow::Result<()>> + Send; |
42 | 40 |
|
43 | | - #[instrument(name = "swirl.enqueue", skip(self, conn), fields(message = Self::JOB_NAME))] |
44 | | - fn enqueue( |
45 | | - &self, |
46 | | - conn: &mut impl LoadConnection<Backend = Pg>, |
47 | | - ) -> Result<Option<i64>, EnqueueError> { |
48 | | - let data = serde_json::to_value(self)?; |
49 | | - let priority = Self::PRIORITY; |
50 | | - |
51 | | - if Self::DEDUPLICATED { |
52 | | - Ok(enqueue_deduplicated(conn, Self::JOB_NAME, &data, priority)?) |
53 | | - } else { |
54 | | - Ok(Some(enqueue_simple(conn, Self::JOB_NAME, &data, priority)?)) |
55 | | - } |
56 | | - } |
57 | | - |
58 | 41 | #[allow(async_fn_in_trait)] |
59 | 42 | #[instrument(name = "swirl.enqueue", skip(self, conn), fields(message = Self::JOB_NAME))] |
60 | | - async fn async_enqueue( |
61 | | - &self, |
62 | | - conn: &mut AsyncPgConnection, |
63 | | - ) -> Result<Option<i64>, EnqueueError> { |
| 43 | + async fn enqueue(&self, conn: &mut AsyncPgConnection) -> Result<Option<i64>, EnqueueError> { |
64 | 44 | let data = serde_json::to_value(self)?; |
65 | 45 | let priority = Self::PRIORITY; |
66 | 46 |
|
67 | 47 | if Self::DEDUPLICATED { |
68 | | - Ok(async_enqueue_deduplicated(conn, Self::JOB_NAME, &data, priority).await?) |
| 48 | + Ok(enqueue_deduplicated(conn, Self::JOB_NAME, &data, priority).await?) |
69 | 49 | } else { |
70 | 50 | Ok(Some( |
71 | | - async_enqueue_simple(conn, Self::JOB_NAME, &data, priority).await?, |
| 51 | + enqueue_simple(conn, Self::JOB_NAME, &data, priority).await?, |
72 | 52 | )) |
73 | 53 | } |
74 | 54 | } |
75 | 55 | } |
76 | 56 |
|
77 | | -fn enqueue_deduplicated( |
78 | | - conn: &mut impl LoadConnection<Backend = Pg>, |
79 | | - job_type: &str, |
80 | | - data: &Value, |
81 | | - priority: i16, |
82 | | -) -> Result<Option<i64>, EnqueueError> { |
83 | | - use diesel::RunQueryDsl; |
84 | | - |
85 | | - let similar_jobs = background_jobs::table |
86 | | - .select(background_jobs::id) |
87 | | - .filter(background_jobs::job_type.eq(job_type)) |
88 | | - .filter(background_jobs::data.eq(data)) |
89 | | - .filter(background_jobs::priority.eq(priority)) |
90 | | - .for_update() |
91 | | - .skip_locked(); |
92 | | - |
93 | | - let deduplicated_select = diesel::select(( |
94 | | - job_type.into_sql::<Text>(), |
95 | | - data.into_sql::<Jsonb>(), |
96 | | - priority.into_sql::<Int2>(), |
97 | | - )) |
98 | | - .filter(not(exists(similar_jobs))); |
99 | | - |
100 | | - let id = diesel::insert_into(background_jobs::table) |
101 | | - .values(deduplicated_select) |
102 | | - .into_columns(( |
103 | | - background_jobs::job_type, |
104 | | - background_jobs::data, |
105 | | - background_jobs::priority, |
106 | | - )) |
107 | | - .returning(background_jobs::id) |
108 | | - .get_result::<i64>(conn) |
109 | | - .optional()?; |
110 | | - |
111 | | - Ok(id) |
112 | | -} |
113 | | - |
114 | | -async fn async_enqueue_deduplicated( |
| 57 | +async fn enqueue_deduplicated( |
115 | 58 | conn: &mut AsyncPgConnection, |
116 | 59 | job_type: &str, |
117 | 60 | data: &Value, |
118 | 61 | priority: i16, |
119 | 62 | ) -> Result<Option<i64>, EnqueueError> { |
120 | | - use diesel_async::RunQueryDsl; |
121 | | - |
122 | 63 | let similar_jobs = background_jobs::table |
123 | 64 | .select(background_jobs::id) |
124 | 65 | .filter(background_jobs::job_type.eq(job_type)) |
@@ -149,34 +90,12 @@ async fn async_enqueue_deduplicated( |
149 | 90 | Ok(id) |
150 | 91 | } |
151 | 92 |
|
152 | | -fn enqueue_simple( |
153 | | - conn: &mut impl LoadConnection<Backend = Pg>, |
154 | | - job_type: &str, |
155 | | - data: &Value, |
156 | | - priority: i16, |
157 | | -) -> Result<i64, EnqueueError> { |
158 | | - use diesel::RunQueryDsl; |
159 | | - |
160 | | - let id = diesel::insert_into(background_jobs::table) |
161 | | - .values(( |
162 | | - background_jobs::job_type.eq(job_type), |
163 | | - background_jobs::data.eq(data), |
164 | | - background_jobs::priority.eq(priority), |
165 | | - )) |
166 | | - .returning(background_jobs::id) |
167 | | - .get_result(conn)?; |
168 | | - |
169 | | - Ok(id) |
170 | | -} |
171 | | - |
172 | | -async fn async_enqueue_simple( |
| 93 | +async fn enqueue_simple( |
173 | 94 | conn: &mut AsyncPgConnection, |
174 | 95 | job_type: &str, |
175 | 96 | data: &Value, |
176 | 97 | priority: i16, |
177 | 98 | ) -> Result<i64, EnqueueError> { |
178 | | - use diesel_async::RunQueryDsl; |
179 | | - |
180 | 99 | let id = diesel::insert_into(background_jobs::table) |
181 | 100 | .values(( |
182 | 101 | background_jobs::job_type.eq(job_type), |
|
0 commit comments