-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfcrepo_tools.py
More file actions
167 lines (143 loc) · 4.72 KB
/
fcrepo_tools.py
File metadata and controls
167 lines (143 loc) · 4.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import logging
import subprocess
from datetime import datetime as dt
from pathlib import Path
from typing import List
import click
import requests
from pyoxigraph import NamedNode, RdfFormat, Store, parse, serialize
from pythonjsonlogger.json import JsonFormatter
from requests import HTTPError
from yaml import Loader, load
from pytools.fcrepo_to_bulkrax import FedoraGraph
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)
class GraphPart:
def __init__(self, dirs: List[str | Path], store: str):
self.dirs = [Path(d) for d in dirs]
self.g = Store(store)
def add_nodes(self, ttl: str | Path):
logging.info(f"Adding {ttl} to graph.")
self.g.bulk_load(path=ttl)
def walk(self):
for d in self.dirs:
for p in d.rglob("*"):
if p.is_file() and (p.name.endswith(".ttl") or p.name.endswith(".nt")):
self.add_nodes(p)
def parse_list(self, p_list: List[Path | str]):
for p in p_list:
if str(p).endswith(".ttl"):
self.add_nodes(p)
def delete_object(session, uri):
try:
r = session.delete(uri)
if r.status_code != 204:
r.raise_for_status()
except HTTPError:
logging.error(f"Error deleting {uri}: {r.text}")
@click.group()
def main():
pass
@main.command()
@click.option(
"--remote-path", help="Path to OCFL root directory on the remove, to sync FROM."
)
@click.option(
"--local-path", default="./", help="Path on the local machine, to sync TO "
)
@click.option("--rsync", default="rsync", help="Local path to rsync command.")
def rsync_ocfl(remote_path, local_path, rsync):
# -rvzWP -f'+ *.nt' -f'+ */' -f'- *' --dry-run aws-gwss-migrate:/data/ocfl-root ./data
# filter for all .nt files, all directories, exclude all binaries
command = [
rsync,
"-rvzWP",
"-f",
"+ *.nt",
"-f",
"+ */",
"-f",
"- *",
# "--dry-run",
remote_path,
local_path,
]
completed = subprocess.run(command, capture_output=True, text=True)
if completed.returncode == 0:
print("Rsync completed successfully.")
@main.command()
@click.option("--ttl", help="Path to TTL file to modify.")
def remove_audits(ttl):
logging.info("Removing all child notes except rest/prod")
g = parse(path=ttl)
# Derive prefixes from original TTL file
with open(ttl) as f:
rdf = f.read()
prefixes = [r for r in rdf.split("\n") if r.startswith("@prefix")]
prefix_dict = {}
for prefix in prefixes:
p = prefix.split()
prefix_dict[p[1][:-1]] = p[2][1:-1]
# Remove every child except the prod object
keep = [
node
for node in g
if not (
node.predicate == NamedNode("http://www.w3.org/ns/ldp#contains")
and node.object != NamedNode("http://localhost:8984/rest/prod")
)
]
# Save modified graph
serialize(input=keep, output=ttl, format=RdfFormat.TURTLE, prefixes=prefix_dict)
@main.command()
@click.option("--root", help="Root path to repository files.")
@click.option("--output", help="Path for saving RDF store.")
def parse_graph(root: str, output: str):
g = GraphPart([root], output)
g.walk()
logging.info("Saving graph.")
@main.command()
@click.option(
"--objects",
help="Path to text file containing list of objects to remove, one URI per line.",
)
def remove_orphans(objects):
with open(objects) as f:
uris = [r.strip() for r in f]
session = requests.Session()
for uri in uris:
if uri:
# Replace the localhost with the base URI of the host network
uri = uri.replace("localhost", "127.0.0.1")
logging.info(f"Deleting object {uri}")
delete_object(session, uri)
@main.command()
@click.option(
"--config",
default="./fcrepo_to_bulkrax.yml",
help="Path to config file (YAML) containing options for fcrepo_to_bulkrax tool.",
)
@click.option(
"--admin-set",
default="",
help="Title of admin set (to override value in YAML file)",
)
@click.option("--dry-run", is_flag=True)
def extract_to_bulkrax(config, admin_set, dry_run):
with open(config) as f:
options = load(f, Loader=Loader)
if admin_set:
options["admin_set"] = admin_set
options["dry_run"] = dry_run
if dry_run:
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
handler = logging.FileHandler(
f"{options['output_path']}/{dt.now().timestamp()}.dry_run.log", mode="w"
)
handler.setFormatter(JsonFormatter())
logger.addHandler(handler)
graph = FedoraGraph(**options)
graph.prepare_imports()
if __name__ == "__main__":
main()