Skip to content

Commit dfd4269

Browse files
author
saul-data
committed
fixed folders and file changes on distributed cache
1 parent 5396eaa commit dfd4269

File tree

11 files changed

+379
-142
lines changed

11 files changed

+379
-142
lines changed

app/mainapp/code_editor/dfs_cache/invalidate_cache.go

Lines changed: 101 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package dfscache
22

33
import (
4+
"errors"
45
"log"
56

6-
dpconfig "github.com/dataplane-app/dataplane/app/mainapp/config"
77
"github.com/dataplane-app/dataplane/app/mainapp/database"
88
"github.com/dataplane-app/dataplane/app/mainapp/database/models"
99
"github.com/dataplane-app/dataplane/app/mainapp/messageq"
@@ -51,62 +51,84 @@ func InvalidateCacheSingle(nodeID string, environmentID string, fileID string) e
5151
/*
5252
Delete a file, move file or folder name change - invalidate the node cache, remove all file level cache and remove the entire folder in each worker.
5353
*/
54-
func InvalidateCacheNode(nodeID string, pipelineID string, environmentID string) error {
55-
// Write to node level cache
56-
err := database.DBConn.Model(&models.CodeNodeCache{}).Where("node_id = ? and environment_id = ?", nodeID, environmentID).Update("cache_valid", false).Error
57-
58-
if err != nil {
59-
return err
60-
}
61-
62-
// Write to file level cache (file gets overwritten)
63-
err = database.DBConn.Where("node_id = ? and environment_id = ?", nodeID, environmentID).Delete(&models.CodeFilesCache{}).Error
64-
65-
if err != nil {
66-
return err
67-
}
54+
// func InvalidateCacheNode(msg models.WorkerTasks) error {
55+
56+
// if msg.EnvironmentID == "" {
57+
// return errors.New("Environment ID is needed.")
58+
// }
59+
// if msg.NodeID == "" {
60+
// return errors.New("Node ID is needed.")
61+
// }
62+
// if msg.PipelineID == "" {
63+
// return errors.New("Pipeline ID is needed.")
64+
// }
65+
// // Write to node level cache
66+
// err := database.DBConn.Model(&models.CodeNodeCache{}).Where("node_id = ? and environment_id = ?", msg.NodeID, msg.EnvironmentID).Update("cache_valid", false).Error
67+
68+
// if err != nil {
69+
// return err
70+
// }
71+
72+
// // Write to file level cache (file gets overwritten)
73+
// err = database.DBConn.Where("node_id = ? and environment_id = ?", msg.NodeID, msg.EnvironmentID).Delete(&models.CodeFilesCache{}).Error
74+
75+
// if err != nil {
76+
// return err
77+
// }
78+
79+
// // Write to file level cache (file gets overwritten)
80+
// err = database.DBConn.Where("node_id = ? and environment_id = ?", msg.NodeID, msg.EnvironmentID).Delete(&models.CodeRunFilesCache{}).Error
81+
82+
// if err != nil {
83+
// return err
84+
// }
85+
86+
// var response models.TaskResponse
87+
88+
// getWorkerGroup := models.PipelineNodes{}
89+
// err = database.DBConn.Select("worker_group").Where("node_id = ? and environment_id =?", msg.NodeID, msg.EnvironmentID).First(&getWorkerGroup).Error
90+
// if err != nil {
91+
// log.Println("Error getting worker groups for cache delete", err)
92+
// return err
93+
// }
94+
// channel := "DisributedStorageRemoval." + msg.EnvironmentID + "." + getWorkerGroup.WorkerGroup
95+
96+
// _, errnats := messageq.MsgReply(channel, msg, &response)
97+
98+
// if errnats != nil {
99+
// log.Println("Send to worker error nats:", errnats)
100+
// return errnats
101+
// }
102+
103+
// return nil
104+
// }
68105

69-
// Write to file level cache (file gets overwritten)
70-
err = database.DBConn.Where("node_id = ? and environment_id = ?", nodeID, environmentID).Delete(&models.CodeRunFilesCache{}).Error
106+
/*
107+
Delete or change pipeline.
108+
*/
109+
func InvalidateCachePipeline(msg models.WorkerTasks) error {
110+
// Write to node level cache
71111

72-
if err != nil {
73-
return err
112+
if msg.EnvironmentID == "" {
113+
return errors.New("Environment ID is needed.")
74114
}
75115

76-
var response models.TaskResponse
77-
78-
getWorkerGroup := models.PipelineNodes{}
79-
err = database.DBConn.Select("worker_group").Where("node_id = ? and environment_id =?", nodeID, environmentID).First(&getWorkerGroup).Error
80-
if err != nil {
81-
log.Println("Error getting worker groups for cache delete", err)
82-
return err
116+
if msg.PipelineID == "" {
117+
return errors.New("Pipeline ID is needed.")
83118
}
84-
channel := "DisributedStorageRemoval." + environmentID + "." + getWorkerGroup.WorkerGroup
85-
86-
_, errnats := messageq.MsgReply(channel, folderpath, &response)
87119

88-
if errnats != nil {
89-
log.Println("Send to worker error nats:", errnats)
90-
return errnats
120+
if msg.RunType == "" {
121+
return errors.New("Run type is needed.")
91122
}
92123

93-
return nil
94-
}
95-
96-
/*
97-
Delete or change pipeline.
98-
*/
99-
func InvalidateCachePipeline(environmentID string, pipelineID string) error {
100-
// Write to node level cache
101-
102124
updateQuery := `
103125
UPDATE code_node_cache
104126
SET cache_valid = false, updated_at = now()
105127
FROM pipeline_nodes
106128
WHERE code_node_cache.environment_id = ? and pipeline_nodes.pipeline_id =? and
107129
pipeline_nodes.node_id = code_node_cache.node_id and pipeline_nodes.environment_id = code_node_cache.environment_id;
108130
`
109-
err := database.DBConn.Exec(updateQuery, environmentID, pipelineID).Error
131+
err := database.DBConn.Exec(updateQuery, msg.EnvironmentID, msg.PipelineID).Error
110132

111133
if err != nil {
112134
return err
@@ -119,24 +141,36 @@ func InvalidateCachePipeline(environmentID string, pipelineID string) error {
119141
WHERE code_files_cache.environment_id = ? and pipeline_nodes.pipeline_id =? and
120142
pipeline_nodes.node_id = code_files_cache.node_id and pipeline_nodes.environment_id = code_files_cache.environment_id;
121143
`
122-
err = database.DBConn.Exec(deleteQuery, environmentID, pipelineID).Error
144+
err = database.DBConn.Exec(deleteQuery, msg.EnvironmentID, msg.PipelineID).Error
145+
146+
if err != nil {
147+
return err
148+
}
149+
150+
deleteQuery = `
151+
DELETE FROM code_run_files_cache
152+
USING pipeline_nodes
153+
WHERE code_run_files_cache.environment_id = ? and pipeline_nodes.pipeline_id =? and
154+
pipeline_nodes.node_id = code_run_files_cache.node_id and pipeline_nodes.environment_id = code_run_files_cache.environment_id;
155+
`
156+
err = database.DBConn.Exec(deleteQuery, msg.EnvironmentID, msg.PipelineID).Error
123157

124158
if err != nil {
125159
return err
126160
}
127161

128162
getWorkerGroups := []*models.PipelineNodes{}
129-
err = database.DBConn.Distinct("worker_group").Select("worker_group").Where("pipeline_id = ? and environment_id =? and worker_group <> ''", pipelineID, environmentID).Find(&getWorkerGroups).Error
163+
err = database.DBConn.Distinct("worker_group").Select("worker_group").Where("pipeline_id = ? and environment_id =? and worker_group <> ''", msg.PipelineID, msg.EnvironmentID).Find(&getWorkerGroups).Error
130164
if err != nil {
131165
log.Println("Error getting worker groups for cache delete", err)
132166
return err
133167
}
134168

135169
var response models.TaskResponse
136170
for _, x := range getWorkerGroups {
137-
channel := "DisributedStorageRemoval." + x.WorkerGroup
171+
channel := "DisributedStorageRemoval." + msg.EnvironmentID + "." + x.WorkerGroup
138172
// log.Println(channel)
139-
_, errnats := messageq.MsgReply(channel, pipelineID, &response)
173+
_, errnats := messageq.MsgReply(channel, msg, &response)
140174

141175
if errnats != nil {
142176
log.Println("Send to worker error nats:", errnats)
@@ -150,9 +184,24 @@ func InvalidateCachePipeline(environmentID string, pipelineID string) error {
150184
/*
151185
Delete or change deployment.
152186
*/
153-
func InvalidateCacheDeployment(environmentID string, folderpath string, pipelineID string, version string) error {
187+
func InvalidateCacheDeployment(msg models.WorkerTasks) error {
154188
// Write to node level cache
155189

190+
if msg.EnvironmentID == "" {
191+
return errors.New("Environment ID is needed.")
192+
}
193+
194+
if msg.PipelineID == "" {
195+
return errors.New("Pipeline ID is needed.")
196+
}
197+
198+
if msg.Version == "" {
199+
return errors.New("Version is needed.")
200+
}
201+
if msg.RunType == "" {
202+
return errors.New("Run type is needed.")
203+
}
204+
156205
updateQuery := `
157206
UPDATE deploy_code_node_cache
158207
SET cache_valid = false, updated_at = now()
@@ -166,7 +215,7 @@ func InvalidateCacheDeployment(environmentID string, folderpath string, pipeline
166215
deploy_pipeline_nodes.environment_id = deploy_code_node_cache.environment_id and
167216
deploy_pipeline_nodes.version = deploy_code_node_cache.version;
168217
`
169-
err := database.DBConn.Exec(updateQuery, environmentID, pipelineID, version).Error
218+
err := database.DBConn.Exec(updateQuery, msg.EnvironmentID, msg.PipelineID, msg.Version).Error
170219

171220
if err != nil {
172221
return err
@@ -185,28 +234,24 @@ func InvalidateCacheDeployment(environmentID string, folderpath string, pipeline
185234
deploy_pipeline_nodes.environment_id = deploy_code_files_cache.environment_id and
186235
deploy_pipeline_nodes.version = deploy_code_files_cache.version;
187236
`
188-
err = database.DBConn.Exec(deleteQuery, environmentID, pipelineID, version).Error
237+
err = database.DBConn.Exec(deleteQuery, msg.EnvironmentID, msg.PipelineID, msg.Version).Error
189238

190239
if err != nil {
191240
return err
192241
}
193242

194243
getWorkerGroups := []*models.DeployPipelineNodes{}
195-
err = database.DBConn.Distinct("worker_group").Select("worker_group").Where("pipeline_id = ? and environment_id =? and worker_group <> ''", pipelineID, environmentID).Find(&getWorkerGroups).Error
244+
err = database.DBConn.Distinct("worker_group").Select("worker_group").Where("pipeline_id = ? and environment_id =? and worker_group <> ''", msg.PipelineID, msg.EnvironmentID).Find(&getWorkerGroups).Error
196245
if err != nil {
197246
log.Println("Error getting worker groups for cache delete", err)
198247
return err
199248
}
200249

201-
if dpconfig.Debug == "true" {
202-
log.Println("folder to delete:", folderpath)
203-
}
204-
205250
var response models.TaskResponse
206251
for _, x := range getWorkerGroups {
207-
channel := "DisributedStorageRemoval." + x.WorkerGroup
252+
channel := "DisributedStorageRemoval." + msg.EnvironmentID + "." + x.WorkerGroup
208253
// log.Println(channel)
209-
_, errnats := messageq.MsgReply(channel, folderpath, &response)
254+
_, errnats := messageq.MsgReply(channel, msg, &response)
210255

211256
if errnats != nil {
212257
log.Println("Send to worker error nats:", errnats)
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package filesystem
2+
3+
import "path/filepath"
4+
5+
/* Node level Folder structure: */
6+
func PipelineRunFolderNode(codeDirectory string, EnvironmentID string, PipelineID string, NodeID string) string {
7+
8+
return filepath.Join(codeDirectory, EnvironmentID, "pipeline", PipelineID, NodeID)
9+
}
10+
11+
func CodeRunFolderNode(codeDirectory string, EnvironmentID string, PipelineID string, NodeID string) string {
12+
13+
return filepath.Join(codeDirectory, EnvironmentID, "coderun", PipelineID, NodeID)
14+
}
15+
16+
func DeployRunFolderNode(codeDirectory string, EnvironmentID string, PipelineID string, Version string, NodeID string) string {
17+
18+
return filepath.Join(codeDirectory, EnvironmentID, "deployment", PipelineID, Version, NodeID)
19+
}
20+
21+
/* Pipeline level folder structure */
22+
func PipelineRunFolderPipeline(codeDirectory string, EnvironmentID string, PipelineID string) string {
23+
24+
return filepath.Join(codeDirectory, EnvironmentID, "pipeline", PipelineID)
25+
}
26+
27+
func CodeRunFolderPipeline(codeDirectory string, EnvironmentID string, PipelineID string) string {
28+
29+
return filepath.Join(codeDirectory, EnvironmentID, "coderun", PipelineID)
30+
}
31+
32+
func DeployRunFolderPipeline(codeDirectory string, EnvironmentID string, PipelineID string, Version string) string {
33+
34+
return filepath.Join(codeDirectory, EnvironmentID, "deployment", PipelineID, Version)
35+
}

app/mainapp/code_editor/runcode/run_code_file_request.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func RunCodeFile(workerGroup string, fileID string, envID string, pipelineID str
4242

4343
// ------ Obtain the file name
4444
filesdata := models.CodeFiles{}
45-
dberror := database.DBConn.Select("file_name").Where("file_id = ? and environment_id =? and level = ?", fileID, envID, "node_file").Find(&filesdata).Error
45+
dberror := database.DBConn.Select("file_name", "level", "folder_id").Where("file_id = ? and environment_id =? and level = ?", fileID, envID, "node_file").Find(&filesdata).Error
4646
if dberror != nil {
4747
rerror := "Code run obtain folder structure error:" + dberror.Error()
4848
WSLogError(envID, runid, rerror, models.CodeRun{})
@@ -52,11 +52,26 @@ func RunCodeFile(workerGroup string, fileID string, envID string, pipelineID str
5252
// parentfolderdata := envID + "/coderun/" + pipelineID + "/" + nodeID
5353
var err error
5454

55-
// // The folder structure will look like <environment ID>/coderun/<pipeline ID>/<node ID>
55+
/* The folder structure will look like <environment ID>/coderun/<pipeline ID>/<node ID>
56+
We also need to look for sub folders if the the fileID is a subfolder */
57+
if filesdata.Level != "node" {
5658

57-
// filesdata, parentfolderdata, filesdata.FolderID,
59+
// Look up folder structure
60+
newdir, err := filesystem.NodeLevelFolderConstructByID(database.DBConn, filesdata.FolderID, envID)
61+
// log.Println("sub folder", newdir[1:])
62+
63+
if err != nil {
64+
log.Println(err)
65+
return runSend, err
66+
}
67+
68+
// Remove the first slash on newdir so there are not two slashes.
69+
commands = append(commands, "python3 -u ${{nodedirectory}}"+newdir[1:]+filesdata.FileName)
70+
71+
} else {
72+
commands = append(commands, "python3 -u ${{nodedirectory}}"+filesdata.FileName)
73+
}
5874

59-
commands = append(commands, "python3 -u ${{nodedirectory}}"+filesdata.FileName)
6075
runSend, err = RunCodeServerWorker(envID, pipelineID, nodeID, workerGroup, runid, commands, replayRunID)
6176
if err != nil {
6277
/* Send back any local errors not happening on the remote worker */

0 commit comments

Comments
 (0)