Skip to content

Commit 0dc2188

Browse files
WestonsideWeston Tuescher
andauthored
feat: add "headless" json-rpc based REPL (#5040)
* feat: Created the headless repl file sucessfully and modified existing methods so that it can work independently. * chore: added in suggested changes to the main go file * chore: removed unsused functions * chore: make fmt main * fix: make fmt and delete loadfile function * fix: remove unused import Co-authored-by: Weston Tuescher <westontuescher@Westons-MacBook-Pro.local>
1 parent 2879293 commit 0dc2188

File tree

1 file changed

+352
-0
lines changed

1 file changed

+352
-0
lines changed

internal/cmd/headless_repl/main.go

Lines changed: 352 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,352 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
"io"
8+
"io/ioutil"
9+
"net/rpc"
10+
"net/rpc/jsonrpc"
11+
"os"
12+
"os/signal"
13+
"sync"
14+
"syscall"
15+
16+
"github.com/influxdata/flux"
17+
"github.com/influxdata/flux/dependencies"
18+
"github.com/influxdata/flux/dependencies/feature"
19+
"github.com/influxdata/flux/dependency"
20+
"github.com/influxdata/flux/execute"
21+
"github.com/influxdata/flux/execute/executetest"
22+
"github.com/influxdata/flux/fluxinit"
23+
"github.com/influxdata/flux/internal/spec"
24+
"github.com/influxdata/flux/interpreter"
25+
"github.com/influxdata/flux/lang"
26+
"github.com/influxdata/flux/libflux/go/libflux"
27+
"github.com/influxdata/flux/memory"
28+
"github.com/influxdata/flux/repl"
29+
"github.com/influxdata/flux/runtime"
30+
"github.com/influxdata/flux/semantic"
31+
"github.com/influxdata/flux/values"
32+
)
33+
34+
type ScopeHolder struct {
35+
ctx context.Context
36+
37+
scope values.Scope
38+
itrp *interpreter.Interpreter
39+
analyzer *libflux.Analyzer
40+
importer interpreter.Importer
41+
42+
cancelMu sync.Mutex
43+
cancelFunc context.CancelFunc
44+
45+
resChan chan string
46+
}
47+
48+
func New(ctx context.Context) *ScopeHolder {
49+
scope := values.NewScope()
50+
importer := runtime.StdLib()
51+
for _, p := range runtime.PreludeList {
52+
pkg, err := importer.ImportPackageObject(p)
53+
if err != nil {
54+
panic(err)
55+
}
56+
pkg.Range(scope.Set)
57+
}
58+
59+
analyzer, err := libflux.NewAnalyzerWithOptions(libflux.NewOptions(ctx))
60+
if err != nil {
61+
panic(err)
62+
}
63+
64+
repl := &ScopeHolder{
65+
ctx: ctx,
66+
scope: scope,
67+
itrp: interpreter.NewInterpreter(nil, &lang.ExecOptsConfig{}),
68+
analyzer: analyzer,
69+
importer: importer,
70+
}
71+
72+
return repl
73+
}
74+
75+
type rwCloser struct {
76+
io.ReadCloser
77+
io.WriteCloser
78+
}
79+
80+
func (rw rwCloser) Close() error {
81+
err := rw.ReadCloser.Close()
82+
if err := rw.WriteCloser.Close(); err != nil {
83+
return err
84+
}
85+
return err
86+
}
87+
88+
type Response struct {
89+
Result string
90+
}
91+
92+
type Request struct {
93+
A string `json:"input"`
94+
}
95+
96+
type Service struct {
97+
c chan string
98+
res chan string
99+
}
100+
101+
func (s *Service) DidOutput(req Request, resp *Response) error {
102+
s.c <- req.A
103+
result := <-s.res
104+
*resp = Response{result}
105+
return nil
106+
}
107+
108+
type API int
109+
110+
func (r *ScopeHolder) Run() {
111+
s := rpc.NewServer()
112+
c := make(chan string)
113+
//for the input result
114+
calc_chan := make(chan string)
115+
r.resChan = calc_chan
116+
117+
serv := Service{c, calc_chan}
118+
s.Register(&serv)
119+
sigs := make(chan os.Signal, 1)
120+
signal.Notify(sigs, syscall.SIGINT)
121+
go func() {
122+
for range sigs {
123+
r.cancel()
124+
}
125+
}()
126+
127+
go s.ServeCodec(jsonrpc.NewServerCodec(rwCloser{os.Stdin, os.Stdout})) //somehow need to get the input that is being
128+
for {
129+
res := <-c
130+
r.InputAndPrintError(res)
131+
}
132+
133+
}
134+
135+
func (r *ScopeHolder) cancel() {
136+
r.cancelMu.Lock()
137+
defer r.cancelMu.Unlock()
138+
if r.cancelFunc != nil {
139+
r.cancelFunc()
140+
r.cancelFunc = nil
141+
}
142+
}
143+
144+
func (r *ScopeHolder) setCancel(cf context.CancelFunc) {
145+
r.cancelMu.Lock()
146+
defer r.cancelMu.Unlock()
147+
r.cancelFunc = cf
148+
}
149+
func (r *ScopeHolder) clearCancel() {
150+
r.setCancel(nil)
151+
}
152+
153+
func (r *ScopeHolder) Input(t string) (*libflux.FluxError, error) {
154+
a, err := r.executeLine(t)
155+
return a, err
156+
}
157+
158+
// input processes a line of input and prints the result.
159+
func (r *ScopeHolder) InputAndPrintError(t string) {
160+
if fluxError, err := r.executeLine(t); err != nil {
161+
if fluxError != nil {
162+
163+
fluxError.Print()
164+
} else {
165+
fmt.Println("Error:", err)
166+
}
167+
}
168+
}
169+
170+
func (r *ScopeHolder) Eval(t string) ([]interpreter.SideEffect, error) {
171+
s, _, err := r.evalWithFluxError(t)
172+
return s, err
173+
}
174+
175+
func (r *ScopeHolder) evalWithFluxError(t string) ([]interpreter.SideEffect, *libflux.FluxError, error) {
176+
if t == "" {
177+
return nil, nil, nil
178+
}
179+
180+
if t[0] == '@' {
181+
q, err := LoadQuery(t)
182+
if err != nil {
183+
return nil, nil, err
184+
}
185+
t = q
186+
}
187+
188+
pkg, fluxError, err := r.analyzeLine(t)
189+
if err != nil {
190+
return nil, fluxError, err
191+
}
192+
193+
ctx, span := dependency.Inject(r.ctx, execute.DefaultExecutionDependencies())
194+
defer span.Finish()
195+
196+
x, err := r.itrp.Eval(ctx, pkg, r.scope, r.importer)
197+
return x, nil, err
198+
}
199+
200+
// executeLine processes a line of input.
201+
// If the input evaluates to a valid value, that value is returned.
202+
func (r *ScopeHolder) executeLine(t string) (*libflux.FluxError, error) {
203+
ses, fluxError, err := r.evalWithFluxError(t)
204+
if err != nil {
205+
return fluxError, err
206+
}
207+
208+
for _, se := range ses {
209+
if _, ok := se.Node.(*semantic.ExpressionStatement); ok {
210+
if t, ok := se.Value.(*flux.TableObject); ok {
211+
now, ok := r.scope.Lookup("now")
212+
if !ok {
213+
return nil, fmt.Errorf("now option not set")
214+
}
215+
nowTime, err := now.Function().Call(r.ctx, nil)
216+
if err != nil {
217+
return nil, err
218+
}
219+
s, err := spec.FromTableObject(r.ctx, t, nowTime.Time().Time())
220+
if err != nil {
221+
return nil, err
222+
}
223+
if err := r.doQuery(r.ctx, s); err != nil {
224+
return nil, err
225+
}
226+
} else {
227+
228+
var a []byte
229+
buf := bytes.NewBuffer(a)
230+
values.Display(buf, se.Value)
231+
r.resChan <- buf.String()
232+
}
233+
}
234+
}
235+
return nil, nil
236+
}
237+
238+
func (r *ScopeHolder) analyzeLine(t string) (*semantic.Package, *libflux.FluxError, error) {
239+
pkg, fluxError := r.analyzer.AnalyzeString(t)
240+
if fluxError != nil {
241+
return nil, fluxError, fluxError.GoError()
242+
}
243+
244+
bs, err := pkg.MarshalFB()
245+
if err != nil {
246+
return nil, nil, err
247+
}
248+
x, err := semantic.DeserializeFromFlatBuffer(bs)
249+
return x, nil, err
250+
}
251+
252+
func (r *ScopeHolder) doQuery(ctx context.Context, spec *flux.Spec) error {
253+
// Setup cancel context
254+
ctx, cancelFunc := context.WithCancel(ctx)
255+
r.setCancel(cancelFunc)
256+
defer cancelFunc()
257+
defer r.clearCancel()
258+
259+
c := repl.Compiler{
260+
Spec: spec,
261+
}
262+
263+
program, err := c.Compile(ctx, runtime.Default)
264+
if err != nil {
265+
return err
266+
}
267+
alloc := &memory.ResourceAllocator{}
268+
269+
qry, err := program.Start(ctx, alloc)
270+
if err != nil {
271+
return err
272+
}
273+
defer qry.Done()
274+
275+
for result := range qry.Results() {
276+
tables := result.Tables()
277+
fmt.Println("Result:", result.Name())
278+
if err := tables.Do(func(tbl flux.Table) error {
279+
_, err := execute.NewFormatter(tbl, nil).WriteTo(os.Stdout)
280+
return err
281+
}); err != nil {
282+
return err
283+
}
284+
}
285+
qry.Done()
286+
return qry.Err()
287+
}
288+
289+
// LoadQuery returns the Flux query q, except for two special cases:
290+
// if q is exactly "-", the query will be read from stdin;
291+
// and if the first character of q is "@",
292+
// the @ prefix is removed and the contents of the file specified by the rest of q are returned.
293+
func LoadQuery(q string) (string, error) {
294+
if q == "-" {
295+
data, err := ioutil.ReadAll(os.Stdin)
296+
if err != nil {
297+
return "", err
298+
}
299+
return string(data), nil
300+
}
301+
302+
if len(q) > 0 && q[0] == '@' {
303+
data, err := ioutil.ReadFile(q[1:])
304+
if err != nil {
305+
return "", err
306+
}
307+
308+
return string(data), nil
309+
}
310+
311+
return q, nil
312+
}
313+
314+
const DefaultInfluxDBHost = "http://localhost:9999"
315+
316+
func injectDependencies(ctx context.Context) (context.Context, *dependency.Span) {
317+
deps := dependencies.NewDefaultDependencies(DefaultInfluxDBHost)
318+
return dependency.Inject(ctx, deps)
319+
}
320+
321+
func runE(args []string) error {
322+
323+
ctx, close := context.Background(), func() {}
324+
325+
defer close()
326+
327+
// Defer initialization until other common errors
328+
// have already passed to avoid a long load time
329+
// for a simple unrelated error.
330+
fluxinit.FluxInit()
331+
ctx, span := injectDependencies(ctx)
332+
defer span.Finish()
333+
334+
flagger := executetest.TestFlagger{}
335+
336+
ctx = feature.Dependency{Flagger: flagger}.Inject(ctx)
337+
338+
var opts []repl.Option
339+
opts = append(opts, repl.EnableSuggestions())
340+
return replE(ctx, opts...)
341+
342+
}
343+
344+
func replE(ctx context.Context, opts ...repl.Option) error {
345+
r := New(ctx)
346+
r.Run()
347+
return nil
348+
}
349+
350+
func main() {
351+
runE(nil)
352+
}

0 commit comments

Comments
 (0)