Skip to content
This repository was archived by the owner on Mar 4, 2025. It is now read-only.

Commit 40c1ef3

Browse files
committed
common, live: Refactor the AMQP response code, as it was getting messy
1 parent 549cbed commit 40c1ef3

File tree

4 files changed

+75
-298
lines changed

4 files changed

+75
-298
lines changed

api/handlers.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func columnsHandler(w http.ResponseWriter, r *http.Request) {
170170
} else {
171171
// Send the columns request to our AMQP backend
172172
var rawResponse []byte
173-
rawResponse, err = com.MQSendRequest(com.AmqpChan, liveNode, "columns", loggedInUser, dbOwner, dbName, table)
173+
rawResponse, err = com.MQRequest(com.AmqpChan, liveNode, "columns", loggedInUser, dbOwner, dbName, table)
174174
if err != nil {
175175
jsonErr(w, err.Error(), http.StatusInternalServerError)
176176
log.Println(err)
@@ -393,7 +393,7 @@ func deleteHandler(w http.ResponseWriter, r *http.Request) {
393393

394394
// Delete the database from our AMQP backend
395395
var rawResponse []byte
396-
rawResponse, err = com.MQSendRequest(com.AmqpChan, liveNode, "delete", loggedInUser, dbOwner, dbName, "")
396+
rawResponse, err = com.MQRequest(com.AmqpChan, liveNode, "delete", loggedInUser, dbOwner, dbName, "")
397397
if err != nil {
398398
jsonErr(w, err.Error(), http.StatusInternalServerError)
399399
log.Println(err)
@@ -646,7 +646,7 @@ func downloadHandler(w http.ResponseWriter, r *http.Request) {
646646

647647
// It's a live database, so we tell the AMQP backend to back up it up into Minio, which we then provide to the user
648648
var rawResponse []byte
649-
rawResponse, err = com.MQSendRequest(com.AmqpChan, liveNode, "backup", loggedInUser, dbOwner, dbName, "")
649+
rawResponse, err = com.MQRequest(com.AmqpChan, liveNode, "backup", loggedInUser, dbOwner, dbName, "")
650650
if err != nil {
651651
return
652652
}
@@ -683,7 +683,7 @@ func downloadHandler(w http.ResponseWriter, r *http.Request) {
683683

684684
// Make a record of the download
685685
err = com.LogDownload(dbOwner, dbFolder, dbName, loggedInUser, r.RemoteAddr, "api LIVE", userAgent,
686-
time.Now(), fmt.Sprintf("%s/%s" ,dbOwner, dbName))
686+
time.Now(), fmt.Sprintf("%s/%s", dbOwner, dbName))
687687
if err != nil {
688688
return
689689
}
@@ -772,7 +772,7 @@ func executeHandler(w http.ResponseWriter, r *http.Request) {
772772

773773
// Send the query execution request to our AMQP backend
774774
var rawResponse []byte
775-
rawResponse, err = com.MQSendRequest(com.AmqpChan, liveNode, "execute", loggedInUser, dbOwner, dbName, query)
775+
rawResponse, err = com.MQRequest(com.AmqpChan, liveNode, "execute", loggedInUser, dbOwner, dbName, query)
776776
if err != nil {
777777
return
778778
}
@@ -893,7 +893,7 @@ func indexesHandler(w http.ResponseWriter, r *http.Request) {
893893
} else {
894894
// Send the indexes request to our AMQP backend
895895
var rawResponse []byte
896-
rawResponse, err = com.MQSendRequest(com.AmqpChan, liveNode, "indexes", loggedInUser, dbOwner, dbName, "")
896+
rawResponse, err = com.MQRequest(com.AmqpChan, liveNode, "indexes", loggedInUser, dbOwner, dbName, "")
897897
if err != nil {
898898
jsonErr(w, err.Error(), http.StatusInternalServerError)
899899
log.Println(err)
@@ -1206,7 +1206,7 @@ func tablesHandler(w http.ResponseWriter, r *http.Request) {
12061206
} else {
12071207
// Send the columns request to our AMQP backend
12081208
var rawResponse []byte
1209-
rawResponse, err = com.MQSendRequest(com.AmqpChan, liveNode, "tables", loggedInUser, dbOwner, dbName, "")
1209+
rawResponse, err = com.MQRequest(com.AmqpChan, liveNode, "tables", loggedInUser, dbOwner, dbName, "")
12101210
if err != nil {
12111211
jsonErr(w, err.Error(), http.StatusInternalServerError)
12121212
log.Println(err)
@@ -1567,7 +1567,7 @@ func viewsHandler(w http.ResponseWriter, r *http.Request) {
15671567
} else {
15681568
// Send the columns request to our AMQP backend
15691569
var rawResponse []byte
1570-
rawResponse, err = com.MQSendRequest(com.AmqpChan, liveNode, "views", loggedInUser, dbOwner, dbName, "")
1570+
rawResponse, err = com.MQRequest(com.AmqpChan, liveNode, "views", loggedInUser, dbOwner, dbName, "")
15711571
if err != nil {
15721572
jsonErr(w, err.Error(), http.StatusInternalServerError)
15731573
log.Println(err)

common/live.go

Lines changed: 10 additions & 255 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,6 @@ func ConnectMQ() (channel *amqp.Channel, err error) {
115115
var conn *amqp.Connection
116116
if Conf.Environment.Environment == "production" {
117117
// If certificate/key files have been provided, then we can use mutual TLS (mTLS)
118-
// TODO: Getting mTLS working was pretty easy with Lets Encrypt certs. Do we still need the server-only TLS
119-
// fallback below?
120118
if Conf.MQ.CertFile != "" && Conf.MQ.KeyFile != "" {
121119
var cert tls.Certificate
122120
cert, err = tls.LoadX509KeyPair(Conf.MQ.CertFile, Conf.MQ.KeyFile)
@@ -130,7 +128,8 @@ func ConnectMQ() (channel *amqp.Channel, err error) {
130128
}
131129
log.Printf("%s connected to AMQP server using mutual TLS (mTLS): %v:%d\n", Conf.Live.Nodename, Conf.MQ.Server, Conf.MQ.Port)
132130
} else {
133-
// Fallback to just verifying the server certs for TLS
131+
// Fallback to just verifying the server certs for TLS. This is needed by the DB4S end point, as it
132+
// uses certs from our own CA, so mTLS won't easily work with it.
134133
conn, err = amqp.Dial(fmt.Sprintf("amqps://%s:%s@%s:%d/", Conf.MQ.Username, Conf.MQ.Password, Conf.MQ.Server, Conf.MQ.Port))
135134
if err != nil {
136135
return
@@ -154,7 +153,7 @@ func ConnectMQ() (channel *amqp.Channel, err error) {
154153
func LiveCreateDB(channel *amqp.Channel, dbOwner, dbName string) (err error) {
155154
// Send the database setup request to our AMQP backend
156155
var rawResponse []byte
157-
rawResponse, err = MQSendRequest(channel, "create_queue", "createdb", "", dbOwner, dbName, "")
156+
rawResponse, err = MQRequest(channel, "create_queue", "createdb", "", dbOwner, dbName, "")
158157
if err != nil {
159158
return
160159
}
@@ -189,7 +188,7 @@ func LiveCreateDB(channel *amqp.Channel, dbOwner, dbName string) (err error) {
189188
func LiveQueryDB(channel *amqp.Channel, nodeName, requestingUser, dbOwner, dbName, query string) (rows SQLiteRecordSet, err error) {
190189
// Send the query request to our AMQP backend
191190
var rawResponse []byte
192-
rawResponse, err = MQSendRequest(channel, nodeName, "query", requestingUser, dbOwner, dbName, query)
191+
rawResponse, err = MQRequest(channel, nodeName, "query", requestingUser, dbOwner, dbName, query)
193192
if err != nil {
194193
return
195194
}
@@ -209,16 +208,10 @@ func LiveQueryDB(channel *amqp.Channel, nodeName, requestingUser, dbOwner, dbNam
209208
return
210209
}
211210

212-
// MQColumnsResponse sends a columns list response
213-
func MQColumnsResponse(msg amqp.Delivery, channel *amqp.Channel, nodeName string, columns []sqlite.Column, errMsg string) (err error) {
214-
// Construct the response
215-
resp := LiveDBColumnsResponse{
216-
Node: nodeName,
217-
Columns: columns,
218-
Error: errMsg,
219-
}
211+
// MQResponse sends an AMQP response back to its requester
212+
func MQResponse(requestType string, msg amqp.Delivery, channel *amqp.Channel, nodeName string, responseData interface{}) (err error) {
220213
var z []byte
221-
z, err = json.Marshal(resp)
214+
z, err = json.Marshal(responseData)
222215
if err != nil {
223216
log.Println(err)
224217
return
@@ -238,7 +231,7 @@ func MQColumnsResponse(msg amqp.Delivery, channel *amqp.Channel, nodeName string
238231
}
239232
msg.Ack(false)
240233
if AmqpDebug {
241-
log.Printf("[COLUMNS] Live node '%s' responded with ACK to message with correlationID: '%s', msg.ReplyTo: '%s'", nodeName, msg.CorrelationId, msg.ReplyTo)
234+
log.Printf("[%s] Live node '%s' responded with ACK to message with correlationID: '%s', msg.ReplyTo: '%s'", requestType, nodeName, msg.CorrelationId, msg.ReplyTo)
242235
}
243236
return
244237
}
@@ -297,178 +290,8 @@ func MQCreateResponse(msg amqp.Delivery, channel *amqp.Channel, nodeName, result
297290
return
298291
}
299292

300-
// MQErrorResponse sends an error message in response to an AMQP request.
301-
// It is probably only useful for returning errors that occur before we've decoded the incoming AMQP
302-
// request to know what type it is
303-
func MQErrorResponse(requestType string, msg amqp.Delivery, channel *amqp.Channel, nodeName string, errMsg string) (err error) {
304-
// Construct the response
305-
resp := LiveDBErrorResponse{
306-
Node: nodeName,
307-
Error: errMsg,
308-
}
309-
var z []byte
310-
z, err = json.Marshal(resp)
311-
if err != nil {
312-
log.Println(err)
313-
return
314-
}
315-
316-
// Send the message
317-
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
318-
defer cancel()
319-
err = channel.PublishWithContext(ctx, "", msg.ReplyTo, false, false,
320-
amqp.Publishing{
321-
ContentType: "text/json",
322-
CorrelationId: msg.CorrelationId,
323-
Body: z,
324-
})
325-
if err != nil {
326-
log.Println(err)
327-
}
328-
msg.Ack(false)
329-
if AmqpDebug {
330-
log.Printf("[%s] Live node '%s' responded with ACK to message with correlationID: '%s', msg.ReplyTo: '%s'", requestType, nodeName, msg.CorrelationId, msg.ReplyTo)
331-
}
332-
return
333-
}
334-
335-
// MQDeleteResponse sends an error message in response to an AMQP database deletion request
336-
func MQDeleteResponse(msg amqp.Delivery, channel *amqp.Channel, nodeName string, errMsg string) (err error) {
337-
// Construct the response
338-
resp := LiveDBErrorResponse{ // Yep, we're reusing this super basic error response instead of creating a new one
339-
Node: nodeName,
340-
Error: errMsg,
341-
}
342-
var z []byte
343-
z, err = json.Marshal(resp)
344-
if err != nil {
345-
log.Println(err)
346-
return
347-
}
348-
349-
// Send the message
350-
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
351-
defer cancel()
352-
err = channel.PublishWithContext(ctx, "", msg.ReplyTo, false, false,
353-
amqp.Publishing{
354-
ContentType: "text/json",
355-
CorrelationId: msg.CorrelationId,
356-
Body: z,
357-
})
358-
if err != nil {
359-
log.Println(err)
360-
}
361-
msg.Ack(false)
362-
if AmqpDebug {
363-
log.Printf("[DELETE] Live node '%s' responded with ACK to message with correlationID: '%s', msg.ReplyTo: '%s'", nodeName, msg.CorrelationId, msg.ReplyTo)
364-
}
365-
return
366-
}
367-
368-
// MQExecuteResponse sends a message in response to an AMQP database execute request
369-
func MQExecuteResponse(msg amqp.Delivery, channel *amqp.Channel, nodeName string, rowsChanged int, errMsg string) (err error) {
370-
// Construct the response
371-
resp := LiveDBExecuteResponse{
372-
Node: nodeName,
373-
RowsChanged: rowsChanged,
374-
Error: errMsg,
375-
}
376-
var z []byte
377-
z, err = json.Marshal(resp)
378-
if err != nil {
379-
log.Println(err)
380-
return
381-
}
382-
383-
// Send the message
384-
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
385-
defer cancel()
386-
err = channel.PublishWithContext(ctx, "", msg.ReplyTo, false, false,
387-
amqp.Publishing{
388-
ContentType: "text/json",
389-
CorrelationId: msg.CorrelationId,
390-
Body: z,
391-
})
392-
if err != nil {
393-
log.Println(err)
394-
}
395-
msg.Ack(false)
396-
if AmqpDebug {
397-
log.Printf("[EXECUTE] Live node '%s' responded with ACK to message with correlationID: '%s', msg.ReplyTo: '%s'", nodeName, msg.CorrelationId, msg.ReplyTo)
398-
}
399-
return
400-
}
401-
402-
// MQIndexesResponse sends an indexes list response
403-
func MQIndexesResponse(msg amqp.Delivery, channel *amqp.Channel, nodeName string, indexes []APIJSONIndex, errMsg string) (err error) {
404-
// Construct the response
405-
resp := LiveDBIndexesResponse{
406-
Node: nodeName,
407-
Indexes: indexes,
408-
Error: errMsg,
409-
}
410-
var z []byte
411-
z, err = json.Marshal(resp)
412-
if err != nil {
413-
log.Println(err)
414-
return
415-
}
416-
417-
// Send the message
418-
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
419-
defer cancel()
420-
err = channel.PublishWithContext(ctx, "", msg.ReplyTo, false, false,
421-
amqp.Publishing{
422-
ContentType: "text/json",
423-
CorrelationId: msg.CorrelationId,
424-
Body: z,
425-
})
426-
if err != nil {
427-
log.Println(err)
428-
}
429-
msg.Ack(false)
430-
if AmqpDebug {
431-
log.Printf("[INDEXES] Live node '%s' responded with ACK to message with correlationID: '%s', msg.ReplyTo: '%s'", nodeName, msg.CorrelationId, msg.ReplyTo)
432-
}
433-
return
434-
}
435-
436-
// MQQueryResponse sends a successful query response back
437-
func MQQueryResponse(msg amqp.Delivery, channel *amqp.Channel, nodeName string, results SQLiteRecordSet, errMsg string) (err error) {
438-
// Construct the response
439-
resp := LiveDBQueryResponse{
440-
Node: nodeName,
441-
Results: results,
442-
Error: errMsg,
443-
}
444-
var z []byte
445-
z, err = json.Marshal(resp)
446-
if err != nil {
447-
log.Println(err)
448-
return
449-
}
450-
451-
// Send the message
452-
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
453-
defer cancel()
454-
err = channel.PublishWithContext(ctx, "", msg.ReplyTo, false, false,
455-
amqp.Publishing{
456-
ContentType: "text/json",
457-
CorrelationId: msg.CorrelationId,
458-
Body: z,
459-
})
460-
if err != nil {
461-
log.Println(err)
462-
}
463-
msg.Ack(false)
464-
if AmqpDebug {
465-
log.Printf("[QUERY] Live node '%s' responded with ACK to message with correlationID: '%s', msg.ReplyTo: '%s'", nodeName, msg.CorrelationId, msg.ReplyTo)
466-
}
467-
return
468-
}
469-
470-
// MQSendRequest is the main function used for sending requests to our AMQP backend
471-
func MQSendRequest(channel *amqp.Channel, queue, operation, requestingUser, dbOwner, dbName, query string) (result []byte, err error) {
293+
// MQRequest is the main function used for sending requests to our AMQP backend
294+
func MQRequest(channel *amqp.Channel, queue, operation, requestingUser, dbOwner, dbName, query string) (result []byte, err error) {
472295
// Create a temporary AMQP queue for receiving the response
473296
var q amqp.Queue
474297
q, err = channel.QueueDeclare("", false, false, true, false, nil)
@@ -528,71 +351,3 @@ func MQSendRequest(channel *amqp.Channel, queue, operation, requestingUser, dbOw
528351
}
529352
return
530353
}
531-
532-
// MQTablesResponse sends a tables list response to an AMQP caller
533-
func MQTablesResponse(msg amqp.Delivery, channel *amqp.Channel, nodeName string, tables []string, errMsg string) (err error) {
534-
// Construct the response
535-
resp := LiveDBTablesResponse{
536-
Node: nodeName,
537-
Tables: tables,
538-
Error: errMsg,
539-
}
540-
var z []byte
541-
z, err = json.Marshal(resp)
542-
if err != nil {
543-
log.Println(err)
544-
return
545-
}
546-
547-
// Send the message
548-
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
549-
defer cancel()
550-
err = channel.PublishWithContext(ctx, "", msg.ReplyTo, false, false,
551-
amqp.Publishing{
552-
ContentType: "text/json",
553-
CorrelationId: msg.CorrelationId,
554-
Body: z,
555-
})
556-
if err != nil {
557-
log.Println(err)
558-
}
559-
msg.Ack(false)
560-
if AmqpDebug {
561-
log.Printf("[TABLES] Live node '%s' responded with ACK to message with correlationID: '%s', msg.ReplyTo: '%s'", nodeName, msg.CorrelationId, msg.ReplyTo)
562-
}
563-
return
564-
}
565-
566-
// MQViewsResponse sends a views list response to an AMQP caller
567-
func MQViewsResponse(msg amqp.Delivery, channel *amqp.Channel, nodeName string, views []string, errMsg string) (err error) {
568-
// Construct the response
569-
resp := LiveDBViewsResponse{
570-
Node: nodeName,
571-
Views: views,
572-
Error: errMsg,
573-
}
574-
var z []byte
575-
z, err = json.Marshal(resp)
576-
if err != nil {
577-
log.Println(err)
578-
return
579-
}
580-
581-
// Send the message
582-
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
583-
defer cancel()
584-
err = channel.PublishWithContext(ctx, "", msg.ReplyTo, false, false,
585-
amqp.Publishing{
586-
ContentType: "text/json",
587-
CorrelationId: msg.CorrelationId,
588-
Body: z,
589-
})
590-
if err != nil {
591-
log.Println(err)
592-
}
593-
msg.Ack(false)
594-
if AmqpDebug {
595-
log.Printf("[VIEWS] Live node '%s' responded with ACK to message with correlationID: '%s', msg.ReplyTo: '%s'", nodeName, msg.CorrelationId, msg.ReplyTo)
596-
}
597-
return
598-
}

0 commit comments

Comments
 (0)