-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfedora.go
More file actions
233 lines (211 loc) · 8.56 KB
/
fedora.go
File metadata and controls
233 lines (211 loc) · 8.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
package main
import (
"encoding/base64"
"encoding/json"
"fmt"
"log"
"net/http"
"slices"
"strings"
"time"
"github.com/go-stomp/stomp"
"github.com/nvkp/turtle"
)
// timeout value for waiting for new messages from the STOMP queue
const stompTimeout = 15 * time.Second
// base URL for Fedora object requests (API) and Hyrax public UI
// TO DO: retrieve from environment variables
const baseURL = "http://localhost:8080/fcrepo/rest"
const hyraxBase = "https://ec2-35-89-4-51.us-west-2.compute.amazonaws.com/concern/archival_documents"
// targets for filtering RDF metadata
var rdfTargets = map[string]map[string]string{"model": {"predicate": "info:fedora/fedora-system:def/model#hasModel", "object": "ArchivalDocument"}}
// RDF predicates for fields we want to extract from Fedora objects
var rdfFields = map[string]string{"title": "http://purl.org/dc/terms/title", "aoURI": "http://www.w3.org/2000/01/rdf-schema#seeAlso", "refID": "http://purl.org/dc/terms/identifier"}
// data structure for STOMP messages
type stompMessage struct {
MsgId string `json:"id"`
Type []string `json:"type"`
}
// data structure for RDF triple
type rdfTriple struct {
Subject string `turtle:"subject"`
Predicate string `turtle:"predicate"`
Object string `turtle:"object"`
}
// TO DO: refactor with interface for testing
// launches a goroutine that subscribes to a STOMP queue, waits for messages (or a timeout), and returns the messages down a channel
// generator function returns the channel for consuming the messages
func generateStompConsumer(conn *stomp.Conn, queueName string) (chan *stomp.Message, error) {
// channel for sending STOMP messages as they are received back to the main loop
// this channel is returned
messages := make(chan *stomp.Message)
// a struct with a channel that will return a value when timeout expires
timer := time.NewTimer(stompTimeout)
// a struct with a channel for receiving messages from a connected STOMP queue
sub, err := conn.Subscribe(queueName, stomp.AckClient)
if err != nil {
// TO DO: improve error handling
// abort if unable to subscribe
return nil, fmt.Errorf("could not subscribe to %s: %w", queueName, err)
}
log.Println("Subscribed to", queueName)
go func() {
// unsubscribe from the queue on return
defer sub.Unsubscribe()
// close the message channel, so that the receiver is notified
defer close(messages)
// loops until time's up
for {
select {
case msg := <-sub.C:
// when receiving a message, send it down the messages channel
// check for a valid STOMP message
// this avoids sending downstream the timeout error that STOMP sends with an empty message if there are none to read
// TO DO: handle other errors from the queue
if (msg != nil) && (msg.Body != nil) {
messages <- msg
// necessary to acknowledge receipt, or the messages remain in the queue
if msg.ShouldAck() {
err := conn.Ack(msg)
if err != nil {
// TO DO: implement error channel
fmt.Printf("Ack error, %+v", msg.Err.Error())
}
}
// reset the timer, so that we wait the maximum time for a new message
timer.Reset(stompTimeout)
}
case <-timer.C:
// time's up
return
}
}
}()
return messages, nil
}
// filters a slice of message statuses to those of a type that we care about
// ignoring "Follow" & "Delete" messages for now
func filterTypes(typeSlice []string) bool {
return slices.ContainsFunc(typeSlice, func(s string) bool {
// TO DO: Replace with environment variable
// useful for testing; in production, should be "Create" only
return (s == "Create") || (s == "Update")
})
}
// converts a Fedora URI to a URL that resolves to a Hyrax work page
func updateURI(uri string) string {
suffixSlice := strings.Split(uri, "/")
// expect the work ID to be last part of the path
suffix := suffixSlice[len(suffixSlice)-1]
return hyraxBase + "/" + suffix
}
// generate a basic auth string for the Fedora API
func basicAuth(username, password string) string {
auth := username + ":" + password
return base64.StdEncoding.EncodeToString([]byte(auth))
}
// filters a slice of RDF triples (corresponding to a single Fedora object) to determine if this object is of the type we want
func filterTriples(data []rdfTriple) bool {
// returns true if the given slice of RDF triple structs contains the specified predicate/object pair
model := rdfTargets["model"]
i := slices.IndexFunc(data, func(t rdfTriple) bool {
return (t.Predicate == model["predicate"]) && (t.Object == model["object"])
})
return i != -1
}
// extracts metadata from the list of triples associated with an archival document
// these will be used to populate the digital archival object record and link it to the parent archival object
func fedora2ASpace(data []rdfTriple) AOMetadata {
var aoMetadata AOMetadata
for _, triple := range data {
switch triple.Predicate {
case rdfFields["title"]:
aoMetadata.Title = triple.Object
case rdfFields["aoURI"]:
aoMetadata.URI = triple.Object
case rdfFields["refID"]:
aoMetadata.RefID = triple.Object
}
}
return aoMetadata
}
// converts a slice of bytes into a slice of structs representing RDF triples
func processRDF(data []byte) ([]rdfTriple, error) {
triples := make([]rdfTriple, 0)
err := turtle.Unmarshal(data, &triples)
if err != nil {
return nil, fmt.Errorf("couldn't unpack RDF data: %s, %w", string(data), err)
}
return triples, nil
}
// construct GET request to retrieve RDF data from the Fedora API
func makeFedoraRequest(objectURI string) (*http.Request, error) {
// TO DO: confirm proper construction of URL
objectURL := baseURL + objectURI
headers := map[string]string{"Accept": "text/turtle",
"Authorization": "Basic " + basicAuth("fedoraAdmin", "fedoraAdmin")}
req, err := http.NewRequest("GET", objectURL, nil)
if err != nil {
return nil, fmt.Errorf("can't create Fedora request for URI %s: %w", objectURL, err)
}
for k, v := range headers {
req.Header.Add(k, v)
}
return req, nil
}
func fetchFedoraMessages() (chan *stomp.Message, error) {
// STOMP code based on https://github.com/birkland/fcr-listen/tree/master
// TO DO: check config options
conn, err := stomp.Dial("tcp", "localhost:61613", stomp.ConnOpt.AcceptVersion(stomp.V12),
stomp.ConnOpt.HeartBeat(10*time.Second, 5*time.Second))
if err != nil {
return nil, fmt.Errorf("unable to connect to Fedora queue: %w", err)
}
messages, err := generateStompConsumer(conn, "/queue/fedora")
if err != nil {
return nil, fmt.Errorf("retrieving messages from Fedora queue failed! %w", err)
}
return messages, nil
}
// iterates over a channel expected to contain messages from the Fedora queue
// for those messages of the appropriate type, queries the Fedora API to retrieve the RDF metadata
// each query gets a dedicated channel (fan-out, fan-in pattern)
// we use a single channel for errors
func fetchFedoraObjects(client *http.Client, messages chan *stomp.Message, errorChan chan error) map[string]chan response {
// mapping of each Fedora object ID to a channel that will receive the metadata for that object
// each channel sends a slice of bytes (http response body)
fedoraObjMap := make(map[string]chan response)
for msg := range messages {
var parsedMessage stompMessage
// Get the URI of the Fedora object associated with this message
objectURI := msg.Header.Get("org.fcrepo.jms.identifier")
// unpack the JSON body, which contains the type of Fedora action associated with the object (Create, Update, Delete, etc.)
err := json.Unmarshal(msg.Body, &parsedMessage)
if err != nil {
// admittedly, rather convoluted...if we encounter an error in parsing the STOMP message associated with a given URI, we want to pass that back to main for logging
// but we don't want to override a useful result (if it's possible we've already seen one for this URI)
if _, ok := fedoraObjMap[objectURI]; !ok {
errorChan <- fmt.Errorf("cannot parse JSON from STOMP message %s: %w", string(msg.Body), err)
}
} else {
// get the Fedora object path/identifier
if filterTypes(parsedMessage.Type) {
// check whether we've processed this ID already
_, ok := fedoraObjMap[objectURI]
// if new, process the ID, unless it points to a file upload
// TO DO: check header for Content-Type
if !ok && !strings.HasSuffix(objectURI, "original") {
// from each call, collect the response channel to be merged later
// these requests will be made concurrently
fedoraRequest, err := makeFedoraRequest(objectURI)
if err != nil {
errorChan <- err
} else {
fedoraObjMap[objectURI] = generateWorker(client, fedoraRequest, errorChan)
}
}
}
}
}
return fedoraObjMap
}