Skip to content
This repository was archived by the owner on Mar 4, 2025. It is now read-only.

Commit 549cbed

Browse files
committed
live, common: Add support for downloading live databases
1 parent 7cedd3c commit 549cbed

File tree

11 files changed

+372
-139
lines changed

11 files changed

+372
-139
lines changed

api/handlers.go

Lines changed: 194 additions & 101 deletions
Large diffs are not rendered by default.

api/main.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -206,9 +206,9 @@ func checkAuth(w http.ResponseWriter, r *http.Request) (loggedInUser string, err
206206
}
207207

208208
// collectInfo is an internal function which:
209-
// 1. Authenticates incoming requests
210-
// 2. Extracts the database owner, name, and commit ID from the request
211-
// 3. Checks permissions
209+
// 1. Authenticates incoming requests
210+
// 2. Extracts the database owner, name, and commit ID from the request
211+
// 3. Checks permissions
212212
func collectInfo(w http.ResponseWriter, r *http.Request) (loggedInUser, dbOwner, dbName, commitID string, httpStatus int, err error) {
213213
// Authenticate the request
214214
loggedInUser, err = checkAuth(w, r)
@@ -241,9 +241,10 @@ func collectInfo(w http.ResponseWriter, r *http.Request) (loggedInUser, dbOwner,
241241
}
242242

243243
// collectInfoAndOpen is an internal function which:
244-
// 1. Calls collectInfo() to authenticate the request + collect the user/database/commit/etc details
245-
// 2. Fetches the database from Minio
246-
// 3. Opens the database, returning the connection handle
244+
// 1. Calls collectInfo() to authenticate the request + collect the user/database/commit/etc details
245+
// 2. Fetches the database from Minio
246+
// 3. Opens the database, returning the connection handle
247+
//
247248
// This function exists purely because this code is common to most of the handlers
248249
func collectInfoAndOpen(w http.ResponseWriter, r *http.Request) (sdb *sqlite.Conn, httpStatus int, err error) {
249250
// Authenticate the request and collect details for the requested database

common/live.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,10 +297,10 @@ func MQCreateResponse(msg amqp.Delivery, channel *amqp.Channel, nodeName, result
297297
return
298298
}
299299

300-
// MQErrorResponse sends an error message in response to an AMQP request
300+
// MQErrorResponse sends an error message in response to an AMQP request.
301301
// It is probably only useful for returning errors that occur before we've decoded the incoming AMQP
302302
// request to know what type it is
303-
func MQErrorResponse(msg amqp.Delivery, channel *amqp.Channel, nodeName string, errMsg string) (err error) {
303+
func MQErrorResponse(requestType string, msg amqp.Delivery, channel *amqp.Channel, nodeName string, errMsg string) (err error) {
304304
// Construct the response
305305
resp := LiveDBErrorResponse{
306306
Node: nodeName,
@@ -327,7 +327,7 @@ func MQErrorResponse(msg amqp.Delivery, channel *amqp.Channel, nodeName string,
327327
}
328328
msg.Ack(false)
329329
if AmqpDebug {
330-
log.Printf("[NOT-YET-DETERMINED] Live node '%s' responded with ACK to message with correlationID: '%s', msg.ReplyTo: '%s'", nodeName, msg.CorrelationId, msg.ReplyTo)
330+
log.Printf("[%s] Live node '%s' responded with ACK to message with correlationID: '%s', msg.ReplyTo: '%s'", requestType, nodeName, msg.CorrelationId, msg.ReplyTo)
331331
}
332332
return
333333
}
@@ -467,6 +467,7 @@ func MQQueryResponse(msg amqp.Delivery, channel *amqp.Channel, nodeName string,
467467
return
468468
}
469469

470+
// MQSendRequest is the main function used for sending requests to our AMQP backend
470471
func MQSendRequest(channel *amqp.Channel, queue, operation, requestingUser, dbOwner, dbName, query string) (result []byte, err error) {
471472
// Create a temporary AMQP queue for receiving the response
472473
var q amqp.Queue

common/minio.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ func RetrieveDatabaseFile(bucket, id string) (newDB string, err error) {
155155
if _, err = os.Stat(newDB); os.IsNotExist(err) {
156156
// * The database doesn't yet exist locally, so fetch it from Minio
157157

158-
// Check if a the database file is already being fetched from Minio by a different caller
158+
// Check if the database file is already being fetched from Minio by a different caller
159159
// eg check if there is a "<filename>.new" file already in the disk cache
160160
if _, err = os.Stat(newDB + ".new"); os.IsNotExist(err) {
161161
// * The database isn't already being fetched, so we're ok to proceed
@@ -168,9 +168,7 @@ func RetrieveDatabaseFile(bucket, id string) (newDB string, err error) {
168168
}
169169

170170
// Close the object handle when this function finishes
171-
defer func() {
172-
MinioHandleClose(userDB)
173-
}()
171+
defer MinioHandleClose(userDB)
174172

175173
// Create the needed directory path in the disk cache
176174
err = os.MkdirAll(filepath.Join(Conf.DiskCache.Directory, bucket), 0750)

common/postgresql.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1184,9 +1184,10 @@ func DisconnectPostgreSQL() {
11841184
// If a non-0 discID value is passed, it will only return the details for that specific discussion/MR. Otherwise it
11851185
// will return a list of all discussions or MRs for a given database
11861186
// Note - This returns a slice of DiscussionEntry, instead of a map. We use a slice because it lets us use an ORDER
1187-
// BY clause in the SQL and preserve the returned order (maps don't preserve order). If in future we no longer
1188-
// need to preserve the order, it might be useful to switch to using a map instead since they're often simpler
1189-
// to work with.
1187+
//
1188+
// BY clause in the SQL and preserve the returned order (maps don't preserve order). If in future we no longer
1189+
// need to preserve the order, it might be useful to switch to using a map instead since they're often simpler
1190+
// to work with.
11901191
func Discussions(dbOwner, dbFolder, dbName string, discType DiscussionType, discID int) (list []DiscussionEntry, err error) {
11911192
dbQuery := `
11921193
WITH u AS (
@@ -1288,8 +1289,9 @@ func Discussions(dbOwner, dbFolder, dbName string, discType DiscussionType, disc
12881289
// If a non-0 comID value is passed, it will only return the details for that specific comment in the discussion.
12891290
// Otherwise it will return a list of all comments for a given discussion
12901291
// Note - This returns a slice instead of a map. We use a slice because it lets us use an ORDER BY clause in the SQL
1291-
// and preserve the returned order (maps don't preserve order). If in future we no longer need to preserve the
1292-
// order, it might be useful to switch to using a map instead since they're often simpler to work with.
1292+
//
1293+
// and preserve the returned order (maps don't preserve order). If in future we no longer need to preserve the
1294+
// order, it might be useful to switch to using a map instead since they're often simpler to work with.
12931295
func DiscussionComments(dbOwner, dbFolder, dbName string, discID, comID int) (list []DiscussionCommentEntry, err error) {
12941296
dbQuery := `
12951297
WITH u AS (

common/sqlite.go

Lines changed: 126 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,43 @@ func OpenSQLiteDatabaseDefensive(w http.ResponseWriter, r *http.Request, dbOwner
430430
}
431431
}
432432

433+
// Turn on cell size checking
434+
err = sdb.Exec("PRAGMA cell_size_check=ON")
435+
if err != nil {
436+
log.Printf("Error when running 'PRAGMA cell_size_check=on' on the LIVE database: %s", err)
437+
return
438+
}
439+
440+
// Turn off mmap
441+
var ok bool
442+
var results []string
443+
err = sdb.Select("PRAGMA mmap_size=0", func(s *sqlite.Stmt) error {
444+
// Retrieve a row from the pragma result
445+
var a string
446+
if err = s.Scan(&a); err != nil {
447+
// Error when reading the row, so return failure
448+
ok = false
449+
return err
450+
}
451+
452+
// If the returned row was the value 0, then we regard the change as successfully applied. Any other return
453+
// value is a failure
454+
switch a {
455+
case "0":
456+
ok = true
457+
default:
458+
ok = false
459+
results = append(results, a)
460+
}
461+
return nil
462+
})
463+
464+
// Check for a failure
465+
if !ok || err != nil {
466+
log.Printf("Error when disabling mmap on the LIVE database: %s\n", err)
467+
return
468+
}
469+
433470
// Set a SQLite authorizer which only allows SELECT statements to run
434471
err = sdb.SetAuthorizer(AuthorizerSelect, "SELECT authorizer")
435472
if err != nil {
@@ -459,6 +496,8 @@ func OpenSQLiteDatabaseDefensive(w http.ResponseWriter, r *http.Request, dbOwner
459496
// OpenSQLiteDatabaseLive is similar to OpenSQLiteDatabase(), but opens the a live SQLite database and implements
460497
// the recommended defensive precautions for potentially malicious user provided SQL
461498
// queries: https://www.sqlite.org/security.html
499+
// TODO: De-duplicate/refactor the common code in this function and OpenSQLiteDatabaseDefensive() above. They're
500+
// mostly the same
462501
func OpenSQLiteDatabaseLive(baseDir, dbOwner, dbName string) (sdb *sqlite.Conn, err error) {
463502
dbPath := filepath.Join(baseDir, dbOwner, dbName, "live.sqlite")
464503
if _, err = os.Stat(dbPath); err != nil {
@@ -561,9 +600,6 @@ func OpenSQLiteDatabaseLive(baseDir, dbOwner, dbName string) (sdb *sqlite.Conn,
561600
}
562601
}
563602

564-
// FIXME: There's new info in the SQLite "security" page, with some new things to potentially do: https://www.sqlite.org/security.html
565-
// * Disable triggers
566-
// FIXME: Whatever improvements are made here, should probably also go into OpenSQLiteDatabaseDefensive()
567603
// Turn on cell size checking
568604
err = sdb.Exec("PRAGMA cell_size_check=ON")
569605
if err != nil {
@@ -814,6 +850,93 @@ func ReadSQLiteDBCSV(sdb *sqlite.Conn, dbTable string) ([][]string, error) {
814850
return resultSet, nil
815851
}
816852

853+
// SQLiteBackupLive is used by our AMQP backend nodes to refresh a live SQLite database back into Minio
854+
func SQLiteBackupLive(baseDir, dbOwner, dbName string) (err error) {
855+
dbPath := filepath.Join(baseDir, dbOwner, dbName, "live.sqlite")
856+
if _, err = os.Stat(dbPath); err != nil {
857+
return
858+
}
859+
860+
// Open the database on the local node
861+
// NOTE - OpenFullMutex seems like the right thing for ensuring multiple connections to a database file don't
862+
// screw things up, but it wouldn't be a bad idea to keep it in mind if weirdness shows up
863+
var sdb *sqlite.Conn
864+
sdb, err = sqlite.Open(dbPath, sqlite.OpenReadWrite|sqlite.OpenFullMutex)
865+
if err != nil {
866+
log.Printf("Couldn't open LIVE database: %s", err)
867+
return
868+
}
869+
if err = sdb.EnableExtendedResultCodes(true); err != nil {
870+
log.Printf("Couldn't enable extended result codes for LIVE database query! Error: %v\n", err.Error())
871+
return
872+
}
873+
defer sdb.Close()
874+
875+
// Generate unique temporary file name
876+
var f *os.File
877+
f, err = os.CreateTemp("", "sqliteBackup*.sqlite")
878+
if err != nil {
879+
return
880+
}
881+
tmpName := f.Name()
882+
err = f.Close()
883+
if err != nil {
884+
return
885+
}
886+
887+
// Generate the backup file
888+
err = sdb.Exec(fmt.Sprintf("VACUUM INTO '%s'", tmpName))
889+
if err != nil {
890+
return
891+
}
892+
893+
// Ensure the backup file was generated, and isn't 0 bytes (just in case)
894+
var fileInfo os.FileInfo
895+
fileInfo, err = os.Stat(tmpName)
896+
if err != nil {
897+
return
898+
}
899+
if fileInfo.Size() == 0 {
900+
err = errors.New("Generating backup live SQLite database failed. File size is 0")
901+
return
902+
}
903+
904+
// Sanity check the backup file
905+
var sdb2 *sqlite.Conn
906+
sdb2, err = sqlite.Open(tmpName, sqlite.OpenReadOnly)
907+
if err != nil {
908+
log.Printf("Couldn't open live database backup to sanity check it: %s", err)
909+
return
910+
}
911+
if err = sdb2.EnableExtendedResultCodes(true); err != nil {
912+
log.Printf("Couldn't enable extended result codes for live database backup integrity check! Error: %v", err.Error())
913+
sdb2.Close()
914+
return
915+
}
916+
// Pretty sure the '1' parameter below isn't needed. The SQLite docs mention "O(N)" (etc.) which is just Big O
917+
// notation, rather than trying to communicate a need for a number in the parameters
918+
err = sdb2.IntegrityCheck("main", 1, true )
919+
if err != nil {
920+
sdb2.Close()
921+
return
922+
}
923+
sdb2.Close()
924+
925+
// Copy the local backup file into Minio
926+
var z *os.File
927+
z, err = os.Open(tmpName)
928+
err = LiveStoreDatabaseMinio(z, dbOwner, dbName, fileInfo.Size())
929+
if err != nil {
930+
z.Close()
931+
return
932+
}
933+
z.Close()
934+
935+
// Delete the backup file from local disk
936+
os.Remove(tmpName)
937+
return
938+
}
939+
817940
// SQLiteExecuteQueryLive is used by our AMQP backend infrastructure to execute a user provided SQLite statement
818941
func SQLiteExecuteQueryLive(baseDir, dbOwner, dbName, loggedInUser, query string) (rowsChanged int, err error) {
819942
// Open the Live database on the local node

common/types.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,9 @@ const MaxLicenceSize = 1
5555

5656
// MinioFolderChars is the number of leading characters of a files' sha256 used as the Minio folder name
5757
// eg: When set to 6, then "34f4255a737156147fbd0a44323a895d18ade79d4db521564d1b0dbb8764cbbc"
58-
// -> Minio folder: "34f425"
59-
// -> Minio filename: "5a737156147fbd0a44323a895d18ade79d4db521564d1b0dbb8764cbbc"
58+
//
59+
// -> Minio folder: "34f425"
60+
// -> Minio filename: "5a737156147fbd0a44323a895d18ade79d4db521564d1b0dbb8764cbbc"
6061
const MinioFolderChars = 6
6162

6263
// QuerySource is used internally to help choose the output format from a SQL query

common/util.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -907,7 +907,7 @@ func DownloadDatabase(w http.ResponseWriter, r *http.Request, dbOwner, dbFolder,
907907
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, dbName))
908908
w.Header().Set("Content-Length", fmt.Sprintf("%d", stat.Size))
909909
w.Header().Set("Content-Type", "application/x-sqlite3")
910-
bytesWritten, err = io.Copy(w, userDB)
910+
_, err = io.Copy(w, userDB)
911911
if err != nil {
912912
log.Printf("Error returning DB file: %v\n", err)
913913
return

db4s/main.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -779,18 +779,17 @@ func metadataGetHandler(w http.ResponseWriter, r *http.Request) {
779779

780780
// postHandler receives uploaded files from DB4S. To simulate a DB4S upload, the following curl command can be used:
781781
//
782-
// $ curl -kE ~/my.cert.pem -D headers.out -F [email protected] -F "branch=main" -F "commitmsg=stuff" \
783-
// -F "sourceurl=https://example.org" -F "lastmodified=2017-01-02T03:04:05Z" -F "licence=CC0" -F "public=true" \
784-
// https://db4s.dbhub.io:5550/someuser
782+
// $ curl -kE ~/my.cert.pem -D headers.out -F [email protected] -F "branch=main" -F "commitmsg=stuff" \
783+
// -F "sourceurl=https://example.org" -F "lastmodified=2017-01-02T03:04:05Z" -F "licence=CC0" -F "public=true" \
784+
// https://db4s.dbhub.io:5550/someuser
785785
//
786786
// Subsequent uploads to the same database name will need to include an additional "commit" field, with the value of
787787
// the commit ID last known to DB4S. An example curl command demonstrating this:
788788
//
789-
// $ curl -kE ~/my.cert.pem -D headers.out -F [email protected] -F "branch=main" -F "commitmsg=stuff" \
790-
// -F "sourceurl=https://example.org" -F "lastmodified=2017-01-02T03:04:05Z" -F "licence=CC0" -F "public=true" \
791-
// -F "commit=51d494f2c5eb6734ddaa204eccb9597b426091c79c951924ac83c72038f22b55" \
792-
// https://db4s.dbhub.io:5550/someuser
793-
//
789+
// $ curl -kE ~/my.cert.pem -D headers.out -F [email protected] -F "branch=main" -F "commitmsg=stuff" \
790+
// -F "sourceurl=https://example.org" -F "lastmodified=2017-01-02T03:04:05Z" -F "licence=CC0" -F "public=true" \
791+
// -F "commit=51d494f2c5eb6734ddaa204eccb9597b426091c79c951924ac83c72038f22b55" \
792+
// https://db4s.dbhub.io:5550/someuser
794793
func postHandler(w http.ResponseWriter, r *http.Request, userAcc string) {
795794
// Set the maximum accepted database size for uploading
796795
oversizeAllowed := false
@@ -864,8 +863,7 @@ func postHandler(w http.ResponseWriter, r *http.Request, userAcc string) {
864863

865864
// Returns a file requested by the client. An example curl command to simulate the request is:
866865
//
867-
// $ curl -OL -kE ~/my.cert.pem -D headers.out -G https://db4s.dbhub.io:5550/someuser/somedb.sqlite
868-
//
866+
// $ curl -OL -kE ~/my.cert.pem -D headers.out -G https://db4s.dbhub.io:5550/someuser/somedb.sqlite
869867
func retrieveDatabase(w http.ResponseWriter, r *http.Request, pageName string, userAcc string, dbOwner string,
870868
dbFolder string, dbName string, branchName string, commit string) (err error) {
871869
pageName += ":retrieveDatabase()"
@@ -970,8 +968,7 @@ func rootHandler(w http.ResponseWriter, r *http.Request) {
970968

971969
// Returns the list of databases available to the user. To simulate, the following curl command can be used:
972970
//
973-
// $ curl -kE ~/my.cert.pem -D headers.out -G https://db4s.dbhub.io:5550/someuser
974-
//
971+
// $ curl -kE ~/my.cert.pem -D headers.out -G https://db4s.dbhub.io:5550/someuser
975972
func userDatabaseList(userAcc string, user string) (dbList []byte, err error) {
976973

977974
// Structure to hold the results, to apply JSON marshalling to

live/main.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func main() {
134134
err = json.Unmarshal(msg.Body, &req)
135135
if err != nil {
136136
log.Println(err)
137-
err = com.MQErrorResponse(msg, ch, com.Conf.Live.Nodename, err.Error())
137+
err = com.MQErrorResponse("NOT-YET-DETERMINED", msg, ch, com.Conf.Live.Nodename, err.Error())
138138
if err != nil {
139139
log.Printf("Error: occurred on '%s' the main live node switch{} while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err)
140140
}
@@ -147,6 +147,23 @@ func main() {
147147

148148
// Handle each operation
149149
switch req.Operation {
150+
case "backup":
151+
err = com.SQLiteBackupLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName)
152+
if err != nil {
153+
err = com.MQErrorResponse("BACKUP", msg, ch, com.Conf.Live.Nodename, err.Error())
154+
if err != nil {
155+
log.Printf("Error: occurred on '%s' in MQErrorResponse() while constructing an AMQP error message response: '%s'", com.Conf.Live.Nodename, err)
156+
}
157+
continue
158+
}
159+
160+
// Return a success message to the caller
161+
err = com.MQErrorResponse("BACKUP", msg, ch, com.Conf.Live.Nodename, "") // Use an empty error message to indicate success
162+
if err != nil {
163+
log.Printf("Error: occurred on '%s' in MQErrorResponse() while constructing the AMQP backup response: '%s'", com.Conf.Live.Nodename, err)
164+
}
165+
continue
166+
150167
case "columns":
151168
var columns []sqlite.Column
152169
columns, err = com.SQLiteGetColumnsLive(com.Conf.Live.StorageDir, req.DBOwner, req.DBName, req.Query) // We use the req.Query field to pass the table name

0 commit comments

Comments
 (0)