Skip to content

Commit 4e87a7a

Browse files
author
柳丰
committed
feat: support invoke cmd on kubernetes mode
Signed-off-by: imneov <[email protected]>
1 parent 0f9d0d3 commit 4e87a7a

File tree

2 files changed

+199
-6
lines changed

2 files changed

+199
-6
lines changed

cmd/invoke.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"github.com/spf13/cobra"
2424

25+
"github.com/dapr/cli/pkg/kubernetes"
2526
"github.com/dapr/cli/pkg/print"
2627
"github.com/dapr/cli/pkg/standalone"
2728
)
@@ -67,7 +68,6 @@ dapr invoke --unix-domain-socket --app-id target --method sample --verb GET
6768
} else if invokeData != "" {
6869
bytePayload = []byte(invokeData)
6970
}
70-
client := standalone.NewClient()
7171

7272
// TODO(@daixiang0): add Windows support
7373
if invokeSocket != "" {
@@ -79,11 +79,22 @@ dapr invoke --unix-domain-socket --app-id target --method sample --verb GET
7979
}
8080
}
8181

82-
response, err := client.Invoke(invokeAppID, invokeAppMethod, bytePayload, invokeVerb, invokeSocket)
83-
if err != nil {
84-
err = fmt.Errorf("error invoking app %s: %s", invokeAppID, err)
85-
print.FailureStatusEvent(os.Stderr, err.Error())
86-
return
82+
var response string
83+
if kubernetesMode {
84+
response, err = kubernetes.Invoke(invokeAppID, invokeAppMethod, bytePayload, invokeVerb)
85+
if err != nil {
86+
err = fmt.Errorf("error invoking app %s: %s", invokeAppID, err)
87+
print.FailureStatusEvent(os.Stderr, err.Error())
88+
return
89+
}
90+
} else {
91+
client := standalone.NewClient()
92+
response, err = client.Invoke(invokeAppID, invokeAppMethod, bytePayload, invokeVerb, invokeSocket)
93+
if err != nil {
94+
err = fmt.Errorf("error invoking app %s: %s", invokeAppID, err)
95+
print.FailureStatusEvent(os.Stderr, err.Error())
96+
return
97+
}
8798
}
8899

89100
if response != "" {
@@ -94,6 +105,7 @@ dapr invoke --unix-domain-socket --app-id target --method sample --verb GET
94105
}
95106

96107
func init() {
108+
InvokeCmd.Flags().BoolVarP(&kubernetesMode, "kubernetes", "k", false, "Invoke a method on a Dapr application in a Kubernetes cluster")
97109
InvokeCmd.Flags().StringVarP(&invokeAppID, "app-id", "a", "", "The application id to invoke")
98110
InvokeCmd.Flags().StringVarP(&invokeAppMethod, "method", "m", "", "The method to invoke")
99111
InvokeCmd.Flags().StringVarP(&invokeData, "data", "d", "", "The JSON serialized data string (optional)")

pkg/kubernetes/invoke.go

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
Copyright 2021 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package kubernetes
15+
16+
import (
17+
"context"
18+
"fmt"
19+
"github.com/dapr/cli/pkg/api"
20+
"net/url"
21+
"strings"
22+
23+
core_v1 "k8s.io/api/core/v1"
24+
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
"k8s.io/apimachinery/pkg/util/net"
26+
k8s "k8s.io/client-go/kubernetes"
27+
"k8s.io/client-go/rest"
28+
)
29+
30+
type AppInfo struct {
31+
AppID string `csv:"APP ID" json:"appId" yaml:"appId"`
32+
HTTPPort string `csv:"HTTP PORT" json:"httpPort" yaml:"httpPort"`
33+
GRPCPort string `csv:"GRPC PORT" json:"grpcPort" yaml:"grpcPort"`
34+
AppPort string `csv:"APP PORT" json:"appPort" yaml:"appPort"`
35+
PodName string `csv:"POD NAME" json:"podName" yaml:"podName"`
36+
Namespace string `csv:"NAMESPACE" json:"namespace" yaml:"namespace"`
37+
}
38+
39+
type (
40+
DaprPod core_v1.Pod
41+
DaprAppList []*AppInfo
42+
)
43+
44+
// Invoke is a command to invoke a remote or local dapr instance.
45+
func Invoke(appID, method string, data []byte, verb string) (string, error) {
46+
client, err := Client()
47+
if err != nil {
48+
return "", err
49+
}
50+
51+
app, err := GetAppInfo(client, appID)
52+
if err != nil {
53+
return "", err
54+
}
55+
56+
return invoke(client.CoreV1().RESTClient(), app, method, data, verb)
57+
}
58+
59+
func invoke(client rest.Interface, app *AppInfo, method string, data []byte, verb string) (string, error) {
60+
res, err := app.Request(client.Verb(verb), method, data, verb)
61+
if err != nil {
62+
return "", fmt.Errorf("error get request: %w", err)
63+
}
64+
65+
result := res.Do(context.TODO())
66+
rawbody, err := result.Raw()
67+
if err != nil {
68+
return "", fmt.Errorf("error get raw: %w", err)
69+
}
70+
71+
if len(rawbody) > 0 {
72+
return string(rawbody), nil
73+
}
74+
75+
return "", nil
76+
}
77+
78+
func GetAppInfo(client k8s.Interface, appID string) (*AppInfo, error) {
79+
list, err := ListAppInfos(client, appID)
80+
if err != nil {
81+
return nil, err
82+
}
83+
if len(list) == 0 {
84+
return nil, fmt.Errorf("%s not found", appID)
85+
}
86+
app := list[0]
87+
return app, nil
88+
}
89+
90+
// List outputs plugins.
91+
func ListAppInfos(client k8s.Interface, appIDs ...string) (DaprAppList, error) {
92+
opts := v1.ListOptions{}
93+
podList, err := client.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), opts)
94+
if err != nil {
95+
return nil, fmt.Errorf("err get pods list:%w", err)
96+
}
97+
98+
fn := func(*AppInfo) bool {
99+
return true
100+
}
101+
if len(appIDs) > 0 {
102+
fn = func(a *AppInfo) bool {
103+
for _, id := range appIDs {
104+
if id != "" && a.AppID == id {
105+
return true
106+
}
107+
}
108+
return false
109+
}
110+
}
111+
112+
l := make(DaprAppList, 0)
113+
for _, p := range podList.Items {
114+
p := DaprPod(p)
115+
FindLoop:
116+
for _, c := range p.Spec.Containers {
117+
if c.Name == "daprd" {
118+
app := getAppInfoFromPod(&p)
119+
if fn(app) {
120+
l = append(l, app)
121+
}
122+
break FindLoop
123+
}
124+
}
125+
}
126+
127+
return l, nil
128+
}
129+
130+
func getAppInfoFromPod(p *DaprPod) (a *AppInfo) {
131+
for _, c := range p.Spec.Containers {
132+
if c.Name == "daprd" {
133+
a = &AppInfo{
134+
PodName: p.Name,
135+
Namespace: p.Namespace,
136+
}
137+
for i, arg := range c.Args {
138+
if arg == "--app-port" {
139+
port := c.Args[i+1]
140+
a.AppPort = port
141+
} else if arg == "--dapr-http-port" {
142+
port := c.Args[i+1]
143+
a.HTTPPort = port
144+
} else if arg == "--dapr-grpc-port" {
145+
port := c.Args[i+1]
146+
a.GRPCPort = port
147+
} else if arg == "--app-id" {
148+
id := c.Args[i+1]
149+
a.AppID = id
150+
}
151+
}
152+
}
153+
}
154+
155+
return
156+
}
157+
158+
func (a *AppInfo) Request(r *rest.Request, method string, data []byte, verb string) (*rest.Request, error) {
159+
r = r.Namespace(a.Namespace).
160+
Resource("pods").
161+
SubResource("proxy").
162+
SetHeader("Content-Type", "application/json").
163+
Name(net.JoinSchemeNamePort("", a.PodName, a.HTTPPort))
164+
if data != nil {
165+
r = r.Body(data)
166+
}
167+
168+
u, err := url.Parse(method)
169+
if err != nil {
170+
return nil, fmt.Errorf("error parse method %s: %w", method, err)
171+
}
172+
173+
suffix := fmt.Sprintf("v%s/invoke/%s/method/%s", api.RuntimeAPIVersion, a.AppID, u.Path)
174+
r = r.Suffix(suffix)
175+
176+
for k, vs := range u.Query() {
177+
r = r.Param(k, strings.Join(vs, ","))
178+
}
179+
180+
return r, nil
181+
}

0 commit comments

Comments
 (0)