forked from internetarchive/gowarc
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwrite.go
More file actions
161 lines (131 loc) · 4.03 KB
/
write.go
File metadata and controls
161 lines (131 loc) · 4.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package warc
import (
"bufio"
"fmt"
"io"
"strconv"
"strings"
"time"
"github.com/google/uuid"
"github.com/internetarchive/gowarc/pkg/spooledtempfile"
)
type Compressor interface {
io.Writer // Embedded so the compressor can be used as an io.Writer and its underlying writer can be replaced via Reset.
io.Closer
Reset(io.Writer)
}
// Writer writes WARC records to WARC files.
type Writer struct {
Compressor Compressor
BufWriter *bufio.Writer
FileName string
DigestAlgorithm DigestAlgorithm
ParallelGZIP bool
}
// RecordBatch is a structure that contains a bunch of
// records to be written at the same time, and a common
// capture timestamp. FeedbackChan is used to signal
// when the records have been written.
type RecordBatch struct {
FeedbackChan chan struct{}
CaptureTime string
Records []*Record
}
// Record represents a WARC record.
type Record struct {
Header Header
Content spooledtempfile.ReadWriteSeekCloser
Version string // WARC/1.0, WARC/1.1 ...
Offset int64 // Offset of the record start (-1 if WARC file type is not supported yet)
Size int64 // COMPRESSED size of the record (gzip member): header + deflate data + trailer. (-1 if WARC file type is not supported yet)
}
// WriteRecord writes a record to the underlying WARC file and flushes the data.
// A record consists of a version string, the record header followed by a
// record content block and two newlines:
//
// Version CLRF
// Header-Key: Header-Value CLRF
// CLRF
// Content
// CLRF
// CLRF
func (w *Writer) WriteRecord(r *Record) (recordID string, err error) {
defer r.Content.Close()
var written int64
// Add the mandatories headers
if r.Header.Get("WARC-Date") == "" {
r.Header.Set("WARC-Date", time.Now().UTC().Format(time.RFC3339Nano))
}
if r.Header.Get("WARC-Type") == "" {
r.Header.Set("WARC-Type", "resource")
}
if r.Header.Get("WARC-Record-ID") == "" {
recordID = uuid.NewString()
r.Header.Set("WARC-Record-ID", "<urn:uuid:"+recordID+">")
}
if _, err := io.WriteString(w.BufWriter, "WARC/1.1\r\n"); err != nil {
return recordID, err
}
// Write headers
if r.Header.Get("Content-Length") == "" {
r.Header.Set("Content-Length", strconv.Itoa(getContentLength(r.Content)))
}
if r.Header.Get("WARC-Block-Digest") == "" {
r.Content.Seek(0, 0)
digest, err := GetDigest(r.Content, w.DigestAlgorithm)
if err != nil {
return recordID, err
}
r.Header.Set("WARC-Block-Digest", digest)
}
for key, value := range r.Header {
if _, err := io.WriteString(w.BufWriter, fmt.Sprintf("%s: %s\r\n", key, value)); err != nil {
return recordID, err
}
}
if _, err := io.WriteString(w.BufWriter, "\r\n"); err != nil {
return recordID, err
}
r.Content.Seek(0, 0)
if written, err = io.Copy(w.BufWriter, r.Content); err != nil {
return recordID, err
}
if written > 0 {
DataTotal.Add(written)
}
if _, err := io.WriteString(w.BufWriter, "\r\n\r\n"); err != nil {
return recordID, err
}
// Flush data
err = w.FlushAndCloseCompressor()
if err != nil {
return recordID, err
}
return recordID, nil
}
// WriteInfoRecord method can be used to write an information record to the WARC file and flush the data
func (w *Writer) WriteInfoRecord(payload map[string]string) (recordID string, err error) {
// Initialize the record
infoRecord := NewRecord("", false)
// Set the headers
infoRecord.Header.Set("WARC-Date", time.Now().UTC().Format(time.RFC3339Nano))
infoRecord.Header.Set("WARC-Filename", strings.TrimSuffix(w.FileName, ".open"))
infoRecord.Header.Set("WARC-Type", "warcinfo")
infoRecord.Header.Set("Content-Type", "application/warc-fields")
// Write the payload
for k, v := range payload {
fmt.Fprintf(infoRecord.Content, "%s: %s\r\n", k, v)
}
// Generate WARC-Block-Digest
digest, err := GetDigest(infoRecord.Content, w.DigestAlgorithm)
if err != nil {
return recordID, err
}
infoRecord.Header.Set("WARC-Block-Digest", digest)
// Finally, write the record and flush the data
recordID, err = w.WriteRecord(infoRecord)
if err != nil {
return recordID, err
}
return recordID, err
}