Skip to content

Commit cc8a049

Browse files
committed
WIP: document store
1 parent 285a07d commit cc8a049

File tree

7 files changed

+474
-18
lines changed

7 files changed

+474
-18
lines changed

commands_doc.go.wip

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
package main
2+
3+
import (
4+
"encoding/csv"
5+
"strconv"
6+
"strings"
7+
"sync"
8+
"time"
9+
10+
"github.com/alash3al/redix/kvstore"
11+
"github.com/jeremywohl/flatten"
12+
"github.com/rs/xid"
13+
"github.com/tidwall/gjson"
14+
"github.com/tidwall/sjson"
15+
)
16+
17+
func dsetCommand(c Context) {
18+
if len(c.args) < 2 {
19+
c.WriteError("DSET command requires at least 2 arguments: DSET <collection> [<json> <json> ...]")
20+
return
21+
}
22+
23+
collection := c.args[0]
24+
25+
docs := map[string]string{}
26+
indexes := map[string]string{}
27+
28+
for _, j := range c.args[1:] {
29+
if !gjson.Valid(j) {
30+
continue
31+
}
32+
33+
randID := xid.New().String()
34+
parseddoc := gjson.Parse(j)
35+
docid := ""
36+
37+
if !parseddoc.Get("id").Exists() || parseddoc.Get("id").String() == "" {
38+
j, _ = sjson.Set(j, "id", randID)
39+
40+
docid = randID
41+
} else {
42+
docid = parseddoc.Get("id").String()
43+
}
44+
45+
if !parseddoc.Get("created_at").Exists() {
46+
j, _ = sjson.Set(j, "created_at", time.Now().UnixNano())
47+
}
48+
49+
j, _ = sjson.Set(j, "updated_at", time.Now().UnixNano())
50+
51+
flatdoc, err := flatten.FlattenString(j, "", flatten.DotStyle)
52+
if err != nil {
53+
continue
54+
}
55+
56+
docs[collection+"/{DOCUMENT}/{RAW}/"+docid] = j
57+
58+
gjson.Parse(flatdoc).ForEach(func(k, v gjson.Result) bool {
59+
// <collection>/{DOCUMENT}/{INDEX}/<docfield>/<docfieldvalue>/<sequence> = <docid>
60+
indexes[collection+"/{DOCUMENT}/{INDEX}/"+k.String()+"/"+v.Raw+"/"+xid.New().String()] = docid
61+
return true
62+
})
63+
}
64+
65+
c.db.MSet(docs)
66+
c.db.MSet(indexes)
67+
68+
c.WriteArray(len(docs) * 2)
69+
for k, v := range docs {
70+
k = strings.SplitN(k, "/{DOCUMENT}/{RAW}/", 2)[1]
71+
72+
c.WriteBulkString(k)
73+
c.WriteBulkString(v)
74+
}
75+
}
76+
77+
func dgetCommand(c Context) {
78+
if len(c.args) < 2 {
79+
c.WriteError("DGET requires at least 2 arguments: DGET <collection> <docid>")
80+
return
81+
}
82+
83+
collection, docid := c.args[0], c.args[1]
84+
85+
doc, err := c.db.Get(collection + "/{DOCUMENT}/{RAW}/" + docid)
86+
if err != nil {
87+
c.WriteError(err.Error())
88+
return
89+
}
90+
91+
c.WriteBulkString(doc)
92+
}
93+
94+
func dgetallCommand(c Context) {
95+
if len(c.args) < 1 {
96+
c.WriteError("DGETALL command requires at least one argument: DGETALL <collection> [<offset> <limit>]")
97+
return
98+
}
99+
100+
prefix := c.args[0] + "/{DOCUMENT}/{RAW}/"
101+
offset := prefix
102+
limit := 5
103+
104+
if len(c.args) > 1 && c.args[1] != "" {
105+
offset = prefix + c.args[1]
106+
}
107+
108+
if len(c.args) > 2 {
109+
limit, _ = strconv.Atoi(c.args[2])
110+
}
111+
112+
data := []string{}
113+
length := 0
114+
err := c.db.Scan(kvstore.ScannerOptions{
115+
FetchValues: true,
116+
IncludeOffset: offset == prefix,
117+
Prefix: prefix,
118+
Offset: offset,
119+
Handler: func(k, v string) bool {
120+
length++
121+
p := strings.SplitN(k, "/{DOCUMENT}/{RAW}/", 2)
122+
if len(p) < 2 {
123+
return true
124+
}
125+
data = append(data, p[1], v)
126+
if length >= limit {
127+
return false
128+
}
129+
return true
130+
},
131+
})
132+
133+
if err != nil {
134+
c.WriteError(err.Error())
135+
return
136+
}
137+
138+
c.WriteArray(len(data))
139+
140+
for _, v := range data {
141+
c.WriteBulkString(v)
142+
}
143+
}
144+
145+
func dfilterCommand(c Context) {
146+
if len(c.args) < 2 {
147+
c.WriteError("DFILTER command requires at least one argument: DFILTER <collection> <filter> [<offset> <limit>]")
148+
return
149+
}
150+
151+
prefix := c.args[0] + "/{DOCUMENT}/{INDEX}/"
152+
filters, err := (func() ([]string, error) {
153+
r := csv.NewReader(strings.NewReader(c.args[1]))
154+
r.Comma = ' '
155+
return r.Read()
156+
})()
157+
158+
if err != nil {
159+
c.WriteError(err.Error())
160+
return
161+
}
162+
163+
offset := prefix
164+
limit := 5
165+
166+
if len(c.args) > 1 && c.args[1] != "" {
167+
offset = prefix + c.args[1]
168+
}
169+
170+
if len(c.args) > 2 {
171+
limit, _ = strconv.Atoi(c.args[2])
172+
}
173+
174+
data := []string{}
175+
length := 0
176+
177+
// all := map[string][]string{}
178+
wg := sync.WaitGroup{}
179+
180+
for _, filter := range filters {
181+
parts := strings.SplitN(filter, ":", 2)
182+
if len(parts) < 2 {
183+
continue
184+
}
185+
wg.Add(1)
186+
go (func() {
187+
defer wg.Done()
188+
c.db.Scan(kvstore.ScannerOptions{
189+
FetchValues: true,
190+
IncludeOffset: offset == prefix,
191+
Prefix: prefix,
192+
Offset: offset,
193+
Handler: func(k, v string) bool {
194+
length++
195+
p := strings.SplitN(k, "/{DOCUMENT}/{RAW}/", 2)
196+
if len(p) < 2 {
197+
return true
198+
}
199+
data = append(data, p[1], v)
200+
if length >= limit {
201+
return false
202+
}
203+
return true
204+
},
205+
})
206+
})()
207+
}
208+
209+
wg.Wait()
210+
211+
c.WriteArray(len(data))
212+
213+
for _, v := range data {
214+
c.WriteBulkString(v)
215+
}
216+
}

