Skip to content

Commit e059d22

Browse files
authored
Revert "Revert connect logs PRs" (#3134)
1 parent 5619dec commit e059d22

33 files changed

+988
-9
lines changed

cmd/lint/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ var vocabWords = []string{
228228
"crn",
229229
"csharp",
230230
"csu",
231+
"datetime",
231232
"decrypt",
232233
"dek",
233234
"deserializer",
@@ -332,6 +333,7 @@ var vocabWords = []string{
332333
"vnet",
333334
"vpc",
334335
"whitelist",
336+
"wikipedia",
335337
"yaml",
336338
"yml",
337339
"zstd",

internal/connect/command.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command {
2525
cmd.AddCommand(newClusterCommand(cfg, prerunner))
2626
cmd.AddCommand(newCustomPluginCommand(cfg, prerunner))
2727
cmd.AddCommand(newEventCommand(prerunner))
28+
cmd.AddCommand(newLogsCommand(prerunner))
2829
cmd.AddCommand(newOffsetCommand(prerunner))
2930
cmd.AddCommand(newPluginCommand(cfg, prerunner))
3031

internal/connect/command_logs.go

Lines changed: 333 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,333 @@
1+
package connect
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"net/url"
7+
"os"
8+
"regexp"
9+
"strings"
10+
"time"
11+
12+
"github.com/spf13/cobra"
13+
14+
"github.com/confluentinc/cli/v4/pkg/ccloudv2"
15+
pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
16+
"github.com/confluentinc/cli/v4/pkg/config"
17+
"github.com/confluentinc/cli/v4/pkg/examples"
18+
"github.com/confluentinc/cli/v4/pkg/kafka"
19+
"github.com/confluentinc/cli/v4/pkg/output"
20+
)
21+
22+
type logsCommand struct {
23+
*pcmd.AuthenticatedCLICommand
24+
}
25+
26+
type logEntryOut struct {
27+
Timestamp string `human:"Timestamp" serialized:"timestamp"`
28+
Level string `human:"Level" serialized:"level"`
29+
TaskId string `human:"Task ID" serialized:"task_id"`
30+
Message string `human:"Message" serialized:"message"`
31+
Exception string `human:"Exception" serialized:"exception"`
32+
}
33+
34+
func newLogsCommand(prerunner pcmd.PreRunner) *cobra.Command {
35+
cmd := &cobra.Command{
36+
Use: "logs <id>",
37+
Short: "Manage logs for connectors.",
38+
Args: cobra.ExactArgs(1),
39+
Example: examples.BuildExampleString(
40+
examples.Example{
41+
Text: "Query connector logs with log level ERROR between the provided time window:",
42+
Code: `confluent connect logs lcc-123456 --level ERROR --start-time "2025-02-01T00:00:00Z" --end-time "2025-02-01T23:59:59Z"`,
43+
},
44+
examples.Example{
45+
Text: "Query connector logs with log level ERROR and WARN between the provided time window:",
46+
Code: `confluent connect logs lcc-123456 --level "ERROR|WARN" --start-time "2025-02-01T00:00:00Z" --end-time "2025-02-01T23:59:59Z"`,
47+
},
48+
examples.Example{
49+
Text: "Query subsequent pages of connector logs for the same query by executing the command with next flag until \"No logs found for the current query\" is printed to the console:",
50+
Code: `confluent connect logs lcc-123456 --level ERROR --start-time "2025-02-01T00:00:00Z" --end-time "2025-02-01T23:59:59Z" --next`,
51+
},
52+
examples.Example{
53+
Text: "Query connector logs with log level ERROR and containing \"example error\" in logs between the provided time window, and store in file:",
54+
Code: `confluent connect logs lcc-123456 --level "ERROR" --search-text "example error" --start-time "2025-02-01T00:00:00Z" --end-time "2025-02-01T23:59:59Z" --output-file errors.json`,
55+
},
56+
examples.Example{
57+
Text: "Query connector logs with log level ERROR and matching regex \"exa*\" in logs between the provided time window, and store in file:",
58+
Code: `confluent connect logs lcc-123456 --level "ERROR" --search-text "exa*" --start-time "2025-02-01T00:00:00Z" --end-time "2025-02-01T23:59:59Z" --output-file errors.json`,
59+
},
60+
),
61+
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin},
62+
}
63+
64+
c := &logsCommand{pcmd.NewAuthenticatedCLICommand(cmd, prerunner)}
65+
cmd.RunE = c.queryLogs
66+
cmd.Flags().String("start-time", "", "Start time for log query in ISO 8601 (https://en.wikipedia.org/wiki/ISO_8601) UTC datetime format (e.g., 2025-02-01T00:00:00Z).")
67+
cmd.Flags().String("end-time", "", "End time for log query in ISO 8601 (https://en.wikipedia.org/wiki/ISO_8601) UTC datetime format (e.g., 2025-02-01T23:59:59Z).")
68+
cmd.Flags().String("level", "ERROR", "Log level filter (INFO, WARN, ERROR). Defaults to ERROR. Use '|' to specify multiple levels (e.g., ERROR|WARN).")
69+
cmd.Flags().String("search-text", "", "Search text within logs.")
70+
cmd.Flags().String("output-file", "", "Output file path to append connector logs.")
71+
cmd.Flags().Bool("next", false, "Whether to fetch next page of logs after the next execution of the command.")
72+
73+
pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand)
74+
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
75+
pcmd.AddOutputFlag(cmd)
76+
77+
cobra.CheckErr(cmd.MarkFlagRequired("start-time"))
78+
cobra.CheckErr(cmd.MarkFlagRequired("end-time"))
79+
80+
return cmd
81+
}
82+
83+
func (c *logsCommand) queryLogs(cmd *cobra.Command, args []string) error {
84+
connectorId := args[0]
85+
if connectorId == "" {
86+
return fmt.Errorf("connector ID cannot be empty")
87+
}
88+
89+
startTime, err := cmd.Flags().GetString("start-time")
90+
if err != nil {
91+
return err
92+
}
93+
94+
endTime, err := cmd.Flags().GetString("end-time")
95+
if err != nil {
96+
return err
97+
}
98+
99+
level, err := cmd.Flags().GetString("level")
100+
if err != nil {
101+
return err
102+
}
103+
levels := strings.Split(level, "|")
104+
for _, l := range levels {
105+
if l != "INFO" && l != "WARN" && l != "ERROR" {
106+
return fmt.Errorf("invalid log level: %s", l)
107+
}
108+
}
109+
110+
searchText, err := cmd.Flags().GetString("search-text")
111+
if err != nil {
112+
return err
113+
}
114+
115+
outputFile, err := cmd.Flags().GetString("output-file")
116+
if err != nil {
117+
return err
118+
}
119+
120+
next, err := cmd.Flags().GetBool("next")
121+
if err != nil {
122+
return err
123+
}
124+
125+
if err := validateTimeFormat(startTime); err != nil {
126+
return fmt.Errorf("invalid start-time format: %w", err)
127+
}
128+
129+
if err := validateTimeFormat(endTime); err != nil {
130+
return fmt.Errorf("invalid end-time format: %w", err)
131+
}
132+
133+
if endTime < startTime {
134+
return fmt.Errorf("end-time must be greater than start-time")
135+
}
136+
137+
startTimeParsed, _ := time.Parse(time.RFC3339, startTime)
138+
now := time.Now()
139+
maxAge := 72 * time.Hour
140+
if startTimeParsed.Before(now.Add(-maxAge)) {
141+
return fmt.Errorf("start-time cannot be older than 72 hours")
142+
}
143+
144+
currentLogQuery := &config.ConnectLogsQueryState{
145+
StartTime: startTime,
146+
EndTime: endTime,
147+
Level: level,
148+
SearchText: searchText,
149+
ConnectorId: connectorId,
150+
PageToken: "",
151+
}
152+
153+
kafkaCluster, err := kafka.GetClusterForCommand(c.V2Client, c.Context)
154+
if err != nil {
155+
return fmt.Errorf("Kafka cluster information not found: %w\nPlease ensure you have set a cluster context with 'confluent kafka cluster use <cluster-id>' or specify --cluster flag", err)
156+
}
157+
kafkaClusterId := kafkaCluster.GetId()
158+
159+
environmentId, err := c.Context.EnvironmentId()
160+
if err != nil {
161+
return fmt.Errorf("Environment ID not found: %w\nPlease ensure you have set an environment context with 'confluent environment use <env-id>' or specify --environment flag", err)
162+
}
163+
164+
connectorName, err := c.getConnectorName(connectorId, environmentId, kafkaClusterId)
165+
if connectorName == "" {
166+
return fmt.Errorf("Connector not found: %w", err)
167+
}
168+
169+
lastQueryPageToken, err := c.getPageTokenFromStoredQuery(next, currentLogQuery)
170+
if err != nil {
171+
return nil
172+
}
173+
174+
crn := fmt.Sprintf("crn://confluent.cloud/organization=%s/environment=%s/cloud-cluster=%s/connector=%s",
175+
c.Context.GetCurrentOrganization(),
176+
environmentId,
177+
kafkaClusterId,
178+
connectorName,
179+
)
180+
181+
logs, err := c.V2Client.SearchConnectorLogs(crn, startTime, endTime, levels, searchText, lastQueryPageToken)
182+
if err != nil {
183+
return fmt.Errorf("failed to query connector logs: %w", err)
184+
}
185+
186+
err = c.storeQueryInContext(logs, currentLogQuery)
187+
if err != nil {
188+
return err
189+
}
190+
191+
if outputFile != "" {
192+
return writeLogsToFile(outputFile, logs)
193+
}
194+
195+
if output.GetFormat(cmd).IsSerialized() {
196+
return output.SerializedOutput(cmd, logs.Data)
197+
}
198+
199+
return printHumanLogs(cmd, logs, connectorId)
200+
}
201+
202+
func (c *logsCommand) getPageTokenFromStoredQuery(next bool, currentLogQuery *config.ConnectLogsQueryState) (string, error) {
203+
lastLogQuery := c.Context.GetConnectLogsQueryState()
204+
var lastQueryPageToken string
205+
if next {
206+
if lastLogQuery != nil && (lastLogQuery.StartTime == currentLogQuery.StartTime &&
207+
lastLogQuery.EndTime == currentLogQuery.EndTime &&
208+
lastLogQuery.Level == currentLogQuery.Level &&
209+
lastLogQuery.SearchText == currentLogQuery.SearchText &&
210+
lastLogQuery.ConnectorId == currentLogQuery.ConnectorId) {
211+
lastQueryPageToken = lastLogQuery.PageToken
212+
if lastQueryPageToken == "" {
213+
output.Printf(false, "No logs found for the current query\n")
214+
return "", fmt.Errorf("No logs found for the current query")
215+
}
216+
} else {
217+
lastQueryPageToken = ""
218+
}
219+
} else {
220+
lastQueryPageToken = ""
221+
}
222+
return lastQueryPageToken, nil
223+
}
224+
225+
func (c *logsCommand) storeQueryInContext(logs *ccloudv2.LoggingSearchResponse, currentLogQuery *config.ConnectLogsQueryState) error {
226+
if logs.Metadata != nil {
227+
pageToken, err := extractPageToken(logs.Metadata.Next)
228+
currentLogQuery.SetPageToken(pageToken)
229+
if err != nil {
230+
return fmt.Errorf("failed to extract page token: %w", err)
231+
}
232+
} else {
233+
currentLogQuery.SetPageToken("")
234+
}
235+
236+
err := c.Context.SetConnectLogsQueryState(currentLogQuery)
237+
if err != nil {
238+
return fmt.Errorf("failed to set connect logs query state: %w", err)
239+
}
240+
if err := c.Config.Save(); err != nil {
241+
return err
242+
}
243+
return nil
244+
}
245+
246+
func (c *logsCommand) getConnectorName(connectorId, environmentId, kafkaClusterId string) (string, error) {
247+
connector, err := c.V2Client.GetConnectorExpansionById(connectorId, environmentId, kafkaClusterId)
248+
if err != nil {
249+
return "", err
250+
}
251+
connectorInfo := connector.GetInfo()
252+
return connectorInfo.GetName(), nil
253+
}
254+
255+
func writeLogsToFile(outputFile string, logs *ccloudv2.LoggingSearchResponse) error {
256+
file, err := os.OpenFile(outputFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
257+
if err != nil {
258+
return fmt.Errorf("failed to open file %s: %w", outputFile, err)
259+
}
260+
defer file.Close()
261+
262+
for _, log := range logs.Data {
263+
logEntry := &logEntryOut{
264+
Timestamp: log.Timestamp,
265+
Level: log.Level,
266+
TaskId: log.TaskId,
267+
Message: log.Message,
268+
}
269+
if log.Exception != nil {
270+
logEntry.Exception = log.Exception.Stacktrace
271+
}
272+
data, err := json.Marshal(logEntry)
273+
if err != nil {
274+
return fmt.Errorf("failed to marshal log entry to JSON: %w", err)
275+
}
276+
277+
if _, err := file.Write(data); err != nil {
278+
return fmt.Errorf("failed to write log entry to file %s: %w", outputFile, err)
279+
}
280+
281+
if _, err := file.WriteString("\n"); err != nil {
282+
return fmt.Errorf("failed to write newline to file %s: %w", outputFile, err)
283+
}
284+
}
285+
286+
output.Printf(false, "Appended %d log entries to file: %s\n", len(logs.Data), outputFile)
287+
return nil
288+
}
289+
290+
func validateTimeFormat(timeStr string) error {
291+
pattern := `^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$`
292+
match, err := regexp.MatchString(pattern, timeStr)
293+
if !match || err != nil {
294+
return fmt.Errorf("must be formatted as: YYYY-MM-DDTHH:MM:SSZ")
295+
}
296+
return nil
297+
}
298+
299+
func extractPageToken(urlStr string) (string, error) {
300+
parsedURL, err := url.Parse(urlStr)
301+
if err != nil {
302+
return "", err
303+
}
304+
305+
queryParams := parsedURL.Query()
306+
pageToken := queryParams.Get("page_token")
307+
308+
return pageToken, nil
309+
}
310+
311+
func printHumanLogs(cmd *cobra.Command, logs *ccloudv2.LoggingSearchResponse, connectorId string) error {
312+
list := output.NewList(cmd)
313+
for _, log := range logs.Data {
314+
logOut := &logEntryOut{
315+
Timestamp: log.Timestamp,
316+
Level: log.Level,
317+
TaskId: log.TaskId,
318+
Message: log.Message,
319+
}
320+
if log.Exception != nil {
321+
logOut.Exception = log.Exception.Stacktrace
322+
}
323+
list.Add(logOut)
324+
}
325+
326+
if len(logs.Data) == 0 {
327+
output.Println(false, "No logs found for the current query")
328+
return nil
329+
}
330+
331+
output.Printf(false, "Found %d log entries for connector %s:\n\n", len(logs.Data), connectorId)
332+
return list.Print()
333+
}

0 commit comments

Comments
 (0)