Skip to content

Commit 1206606

Browse files
committed
WIP on merging 2 or more archives when tilesets are disjoint
* currently algorithm is broken for any deduplication in archives
1 parent 2d9a839 commit 1206606

File tree

4 files changed

+191
-2
lines changed

4 files changed

+191
-2
lines changed

main.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,7 @@ var cli struct {
7070
} `cmd:"" help:"Create an archive from a larger archive for a subset of zoom levels or geographic region"`
7171

7272
Merge struct {
73-
Output string `arg:"" help:"Output archive" type:"path"`
74-
Input []string `arg:"" help:"Input archives"`
73+
Archives []string `arg:"" name:"inputs_then_output" help:"One or more input archives, followed by the output name last."`
7574
} `cmd:"" help:"Merge multiple archives into a single archive" hidden:""`
7675

7776
Convert struct {
@@ -217,6 +216,11 @@ func main() {
217216
if err != nil {
218217
logger.Fatalf("Failed to convert %s, %v", path, err)
219218
}
219+
case "merge <inputs_then_output>":
220+
err := pmtiles.Merge(logger, cli.Merge.Archives)
221+
if err != nil {
222+
logger.Fatalf("Failed to merge, %v", err)
223+
}
220224
case "upload <input-pmtiles> <remote-pmtiles>":
221225
err := pmtiles.Upload(logger, cli.Upload.InputPmtiles, cli.Upload.Bucket, cli.Upload.RemotePmtiles, cli.Upload.MaxConcurrency)
222226

pmtiles/merge.go

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
package pmtiles
2+
3+
import (
4+
"fmt"
5+
"github.com/RoaringBitmap/roaring/roaring64"
6+
"io"
7+
"log"
8+
"math"
9+
"os"
10+
"sort"
11+
)
12+
13+
type MergeEntry struct {
14+
Entry EntryV3
15+
InputIdx int // the index of the input archive 0...N
16+
InputOffset uint64 // the original offset of the entry in the archive's tile section
17+
}
18+
19+
type MergeOp struct {
20+
InputIdx int
21+
InputOffset uint64
22+
Length uint64
23+
}
24+
25+
func Merge(logger *log.Logger, inputs []string) error {
26+
union := roaring64.New()
27+
var mergedEntries []MergeEntry
28+
29+
minLonE7 := int32(math.MaxInt32)
30+
minLatE7 := int32(math.MaxInt32)
31+
maxLonE7 := int32(math.MinInt32)
32+
maxLatE7 := int32(math.MinInt32)
33+
34+
var handles []*os.File
35+
var headers []HeaderV3
36+
37+
for archiveIdx, archive := range inputs[:len(inputs)-1] {
38+
f, _ := os.OpenFile(archive, os.O_RDONLY, 0666)
39+
handles = append(handles, f)
40+
41+
buf := make([]byte, HeaderV3LenBytes)
42+
_, _ = f.Read(buf)
43+
h, _ := DeserializeHeader(buf)
44+
headers = append(headers, h)
45+
46+
if !h.Clustered {
47+
return fmt.Errorf("Archive must be clustered")
48+
}
49+
50+
if archiveIdx > 0 {
51+
if h.TileType != headers[0].TileType {
52+
return fmt.Errorf("Tile types do not match")
53+
}
54+
if h.TileCompression != headers[0].TileCompression {
55+
return fmt.Errorf("Tile compressions do not match")
56+
}
57+
if h.InternalCompression != headers[0].InternalCompression {
58+
return fmt.Errorf("Internal compressions do not match")
59+
}
60+
}
61+
62+
if h.MinLonE7 < minLonE7 {
63+
minLonE7 = h.MinLonE7
64+
}
65+
if h.MinLatE7 < minLatE7 {
66+
minLatE7 = h.MinLatE7
67+
}
68+
if h.MaxLonE7 > maxLonE7 {
69+
maxLonE7 = h.MaxLonE7
70+
}
71+
if h.MaxLatE7 > maxLatE7 {
72+
maxLatE7 = h.MaxLatE7
73+
}
74+
75+
tileset := roaring64.New()
76+
_ = IterateEntries(h,
77+
func(offset uint64, length uint64) ([]byte, error) {
78+
return io.ReadAll(io.NewSectionReader(f, int64(offset), int64(length)))
79+
},
80+
func(e EntryV3) {
81+
tileset.AddRange(e.TileID, e.TileID+uint64(e.RunLength))
82+
mergedEntries = append(mergedEntries, MergeEntry{Entry: e, InputOffset: e.Offset, InputIdx: archiveIdx})
83+
})
84+
85+
if union.Intersects(tileset) {
86+
return fmt.Errorf("Tilesets intersect")
87+
}
88+
union.Or(tileset)
89+
}
90+
91+
// sort all MergeEntries
92+
sort.Slice(mergedEntries, func(i, j int) bool {
93+
return mergedEntries[i].Entry.TileID < mergedEntries[j].Entry.TileID
94+
})
95+
96+
// renumber the offsets
97+
acc := uint64(0)
98+
addressedTiles := uint64(0)
99+
tileContents := roaring64.New()
100+
for idx := range mergedEntries {
101+
// TODO: this algo is broken with any deduplication of tiles
102+
// need to bookkeep on the max seen offset in each input archive
103+
mergedEntries[idx].Entry.Offset = acc
104+
acc += uint64(mergedEntries[idx].Entry.Length)
105+
addressedTiles += uint64(mergedEntries[idx].Entry.RunLength)
106+
tileContents.Add(mergedEntries[idx].Entry.Offset)
107+
}
108+
109+
// construct a directory
110+
tmp := make([]EntryV3, len(mergedEntries))
111+
for i := range mergedEntries {
112+
tmp[i] = mergedEntries[i].Entry
113+
}
114+
115+
rootBytes, leavesBytes, _ := optimizeDirectories(tmp, 16384-HeaderV3LenBytes, Gzip)
116+
117+
var header HeaderV3
118+
119+
header.RootOffset = HeaderV3LenBytes
120+
header.RootLength = uint64(len(rootBytes))
121+
header.MetadataOffset = header.RootOffset + header.RootLength
122+
header.MetadataLength = headers[0].MetadataLength
123+
header.InternalCompression = headers[0].InternalCompression
124+
header.TileCompression = headers[0].TileCompression
125+
header.LeafDirectoryOffset = header.MetadataOffset + header.MetadataLength
126+
header.LeafDirectoryLength = uint64(len(leavesBytes))
127+
header.TileDataOffset = header.LeafDirectoryOffset + header.LeafDirectoryLength
128+
129+
header.MinLonE7 = minLonE7
130+
header.MinLatE7 = minLatE7
131+
header.MaxLonE7 = maxLonE7
132+
header.MaxLatE7 = maxLatE7
133+
134+
// although we can rely on the input header data,
135+
// it's cheap and more reliable to re-calculate these from scratch
136+
firstZ, _, _ := IDToZxy(mergedEntries[0].Entry.TileID)
137+
header.MinZoom = uint8(firstZ)
138+
lastEntry := mergedEntries[len(mergedEntries)-1].Entry
139+
lastZ, _, _ := IDToZxy(lastEntry.TileID + uint64(lastEntry.RunLength) - 1)
140+
header.MaxZoom = uint8(lastZ)
141+
// construct a new center
142+
143+
header.TileDataLength = acc
144+
header.AddressedTilesCount = addressedTiles
145+
header.TileEntriesCount = uint64(len(mergedEntries))
146+
header.TileContentsCount = tileContents.GetCardinality()
147+
148+
// optimize IO by batching
149+
var mergeOps []MergeOp
150+
for _, me := range mergedEntries {
151+
last := len(mergeOps) - 1
152+
entryLength := uint64(me.Entry.Length)
153+
if last >= 0 && (mergeOps[last].InputIdx == me.InputIdx) && (me.InputOffset == mergeOps[last].InputOffset+mergeOps[last].Length) {
154+
mergeOps[last].Length += entryLength
155+
} else {
156+
mergeOps = append(mergeOps, MergeOp{InputIdx: me.InputIdx, InputOffset: me.InputOffset, Length: entryLength})
157+
}
158+
}
159+
160+
output, _ := os.Create(inputs[len(inputs)-1])
161+
defer output.Close()
162+
163+
headerBytes := SerializeHeader(header)
164+
_, _ = output.Write(headerBytes)
165+
_, _ = output.Write(rootBytes)
166+
fmt.Println("Copying JSON metadata from first input element")
167+
firstHandle := handles[0]
168+
firstHandle.Seek(int64(headers[0].MetadataOffset), io.SeekStart)
169+
io.CopyN(output, firstHandle, int64(headers[0].MetadataLength))
170+
_, _ = output.Write(leavesBytes)
171+
172+
for _, op := range mergeOps {
173+
handle := handles[op.InputIdx]
174+
handle.Seek(int64(headers[op.InputIdx].TileDataOffset)+int64(op.InputOffset), io.SeekStart)
175+
io.CopyN(output, handle, int64(op.Length))
176+
}
177+
178+
for _, h := range handles {
179+
h.Close()
180+
}
181+
182+
return nil
183+
}

pmtiles/merge_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package pmtiles

pmtiles/verify.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ func Verify(_ *log.Logger, file string) error {
153153
return fmt.Errorf("invalid: header MinZoom=%v does not match min tile z %v", header.MinZoom, z)
154154
}
155155

156+
// TODO this is technically kind of wrong
156157
if z, _, _ := IDToZxy(maxTileID); z != header.MaxZoom {
157158
return fmt.Errorf("invalid: header MaxZoom=%v does not match max tile z %v", header.MaxZoom, z)
158159
}

0 commit comments

Comments
 (0)