commands_strings.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ func setCommand(c Context) {
2626
}
2727

2828
ttlVal, _ := strconv.Atoi(ttl)
29+
if ttlVal < 0 {
30+
ttlVal = 0
31+
}
2932

3033
if err := c.db.Set(k, v, ttlVal); err != nil {
3134
c.WriteError(err.Error())

helpers.go renamed to helpers_db.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@ import (
88
"path/filepath"
99
"strings"
1010

11-
"github.com/alash3al/redix/kvstore/bolt"
12-
13-
"github.com/alash3al/redix/kvstore/badger"
14-
1511
"github.com/alash3al/redix/kvstore"
12+
"github.com/alash3al/redix/kvstore/badger"
13+
"github.com/alash3al/redix/kvstore/bolt"
14+
"github.com/alash3al/redix/kvstore/level"
1615
)
1716

1817
// selectDB - load/fetches the requested db
@@ -41,5 +40,7 @@ func openDB(engine, dbpath string) (kvstore.DB, error) {
4140
return badger.OpenBadger(dbpath)
4241
case "bolt", "boltdb":
4342
return bolt.OpenBolt(dbpath)
43+
case "level", "leveldb":
44+
return level.OpenLevelDB(dbpath)
4445
}
4546
}

init.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func init() {
3636
return
3737
}
3838

39-
color.Cyan(redixBrand)
39+
color.Blue(redixBrand)
4040

4141
databases = new(sync.Map)
4242
changelog = pubsub.NewBroker()
@@ -53,7 +53,9 @@ func init() {
5353
if !f.IsDir() {
5454
continue
5555
}
56+
5657
name := filepath.Base(f.Name())
58+
5759
_, err := selectDB(name)
5860
if err != nil {
5961
log.Println(color.RedString(err.Error()))

0 commit comments

Comments
 (0)