-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrun_pipeline.R
More file actions
177 lines (150 loc) · 4.79 KB
/
run_pipeline.R
File metadata and controls
177 lines (150 loc) · 4.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#PREP
#generating batch_id
batch_id <- paste0(Sys.Date(),"-",paste(sample(letters,10),collapse = ""))
#load packages
library(httr)
library(jsonlite)
library(curl)
#load configuration from config.yml using config package
config <- config::get()
# Get users
if(config$gather_from_controller_app){
# GATHER
source("R/gather/get_subscribers_from_controller.R")
# Get subscribers for the list and write the data frame
subscribers_df <- get_subscribers_from_controller(api_url = config$controller_app_base_url,
email_list_id = config$controller_app_list_id,
api_token = config$controller_app_api_key)
write.csv(subscribers_df,config$participant_data_file,row.names = F)
}
# get their records using this script which will save data to config$data_file
source(config$gather_bio_script)
# check that records have been updated
if(difftime(file.info(config$data_file)$mtime,Sys.time(),units = "secs") < 30){
print("Biodiversity data file has been updated")
} else {
stop("Biodiversity data file has not been updated, check for issue in gather script")
}
# GENERATE
#run the pipeline
Sys.setenv(BATCH_ID = batch_id) # Set an environment variable in R for the batch code
targets::tar_make() # run the pipeline
Sys.unsetenv("BATCH_ID") #and unset the variable
#SEND
#maintain a dataframe with the email and any errors that arise when sending
status_log <- data.frame(
user_id = character(),
email = character(),
status = character(),
message = character(),
stringsAsFactors = FALSE
)
source("R/send/send_notify_app.R")
library(blastula)
if(config$mail_creds == "envvar"){
Sys.setenv(SMTP_PASSWORD = config$mail_password)
creds <- creds_envvar(
user = config$mail_username,
pass_envvar = "SMTP_PASSWORD",
host = config$mail_server,
port = config$mail_port,
use_ssl = config$mail_use_ssl)
}
if(config$mail_creds == "anonymous"){
creds <- creds_anonymous(host = config$mail_server,port=config$mail_port,use_ssl = config$mail_use_ssl)
}
meta_table <-read.csv(paste0("renders/",batch_id,"/meta_table.csv"))
participants <- read.csv(config$participant_data_file)
#here we are going through all the users and sending email
if (!is.data.frame(meta_table)) {
stop("meta_table must be a data frame.")
}
for (i in 1:nrow(meta_table)) {
result <- tryCatch({
# Debug: Print current row being processed
print(paste("Processing row:", i))
# Check for required columns in meta_table
if (!all(c("content_key", "user_id", "file", "email") %in% colnames(meta_table))) {
stop("meta_table is missing required columns.")
}
# Safely extract values
content_key <- meta_table[i, "content_key"]
user_id <- meta_table[i, "user_id"]
file <- meta_table[i, "file"]
email <- meta_table[i, "email"]
#check email is in content
lines_read <- paste0(readLines(file),collapse = "")
if(grepl(email, lines_read, fixed = TRUE)==FALSE){
stop("Target email address is different to email listed in footer")
}
# Perform operations
if(config$gather_from_controller_app){
send_notify_app(
content_key = content_key,
user_external_key = user_id,
batch_id = batch_id,
config = config
)
}
sender <- config$mail_default_sender
names(sender) <- config$mail_default_name
email_obj <- blastula:::cid_images(file)
smtp_send(
email_obj,
from = sender,
to = email,
subject = config$mail_default_subject,
credentials = creds,
verbose = FALSE
)
data.frame(
user_id = user_id,
email = email,
status = "Success",
message = "Email sent",
stringsAsFactors = FALSE
)
}, error = function(e) {
print("Email not sent")
data.frame(
user_id = user_id,
email = email,
status = "Failed",
message = e$message,
stringsAsFactors = FALSE
)
})
status_log <- rbind(status_log,result)
}
status_lines <- apply(status_log, 1, function(row) {
paste(
"User ID:", row["user_id"],
"| Email:", row["email"],
"| Status:", row["status"],
"| Message:", row["message"]
)
})
report_email <- compose_email(
body = md(
paste0(
"### Email Send Summary",
"
Total attempted: ", nrow(status_log),
"
✅ Success: ", sum(status_log$status == "Success"),
"
❌ Failed: ", sum(status_log$status == "Failed"),
"
Please check logs for more information"
)
),
footer = Sys.time()
)
smtp_send(
email = report_email,
from = sender,
to = config$mail_test_recipient, # your email
subject = paste("Email Batch Status Report -", Sys.Date()),
credentials = creds,
verbose = FALSE
)